Source code for cfxdb.realmstore._session

##############################################################################
#
#                        Crossbar.io Database
#     Copyright (c) Crossbar.io Technologies GmbH. Licensed under MIT.
#
##############################################################################

import uuid
import cbor2
from typing import Optional, Dict, Any
import pprint

import flatbuffers
import numpy as np

from zlmdb import table, MapUuidFlatBuffers, MapUint64TimestampUuid
from cfxdb.gen.realmstore import Session as SessionGen


class _SessionGen(SessionGen.Session):
    @classmethod
    def GetRootAs(cls, buf, offset=0):
        n = flatbuffers.encode.Get(flatbuffers.packer.uoffset, buf, offset)
        x = _SessionGen()
        x.Init(buf, n + offset)
        return x

    def ArealmOidAsBytes(self):
        o = flatbuffers.number_types.UOffsetTFlags.py_type(self._tab.Offset(4))
        if o != 0:
            _off = self._tab.Vector(o)
            _len = self._tab.VectorLen(o)
            return memoryview(self._tab.Bytes)[_off:_off + _len]
        return None

    def OidAsBytes(self):
        o = flatbuffers.number_types.UOffsetTFlags.py_type(self._tab.Offset(6))
        if o != 0:
            _off = self._tab.Vector(o)
            _len = self._tab.VectorLen(o)
            return memoryview(self._tab.Bytes)[_off:_off + _len]
        return None

    def NodeOidAsBytes(self):
        o = flatbuffers.number_types.UOffsetTFlags.py_type(self._tab.Offset(14))
        if o != 0:
            _off = self._tab.Vector(o)
            _len = self._tab.VectorLen(o)
            return memoryview(self._tab.Bytes)[_off:_off + _len]
        return None

    def TransportAsBytes(self):
        o = flatbuffers.number_types.UOffsetTFlags.py_type(self._tab.Offset(22))
        if o != 0:
            _off = self._tab.Vector(o)
            _len = self._tab.VectorLen(o)
            return memoryview(self._tab.Bytes)[_off:_off + _len]
        return None

    def ProxyNodeOidAsBytes(self):
        o = flatbuffers.number_types.UOffsetTFlags.py_type(self._tab.Offset(24))
        if o != 0:
            _off = self._tab.Vector(o)
            _len = self._tab.VectorLen(o)
            return memoryview(self._tab.Bytes)[_off:_off + _len]
        return None

    def ProxyTransportAsBytes(self):
        o = flatbuffers.number_types.UOffsetTFlags.py_type(self._tab.Offset(32))
        if o != 0:
            _off = self._tab.Vector(o)
            _len = self._tab.VectorLen(o)
            return memoryview(self._tab.Bytes)[_off:_off + _len]
        return None

    def AuthextraAsBytes(self):
        o = flatbuffers.number_types.UOffsetTFlags.py_type(self._tab.Offset(44))
        if o != 0:
            _off = self._tab.Vector(o)
            _len = self._tab.VectorLen(o)
            return memoryview(self._tab.Bytes)[_off:_off + _len]
        return None


[docs]class Session(object): """ Persisted session database object. """ __slots__ = ( '_from_fbs', '_arealm_oid', '_oid', '_session', '_joined_at', '_left_at', '_node_oid', '_node_authid', '_worker_name', '_worker_pid', '_transport', '_proxy_node_oid', '_proxy_node_authid', '_proxy_worker_name', '_proxy_worker_pid', '_proxy_transport', '_realm', '_authid', '_authrole', '_authmethod', '_authprovider', '_authextra', ) def __init__(self, from_fbs: Optional[_SessionGen] = None): self._from_fbs = from_fbs # [uint8] (uuid) self._arealm_oid: Optional[uuid.UUID] = None # [uint8] (uuid) self._oid: Optional[uuid.UUID] = None # uint64 self._session: Optional[int] = None # uint64 (timestamp) self._joined_at: Optional[np.datetime64] = None # uint64 (timestamp) self._left_at: Optional[np.datetime64] = None # [uint8] (uuid) self._node_oid: Optional[uuid.UUID] = None # string self._node_authid: Optional[str] = None # string self._worker_name: Optional[str] = None # int32 self._worker_pid: Optional[int] = None # [uint8] (cbor) self._transport: Optional[Dict[str, Any]] = None # [uint8] (uuid) self._proxy_node_oid: Optional[uuid.UUID] = None # string self._proxy_node_authid: Optional[str] = None # string self._proxy_worker_name: Optional[str] = None # int32 self._proxy_worker_pid: Optional[int] = None # [uint8] (cbor) self._proxy_transport: Optional[Dict[str, Any]] = None # string self._realm: Optional[str] = None # string self._authid: Optional[str] = None # string self._authrole: Optional[str] = None # string self._authmethod: Optional[str] = None # string self._authprovider: Optional[str] = None # [uint8] (cbor) self._authextra: Optional[Dict[str, Any]] = None
[docs] def marshal(self): obj = { 'arealm_oid': self.arealm_oid.bytes if self.arealm_oid else None, 'oid': self.oid.bytes if self.oid else None, 'session': self.session, 'joined_at': int(self.joined_at) if self.joined_at else None, 'left_at': int(self.left_at) if self.left_at else None, 'node_oid': self.node_oid.bytes if self.node_oid else None, 'node_authid': self.node_authid, 'worker_name': self.worker_name, 'worker_pid': self.worker_pid, 'transport': self.transport, 'proxy_node_oid': self.proxy_node_oid.bytes if self.proxy_node_oid else None, 'proxy_node_authid': self.proxy_node_authid, 'proxy_worker_name': self.proxy_worker_name, 'proxy_worker_pid': self.proxy_worker_pid, 'proxy_transport': self.proxy_transport, 'realm': self.realm, 'authid': self.authid, 'authrole': self.authrole, 'authmethod': self.authmethod, 'authprovider': self.authprovider, 'authextra': self.authextra, } return obj
def __str__(self): return '\n{}\n'.format(pprint.pformat(self.marshal())) @property def arealm_oid(self) -> Optional[uuid.UUID]: """ OID of the application realm this session is/was joined on. """ if self._arealm_oid is None and self._from_fbs: if self._from_fbs.ArealmOidLength(): _arealm_oid = self._from_fbs.ArealmOidAsBytes() self._arealm_oid = uuid.UUID(bytes=bytes(_arealm_oid)) return self._arealm_oid @arealm_oid.setter def arealm_oid(self, value: Optional[uuid.UUID]): assert value is None or isinstance(value, uuid.UUID) self._arealm_oid = value @property def oid(self) -> Optional[uuid.UUID]: """ Unlimited time, globally unique, long-term OID of this session. The pair of WAMP session ID and join time ``(session, joined_at)`` bidirectionally maps to session ``oid``. """ if self._oid is None and self._from_fbs: if self._from_fbs.OidLength(): _oid = self._from_fbs.OidAsBytes() self._oid = uuid.UUID(bytes=bytes(_oid)) return self._oid @oid.setter def oid(self, value: Optional[uuid.UUID]): assert value is None or isinstance(value, uuid.UUID) self._oid = value @property def session(self) -> Optional[int]: """ The WAMP session_id of the session. """ if self._session is None and self._from_fbs: self._session = self._from_fbs.Session() return self._session @session.setter def session(self, value: Optional[int]): assert value is None or type(value) == int self._session = value @property def joined_at(self) -> Optional[np.datetime64]: """ Timestamp when the session was joined by the router. Epoch time in ns. """ if self._joined_at is None and self._from_fbs: self._joined_at = np.datetime64(self._from_fbs.JoinedAt(), 'ns') return self._joined_at @joined_at.setter def joined_at(self, value: Optional[np.datetime64]): assert value is None or isinstance(value, np.datetime64) self._joined_at = value @property def left_at(self) -> Optional[np.datetime64]: """ Timestamp when the session left the router. Epoch time in ns. """ if self._left_at is None and self._from_fbs: self._left_at = np.datetime64(self._from_fbs.LeftAt(), 'ns') return self._left_at @left_at.setter def left_at(self, value: Optional[np.datetime64]): assert value is None or isinstance(value, np.datetime64) self._left_at = value @property def node_oid(self) -> Optional[uuid.UUID]: """ OID of the node of the router worker hosting this session. """ if self._node_oid is None and self._from_fbs: if self._from_fbs.NodeOidLength(): _node_oid = self._from_fbs.NodeOidAsBytes() self._node_oid = uuid.UUID(bytes=bytes(_node_oid)) return self._node_oid @node_oid.setter def node_oid(self, value: Optional[uuid.UUID]): assert value is None or isinstance(value, uuid.UUID) self._node_oid = value @property def node_authid(self) -> Optional[str]: """ Name (management realm WAMP authid) of the node of the router worker hosting this session. """ if self._node_authid is None and self._from_fbs: _node_authid = self._from_fbs.NodeAuthid() if _node_authid: self._node_authid = _node_authid.decode('utf8') return self._node_authid @node_authid.setter def node_authid(self, value: Optional[str]): self._node_authid = value @property def worker_name(self) -> Optional[str]: """ Local worker name of the router worker hosting this session. """ if self._worker_name is None and self._from_fbs: _worker_name = self._from_fbs.WorkerName() if _worker_name: self._worker_name = _worker_name.decode('utf8') return self._worker_name @worker_name.setter def worker_name(self, value: Optional[str]): self._worker_name = value @property def worker_pid(self) -> Optional[int]: """ Local worker PID of the router worker hosting this session. """ if self._worker_pid is None and self._from_fbs: self._worker_pid = self._from_fbs.WorkerPid() return self._worker_pid @worker_pid.setter def worker_pid(self, value: Optional[int]): self._worker_pid = value @property def transport(self) -> Optional[Dict[str, Any]]: """ Session transport information. """ if self._transport is None and self._from_fbs: _transport = self._from_fbs.TransportAsBytes() if _transport: self._transport = cbor2.loads(_transport) else: self._transport = {} return self._transport @transport.setter def transport(self, value: Optional[Dict[str, Any]]): assert value is None or type(value) == dict self._transport = value @property def proxy_node_oid(self) -> Optional[uuid.UUID]: """ From proxy (in proxy-router cluster setups): OID of the node of the proxy worker hosting this session. """ if self._proxy_node_oid is None and self._from_fbs: if self._from_fbs.ProxyNodeOidLength(): _proxy_node_oid = self._from_fbs.ProxyNodeOidAsBytes() self._proxy_node_oid = uuid.UUID(bytes=bytes(_proxy_node_oid)) return self._proxy_node_oid @proxy_node_oid.setter def proxy_node_oid(self, value: Optional[uuid.UUID]): assert value is None or isinstance(value, uuid.UUID) self._proxy_node_oid = value @property def proxy_node_authid(self) -> Optional[str]: """ From proxy (in proxy-router cluster setups): Name (management realm WAMP authid) of the node of the proxy worker hosting this session. """ if self._proxy_node_authid is None and self._from_fbs: _proxy_node_authid = self._from_fbs.ProxyNodeAuthid() if _proxy_node_authid: self._proxy_node_authid = _proxy_node_authid.decode('utf8') return self._proxy_node_authid @proxy_node_authid.setter def proxy_node_authid(self, value: Optional[str]): self._proxy_node_authid = value @property def proxy_worker_name(self) -> Optional[str]: """ From proxy (in proxy-router cluster setups): Local worker name of the proxy worker hosting this session. """ if self._proxy_worker_name is None and self._from_fbs: _proxy_worker_name = self._from_fbs.ProxyWorkerName() if _proxy_worker_name: self._proxy_worker_name = _proxy_worker_name.decode('utf8') return self._proxy_worker_name @proxy_worker_name.setter def proxy_worker_name(self, value: Optional[str]): self._proxy_worker_name = value @property def proxy_worker_pid(self) -> Optional[int]: """ From proxy (in proxy-router cluster setups): Local worker PID of the proxy worker hosting this session. """ if self._proxy_worker_pid is None and self._from_fbs: self._proxy_worker_pid = self._from_fbs.ProxyWorkerPid() return self._proxy_worker_pid @proxy_worker_pid.setter def proxy_worker_pid(self, value: Optional[int]): self._proxy_worker_pid = value @property def proxy_transport(self) -> Optional[Dict[str, Any]]: """ From proxy (in proxy-router cluster setups): Session transport information, the transport from the proxy to the backend router. """ if self._proxy_transport is None and self._from_fbs: _proxy_transport = self._from_fbs.ProxyTransportAsBytes() if _proxy_transport: self._proxy_transport = cbor2.loads(_proxy_transport) else: self._proxy_transport = {} return self._proxy_transport @proxy_transport.setter def proxy_transport(self, value: Optional[Dict[str, Any]]): assert value is None or type(value) == dict self._proxy_transport = value @property def realm(self) -> Optional[str]: """ The WAMP realm the session is/was joined on. """ if self._realm is None and self._from_fbs: self._realm = self._from_fbs.Realm().decode('utf8') return self._realm @realm.setter def realm(self, value: Optional[str]): assert value is None or type(value) == str self._realm = value @property def authid(self) -> Optional[str]: """ The WAMP authid the session was authenticated under. """ if self._authid is None and self._from_fbs: _authid = self._from_fbs.Authid() if _authid: self._authid = _authid.decode('utf8') return self._authid @authid.setter def authid(self, value: Optional[str]): self._authid = value @property def authrole(self) -> Optional[str]: """ The WAMP authrole the session was authenticated under. """ if self._authrole is None and self._from_fbs: _authrole = self._from_fbs.Authrole() if _authrole: self._authrole = _authrole.decode('utf8') return self._authrole @authrole.setter def authrole(self, value: Optional[str]): self._authrole = value @property def authmethod(self) -> Optional[str]: """ The WAMP authmethod uses to authenticate the session. """ if self._authmethod is None and self._from_fbs: _authmethod = self._from_fbs.Authmethod() if _authmethod: self._authmethod = _authmethod.decode('utf8') return self._authmethod @authmethod.setter def authmethod(self, value: Optional[str]): self._authmethod = value @property def authprovider(self) -> Optional[str]: """ The WAMP authprovider that was handling the session authentication. """ if self._authprovider is None and self._from_fbs: _authprovider = self._from_fbs.Authprovider() if _authprovider: self._authprovider = _authprovider.decode('utf8') return self._authprovider @authprovider.setter def authprovider(self, value: Optional[str]): self._authprovider = value @property def authextra(self) -> Optional[Dict[str, Any]]: """ The WAMP authextra as provided to the authenticated session. """ if self._authextra is None and self._from_fbs: _authextra = self._from_fbs.AuthextraAsBytes() if _authextra: self._authextra = cbor2.loads(_authextra) else: self._authextra = {} return self._authextra @authextra.setter def authextra(self, value: Optional[Dict[str, Any]]): assert value is None or type(value) == dict self._authextra = value
[docs] @staticmethod def cast(buf) -> 'Session': return Session(_SessionGen.GetRootAsSession(buf, 0))
[docs] def build(self, builder): arealm_oid = self.arealm_oid.bytes if self.arealm_oid else None if arealm_oid: arealm_oid = builder.CreateString(arealm_oid) oid = self.oid.bytes if self.oid else None if oid: oid = builder.CreateString(oid) node_oid = self.node_oid.bytes if self.node_oid else None if node_oid: node_oid = builder.CreateString(node_oid) node_authid = self.node_authid if node_authid: node_authid = builder.CreateString(node_authid) worker_name = self.worker_name if worker_name: worker_name = builder.CreateString(worker_name) transport = self.transport if transport: transport = builder.CreateString(cbor2.dumps(transport)) proxy_node_oid = self.proxy_node_oid.bytes if self.proxy_node_oid else None if proxy_node_oid: proxy_node_oid = builder.CreateString(proxy_node_oid) proxy_node_authid = self.proxy_node_authid if proxy_node_authid: proxy_node_authid = builder.CreateString(proxy_node_authid) proxy_worker_name = self.proxy_worker_name if proxy_worker_name: proxy_worker_name = builder.CreateString(proxy_worker_name) proxy_transport = self.proxy_transport if proxy_transport: proxy_transport = builder.CreateString(cbor2.dumps(proxy_transport)) realm = self.realm if realm: realm = builder.CreateString(realm) authid = self.authid if authid: authid = builder.CreateString(authid) authrole = self.authrole if authrole: authrole = builder.CreateString(authrole) authmethod = self.authmethod if authmethod: authmethod = builder.CreateString(authmethod) authprovider = self.authprovider if authprovider: authprovider = builder.CreateString(authprovider) authextra = self.authextra if authextra: authextra = builder.CreateString(cbor2.dumps(authextra)) SessionGen.SessionStart(builder) if arealm_oid: SessionGen.SessionAddArealmOid(builder, arealm_oid) if oid: SessionGen.SessionAddOid(builder, oid) if self.session: SessionGen.SessionAddSession(builder, self.session) if self.joined_at: SessionGen.SessionAddJoinedAt(builder, int(self.joined_at)) if self.left_at: SessionGen.SessionAddLeftAt(builder, int(self.left_at)) if node_oid: SessionGen.SessionAddNodeOid(builder, node_oid) if node_authid: SessionGen.SessionAddNodeAuthid(builder, node_authid) if worker_name: SessionGen.SessionAddWorkerName(builder, worker_name) if self.worker_pid: SessionGen.SessionAddWorkerPid(builder, self.worker_pid) if transport: SessionGen.SessionAddTransport(builder, transport) if proxy_node_oid: SessionGen.SessionAddProxyNodeOid(builder, proxy_node_oid) if proxy_node_authid: SessionGen.SessionAddProxyNodeAuthid(builder, proxy_node_authid) if proxy_worker_name: SessionGen.SessionAddProxyWorkerName(builder, proxy_worker_name) if self.proxy_worker_pid: SessionGen.SessionAddProxyWorkerPid(builder, self.proxy_worker_pid) if proxy_transport: SessionGen.SessionAddProxyTransport(builder, proxy_transport) if realm: SessionGen.SessionAddRealm(builder, realm) if authid: SessionGen.SessionAddAuthid(builder, authid) if authrole: SessionGen.SessionAddAuthrole(builder, authrole) if authmethod: SessionGen.SessionAddAuthmethod(builder, authmethod) if authprovider: SessionGen.SessionAddAuthprovider(builder, authprovider) if authextra: SessionGen.SessionAddAuthextra(builder, authextra) final = SessionGen.SessionEnd(builder) return final
[docs]@table('403ecc06-f564-4ea9-92f2-c4c13bd2ba5a', build=Session.build, cast=Session.cast) class Sessions(MapUuidFlatBuffers): """ Persisted session information table. Map :class:`zlmdb.MapUuidFlatBuffers` from ``session_oid`` to :class:`cfxdb.realmstore.Session` """
[docs]@table('0ea1ea1a-45f2-4352-a4a0-1fafff099c96') class IndexSessionsBySessionId(MapUint64TimestampUuid): """ Index: ``(sessionid, joined_at) -> session_oid`` """