Source code for cfxdb.realmstore._publication

##############################################################################
#
#                        Crossbar.io Database
#     Copyright (c) Crossbar.io Technologies GmbH. Licensed under MIT.
#
##############################################################################

import pprint

import cbor2
import flatbuffers

from zlmdb import table, MapOidFlatBuffers

from cfxdb.gen.realmstore import Publication as PublicationGen


class _Publication(PublicationGen.Publication):
    """
    Expand methods on the class code generated by flatc.

    FIXME: comes up with a PR for flatc to generated this stuff automatically.
    """
    @classmethod
    def GetRootAsPublication(cls, buf, offset):
        n = flatbuffers.encode.Get(flatbuffers.packer.uoffset, buf, offset)
        x = _Publication()
        x.Init(buf, n + offset)
        return x

    def ArgsAsBytes(self):
        o = flatbuffers.number_types.UOffsetTFlags.py_type(self._tab.Offset(12))
        if o != 0:
            _off = self._tab.Vector(o)
            _len = self._tab.VectorLen(o)
            return memoryview(self._tab.Bytes)[_off:_off + _len]
        return None

    def KwargsAsBytes(self):
        o = flatbuffers.number_types.UOffsetTFlags.py_type(self._tab.Offset(14))
        if o != 0:
            _off = self._tab.Vector(o)
            _len = self._tab.VectorLen(o)
            return memoryview(self._tab.Bytes)[_off:_off + _len]
        return None

    def PayloadAsBytes(self):
        o = flatbuffers.number_types.UOffsetTFlags.py_type(self._tab.Offset(16))
        if o != 0:
            _off = self._tab.Vector(o)
            _len = self._tab.VectorLen(o)
            return memoryview(self._tab.Bytes)[_off:_off + _len]
        return None

    def EncKeyAsBytes(self):
        o = flatbuffers.number_types.UOffsetTFlags.py_type(self._tab.Offset(38))
        if o != 0:
            _off = self._tab.Vector(o)
            _len = self._tab.VectorLen(o)
            return memoryview(self._tab.Bytes)[_off:_off + _len]
        return None


[docs]class Publication(object): """ Persisted publication database object. """ ENC_ALGO_NONE = 0 ENC_ALGO_CRYPTOBOX = 1 ENC_ALGO_MQTT = 2 ENC_ALGO_XBR = 3 ENC_SER_NONE = 0 ENC_SER_JSON = 1 ENC_SER_MSGPACK = 2 ENC_SER_CBOR = 3 ENC_SER_UBJSON = 4 ENC_SER_OPAQUE = 5 ENC_SER_FLATBUFFERS = 6 __slots__ = ( '_from_fbs', '_timestamp', '_publication', '_publisher', '_topic', '_args', '_kwargs', '_payload', '_acknowledge', '_retain', '_exclude_me', '_exclude', '_exclude_authid', '_exclude_authrole', '_eligible', '_eligible_authid', '_eligible_authrole', '_enc_algo', '_enc_key', '_enc_serializer', ) def __init__(self, from_fbs=None): self._from_fbs = from_fbs self._timestamp = None self._publication = None self._publisher = None self._topic = None self._args = None self._kwargs = None self._payload = None self._acknowledge = None self._retain = None self._exclude_me = None self._exclude = None self._exclude_authid = None self._exclude_authrole = None self._eligible = None self._eligible_authid = None self._eligible_authrole = None self._enc_algo = None self._enc_key = None self._enc_serializer = None
[docs] def marshal(self): obj = { 'timestamp': self.timestamp, 'publication': self.publication, 'publisher': self.publisher, 'topic': self.topic, 'args': self.args, 'kwargs': self.kwargs, 'payload': self.payload, 'acknowledge': self.acknowledge, 'retain': self.retain, 'exclude_me': self.exclude_me, 'exclude': self.exclude, 'exclude_authid': self.exclude_authid, 'exclude_authrole': self.exclude_authrole, 'eligible': self.eligible, 'eligible_authid': self.eligible_authid, 'eligible_authrole': self.eligible_authrole, 'enc_algo': self.enc_algo, 'enc_key': self.enc_key, 'enc_serializer': self.enc_serializer, } return obj
def __str__(self): return '\n{}\n'.format(pprint.pformat(self.marshal())) @property def timestamp(self): """ Timestamp when the publication was accepted by the broker. Epoch time in ns. :returns: epoch time in ns :rtype: int """ if self._timestamp is None and self._from_fbs: self._timestamp = self._from_fbs.Timestamp() return self._timestamp @timestamp.setter def timestamp(self, value): assert value is None or type(value) == int self._timestamp = value @property def publication(self): """ WAMP publication ID that was assigned by the broker. :returns: publication ID :rtype: int """ if self._publication is None and self._from_fbs: self._publication = self._from_fbs.Publication() return self._publication @publication.setter def publication(self, value): assert value is None or type(value) == int self._publication = value @property def publisher(self): """ WAMP session ID of the publisher. :returns: publisher ID :rtype: int """ if self._publisher is None and self._from_fbs: self._publisher = self._from_fbs.Publisher() return self._publisher @publisher.setter def publisher(self, value): assert value is None or type(value) == int self._publisher = value @property def topic(self): """ The WAMP or application URI of the PubSub topic the event was published to. :returns: topic (URI) published to :rtype: str """ if self._topic is None and self._from_fbs: self._topic = self._from_fbs.Topic().decode('utf8') return self._topic @topic.setter def topic(self, value): assert value is None or type(value) == str self._topic = value # # args, kwargs, payload # @property def args(self): """ Positional values for application-defined event payload. :returns: positional arguments (app payload) of the event (if any) :rtype: None or list """ if self._args is None and self._from_fbs: if self._from_fbs.ArgsLength(): self._args = cbor2.loads(bytes(self._from_fbs.ArgsAsBytes())) return self._args @args.setter def args(self, value): assert value is None or type(value) == list self._args = value @property def kwargs(self): """ Keyword values for application-defined event payload. :returns: keyword arguments (app payload) of the event (if any) :rtype: None or dict """ if self._kwargs is None and self._from_fbs: if self._from_fbs.KwargsLength(): self._kwargs = cbor2.loads(bytes(self._from_fbs.KwargsAsBytes())) return self._kwargs @kwargs.setter def kwargs(self, value): assert value is None or type(value) == dict self._kwargs = value @property def payload(self): """ Alternative, transparent payload. If given, ``args`` and ``kwargs`` must be left unset. :returns: Transparent binary payload (see ``enc_algo``) if applicable :rtype: None or bytes """ if self._payload is None and self._from_fbs: if self._from_fbs.PayloadLength(): self._payload = self._from_fbs.PayloadAsBytes() return self._payload @payload.setter def payload(self, value): assert value is None or type(value) == bytes self._payload = value # # acknowledge, retain, exclude_me # @property def acknowledge(self): """ If ``True``, the broker was asked to acknowledge the publication with a success or error response. :returns: acknowledge flag :rtype: None or bool """ if self._acknowledge is None and self._from_fbs: self._acknowledge = self._from_fbs.Acknowledge() return self._acknowledge @acknowledge.setter def acknowledge(self, value): assert value is None or type(value) == bool self._acknowledge = value @property def retain(self): """ If ``True``, the broker was requested to retain this event. :returns: retain flag :rtype: None or bool """ if self._retain is None and self._from_fbs: self._retain = self._from_fbs.Retain() return self._retain @retain.setter def retain(self, value): assert value is None or type(value) == bool self._retain = value @property def exclude_me(self): """ If ``True``, the broker was asked to exclude the publisher from receiving the event. :returns: exclude_me flag :rtype: None or bool """ if self._exclude_me is None and self._from_fbs: self._exclude_me = self._from_fbs.ExcludeMe() return self._exclude_me @exclude_me.setter def exclude_me(self, value): assert value is None or type(value) == bool self._exclude_me = value # # exclude, exclude_authid, exclude_authrole # @property def exclude(self): """ List of WAMP session IDs to exclude from receiving this event. :returns: list of excluded session IDs :rtype: list[int] """ if self._exclude is None and self._from_fbs: if self._from_fbs.ExcludeLength(): exclude = [] for j in range(self._from_fbs.ExcludeLength()): exclude.append(self._from_fbs.Exclude(j)) self._exclude = exclude return self._exclude @exclude.setter def exclude(self, value): assert value is None or type(value) == list if value: for x in value: assert type(x) == int self._exclude = value @property def exclude_authid(self): """ List of WAMP authids to exclude from receiving this event. :returns: list of excluded authids :rtype: list[str] """ if self._exclude_authid is None and self._from_fbs: if self._from_fbs.ExcludeAuthidLength(): exclude_authid = [] for j in range(self._from_fbs.ExcludeAuthidLength()): exclude_authid.append(self._from_fbs.ExcludeAuthid(j).decode('utf8')) self._exclude_authid = exclude_authid return self._exclude_authid @exclude_authid.setter def exclude_authid(self, value): assert value is None or type(value) == list if value: for x in value: assert type(x) == str self._exclude_authid = value @property def exclude_authrole(self): """ List of WAMP authroles to exclude from receiving this event. :returns: list of excluded authroles :rtype: list[str] """ if self._exclude_authrole is None and self._from_fbs: if self._from_fbs.ExcludeAuthroleLength(): exclude_authrole = [] for j in range(self._from_fbs.ExcludeAuthroleLength()): exclude_authrole.append(self._from_fbs.ExcludeAuthrole(j).decode('utf8')) self._exclude_authrole = exclude_authrole return self._exclude_authrole @exclude_authrole.setter def exclude_authrole(self, value): assert value is None or type(value) == list if value: for x in value: assert type(x) == str self._exclude_authrole = value # # eligible, eligible_authid, eligible_authrole # @property def eligible(self): """ List of WAMP session IDs eligible to receive this event. :returns: list of eligible session IDs :rtype: list[int] """ if self._eligible is None and self._from_fbs: if self._from_fbs.EligibleLength(): eligible = [] for j in range(self._from_fbs.EligibleLength()): eligible.append(self._from_fbs.Eligible(j)) self._eligible = eligible return self._eligible @eligible.setter def eligible(self, value): assert value is None or type(value) == list if value: for x in value: assert type(x) == int self._eligible = value @property def eligible_authid(self): """ List of WAMP authids eligible to receive this event. :returns: list of eligible authids :rtype: list[str] """ if self._eligible_authid is None and self._from_fbs: if self._from_fbs.EligibleAuthidLength(): eligible_authid = [] for j in range(self._from_fbs.EligibleAuthidLength()): eligible_authid.append(self._from_fbs.EligibleAuthid(j).decode('utf8')) self._eligible_authid = eligible_authid return self._eligible_authid @eligible_authid.setter def eligible_authid(self, value): assert value is None or type(value) == list if value: for x in value: assert type(x) == str self._eligible_authid = value @property def eligible_authrole(self): """ List of WAMP authroles eligible to receive this event. :returns: list of eligible authroles :rtype: list[str] """ if self._eligible_authrole is None and self._from_fbs: if self._from_fbs.EligibleAuthroleLength(): eligible_authrole = [] for j in range(self._from_fbs.EligibleAuthroleLength()): eligible_authrole.append(self._from_fbs.EligibleAuthrole(j).decode('utf8')) self._eligible_authrole = eligible_authrole return self._eligible_authrole @eligible_authrole.setter def eligible_authrole(self, value): assert value is None or type(value) == list if value: for x in value: assert type(x) == str self._eligible_authrole = value # # encryption # @property def enc_algo(self): """ When using payload transparency, the encoding algorithm that was used to encode the payload. :returns: payload encryption algorithm :rtype: int """ if self._enc_algo is None and self._from_fbs: self._enc_algo = self._from_fbs.EncAlgo() return self._enc_algo @enc_algo.setter def enc_algo(self, value): assert value is None or value in [self.ENC_ALGO_CRYPTOBOX, self.ENC_ALGO_MQTT, self.ENC_ALGO_XBR] self._enc_algo = value @property def enc_key(self): """ When using payload transparency with an encryption algorithm, the payload encryption key. :returns: payload key :rtype: None or bytes """ if self._enc_key is None and self._from_fbs: if self._from_fbs.EncKeyLength(): self._enc_key = self._from_fbs.EncKeyAsBytes() return self._enc_key @enc_key.setter def enc_key(self, value): assert value is None or type(value) == bytes self._enc_key = value @property def enc_serializer(self): """ When using payload transparency, the payload object serializer that was used encoding the payload. :returns: payload serializer :rtype: int """ if self._enc_serializer is None and self._from_fbs: self._enc_serializer = self._from_fbs.EncSerializer() return self._enc_serializer @enc_serializer.setter def enc_serializer(self, value): assert value is None or value in [ self.ENC_SER_JSON, self.ENC_SER_MSGPACK, self.ENC_SER_CBOR, self.ENC_SER_UBJSON ] self._enc_serializer = value
[docs] @staticmethod def cast(buf): return Publication(_Publication.GetRootAsPublication(buf, 0))
[docs] def build(self, builder): args = self.args if args: args = builder.CreateString(cbor2.dumps(args)) kwargs = self.kwargs if kwargs: kwargs = builder.CreateString(cbor2.dumps(kwargs)) payload = self.payload if payload: payload = builder.CreateString(payload) topic = self.topic if topic: topic = builder.CreateString(topic) enc_key = self.enc_key if enc_key: enc_key = builder.CreateString(enc_key) # exclude: [int] exclude = self.exclude if exclude: PublicationGen.PublicationStartExcludeAuthidVector(builder, len(exclude)) for session in reversed(exclude): builder.PrependUint64(session) exclude = builder.EndVector() # exclude_authid: [string] exclude_authid = self.exclude_authid if exclude_authid: _exclude_authid = [] for authid in exclude_authid: _exclude_authid.append(builder.CreateString(authid)) PublicationGen.PublicationStartExcludeAuthidVector(builder, len(_exclude_authid)) for o in reversed(_exclude_authid): builder.PrependUOffsetTRelative(o) exclude_authid = builder.EndVector() # exclude_authrole: [string] exclude_authrole = self.exclude_authrole if exclude_authid: _exclude_authrole = [] for authrole in exclude_authrole: _exclude_authrole.append(builder.CreateString(authrole)) PublicationGen.PublicationStartExcludeAuthroleVector(builder, len(_exclude_authrole)) for o in reversed(_exclude_authrole): builder.PrependUOffsetTRelative(o) exclude_authrole = builder.EndVector() # eligible: [int] eligible = self.eligible if eligible: PublicationGen.PublicationStartEligibleAuthidVector(builder, len(eligible)) for session in reversed(eligible): builder.PrependUint64(session) eligible = builder.EndVector() # eligible_authid: [string] eligible_authid = self.eligible_authid if eligible_authid: _eligible_authid = [] for authid in eligible_authid: _eligible_authid.append(builder.CreateString(authid)) PublicationGen.PublicationStartEligibleAuthidVector(builder, len(_eligible_authid)) for o in reversed(_eligible_authid): builder.PrependUOffsetTRelative(o) eligible_authid = builder.EndVector() # eligible_authrole: [string] eligible_authrole = self.eligible_authrole if eligible_authrole: _eligible_authrole = [] for authrole in eligible_authrole: _eligible_authrole.append(builder.CreateString(authrole)) PublicationGen.PublicationStartEligibleAuthroleVector(builder, len(_eligible_authrole)) for o in reversed(_eligible_authrole): builder.PrependUOffsetTRelative(o) eligible_authrole = builder.EndVector() # now start and build a new object .. PublicationGen.PublicationStart(builder) if self.timestamp: PublicationGen.PublicationAddTimestamp(builder, self.timestamp) if self.publication: PublicationGen.PublicationAddPublication(builder, self.publication) if self.publisher: PublicationGen.PublicationAddPublisher(builder, self.publisher) if topic: PublicationGen.PublicationAddTopic(builder, topic) if args: PublicationGen.PublicationAddArgs(builder, args) if kwargs: PublicationGen.PublicationAddKwargs(builder, kwargs) if payload is not None: PublicationGen.PublicationAddPayload(builder, payload) if self.acknowledge is not None: PublicationGen.PublicationAddAcknowledge(builder, self.acknowledge) if self.retain is not None: PublicationGen.PublicationAddRetain(builder, self.retain) if self.exclude_me is not None: PublicationGen.PublicationAddExcludeMe(builder, self.exclude_me) if exclude: PublicationGen.PublicationAddExclude(builder, exclude) if exclude_authid: PublicationGen.PublicationAddExcludeAuthid(builder, exclude_authid) if exclude_authrole: PublicationGen.PublicationAddExcludeAuthrole(builder, exclude_authrole) if eligible: PublicationGen.PublicationAddEligible(builder, eligible) if eligible_authid: PublicationGen.PublicationAddEligibleAuthid(builder, eligible_authid) if eligible_authrole: PublicationGen.PublicationAddEligibleAuthrole(builder, eligible_authrole) if self.enc_algo: PublicationGen.PublicationAddEncAlgo(builder, self.enc_algo) if enc_key: PublicationGen.PublicationAddEncKey(builder, enc_key) if self.enc_serializer: PublicationGen.PublicationAddEncSerializer(builder, self.enc_serializer) # finish the object. final = PublicationGen.PublicationEnd(builder) return final
[docs]@table('dd04931a-753b-4fde-8140-d66b93519c73', build=Publication.build, cast=Publication.cast) class Publications(MapOidFlatBuffers): """ Persisted publications archive. Map :class:`zlmdb.MapOidFlatBuffers` from ``publication`` to :class:`cfxdb.eventstore.Publication`. """