Source code for cfxdb.realmstore._publication

##############################################################################
#
#                        Crossbar.io Database
#     Copyright (c) typedef int GmbH. Licensed under MIT.
#
##############################################################################

import pprint

import cbor2
from zlmdb import MapOidFlatBuffers, flatbuffers, table

from cfxdb.gen.realmstore import Publication as PublicationGen


[docs] class _Publication(PublicationGen.Publication): """ Expand methods on the class code generated by flatc. FIXME: comes up with a PR for flatc to generated this stuff automatically. """ @classmethod
[docs] def GetRootAsPublication(cls, buf, offset): n = flatbuffers.encode.Get(flatbuffers.packer.uoffset, buf, offset) x = _Publication() x.Init(buf, n + offset) return x
[docs] def ArgsAsBytes(self): o = flatbuffers.number_types.UOffsetTFlags.py_type(self._tab.Offset(12)) if o != 0: _off = self._tab.Vector(o) _len = self._tab.VectorLen(o) return memoryview(self._tab.Bytes)[_off : _off + _len] return None
[docs] def KwargsAsBytes(self): o = flatbuffers.number_types.UOffsetTFlags.py_type(self._tab.Offset(14)) if o != 0: _off = self._tab.Vector(o) _len = self._tab.VectorLen(o) return memoryview(self._tab.Bytes)[_off : _off + _len] return None
[docs] def PayloadAsBytes(self): o = flatbuffers.number_types.UOffsetTFlags.py_type(self._tab.Offset(16)) if o != 0: _off = self._tab.Vector(o) _len = self._tab.VectorLen(o) return memoryview(self._tab.Bytes)[_off : _off + _len] return None
[docs] def EncKeyAsBytes(self): o = flatbuffers.number_types.UOffsetTFlags.py_type(self._tab.Offset(38)) if o != 0: _off = self._tab.Vector(o) _len = self._tab.VectorLen(o) return memoryview(self._tab.Bytes)[_off : _off + _len] return None
[docs] class Publication(object): """ Persisted publication database object. """
[docs] ENC_ALGO_NONE = 0
[docs] ENC_ALGO_CRYPTOBOX = 1
[docs] ENC_ALGO_MQTT = 2
[docs] ENC_ALGO_XBR = 3
[docs] ENC_SER_NONE = 0
[docs] ENC_SER_JSON = 1
[docs] ENC_SER_MSGPACK = 2
[docs] ENC_SER_CBOR = 3
[docs] ENC_SER_UBJSON = 4
[docs] ENC_SER_OPAQUE = 5
[docs] ENC_SER_FLATBUFFERS = 6
[docs] __slots__ = ( "_from_fbs", "_timestamp", "_publication", "_publisher", "_topic", "_args", "_kwargs", "_payload", "_acknowledge", "_retain", "_exclude_me", "_exclude", "_exclude_authid", "_exclude_authrole", "_eligible", "_eligible_authid", "_eligible_authrole", "_enc_algo", "_enc_key", "_enc_serializer", )
def __init__(self, from_fbs=None):
[docs] self._from_fbs = from_fbs
[docs] self._timestamp = None
[docs] self._publication = None
[docs] self._publisher = None
[docs] self._topic = None
[docs] self._args = None
[docs] self._kwargs = None
[docs] self._payload = None
[docs] self._acknowledge = None
[docs] self._retain = None
[docs] self._exclude_me = None
[docs] self._exclude = None
[docs] self._exclude_authid = None
[docs] self._exclude_authrole = None
[docs] self._eligible = None
[docs] self._eligible_authid = None
[docs] self._eligible_authrole = None
[docs] self._enc_algo = None
[docs] self._enc_key = None
[docs] self._enc_serializer = None
[docs] def marshal(self): obj = { "timestamp": self.timestamp, "publication": self.publication, "publisher": self.publisher, "topic": self.topic, "args": self.args, "kwargs": self.kwargs, "payload": self.payload, "acknowledge": self.acknowledge, "retain": self.retain, "exclude_me": self.exclude_me, "exclude": self.exclude, "exclude_authid": self.exclude_authid, "exclude_authrole": self.exclude_authrole, "eligible": self.eligible, "eligible_authid": self.eligible_authid, "eligible_authrole": self.eligible_authrole, "enc_algo": self.enc_algo, "enc_key": self.enc_key, "enc_serializer": self.enc_serializer, } return obj
[docs] def __str__(self): return "\n{}\n".format(pprint.pformat(self.marshal()))
@property
[docs] def timestamp(self): """ Timestamp when the publication was accepted by the broker. Epoch time in ns. :returns: epoch time in ns :rtype: int """ if self._timestamp is None and self._from_fbs: self._timestamp = self._from_fbs.Timestamp() return self._timestamp
@timestamp.setter def timestamp(self, value): assert value is None or type(value) == int self._timestamp = value @property
[docs] def publication(self): """ WAMP publication ID that was assigned by the broker. :returns: publication ID :rtype: int """ if self._publication is None and self._from_fbs: self._publication = self._from_fbs.Publication() return self._publication
@publication.setter def publication(self, value): assert value is None or type(value) == int self._publication = value @property
[docs] def publisher(self): """ WAMP session ID of the publisher. :returns: publisher ID :rtype: int """ if self._publisher is None and self._from_fbs: self._publisher = self._from_fbs.Publisher() return self._publisher
@publisher.setter def publisher(self, value): assert value is None or type(value) == int self._publisher = value @property
[docs] def topic(self): """ The WAMP or application URI of the PubSub topic the event was published to. :returns: topic (URI) published to :rtype: str """ if self._topic is None and self._from_fbs: self._topic = self._from_fbs.Topic().decode("utf8") return self._topic
@topic.setter def topic(self, value): assert value is None or type(value) == str self._topic = value # # args, kwargs, payload # @property
[docs] def args(self): """ Positional values for application-defined event payload. :returns: positional arguments (app payload) of the event (if any) :rtype: None or list """ if self._args is None and self._from_fbs: if self._from_fbs.ArgsLength(): self._args = cbor2.loads(bytes(self._from_fbs.ArgsAsBytes())) return self._args
@args.setter def args(self, value): assert value is None or type(value) == list self._args = value @property
[docs] def kwargs(self): """ Keyword values for application-defined event payload. :returns: keyword arguments (app payload) of the event (if any) :rtype: None or dict """ if self._kwargs is None and self._from_fbs: if self._from_fbs.KwargsLength(): self._kwargs = cbor2.loads(bytes(self._from_fbs.KwargsAsBytes())) return self._kwargs
@kwargs.setter def kwargs(self, value): assert value is None or type(value) == dict self._kwargs = value @property
[docs] def payload(self): """ Alternative, transparent payload. If given, ``args`` and ``kwargs`` must be left unset. :returns: Transparent binary payload (see ``enc_algo``) if applicable :rtype: None or bytes """ if self._payload is None and self._from_fbs: if self._from_fbs.PayloadLength(): self._payload = self._from_fbs.PayloadAsBytes() return self._payload
@payload.setter def payload(self, value): assert value is None or type(value) == bytes self._payload = value # # acknowledge, retain, exclude_me # @property
[docs] def acknowledge(self): """ If ``True``, the broker was asked to acknowledge the publication with a success or error response. :returns: acknowledge flag :rtype: None or bool """ if self._acknowledge is None and self._from_fbs: self._acknowledge = self._from_fbs.Acknowledge() return self._acknowledge
@acknowledge.setter def acknowledge(self, value): assert value is None or type(value) == bool self._acknowledge = value @property
[docs] def retain(self): """ If ``True``, the broker was requested to retain this event. :returns: retain flag :rtype: None or bool """ if self._retain is None and self._from_fbs: self._retain = self._from_fbs.Retain() return self._retain
@retain.setter def retain(self, value): assert value is None or type(value) == bool self._retain = value @property
[docs] def exclude_me(self): """ If ``True``, the broker was asked to exclude the publisher from receiving the event. :returns: exclude_me flag :rtype: None or bool """ if self._exclude_me is None and self._from_fbs: self._exclude_me = self._from_fbs.ExcludeMe() return self._exclude_me
@exclude_me.setter def exclude_me(self, value): assert value is None or type(value) == bool self._exclude_me = value # # exclude, exclude_authid, exclude_authrole # @property
[docs] def exclude(self): """ List of WAMP session IDs to exclude from receiving this event. :returns: list of excluded session IDs :rtype: list[int] """ if self._exclude is None and self._from_fbs: if self._from_fbs.ExcludeLength(): exclude = [] for j in range(self._from_fbs.ExcludeLength()): exclude.append(self._from_fbs.Exclude(j)) self._exclude = exclude return self._exclude
@exclude.setter def exclude(self, value): assert value is None or type(value) == list if value: for x in value: assert type(x) == int self._exclude = value @property
[docs] def exclude_authid(self): """ List of WAMP authids to exclude from receiving this event. :returns: list of excluded authids :rtype: list[str] """ if self._exclude_authid is None and self._from_fbs: if self._from_fbs.ExcludeAuthidLength(): exclude_authid = [] for j in range(self._from_fbs.ExcludeAuthidLength()): exclude_authid.append(self._from_fbs.ExcludeAuthid(j).decode("utf8")) self._exclude_authid = exclude_authid return self._exclude_authid
@exclude_authid.setter def exclude_authid(self, value): assert value is None or type(value) == list if value: for x in value: assert type(x) == str self._exclude_authid = value @property
[docs] def exclude_authrole(self): """ List of WAMP authroles to exclude from receiving this event. :returns: list of excluded authroles :rtype: list[str] """ if self._exclude_authrole is None and self._from_fbs: if self._from_fbs.ExcludeAuthroleLength(): exclude_authrole = [] for j in range(self._from_fbs.ExcludeAuthroleLength()): exclude_authrole.append(self._from_fbs.ExcludeAuthrole(j).decode("utf8")) self._exclude_authrole = exclude_authrole return self._exclude_authrole
@exclude_authrole.setter def exclude_authrole(self, value): assert value is None or type(value) == list if value: for x in value: assert type(x) == str self._exclude_authrole = value # # eligible, eligible_authid, eligible_authrole # @property
[docs] def eligible(self): """ List of WAMP session IDs eligible to receive this event. :returns: list of eligible session IDs :rtype: list[int] """ if self._eligible is None and self._from_fbs: if self._from_fbs.EligibleLength(): eligible = [] for j in range(self._from_fbs.EligibleLength()): eligible.append(self._from_fbs.Eligible(j)) self._eligible = eligible return self._eligible
@eligible.setter def eligible(self, value): assert value is None or type(value) == list if value: for x in value: assert type(x) == int self._eligible = value @property
[docs] def eligible_authid(self): """ List of WAMP authids eligible to receive this event. :returns: list of eligible authids :rtype: list[str] """ if self._eligible_authid is None and self._from_fbs: if self._from_fbs.EligibleAuthidLength(): eligible_authid = [] for j in range(self._from_fbs.EligibleAuthidLength()): eligible_authid.append(self._from_fbs.EligibleAuthid(j).decode("utf8")) self._eligible_authid = eligible_authid return self._eligible_authid
@eligible_authid.setter def eligible_authid(self, value): assert value is None or type(value) == list if value: for x in value: assert type(x) == str self._eligible_authid = value @property
[docs] def eligible_authrole(self): """ List of WAMP authroles eligible to receive this event. :returns: list of eligible authroles :rtype: list[str] """ if self._eligible_authrole is None and self._from_fbs: if self._from_fbs.EligibleAuthroleLength(): eligible_authrole = [] for j in range(self._from_fbs.EligibleAuthroleLength()): eligible_authrole.append(self._from_fbs.EligibleAuthrole(j).decode("utf8")) self._eligible_authrole = eligible_authrole return self._eligible_authrole
@eligible_authrole.setter def eligible_authrole(self, value): assert value is None or type(value) == list if value: for x in value: assert type(x) == str self._eligible_authrole = value # # encryption # @property
[docs] def enc_algo(self): """ When using payload transparency, the encoding algorithm that was used to encode the payload. :returns: payload encryption algorithm :rtype: int """ if self._enc_algo is None and self._from_fbs: self._enc_algo = self._from_fbs.EncAlgo() return self._enc_algo
@enc_algo.setter def enc_algo(self, value): assert value is None or value in [self.ENC_ALGO_CRYPTOBOX, self.ENC_ALGO_MQTT, self.ENC_ALGO_XBR] self._enc_algo = value @property
[docs] def enc_key(self): """ When using payload transparency with an encryption algorithm, the payload encryption key. :returns: payload key :rtype: None or bytes """ if self._enc_key is None and self._from_fbs: if self._from_fbs.EncKeyLength(): self._enc_key = self._from_fbs.EncKeyAsBytes() return self._enc_key
@enc_key.setter def enc_key(self, value): assert value is None or type(value) == bytes self._enc_key = value @property
[docs] def enc_serializer(self): """ When using payload transparency, the payload object serializer that was used encoding the payload. :returns: payload serializer :rtype: int """ if self._enc_serializer is None and self._from_fbs: self._enc_serializer = self._from_fbs.EncSerializer() return self._enc_serializer
@enc_serializer.setter def enc_serializer(self, value): assert value is None or value in [ self.ENC_SER_JSON, self.ENC_SER_MSGPACK, self.ENC_SER_CBOR, self.ENC_SER_UBJSON, ] self._enc_serializer = value @staticmethod
[docs] def cast(buf): return Publication(_Publication.GetRootAsPublication(buf, 0))
[docs] def build(self, builder): args = self.args if args: args = builder.CreateString(cbor2.dumps(args)) kwargs = self.kwargs if kwargs: kwargs = builder.CreateString(cbor2.dumps(kwargs)) payload = self.payload if payload: payload = builder.CreateString(payload) topic = self.topic if topic: topic = builder.CreateString(topic) enc_key = self.enc_key if enc_key: enc_key = builder.CreateString(enc_key) # exclude: [int] exclude = self.exclude if exclude: PublicationGen.PublicationStartExcludeAuthidVector(builder, len(exclude)) for session in reversed(exclude): builder.PrependUint64(session) exclude = builder.EndVector() # exclude_authid: [string] exclude_authid = self.exclude_authid if exclude_authid: _exclude_authid = [] for authid in exclude_authid: _exclude_authid.append(builder.CreateString(authid)) PublicationGen.PublicationStartExcludeAuthidVector(builder, len(_exclude_authid)) for o in reversed(_exclude_authid): builder.PrependUOffsetTRelative(o) exclude_authid = builder.EndVector() # exclude_authrole: [string] exclude_authrole = self.exclude_authrole if exclude_authid: _exclude_authrole = [] for authrole in exclude_authrole: _exclude_authrole.append(builder.CreateString(authrole)) PublicationGen.PublicationStartExcludeAuthroleVector(builder, len(_exclude_authrole)) for o in reversed(_exclude_authrole): builder.PrependUOffsetTRelative(o) exclude_authrole = builder.EndVector() # eligible: [int] eligible = self.eligible if eligible: PublicationGen.PublicationStartEligibleAuthidVector(builder, len(eligible)) for session in reversed(eligible): builder.PrependUint64(session) eligible = builder.EndVector() # eligible_authid: [string] eligible_authid = self.eligible_authid if eligible_authid: _eligible_authid = [] for authid in eligible_authid: _eligible_authid.append(builder.CreateString(authid)) PublicationGen.PublicationStartEligibleAuthidVector(builder, len(_eligible_authid)) for o in reversed(_eligible_authid): builder.PrependUOffsetTRelative(o) eligible_authid = builder.EndVector() # eligible_authrole: [string] eligible_authrole = self.eligible_authrole if eligible_authrole: _eligible_authrole = [] for authrole in eligible_authrole: _eligible_authrole.append(builder.CreateString(authrole)) PublicationGen.PublicationStartEligibleAuthroleVector(builder, len(_eligible_authrole)) for o in reversed(_eligible_authrole): builder.PrependUOffsetTRelative(o) eligible_authrole = builder.EndVector() # now start and build a new object .. PublicationGen.PublicationStart(builder) if self.timestamp: PublicationGen.PublicationAddTimestamp(builder, self.timestamp) if self.publication: PublicationGen.PublicationAddPublication(builder, self.publication) if self.publisher: PublicationGen.PublicationAddPublisher(builder, self.publisher) if topic: PublicationGen.PublicationAddTopic(builder, topic) if args: PublicationGen.PublicationAddArgs(builder, args) if kwargs: PublicationGen.PublicationAddKwargs(builder, kwargs) if payload is not None: PublicationGen.PublicationAddPayload(builder, payload) if self.acknowledge is not None: PublicationGen.PublicationAddAcknowledge(builder, self.acknowledge) if self.retain is not None: PublicationGen.PublicationAddRetain(builder, self.retain) if self.exclude_me is not None: PublicationGen.PublicationAddExcludeMe(builder, self.exclude_me) if exclude: PublicationGen.PublicationAddExclude(builder, exclude) if exclude_authid: PublicationGen.PublicationAddExcludeAuthid(builder, exclude_authid) if exclude_authrole: PublicationGen.PublicationAddExcludeAuthrole(builder, exclude_authrole) if eligible: PublicationGen.PublicationAddEligible(builder, eligible) if eligible_authid: PublicationGen.PublicationAddEligibleAuthid(builder, eligible_authid) if eligible_authrole: PublicationGen.PublicationAddEligibleAuthrole(builder, eligible_authrole) if self.enc_algo: PublicationGen.PublicationAddEncAlgo(builder, self.enc_algo) if enc_key: PublicationGen.PublicationAddEncKey(builder, enc_key) if self.enc_serializer: PublicationGen.PublicationAddEncSerializer(builder, self.enc_serializer) # finish the object. final = PublicationGen.PublicationEnd(builder) return final
@table("dd04931a-753b-4fde-8140-d66b93519c73", build=Publication.build, cast=Publication.cast)
[docs] class Publications(MapOidFlatBuffers): """ Persisted publications archive. Map :class:`zlmdb.MapOidFlatBuffers` from ``publication`` to :class:`cfxdb.eventstore.Publication`. """