Source code for cfxdb.xbrmm.userkey
##############################################################################
#
# Crossbar.io Database
# Copyright (c) typedef int GmbH. Licensed under MIT.
#
##############################################################################
import pprint
import uuid
import numpy as np
from zlmdb import MapBytes32FlatBuffers, MapUuidTimestampBytes32, flatbuffers, table
from cfxdb.gen.xbrmm import UserKey as UserKeyGen
[docs]
class _UserKeyGen(UserKeyGen.UserKey):
"""
Expand methods on the class code generated by flatc.
FIXME: come up with a PR for flatc to generated this stuff automatically.
"""
@classmethod
[docs]
def GetRootAsUserKey(cls, buf, offset):
n = flatbuffers.encode.Get(flatbuffers.packer.uoffset, buf, offset)
x = _UserKeyGen()
x.Init(buf, n + offset)
return x
[docs]
def PubkeyAsBytes(self):
o = flatbuffers.number_types.UOffsetTFlags.py_type(self._tab.Offset(4))
if o != 0:
_off = self._tab.Vector(o)
_len = self._tab.VectorLen(o)
return memoryview(self._tab.Bytes)[_off : _off + _len]
return None
[docs]
def OwnerAsBytes(self):
o = flatbuffers.number_types.UOffsetTFlags.py_type(self._tab.Offset(8))
if o != 0:
_off = self._tab.Vector(o)
_len = self._tab.VectorLen(o)
return memoryview(self._tab.Bytes)[_off : _off + _len]
return None
[docs]
def WalletAddressAsBytes(self):
o = flatbuffers.number_types.UOffsetTFlags.py_type(self._tab.Offset(10))
if o != 0:
_off = self._tab.Vector(o)
_len = self._tab.VectorLen(o)
return memoryview(self._tab.Bytes)[_off : _off + _len]
return None
[docs]
def SignatureAsBytes(self):
o = flatbuffers.number_types.UOffsetTFlags.py_type(self._tab.Offset(12))
if o != 0:
_off = self._tab.Vector(o)
_len = self._tab.VectorLen(o)
return memoryview(self._tab.Bytes)[_off : _off + _len]
return None
[docs]
class UserKey:
"""
User client (public) keys.
"""
def __init__(self, from_fbs=None):
[docs]
self._from_fbs = from_fbs
# User key - a Ed25519 public key - for authenticating using WAMP-cryptosign.
# [uint8] (uint256)
# Timestamp (epoch time in ns) of initial creation of this record.
# uint64 (timestamp)
# ID of user account this user key is owned by.
# [uint8] (uuid)
# Wallet address of user account this key is owned by.
# [uint8] (address)
[docs]
self._wallet_address = None
# Signature which resulted in this key being saved.
# [uint8] (ethsig)
[docs]
def marshal(self):
obj = {
"pubkey": bytes(self.pubkey),
"created": int(self.created),
"owner": self.owner.bytes,
"wallet_address": self.wallet_address if self.wallet_address else None,
"signature": bytes(self.signature) if self.signature else None,
}
return obj
[docs]
def __str__(self):
return "\n{}\n".format(pprint.pformat(self.marshal()))
@property
[docs]
def pubkey(self) -> bytes:
"""
User key - a Ed25519 public key - for authenticating using WAMP-cryptosign.
"""
if self._pubkey is None and self._from_fbs:
if self._from_fbs.PubkeyLength():
self._pubkey = self._from_fbs.PubkeyAsBytes()
return self._pubkey
@pubkey.setter
def pubkey(self, value: bytes):
assert value is None or (type(value) == bytes and len(value) == 32)
self._pubkey = value
@property
[docs]
def created(self) -> np.datetime64:
"""
Timestamp (epoch time in ns) of initial creation of this record.
"""
if self._created is None and self._from_fbs:
self._created = np.datetime64(self._from_fbs.Created(), "ns")
return self._created
@created.setter
def created(self, value: np.datetime64):
assert value is None or isinstance(value, np.datetime64)
self._created = value
@property
[docs]
def owner(self) -> uuid.UUID:
"""
ID of user account this user key is owned by.
"""
if self._owner is None and self._from_fbs:
if self._from_fbs.OwnerLength():
_owner = self._from_fbs.OwnerAsBytes()
self._owner = uuid.UUID(bytes=bytes(_owner))
else:
self._owner = uuid.UUID(bytes=b"\x00" * 20)
return self._owner
@owner.setter
def owner(self, value: uuid.UUID):
assert value is None or isinstance(value, uuid.UUID)
self._owner = value
@property
[docs]
def wallet_address(self) -> bytes:
"""
Wallet address of the member this key belong to.
"""
if self._wallet_address is None and self._from_fbs:
if self._from_fbs.WalletAddressLength():
self._wallet_address = self._from_fbs.WalletAddressAsBytes()
return self._wallet_address
@wallet_address.setter
def wallet_address(self, value: bytes):
assert value is None or (type(value) == bytes and len(value) == 20)
self._wallet_address = value
@property
[docs]
def signature(self) -> bytes:
"""
market maker transaction signature.
"""
if self._signature is None and self._from_fbs:
if self._from_fbs.SignatureLength():
self._signature = self._from_fbs.SignatureAsBytes()
return self._signature
@signature.setter
def signature(self, value: bytes):
assert value is None or (type(value) == bytes and len(value) == 65)
self._signature = value
@staticmethod
[docs]
def cast(buf):
return UserKey(_UserKeyGen.GetRootAsUserKey(buf, 0))
[docs]
def build(self, builder):
owner = self.owner.bytes if self.owner else None
if owner:
owner = builder.CreateString(owner)
pubkey = self.pubkey
if pubkey:
pubkey = builder.CreateString(pubkey)
wallet_address = self.wallet_address
if wallet_address:
wallet_address = builder.CreateString(wallet_address)
signature = self.signature
if signature:
signature = builder.CreateString(signature)
UserKeyGen.UserKeyStart(builder)
if pubkey:
UserKeyGen.UserKeyAddPubkey(builder, pubkey)
if self.created:
UserKeyGen.UserKeyAddCreated(builder, int(self.created))
if owner:
UserKeyGen.UserKeyAddOwner(builder, owner)
if wallet_address:
UserKeyGen.UserKeyAddWalletAddress(builder, wallet_address)
if signature:
UserKeyGen.UserKeyAddSignature(builder, signature)
final = UserKeyGen.UserKeyEnd(builder)
return final
@table("62d140f6-c1f4-4f38-bfde-e450f29d0f95", build=UserKey.build, cast=UserKey.cast)
[docs]
class UserKeys(MapBytes32FlatBuffers):
"""
Database table for user client keys.
"""
@table("d4476238-c642-4625-8844-780137152ddd")
[docs]
class IndexUserKeyByMember(MapUuidTimestampBytes32):
"""
Database (index) table for (member_oid, created) -> userkey mapping.
"""