##############################################################################
#
# Crossbar.io Database
# Copyright (c) typedef int GmbH. Licensed under MIT.
#
##############################################################################
import pprint
import cbor2
from zlmdb import MapOidFlatBuffers, flatbuffers, table
from cfxdb.gen.realmstore import Publication as PublicationGen
[docs]
class _Publication(PublicationGen.Publication):
"""
Expand methods on the class code generated by flatc.
FIXME: comes up with a PR for flatc to generated this stuff automatically.
"""
@classmethod
[docs]
def GetRootAsPublication(cls, buf, offset):
n = flatbuffers.encode.Get(flatbuffers.packer.uoffset, buf, offset)
x = _Publication()
x.Init(buf, n + offset)
return x
[docs]
def ArgsAsBytes(self):
o = flatbuffers.number_types.UOffsetTFlags.py_type(self._tab.Offset(12))
if o != 0:
_off = self._tab.Vector(o)
_len = self._tab.VectorLen(o)
return memoryview(self._tab.Bytes)[_off : _off + _len]
return None
[docs]
def KwargsAsBytes(self):
o = flatbuffers.number_types.UOffsetTFlags.py_type(self._tab.Offset(14))
if o != 0:
_off = self._tab.Vector(o)
_len = self._tab.VectorLen(o)
return memoryview(self._tab.Bytes)[_off : _off + _len]
return None
[docs]
def PayloadAsBytes(self):
o = flatbuffers.number_types.UOffsetTFlags.py_type(self._tab.Offset(16))
if o != 0:
_off = self._tab.Vector(o)
_len = self._tab.VectorLen(o)
return memoryview(self._tab.Bytes)[_off : _off + _len]
return None
[docs]
def EncKeyAsBytes(self):
o = flatbuffers.number_types.UOffsetTFlags.py_type(self._tab.Offset(38))
if o != 0:
_off = self._tab.Vector(o)
_len = self._tab.VectorLen(o)
return memoryview(self._tab.Bytes)[_off : _off + _len]
return None
[docs]
class Publication(object):
"""
Persisted publication database object.
"""
[docs]
ENC_SER_FLATBUFFERS = 6
[docs]
__slots__ = (
"_from_fbs",
"_timestamp",
"_publication",
"_publisher",
"_topic",
"_args",
"_kwargs",
"_payload",
"_acknowledge",
"_retain",
"_exclude_me",
"_exclude",
"_exclude_authid",
"_exclude_authrole",
"_eligible",
"_eligible_authid",
"_eligible_authrole",
"_enc_algo",
"_enc_key",
"_enc_serializer",
)
def __init__(self, from_fbs=None):
[docs]
self._from_fbs = from_fbs
[docs]
self._publication = None
[docs]
self._acknowledge = None
[docs]
self._exclude_me = None
[docs]
self._exclude_authid = None
[docs]
self._exclude_authrole = None
[docs]
self._eligible_authid = None
[docs]
self._eligible_authrole = None
[docs]
self._enc_serializer = None
[docs]
def marshal(self):
obj = {
"timestamp": self.timestamp,
"publication": self.publication,
"publisher": self.publisher,
"topic": self.topic,
"args": self.args,
"kwargs": self.kwargs,
"payload": self.payload,
"acknowledge": self.acknowledge,
"retain": self.retain,
"exclude_me": self.exclude_me,
"exclude": self.exclude,
"exclude_authid": self.exclude_authid,
"exclude_authrole": self.exclude_authrole,
"eligible": self.eligible,
"eligible_authid": self.eligible_authid,
"eligible_authrole": self.eligible_authrole,
"enc_algo": self.enc_algo,
"enc_key": self.enc_key,
"enc_serializer": self.enc_serializer,
}
return obj
[docs]
def __str__(self):
return "\n{}\n".format(pprint.pformat(self.marshal()))
@property
[docs]
def timestamp(self):
"""
Timestamp when the publication was accepted by the broker. Epoch time in ns.
:returns: epoch time in ns
:rtype: int
"""
if self._timestamp is None and self._from_fbs:
self._timestamp = self._from_fbs.Timestamp()
return self._timestamp
@timestamp.setter
def timestamp(self, value):
assert value is None or type(value) == int
self._timestamp = value
@property
[docs]
def publication(self):
"""
WAMP publication ID that was assigned by the broker.
:returns: publication ID
:rtype: int
"""
if self._publication is None and self._from_fbs:
self._publication = self._from_fbs.Publication()
return self._publication
@publication.setter
def publication(self, value):
assert value is None or type(value) == int
self._publication = value
@property
[docs]
def publisher(self):
"""
WAMP session ID of the publisher.
:returns: publisher ID
:rtype: int
"""
if self._publisher is None and self._from_fbs:
self._publisher = self._from_fbs.Publisher()
return self._publisher
@publisher.setter
def publisher(self, value):
assert value is None or type(value) == int
self._publisher = value
@property
[docs]
def topic(self):
"""
The WAMP or application URI of the PubSub topic the event was published to.
:returns: topic (URI) published to
:rtype: str
"""
if self._topic is None and self._from_fbs:
self._topic = self._from_fbs.Topic().decode("utf8")
return self._topic
@topic.setter
def topic(self, value):
assert value is None or type(value) == str
self._topic = value
#
# args, kwargs, payload
#
@property
[docs]
def args(self):
"""
Positional values for application-defined event payload.
:returns: positional arguments (app payload) of the event (if any)
:rtype: None or list
"""
if self._args is None and self._from_fbs:
if self._from_fbs.ArgsLength():
self._args = cbor2.loads(bytes(self._from_fbs.ArgsAsBytes()))
return self._args
@args.setter
def args(self, value):
assert value is None or type(value) == list
self._args = value
@property
[docs]
def kwargs(self):
"""
Keyword values for application-defined event payload.
:returns: keyword arguments (app payload) of the event (if any)
:rtype: None or dict
"""
if self._kwargs is None and self._from_fbs:
if self._from_fbs.KwargsLength():
self._kwargs = cbor2.loads(bytes(self._from_fbs.KwargsAsBytes()))
return self._kwargs
@kwargs.setter
def kwargs(self, value):
assert value is None or type(value) == dict
self._kwargs = value
@property
[docs]
def payload(self):
"""
Alternative, transparent payload. If given, ``args`` and ``kwargs`` must be left unset.
:returns: Transparent binary payload (see ``enc_algo``) if applicable
:rtype: None or bytes
"""
if self._payload is None and self._from_fbs:
if self._from_fbs.PayloadLength():
self._payload = self._from_fbs.PayloadAsBytes()
return self._payload
@payload.setter
def payload(self, value):
assert value is None or type(value) == bytes
self._payload = value
#
# acknowledge, retain, exclude_me
#
@property
[docs]
def acknowledge(self):
"""
If ``True``, the broker was asked to acknowledge the publication with a success or error response.
:returns: acknowledge flag
:rtype: None or bool
"""
if self._acknowledge is None and self._from_fbs:
self._acknowledge = self._from_fbs.Acknowledge()
return self._acknowledge
@acknowledge.setter
def acknowledge(self, value):
assert value is None or type(value) == bool
self._acknowledge = value
@property
[docs]
def retain(self):
"""
If ``True``, the broker was requested to retain this event.
:returns: retain flag
:rtype: None or bool
"""
if self._retain is None and self._from_fbs:
self._retain = self._from_fbs.Retain()
return self._retain
@retain.setter
def retain(self, value):
assert value is None or type(value) == bool
self._retain = value
@property
[docs]
def exclude_me(self):
"""
If ``True``, the broker was asked to exclude the publisher from receiving the event.
:returns: exclude_me flag
:rtype: None or bool
"""
if self._exclude_me is None and self._from_fbs:
self._exclude_me = self._from_fbs.ExcludeMe()
return self._exclude_me
@exclude_me.setter
def exclude_me(self, value):
assert value is None or type(value) == bool
self._exclude_me = value
#
# exclude, exclude_authid, exclude_authrole
#
@property
[docs]
def exclude(self):
"""
List of WAMP session IDs to exclude from receiving this event.
:returns: list of excluded session IDs
:rtype: list[int]
"""
if self._exclude is None and self._from_fbs:
if self._from_fbs.ExcludeLength():
exclude = []
for j in range(self._from_fbs.ExcludeLength()):
exclude.append(self._from_fbs.Exclude(j))
self._exclude = exclude
return self._exclude
@exclude.setter
def exclude(self, value):
assert value is None or type(value) == list
if value:
for x in value:
assert type(x) == int
self._exclude = value
@property
[docs]
def exclude_authid(self):
"""
List of WAMP authids to exclude from receiving this event.
:returns: list of excluded authids
:rtype: list[str]
"""
if self._exclude_authid is None and self._from_fbs:
if self._from_fbs.ExcludeAuthidLength():
exclude_authid = []
for j in range(self._from_fbs.ExcludeAuthidLength()):
exclude_authid.append(self._from_fbs.ExcludeAuthid(j).decode("utf8"))
self._exclude_authid = exclude_authid
return self._exclude_authid
@exclude_authid.setter
def exclude_authid(self, value):
assert value is None or type(value) == list
if value:
for x in value:
assert type(x) == str
self._exclude_authid = value
@property
[docs]
def exclude_authrole(self):
"""
List of WAMP authroles to exclude from receiving this event.
:returns: list of excluded authroles
:rtype: list[str]
"""
if self._exclude_authrole is None and self._from_fbs:
if self._from_fbs.ExcludeAuthroleLength():
exclude_authrole = []
for j in range(self._from_fbs.ExcludeAuthroleLength()):
exclude_authrole.append(self._from_fbs.ExcludeAuthrole(j).decode("utf8"))
self._exclude_authrole = exclude_authrole
return self._exclude_authrole
@exclude_authrole.setter
def exclude_authrole(self, value):
assert value is None or type(value) == list
if value:
for x in value:
assert type(x) == str
self._exclude_authrole = value
#
# eligible, eligible_authid, eligible_authrole
#
@property
[docs]
def eligible(self):
"""
List of WAMP session IDs eligible to receive this event.
:returns: list of eligible session IDs
:rtype: list[int]
"""
if self._eligible is None and self._from_fbs:
if self._from_fbs.EligibleLength():
eligible = []
for j in range(self._from_fbs.EligibleLength()):
eligible.append(self._from_fbs.Eligible(j))
self._eligible = eligible
return self._eligible
@eligible.setter
def eligible(self, value):
assert value is None or type(value) == list
if value:
for x in value:
assert type(x) == int
self._eligible = value
@property
[docs]
def eligible_authid(self):
"""
List of WAMP authids eligible to receive this event.
:returns: list of eligible authids
:rtype: list[str]
"""
if self._eligible_authid is None and self._from_fbs:
if self._from_fbs.EligibleAuthidLength():
eligible_authid = []
for j in range(self._from_fbs.EligibleAuthidLength()):
eligible_authid.append(self._from_fbs.EligibleAuthid(j).decode("utf8"))
self._eligible_authid = eligible_authid
return self._eligible_authid
@eligible_authid.setter
def eligible_authid(self, value):
assert value is None or type(value) == list
if value:
for x in value:
assert type(x) == str
self._eligible_authid = value
@property
[docs]
def eligible_authrole(self):
"""
List of WAMP authroles eligible to receive this event.
:returns: list of eligible authroles
:rtype: list[str]
"""
if self._eligible_authrole is None and self._from_fbs:
if self._from_fbs.EligibleAuthroleLength():
eligible_authrole = []
for j in range(self._from_fbs.EligibleAuthroleLength()):
eligible_authrole.append(self._from_fbs.EligibleAuthrole(j).decode("utf8"))
self._eligible_authrole = eligible_authrole
return self._eligible_authrole
@eligible_authrole.setter
def eligible_authrole(self, value):
assert value is None or type(value) == list
if value:
for x in value:
assert type(x) == str
self._eligible_authrole = value
#
# encryption
#
@property
[docs]
def enc_algo(self):
"""
When using payload transparency, the encoding algorithm that was used to encode the payload.
:returns: payload encryption algorithm
:rtype: int
"""
if self._enc_algo is None and self._from_fbs:
self._enc_algo = self._from_fbs.EncAlgo()
return self._enc_algo
@enc_algo.setter
def enc_algo(self, value):
assert value is None or value in [self.ENC_ALGO_CRYPTOBOX, self.ENC_ALGO_MQTT, self.ENC_ALGO_XBR]
self._enc_algo = value
@property
[docs]
def enc_key(self):
"""
When using payload transparency with an encryption algorithm, the payload encryption key.
:returns: payload key
:rtype: None or bytes
"""
if self._enc_key is None and self._from_fbs:
if self._from_fbs.EncKeyLength():
self._enc_key = self._from_fbs.EncKeyAsBytes()
return self._enc_key
@enc_key.setter
def enc_key(self, value):
assert value is None or type(value) == bytes
self._enc_key = value
@property
[docs]
def enc_serializer(self):
"""
When using payload transparency, the payload object serializer that was used encoding the payload.
:returns: payload serializer
:rtype: int
"""
if self._enc_serializer is None and self._from_fbs:
self._enc_serializer = self._from_fbs.EncSerializer()
return self._enc_serializer
@enc_serializer.setter
def enc_serializer(self, value):
assert value is None or value in [
self.ENC_SER_JSON,
self.ENC_SER_MSGPACK,
self.ENC_SER_CBOR,
self.ENC_SER_UBJSON,
]
self._enc_serializer = value
@staticmethod
[docs]
def cast(buf):
return Publication(_Publication.GetRootAsPublication(buf, 0))
[docs]
def build(self, builder):
args = self.args
if args:
args = builder.CreateString(cbor2.dumps(args))
kwargs = self.kwargs
if kwargs:
kwargs = builder.CreateString(cbor2.dumps(kwargs))
payload = self.payload
if payload:
payload = builder.CreateString(payload)
topic = self.topic
if topic:
topic = builder.CreateString(topic)
enc_key = self.enc_key
if enc_key:
enc_key = builder.CreateString(enc_key)
# exclude: [int]
exclude = self.exclude
if exclude:
PublicationGen.PublicationStartExcludeAuthidVector(builder, len(exclude))
for session in reversed(exclude):
builder.PrependUint64(session)
exclude = builder.EndVector()
# exclude_authid: [string]
exclude_authid = self.exclude_authid
if exclude_authid:
_exclude_authid = []
for authid in exclude_authid:
_exclude_authid.append(builder.CreateString(authid))
PublicationGen.PublicationStartExcludeAuthidVector(builder, len(_exclude_authid))
for o in reversed(_exclude_authid):
builder.PrependUOffsetTRelative(o)
exclude_authid = builder.EndVector()
# exclude_authrole: [string]
exclude_authrole = self.exclude_authrole
if exclude_authid:
_exclude_authrole = []
for authrole in exclude_authrole:
_exclude_authrole.append(builder.CreateString(authrole))
PublicationGen.PublicationStartExcludeAuthroleVector(builder, len(_exclude_authrole))
for o in reversed(_exclude_authrole):
builder.PrependUOffsetTRelative(o)
exclude_authrole = builder.EndVector()
# eligible: [int]
eligible = self.eligible
if eligible:
PublicationGen.PublicationStartEligibleAuthidVector(builder, len(eligible))
for session in reversed(eligible):
builder.PrependUint64(session)
eligible = builder.EndVector()
# eligible_authid: [string]
eligible_authid = self.eligible_authid
if eligible_authid:
_eligible_authid = []
for authid in eligible_authid:
_eligible_authid.append(builder.CreateString(authid))
PublicationGen.PublicationStartEligibleAuthidVector(builder, len(_eligible_authid))
for o in reversed(_eligible_authid):
builder.PrependUOffsetTRelative(o)
eligible_authid = builder.EndVector()
# eligible_authrole: [string]
eligible_authrole = self.eligible_authrole
if eligible_authrole:
_eligible_authrole = []
for authrole in eligible_authrole:
_eligible_authrole.append(builder.CreateString(authrole))
PublicationGen.PublicationStartEligibleAuthroleVector(builder, len(_eligible_authrole))
for o in reversed(_eligible_authrole):
builder.PrependUOffsetTRelative(o)
eligible_authrole = builder.EndVector()
# now start and build a new object ..
PublicationGen.PublicationStart(builder)
if self.timestamp:
PublicationGen.PublicationAddTimestamp(builder, self.timestamp)
if self.publication:
PublicationGen.PublicationAddPublication(builder, self.publication)
if self.publisher:
PublicationGen.PublicationAddPublisher(builder, self.publisher)
if topic:
PublicationGen.PublicationAddTopic(builder, topic)
if args:
PublicationGen.PublicationAddArgs(builder, args)
if kwargs:
PublicationGen.PublicationAddKwargs(builder, kwargs)
if payload is not None:
PublicationGen.PublicationAddPayload(builder, payload)
if self.acknowledge is not None:
PublicationGen.PublicationAddAcknowledge(builder, self.acknowledge)
if self.retain is not None:
PublicationGen.PublicationAddRetain(builder, self.retain)
if self.exclude_me is not None:
PublicationGen.PublicationAddExcludeMe(builder, self.exclude_me)
if exclude:
PublicationGen.PublicationAddExclude(builder, exclude)
if exclude_authid:
PublicationGen.PublicationAddExcludeAuthid(builder, exclude_authid)
if exclude_authrole:
PublicationGen.PublicationAddExcludeAuthrole(builder, exclude_authrole)
if eligible:
PublicationGen.PublicationAddEligible(builder, eligible)
if eligible_authid:
PublicationGen.PublicationAddEligibleAuthid(builder, eligible_authid)
if eligible_authrole:
PublicationGen.PublicationAddEligibleAuthrole(builder, eligible_authrole)
if self.enc_algo:
PublicationGen.PublicationAddEncAlgo(builder, self.enc_algo)
if enc_key:
PublicationGen.PublicationAddEncKey(builder, enc_key)
if self.enc_serializer:
PublicationGen.PublicationAddEncSerializer(builder, self.enc_serializer)
# finish the object.
final = PublicationGen.PublicationEnd(builder)
return final
@table("dd04931a-753b-4fde-8140-d66b93519c73", build=Publication.build, cast=Publication.cast)
[docs]
class Publications(MapOidFlatBuffers):
"""
Persisted publications archive.
Map :class:`zlmdb.MapOidFlatBuffers` from ``publication`` to :class:`cfxdb.eventstore.Publication`.
"""