Source code for cfxdb.realmstore._session

##############################################################################
#
#                        Crossbar.io Database
#     Copyright (c) typedef int GmbH. Licensed under MIT.
#
##############################################################################

import pprint
import uuid
from typing import Any, Dict, Optional

import cbor2
import numpy as np
from zlmdb import MapUint64TimestampUuid, MapUuidFlatBuffers, flatbuffers, table

from cfxdb.gen.realmstore import Session as SessionGen


[docs] class _SessionGen(SessionGen.Session): @classmethod
[docs] def GetRootAs(cls, buf, offset=0): n = flatbuffers.encode.Get(flatbuffers.packer.uoffset, buf, offset) x = _SessionGen() x.Init(buf, n + offset) return x
[docs] def ArealmOidAsBytes(self): o = flatbuffers.number_types.UOffsetTFlags.py_type(self._tab.Offset(4)) if o != 0: _off = self._tab.Vector(o) _len = self._tab.VectorLen(o) return memoryview(self._tab.Bytes)[_off : _off + _len] return None
[docs] def OidAsBytes(self): o = flatbuffers.number_types.UOffsetTFlags.py_type(self._tab.Offset(6)) if o != 0: _off = self._tab.Vector(o) _len = self._tab.VectorLen(o) return memoryview(self._tab.Bytes)[_off : _off + _len] return None
[docs] def NodeOidAsBytes(self): o = flatbuffers.number_types.UOffsetTFlags.py_type(self._tab.Offset(14)) if o != 0: _off = self._tab.Vector(o) _len = self._tab.VectorLen(o) return memoryview(self._tab.Bytes)[_off : _off + _len] return None
[docs] def TransportAsBytes(self): o = flatbuffers.number_types.UOffsetTFlags.py_type(self._tab.Offset(22)) if o != 0: _off = self._tab.Vector(o) _len = self._tab.VectorLen(o) return memoryview(self._tab.Bytes)[_off : _off + _len] return None
[docs] def ProxyNodeOidAsBytes(self): o = flatbuffers.number_types.UOffsetTFlags.py_type(self._tab.Offset(24)) if o != 0: _off = self._tab.Vector(o) _len = self._tab.VectorLen(o) return memoryview(self._tab.Bytes)[_off : _off + _len] return None
[docs] def ProxyTransportAsBytes(self): o = flatbuffers.number_types.UOffsetTFlags.py_type(self._tab.Offset(32)) if o != 0: _off = self._tab.Vector(o) _len = self._tab.VectorLen(o) return memoryview(self._tab.Bytes)[_off : _off + _len] return None
[docs] def AuthextraAsBytes(self): o = flatbuffers.number_types.UOffsetTFlags.py_type(self._tab.Offset(44)) if o != 0: _off = self._tab.Vector(o) _len = self._tab.VectorLen(o) return memoryview(self._tab.Bytes)[_off : _off + _len] return None
[docs] class Session(object): """ Persisted session database object. """
[docs] __slots__ = ( "_from_fbs", "_arealm_oid", "_oid", "_session", "_joined_at", "_left_at", "_node_oid", "_node_authid", "_worker_name", "_worker_pid", "_transport", "_proxy_node_oid", "_proxy_node_authid", "_proxy_worker_name", "_proxy_worker_pid", "_proxy_transport", "_realm", "_authid", "_authrole", "_authmethod", "_authprovider", "_authextra", )
def __init__(self, from_fbs: Optional[_SessionGen] = None):
[docs] self._from_fbs = from_fbs
# [uint8] (uuid)
[docs] self._arealm_oid: Optional[uuid.UUID] = None
# [uint8] (uuid)
[docs] self._oid: Optional[uuid.UUID] = None
# uint64
[docs] self._session: Optional[int] = None
# uint64 (timestamp)
[docs] self._joined_at: Optional[np.datetime64] = None
# uint64 (timestamp)
[docs] self._left_at: Optional[np.datetime64] = None
# [uint8] (uuid)
[docs] self._node_oid: Optional[uuid.UUID] = None
# string
[docs] self._node_authid: Optional[str] = None
# string
[docs] self._worker_name: Optional[str] = None
# int32
[docs] self._worker_pid: Optional[int] = None
# [uint8] (cbor)
[docs] self._transport: Optional[Dict[str, Any]] = None
# [uint8] (uuid)
[docs] self._proxy_node_oid: Optional[uuid.UUID] = None
# string
[docs] self._proxy_node_authid: Optional[str] = None
# string
[docs] self._proxy_worker_name: Optional[str] = None
# int32
[docs] self._proxy_worker_pid: Optional[int] = None
# [uint8] (cbor)
[docs] self._proxy_transport: Optional[Dict[str, Any]] = None
# string
[docs] self._realm: Optional[str] = None
# string
[docs] self._authid: Optional[str] = None
# string
[docs] self._authrole: Optional[str] = None
# string
[docs] self._authmethod: Optional[str] = None
# string
[docs] self._authprovider: Optional[str] = None
# [uint8] (cbor)
[docs] self._authextra: Optional[Dict[str, Any]] = None
[docs] def marshal(self): obj = { "arealm_oid": self.arealm_oid.bytes if self.arealm_oid else None, "oid": self.oid.bytes if self.oid else None, "session": self.session, "joined_at": int(self.joined_at) if self.joined_at else None, "left_at": int(self.left_at) if self.left_at else None, "node_oid": self.node_oid.bytes if self.node_oid else None, "node_authid": self.node_authid, "worker_name": self.worker_name, "worker_pid": self.worker_pid, "transport": self.transport, "proxy_node_oid": self.proxy_node_oid.bytes if self.proxy_node_oid else None, "proxy_node_authid": self.proxy_node_authid, "proxy_worker_name": self.proxy_worker_name, "proxy_worker_pid": self.proxy_worker_pid, "proxy_transport": self.proxy_transport, "realm": self.realm, "authid": self.authid, "authrole": self.authrole, "authmethod": self.authmethod, "authprovider": self.authprovider, "authextra": self.authextra, } return obj
[docs] def __str__(self): return "\n{}\n".format(pprint.pformat(self.marshal()))
@property
[docs] def arealm_oid(self) -> Optional[uuid.UUID]: """ OID of the application realm this session is/was joined on. """ if self._arealm_oid is None and self._from_fbs: if self._from_fbs.ArealmOidLength(): _arealm_oid = self._from_fbs.ArealmOidAsBytes() self._arealm_oid = uuid.UUID(bytes=bytes(_arealm_oid)) return self._arealm_oid
@arealm_oid.setter def arealm_oid(self, value: Optional[uuid.UUID]): assert value is None or isinstance(value, uuid.UUID) self._arealm_oid = value @property
[docs] def oid(self) -> Optional[uuid.UUID]: """ Unlimited time, globally unique, long-term OID of this session. The pair of WAMP session ID and join time ``(session, joined_at)`` bidirectionally maps to session ``oid``. """ if self._oid is None and self._from_fbs: if self._from_fbs.OidLength(): _oid = self._from_fbs.OidAsBytes() self._oid = uuid.UUID(bytes=bytes(_oid)) return self._oid
@oid.setter def oid(self, value: Optional[uuid.UUID]): assert value is None or isinstance(value, uuid.UUID) self._oid = value @property
[docs] def session(self) -> Optional[int]: """ The WAMP session_id of the session. """ if self._session is None and self._from_fbs: self._session = self._from_fbs.Session() return self._session
@session.setter def session(self, value: Optional[int]): assert value is None or type(value) == int self._session = value @property
[docs] def joined_at(self) -> Optional[np.datetime64]: """ Timestamp when the session was joined by the router. Epoch time in ns. """ if self._joined_at is None and self._from_fbs: self._joined_at = np.datetime64(self._from_fbs.JoinedAt(), "ns") return self._joined_at
@joined_at.setter def joined_at(self, value: Optional[np.datetime64]): assert value is None or isinstance(value, np.datetime64) self._joined_at = value @property
[docs] def left_at(self) -> Optional[np.datetime64]: """ Timestamp when the session left the router. Epoch time in ns. """ if self._left_at is None and self._from_fbs: self._left_at = np.datetime64(self._from_fbs.LeftAt(), "ns") return self._left_at
@left_at.setter def left_at(self, value: Optional[np.datetime64]): assert value is None or isinstance(value, np.datetime64) self._left_at = value @property
[docs] def node_oid(self) -> Optional[uuid.UUID]: """ OID of the node of the router worker hosting this session. """ if self._node_oid is None and self._from_fbs: if self._from_fbs.NodeOidLength(): _node_oid = self._from_fbs.NodeOidAsBytes() self._node_oid = uuid.UUID(bytes=bytes(_node_oid)) return self._node_oid
@node_oid.setter def node_oid(self, value: Optional[uuid.UUID]): assert value is None or isinstance(value, uuid.UUID) self._node_oid = value @property
[docs] def node_authid(self) -> Optional[str]: """ Name (management realm WAMP authid) of the node of the router worker hosting this session. """ if self._node_authid is None and self._from_fbs: _node_authid = self._from_fbs.NodeAuthid() if _node_authid: self._node_authid = _node_authid.decode("utf8") return self._node_authid
@node_authid.setter def node_authid(self, value: Optional[str]): self._node_authid = value @property
[docs] def worker_name(self) -> Optional[str]: """ Local worker name of the router worker hosting this session. """ if self._worker_name is None and self._from_fbs: _worker_name = self._from_fbs.WorkerName() if _worker_name: self._worker_name = _worker_name.decode("utf8") return self._worker_name
@worker_name.setter def worker_name(self, value: Optional[str]): self._worker_name = value @property
[docs] def worker_pid(self) -> Optional[int]: """ Local worker PID of the router worker hosting this session. """ if self._worker_pid is None and self._from_fbs: self._worker_pid = self._from_fbs.WorkerPid() return self._worker_pid
@worker_pid.setter def worker_pid(self, value: Optional[int]): self._worker_pid = value @property
[docs] def transport(self) -> Optional[Dict[str, Any]]: """ Session transport information. """ if self._transport is None and self._from_fbs: _transport = self._from_fbs.TransportAsBytes() if _transport: self._transport = cbor2.loads(_transport) else: self._transport = {} return self._transport
@transport.setter def transport(self, value: Optional[Dict[str, Any]]): assert value is None or type(value) == dict self._transport = value @property
[docs] def proxy_node_oid(self) -> Optional[uuid.UUID]: """ From proxy (in proxy-router cluster setups): OID of the node of the proxy worker hosting this session. """ if self._proxy_node_oid is None and self._from_fbs: if self._from_fbs.ProxyNodeOidLength(): _proxy_node_oid = self._from_fbs.ProxyNodeOidAsBytes() self._proxy_node_oid = uuid.UUID(bytes=bytes(_proxy_node_oid)) return self._proxy_node_oid
@proxy_node_oid.setter def proxy_node_oid(self, value: Optional[uuid.UUID]): assert value is None or isinstance(value, uuid.UUID) self._proxy_node_oid = value @property
[docs] def proxy_node_authid(self) -> Optional[str]: """ From proxy (in proxy-router cluster setups): Name (management realm WAMP authid) of the node of the proxy worker hosting this session. """ if self._proxy_node_authid is None and self._from_fbs: _proxy_node_authid = self._from_fbs.ProxyNodeAuthid() if _proxy_node_authid: self._proxy_node_authid = _proxy_node_authid.decode("utf8") return self._proxy_node_authid
@proxy_node_authid.setter def proxy_node_authid(self, value: Optional[str]): self._proxy_node_authid = value @property
[docs] def proxy_worker_name(self) -> Optional[str]: """ From proxy (in proxy-router cluster setups): Local worker name of the proxy worker hosting this session. """ if self._proxy_worker_name is None and self._from_fbs: _proxy_worker_name = self._from_fbs.ProxyWorkerName() if _proxy_worker_name: self._proxy_worker_name = _proxy_worker_name.decode("utf8") return self._proxy_worker_name
@proxy_worker_name.setter def proxy_worker_name(self, value: Optional[str]): self._proxy_worker_name = value @property
[docs] def proxy_worker_pid(self) -> Optional[int]: """ From proxy (in proxy-router cluster setups): Local worker PID of the proxy worker hosting this session. """ if self._proxy_worker_pid is None and self._from_fbs: self._proxy_worker_pid = self._from_fbs.ProxyWorkerPid() return self._proxy_worker_pid
@proxy_worker_pid.setter def proxy_worker_pid(self, value: Optional[int]): self._proxy_worker_pid = value @property
[docs] def proxy_transport(self) -> Optional[Dict[str, Any]]: """ From proxy (in proxy-router cluster setups): Session transport information, the transport from the proxy to the backend router. """ if self._proxy_transport is None and self._from_fbs: _proxy_transport = self._from_fbs.ProxyTransportAsBytes() if _proxy_transport: self._proxy_transport = cbor2.loads(_proxy_transport) else: self._proxy_transport = {} return self._proxy_transport
@proxy_transport.setter def proxy_transport(self, value: Optional[Dict[str, Any]]): assert value is None or type(value) == dict self._proxy_transport = value @property
[docs] def realm(self) -> Optional[str]: """ The WAMP realm the session is/was joined on. """ if self._realm is None and self._from_fbs: self._realm = self._from_fbs.Realm().decode("utf8") return self._realm
@realm.setter def realm(self, value: Optional[str]): assert value is None or type(value) == str self._realm = value @property
[docs] def authid(self) -> Optional[str]: """ The WAMP authid the session was authenticated under. """ if self._authid is None and self._from_fbs: _authid = self._from_fbs.Authid() if _authid: self._authid = _authid.decode("utf8") return self._authid
@authid.setter def authid(self, value: Optional[str]): self._authid = value @property
[docs] def authrole(self) -> Optional[str]: """ The WAMP authrole the session was authenticated under. """ if self._authrole is None and self._from_fbs: _authrole = self._from_fbs.Authrole() if _authrole: self._authrole = _authrole.decode("utf8") return self._authrole
@authrole.setter def authrole(self, value: Optional[str]): self._authrole = value @property
[docs] def authmethod(self) -> Optional[str]: """ The WAMP authmethod uses to authenticate the session. """ if self._authmethod is None and self._from_fbs: _authmethod = self._from_fbs.Authmethod() if _authmethod: self._authmethod = _authmethod.decode("utf8") return self._authmethod
@authmethod.setter def authmethod(self, value: Optional[str]): self._authmethod = value @property
[docs] def authprovider(self) -> Optional[str]: """ The WAMP authprovider that was handling the session authentication. """ if self._authprovider is None and self._from_fbs: _authprovider = self._from_fbs.Authprovider() if _authprovider: self._authprovider = _authprovider.decode("utf8") return self._authprovider
@authprovider.setter def authprovider(self, value: Optional[str]): self._authprovider = value @property
[docs] def authextra(self) -> Optional[Dict[str, Any]]: """ The WAMP authextra as provided to the authenticated session. """ if self._authextra is None and self._from_fbs: _authextra = self._from_fbs.AuthextraAsBytes() if _authextra: self._authextra = cbor2.loads(_authextra) else: self._authextra = {} return self._authextra
@authextra.setter def authextra(self, value: Optional[Dict[str, Any]]): assert value is None or type(value) == dict self._authextra = value @staticmethod
[docs] def cast(buf) -> "Session": return Session(_SessionGen.GetRootAsSession(buf, 0))
[docs] def build(self, builder): arealm_oid = self.arealm_oid.bytes if self.arealm_oid else None if arealm_oid: arealm_oid = builder.CreateString(arealm_oid) oid = self.oid.bytes if self.oid else None if oid: oid = builder.CreateString(oid) node_oid = self.node_oid.bytes if self.node_oid else None if node_oid: node_oid = builder.CreateString(node_oid) node_authid = self.node_authid if node_authid: node_authid = builder.CreateString(node_authid) worker_name = self.worker_name if worker_name: worker_name = builder.CreateString(worker_name) transport = self.transport if transport: transport = builder.CreateString(cbor2.dumps(transport)) proxy_node_oid = self.proxy_node_oid.bytes if self.proxy_node_oid else None if proxy_node_oid: proxy_node_oid = builder.CreateString(proxy_node_oid) proxy_node_authid = self.proxy_node_authid if proxy_node_authid: proxy_node_authid = builder.CreateString(proxy_node_authid) proxy_worker_name = self.proxy_worker_name if proxy_worker_name: proxy_worker_name = builder.CreateString(proxy_worker_name) proxy_transport = self.proxy_transport if proxy_transport: proxy_transport = builder.CreateString(cbor2.dumps(proxy_transport)) realm = self.realm if realm: realm = builder.CreateString(realm) authid = self.authid if authid: authid = builder.CreateString(authid) authrole = self.authrole if authrole: authrole = builder.CreateString(authrole) authmethod = self.authmethod if authmethod: authmethod = builder.CreateString(authmethod) authprovider = self.authprovider if authprovider: authprovider = builder.CreateString(authprovider) authextra = self.authextra if authextra: authextra = builder.CreateString(cbor2.dumps(authextra)) SessionGen.SessionStart(builder) if arealm_oid: SessionGen.SessionAddArealmOid(builder, arealm_oid) if oid: SessionGen.SessionAddOid(builder, oid) if self.session: SessionGen.SessionAddSession(builder, self.session) if self.joined_at: SessionGen.SessionAddJoinedAt(builder, int(self.joined_at)) if self.left_at: SessionGen.SessionAddLeftAt(builder, int(self.left_at)) if node_oid: SessionGen.SessionAddNodeOid(builder, node_oid) if node_authid: SessionGen.SessionAddNodeAuthid(builder, node_authid) if worker_name: SessionGen.SessionAddWorkerName(builder, worker_name) if self.worker_pid: SessionGen.SessionAddWorkerPid(builder, self.worker_pid) if transport: SessionGen.SessionAddTransport(builder, transport) if proxy_node_oid: SessionGen.SessionAddProxyNodeOid(builder, proxy_node_oid) if proxy_node_authid: SessionGen.SessionAddProxyNodeAuthid(builder, proxy_node_authid) if proxy_worker_name: SessionGen.SessionAddProxyWorkerName(builder, proxy_worker_name) if self.proxy_worker_pid: SessionGen.SessionAddProxyWorkerPid(builder, self.proxy_worker_pid) if proxy_transport: SessionGen.SessionAddProxyTransport(builder, proxy_transport) if realm: SessionGen.SessionAddRealm(builder, realm) if authid: SessionGen.SessionAddAuthid(builder, authid) if authrole: SessionGen.SessionAddAuthrole(builder, authrole) if authmethod: SessionGen.SessionAddAuthmethod(builder, authmethod) if authprovider: SessionGen.SessionAddAuthprovider(builder, authprovider) if authextra: SessionGen.SessionAddAuthextra(builder, authextra) final = SessionGen.SessionEnd(builder) return final
@table("403ecc06-f564-4ea9-92f2-c4c13bd2ba5a", build=Session.build, cast=Session.cast)
[docs] class Sessions(MapUuidFlatBuffers): """ Persisted session information table. Map :class:`zlmdb.MapUuidFlatBuffers` from ``session_oid`` to :class:`cfxdb.realmstore.Session` """
@table("0ea1ea1a-45f2-4352-a4a0-1fafff099c96")
[docs] class IndexSessionsBySessionId(MapUint64TimestampUuid): """ Index: ``(sessionid, joined_at) -> session_oid`` """