##############################################################################
#
# Crossbar.io Database
# Copyright (c) Crossbar.io Technologies GmbH. Licensed under MIT.
#
##############################################################################
from typing import Optional
import pprint
import binascii
from uuid import UUID
import numpy as np
from cfxdb._exception import InvalidConfigException
[docs]class Credential(object):
"""
Credentials created for use with WAMP authentication.
"""
def __init__(self,
oid: Optional[UUID] = None,
created: Optional[np.datetime64] = None,
authmethod: Optional[str] = None,
authid: Optional[str] = None,
realm: Optional[str] = None,
authconfig: Optional[dict] = None,
principal_oid: Optional[UUID] = None,
_unknown=None):
"""
:param oid: Object ID of this credential object
:param created: Timestamp when credential was created.
:param authmethod: WAMP authentication method offered by the authenticating client.
:param realm: WAMP realm requested by the authenticating client.
:param authid: WAMP authid announced by the authenticating client.
:param authconfig: Authentication method specific configuration.
:param principal_oid: ID of the principal this credential resolves to upon successful authentication.
"""
self.oid = oid
self.created = created
self.authmethod = authmethod
self.realm = realm
self.authid = authid
self.authconfig = authconfig
self.principal_oid = principal_oid
# private member with unknown/untouched data passing through
self._unknown = _unknown
def __eq__(self, other):
if not isinstance(other, self.__class__):
return False
if other.oid != self.oid:
return False
if other.created != self.created:
return False
if other.authmethod != self.authmethod:
return False
if other.realm != self.realm:
return False
if other.authid != self.authid:
return False
if other.authconfig != self.authconfig:
return False
if other.principal_oid != self.principal_oid:
return False
return True
def __ne__(self, other):
return not self.__eq__(other)
def __str__(self):
return pprint.pformat(self.marshal())
[docs] def copy(self, other, overwrite=False):
"""
Copy over other object.
:param other: Other credential to copy data from.
:type other: instance of :class:`ManagementRealm`
:return:
"""
if (not self.oid and other.oid) or overwrite:
self.oid = other.oid
if (not self.created and other.created) or overwrite:
self.created = other.created
if (not self.authmethod and other.authmethod) or overwrite:
self.authmethod = other.authmethod
if (not self.realm and other.realm) or overwrite:
self.realm = other.realm
if (not self.authid and other.authid) or overwrite:
self.authid = other.authid
if (not self.authconfig and other.authconfig) or overwrite:
self.authconfig = other.authconfig
if (not self.principal_oid and other.principal_oid) or overwrite:
self.principal_oid = other.principal_oid
# _unknown is not copied!
[docs] def marshal(self):
"""
Marshal this object to a generic host language object.
:return: dict
"""
obj = {
'oid': str(self.oid) if self.oid else None,
'created': int(self.created) if self.created else None,
'authmethod': self.authmethod,
'realm': self.realm,
'authid': self.authid,
'authconfig': self.authconfig,
'principal_oid': str(self.principal_oid) if self.principal_oid else None,
}
if self._unknown:
# pass through all attributes unknown
obj.update(self._unknown)
return obj
[docs] @staticmethod
def parse(data):
"""
Parse generic host language object into an object of this class.
:param data: Generic host language object
:type data: dict
:return: instance of :class:`ManagementRealm`
"""
assert type(data) == dict
# future attributes (yet unknown) are not only ignored, but passed through!
_unknown = {}
for k in data:
if k not in ['oid', 'created', 'authmethod', 'realm', 'authid', 'authconfig', 'principal_oid']:
_unknown[k] = data[k]
oid = data.get('oid', None)
assert oid is None or type(oid) == str
if oid:
oid = UUID(oid)
created = data.get('created', None)
assert created is None or type(created) == int
if created:
created = np.datetime64(created, 'ns')
authmethod = data.get('authmethod', None)
assert authmethod is None or type(authmethod) == str
realm = data.get('realm', None)
assert realm is None or type(realm) == str
authid = data.get('authid', None)
assert authid is None or type(authid) == str
principal_oid = data.get('principal_oid', None)
assert principal_oid is None or type(principal_oid) == str
if principal_oid:
principal_oid = UUID(principal_oid)
authconfig = data.get('authconfig', None)
assert authconfig is None or type(authconfig) == dict
if authconfig:
if authmethod == 'cryptosign':
# check allowed keys
for key in authconfig:
if key not in ['authorized_keys']:
raise InvalidConfigException(
'invalid attribute "{}" in cryptosign config'.format(key))
# check required keys
if 'authorized_keys' not in authconfig or type(authconfig['authorized_keys']) != list:
raise InvalidConfigException(
'invalid type "{}" for authorized_keys in cryptosign config'.format(
type(authconfig['authorized_keys'])))
if len(authconfig['authorized_keys']) == 0:
raise InvalidConfigException(
'need at least one key in authorized_keys in cryptosign config')
authorized_keys = authconfig['authorized_keys']
for k in authorized_keys:
if type(k) != str or len(k) != 64:
raise InvalidConfigException(
'key in autheorized_keys must have type str[64] (was {}) in authorized_keys in cryptosign config'
.format(type(k)))
try:
binascii.a2b_hex(k)
except Exception as e:
raise InvalidConfigException(
'invalid key in autheorized_keys in authorized_keys in cryptosign config: {}'.
format(e))
elif authmethod == 'ticket':
# check allowed keys
for key in authconfig:
if key not in ['secret']:
raise InvalidConfigException('invalid attribute "{}" in ticket config'.format(key))
# check required keys
if 'secret' not in authconfig or type(authconfig['secret']) != str:
raise InvalidConfigException(
'invalid type "{}" for authorized_keys in ticket config'.format(
type(authconfig['authorized_keys'])))
elif authmethod == 'wampcra':
# check allowed keys
for key in authconfig:
if key not in ['secret', 'salt', 'iterations', 'keylen']:
raise InvalidConfigException('invalid attribute "{}" in wampcra config'.format(key))
# check required keys
if 'secret' not in authconfig or type(authconfig['secret']) != str:
raise InvalidConfigException('invalid type "{}" for secret in wampcra config'.format(
type(authconfig['secret'])))
if 'salt' in authconfig and type(authconfig['salt']) != str:
raise InvalidConfigException('invalid type "{}" for salt in wampcra config'.format(
type(authconfig['salt'])))
if 'iterations' in authconfig and type(authconfig['iterations']) != int:
raise InvalidConfigException('invalid type "{}" for iterations in wampcra config'.format(
type(authconfig['iterations'])))
if 'keylen' in authconfig and type(authconfig['keylen']) != int:
raise InvalidConfigException('invalid type "{}" for keylen in wampcra config'.format(
type(authconfig['keylen'])))
elif authmethod == 'tls':
raise NotImplementedError('FIXME: check tls authmethod configuration')
elif authmethod == 'scram':
# check allowed keys
for key in authconfig:
if key not in ['kdf', 'iterations', 'memory', 'salt', 'stored-key', 'server-key']:
raise InvalidConfigException('invalid attribute "{}" in scram config'.format(key))
# check required keys
for key, Type in [('kdf', str), ('iterations', int), ('memory', int), ('salt', str),
('stored-key', str), ('server-key', str)]:
if key not in authconfig or type(authconfig[key]) != Type:
raise InvalidConfigException('invalid type "{}" for secret in scram config'.format(
type(authconfig[key])))
elif authmethod == 'cookie':
raise NotImplementedError('FIXME: check cookie authmethod configuration')
elif authmethod == 'anonymous':
# there is nothing to configure for authmethod==anonymous (authid/authrole actually assigned
# is defined already "outside" this config dict)
for key in authconfig:
if key not in []:
raise InvalidConfigException('invalid attribute "{}" in anonymous config'.format(key))
else:
raise InvalidConfigException('invalid authmethod "{}"'.format(authmethod))
obj = Credential(oid=oid,
created=created,
authmethod=authmethod,
realm=realm,
authid=authid,
authconfig=authconfig,
principal_oid=principal_oid,
_unknown=_unknown)
return obj