Source code for cfxdb.xbrnetwork.vaction

##############################################################################
#
#                        Crossbar.io Database
#     Copyright (c) Crossbar.io Technologies GmbH. Licensed under MIT.
#
##############################################################################

import pprint
import uuid

import cbor2
import flatbuffers
import numpy as np
from cfxdb.gen.xbrnetwork import VerifiedAction as VerifiedActionGen
from zlmdb import table, MapUuidFlatBuffers


class _VerifiedActionGen(VerifiedActionGen.VerifiedAction):
    """
    Expand methods on the class code generated by flatc.

    FIXME: come up with a PR for flatc to generated this stuff automatically.
    """
    @classmethod
    def GetRootAsVerifiedAction(cls, buf, offset):
        n = flatbuffers.encode.Get(flatbuffers.packer.uoffset, buf, offset)
        x = _VerifiedActionGen()
        x.Init(buf, n + offset)
        return x

    def OidAsBytes(self):
        o = flatbuffers.number_types.UOffsetTFlags.py_type(self._tab.Offset(4))
        if o != 0:
            _off = self._tab.Vector(o)
            _len = self._tab.VectorLen(o)
            return memoryview(self._tab.Bytes)[_off:_off + _len]
        return None

    def VerifiedOidAsBytes(self):
        o = flatbuffers.number_types.UOffsetTFlags.py_type(self._tab.Offset(14))
        if o != 0:
            _off = self._tab.Vector(o)
            _len = self._tab.VectorLen(o)
            return memoryview(self._tab.Bytes)[_off:_off + _len]
        return None

    def VerifiedDataAsBytes(self):
        o = flatbuffers.number_types.UOffsetTFlags.py_type(self._tab.Offset(16))
        if o != 0:
            _off = self._tab.Vector(o)
            _len = self._tab.VectorLen(o)
            return memoryview(self._tab.Bytes)[_off:_off + _len]
        return None


class VerificationType(object):

    NONE = 0
    """
    Unset
    """

    MEMBER_ONBOARD_EMAIL = 1
    """
    Member onboarded, using verification via member primary email.
    """

    MEMBER_LOGIN_EMAIL = 2
    """
    Member created a market, using verification via member primary email.
    """

    CREATE_MARKET_EMAIL = 3
    """
    Member joined a market, using verification via member primary email.
    """


[docs]class VerifiedAction(object): """ User actions (such as "on-board new user") yet to be verified. Actions to be verified are identified using their "Verification Action ID". A verificatin email is sent to the (primary) email address of the user, and to verify, the user must click the verification link contained in the email. The verification link contains the verification action ID and code. """ VERIFICATION_TYPE_NONE = 0 """ Unset verification action type ("null"). """ VERIFICATION_TYPE_ONBOARD_MEMBER = 1 """ Verification action type for on-boarding new members via email verification. """ VERIFICATION_TYPE_LOGIN_MEMBER = 2 """ Verification action type for login of member (client) via email verification. """ VERIFICATION_TYPE_CREATE_MARKET = 3 """ Verification action type for creation of new data market via email verification. """ VERIFICATION_TYPE_JOIN_MARKET = 4 """ Verification action type for joining an existing market via email verification. """ VERIFICATION_TYPE_CREATE_CATALOG = 5 """ Verification action type for creation of a catalog via email verification. """ VERIFICATION_TYPE_PUBLISH_API = 5 """ Verification action type for publishing on API via email verification. """ VERIFICATION_TYPE = list(range(7)) """ All valid verification action types. """ VERIFICATION_STATUS_NONE = 0 """ Unset verification action status ("null"). """ VERIFICATION_STATUS_PENDING = 1 """ Verification is still pending. """ VERIFICATION_STATUS_VERIFIED = 2 """ Verification has been successfully completed. """ VERIFICATION_STATUS_FAILED = 3 """ Verification has failed. """ VERIFICATION_STATUS_EXPIRED = 4 """ Action cannot be verified any longer since it has expired. """ VERIFICATION_STATUS = list(range(5)) """ All valid verification status codes. """ def __init__(self, from_fbs=None): self._from_fbs = from_fbs # Globally unique and static ID of action . # [uint8] (uuid) self._oid = None # Timestamp (epoch time in ns) of initial creation of this record. # uint64 (timestamp) self._created = None # Verification type. # VerificationType (uint8) self._vtype = None # Verification type. # VerificationStatus (uint8) self._vstatus = None # Verification code sent. # string self._vcode = None # ID of object of verified action. # [uint8] (uuid) self._verified_oid = None # Action data, serialized in CBOR. # [uint8] self._verified_data = None def marshal(self): obj = { 'oid': self.oid.bytes if self.oid else None, 'created': int(self.created) if self.created else None, 'vtype': self.vtype, 'vstatus': self.vstatus, 'vcode': self.vcode, 'verified_oid': self.verified_oid.bytes if self.verified_oid.bytes else None, 'verified_data': cbor2.dumps(self.verified_data) if self.verified_data else None, } return obj def __str__(self): return '\n{}\n'.format(pprint.pformat(self.marshal())) @property def oid(self) -> uuid.UUID: """ Globally unique and static ID of action. """ if self._oid is None and self._from_fbs: if self._from_fbs.OidLength(): _oid = self._from_fbs.OidAsBytes() self._oid = uuid.UUID(bytes=bytes(_oid)) else: self._oid = uuid.UUID(bytes=b'\x00' * 20) return self._oid @oid.setter def oid(self, value: uuid.UUID): assert value is None or isinstance(value, uuid.UUID) self._oid = value @property def created(self) -> np.datetime64: """ Timestamp (epoch time in ns) of initial creation of this record. """ if self._created is None and self._from_fbs: self._created = np.datetime64(self._from_fbs.Created(), 'ns') return self._created @created.setter def created(self, value: np.datetime64): assert value is None or isinstance(value, np.datetime64) self._created = value @property def vtype(self) -> int: """ Verification type. """ if self._vtype is None and self._from_fbs: self._vtype = self._from_fbs.Vtype() return self._vtype or 0 @vtype.setter def vtype(self, value: int): assert value is None or (type(value) == int and value in self.VERIFICATION_TYPE) self._vtype = value @property def vstatus(self) -> int: """ Verification type. """ if self._vstatus is None and self._from_fbs: self._vstatus = self._from_fbs.Vstatus() return self._vstatus or 0 @vstatus.setter def vstatus(self, value: int): assert value is None or (type(value) == int and value in self.VERIFICATION_STATUS) self._vstatus = value @property def vcode(self) -> str: """ Verification code sent. """ if self._vcode is None and self._from_fbs: vcode = self._from_fbs.Vcode() if vcode: self._vcode = vcode.decode('utf8') return self._vcode @vcode.setter def vcode(self, value: str): assert value is None or type(value) == str self._vcode = value @property def verified_oid(self) -> uuid.UUID: """ ID of object of verified action. """ if self._verified_oid is None and self._from_fbs: if self._from_fbs.VerifiedOidLength(): _verified_oid = self._from_fbs.VerifiedOidAsBytes() self._verified_oid = uuid.UUID(bytes=bytes(_verified_oid)) else: self._verified_oid = uuid.UUID(bytes=b'\x00' * 16) return self._verified_oid @verified_oid.setter def verified_oid(self, value: uuid.UUID): assert value is None or isinstance(value, uuid.UUID) self._verified_oid = value @property def verified_data(self) -> dict: """ Action data, serialized in CBOR. """ if self._verified_data is None and self._from_fbs: _verified_data = self._from_fbs.VerifiedDataAsBytes() if _verified_data: self._verified_data = cbor2.loads(_verified_data) else: self._verified_data = {} return self._verified_data @verified_data.setter def verified_data(self, value: dict): assert value is None or type(value) == dict self._verified_data = value @staticmethod def cast(buf): return VerifiedAction(_VerifiedActionGen.GetRootAsVerifiedAction(buf, 0)) def build(self, builder): oid = self.oid.bytes if self.oid else None if oid: oid = builder.CreateString(oid) vcode = self.vcode if vcode: vcode = builder.CreateString(vcode) verified_oid = self.verified_oid.bytes if self.verified_oid else None if verified_oid: verified_oid = builder.CreateString(verified_oid) verified_data = self.verified_data if verified_data: verified_data = builder.CreateString(cbor2.dumps(verified_data)) VerifiedActionGen.VerifiedActionStart(builder) if oid: VerifiedActionGen.VerifiedActionAddOid(builder, oid) if self.created: VerifiedActionGen.VerifiedActionAddCreated(builder, int(self.created)) if self.vtype: VerifiedActionGen.VerifiedActionAddVtype(builder, self.vtype) if self.vstatus: VerifiedActionGen.VerifiedActionAddVstatus(builder, self.vstatus) if vcode: VerifiedActionGen.VerifiedActionAddVcode(builder, vcode) if verified_oid: VerifiedActionGen.VerifiedActionAddVerifiedOid(builder, verified_oid) if verified_data: VerifiedActionGen.VerifiedActionAddVerifiedData(builder, verified_data) final = VerifiedActionGen.VerifiedActionEnd(builder) return final
[docs]@table('84d805a7-c012-4dfa-9b95-e34760767f82', build=VerifiedAction.build, cast=VerifiedAction.cast) class VerifiedActions(MapUuidFlatBuffers): """ Database table for verification/verified actions, eg on-boarding new XBR members. """
[docs] @staticmethod def parse(data): """ :param data: :return: """ oid = None if 'oid' in data: assert type(data['oid'] == bytes and len(data['oid']) == 16) oid = uuid.UUID(bytes=data['oid']) created = None if 'created' in data: assert type(data['created'] == int) created = np.datetime64(data['created'], 'ns') vtype = None if 'vtype' in data: assert type(data['vtype'] == int) vtype = data['vtype'] vstatus = None if 'vstatus' in data: assert type(data['vstatus'] == int) vstatus = data['vstatus'] vcode = None if 'vcode' in data: assert type(data['vcode'] == str) vcode = data['vcode'] verified_oid = None if 'verified_oid' in data: assert type(data['verified_oid'] == bytes and len(data['verified_oid']) == 16) verified_oid = uuid.UUID(bytes=data['verified_oid']) verified_data = None if 'verified_data' in data: assert type(data['verified_data']) == bytes verified_data = cbor2.loads(data['verified_data']) obj = VerifiedAction() obj.oid = oid obj.created = created obj.vtype = vtype obj.vstatus = vstatus obj.vcode = vcode obj.verified_oid = verified_oid obj.verified_data = verified_data return obj