##############################################################################
#
# Crossbar.io Database
# Copyright (c) typedef int GmbH. Licensed under MIT.
#
##############################################################################
import binascii
import pprint
from typing import Optional
from uuid import UUID
import numpy as np
from cfxdb._exception import InvalidConfigException
[docs]
class Credential(object):
"""
Credentials created for use with WAMP authentication.
"""
def __init__(
self,
oid: Optional[UUID] = None,
created: Optional[np.datetime64] = None,
authmethod: Optional[str] = None,
authid: Optional[str] = None,
realm: Optional[str] = None,
authconfig: Optional[dict] = None,
principal_oid: Optional[UUID] = None,
_unknown=None,
):
"""
:param oid: Object ID of this credential object
:param created: Timestamp when credential was created.
:param authmethod: WAMP authentication method offered by the authenticating client.
:param realm: WAMP realm requested by the authenticating client.
:param authid: WAMP authid announced by the authenticating client.
:param authconfig: Authentication method specific configuration.
:param principal_oid: ID of the principal this credential resolves to upon successful authentication.
"""
[docs]
self.authmethod = authmethod
[docs]
self.authconfig = authconfig
[docs]
self.principal_oid = principal_oid
# private member with unknown/untouched data passing through
[docs]
self._unknown = _unknown
[docs]
def __eq__(self, other):
if not isinstance(other, self.__class__):
return False
if other.oid != self.oid:
return False
if other.created != self.created:
return False
if other.authmethod != self.authmethod:
return False
if other.realm != self.realm:
return False
if other.authid != self.authid:
return False
if other.authconfig != self.authconfig:
return False
if other.principal_oid != self.principal_oid:
return False
return True
[docs]
def __ne__(self, other):
return not self.__eq__(other)
[docs]
def __str__(self):
return pprint.pformat(self.marshal())
[docs]
def copy(self, other, overwrite=False):
"""
Copy over other object.
:param other: Other credential to copy data from.
:type other: instance of :class:`ManagementRealm`
:return:
"""
if (not self.oid and other.oid) or overwrite:
self.oid = other.oid
if (not self.created and other.created) or overwrite:
self.created = other.created
if (not self.authmethod and other.authmethod) or overwrite:
self.authmethod = other.authmethod
if (not self.realm and other.realm) or overwrite:
self.realm = other.realm
if (not self.authid and other.authid) or overwrite:
self.authid = other.authid
if (not self.authconfig and other.authconfig) or overwrite:
self.authconfig = other.authconfig
if (not self.principal_oid and other.principal_oid) or overwrite:
self.principal_oid = other.principal_oid
# _unknown is not copied!
[docs]
def marshal(self):
"""
Marshal this object to a generic host language object.
:return: dict
"""
obj = {
"oid": str(self.oid) if self.oid else None,
"created": int(self.created) if self.created else None,
"authmethod": self.authmethod,
"realm": self.realm,
"authid": self.authid,
"authconfig": self.authconfig,
"principal_oid": str(self.principal_oid) if self.principal_oid else None,
}
if self._unknown:
# pass through all attributes unknown
obj.update(self._unknown)
return obj
@staticmethod
[docs]
def parse(data):
"""
Parse generic host language object into an object of this class.
:param data: Generic host language object
:type data: dict
:return: instance of :class:`ManagementRealm`
"""
assert type(data) == dict
# future attributes (yet unknown) are not only ignored, but passed through!
_unknown = {}
for k in data:
if k not in ["oid", "created", "authmethod", "realm", "authid", "authconfig", "principal_oid"]:
_unknown[k] = data[k]
oid = data.get("oid", None)
assert oid is None or type(oid) == str
if oid:
oid = UUID(oid)
created = data.get("created", None)
assert created is None or type(created) == int
if created:
created = np.datetime64(created, "ns")
authmethod = data.get("authmethod", None)
assert authmethod is None or type(authmethod) == str
realm = data.get("realm", None)
assert realm is None or type(realm) == str
authid = data.get("authid", None)
assert authid is None or type(authid) == str
principal_oid = data.get("principal_oid", None)
assert principal_oid is None or type(principal_oid) == str
if principal_oid:
principal_oid = UUID(principal_oid)
authconfig = data.get("authconfig", None)
assert authconfig is None or type(authconfig) == dict
if authconfig:
if authmethod == "cryptosign":
# check allowed keys
for key in authconfig:
if key not in ["authorized_keys"]:
raise InvalidConfigException('invalid attribute "{}" in cryptosign config'.format(key))
# check required keys
if "authorized_keys" not in authconfig or type(authconfig["authorized_keys"]) != list:
raise InvalidConfigException(
'invalid type "{}" for authorized_keys in cryptosign config'.format(
type(authconfig["authorized_keys"])
)
)
if len(authconfig["authorized_keys"]) == 0:
raise InvalidConfigException("need at least one key in authorized_keys in cryptosign config")
authorized_keys = authconfig["authorized_keys"]
for k in authorized_keys:
if type(k) != str or len(k) != 64:
raise InvalidConfigException(
"key in autheorized_keys must have type str[64] (was {}) in authorized_keys in cryptosign config".format(
type(k)
)
)
try:
binascii.a2b_hex(k)
except Exception as e:
raise InvalidConfigException(
"invalid key in autheorized_keys in authorized_keys in cryptosign config: {}".format(e)
)
elif authmethod == "ticket":
# check allowed keys
for key in authconfig:
if key not in ["secret"]:
raise InvalidConfigException('invalid attribute "{}" in ticket config'.format(key))
# check required keys
if "secret" not in authconfig or type(authconfig["secret"]) != str:
raise InvalidConfigException(
'invalid type "{}" for authorized_keys in ticket config'.format(
type(authconfig["authorized_keys"])
)
)
elif authmethod == "wampcra":
# check allowed keys
for key in authconfig:
if key not in ["secret", "salt", "iterations", "keylen"]:
raise InvalidConfigException('invalid attribute "{}" in wampcra config'.format(key))
# check required keys
if "secret" not in authconfig or type(authconfig["secret"]) != str:
raise InvalidConfigException(
'invalid type "{}" for secret in wampcra config'.format(type(authconfig["secret"]))
)
if "salt" in authconfig and type(authconfig["salt"]) != str:
raise InvalidConfigException(
'invalid type "{}" for salt in wampcra config'.format(type(authconfig["salt"]))
)
if "iterations" in authconfig and type(authconfig["iterations"]) != int:
raise InvalidConfigException(
'invalid type "{}" for iterations in wampcra config'.format(type(authconfig["iterations"]))
)
if "keylen" in authconfig and type(authconfig["keylen"]) != int:
raise InvalidConfigException(
'invalid type "{}" for keylen in wampcra config'.format(type(authconfig["keylen"]))
)
elif authmethod == "tls":
raise NotImplementedError("FIXME: check tls authmethod configuration")
elif authmethod == "scram":
# check allowed keys
for key in authconfig:
if key not in ["kdf", "iterations", "memory", "salt", "stored-key", "server-key"]:
raise InvalidConfigException('invalid attribute "{}" in scram config'.format(key))
# check required keys
for key, Type in [
("kdf", str),
("iterations", int),
("memory", int),
("salt", str),
("stored-key", str),
("server-key", str),
]:
if key not in authconfig or type(authconfig[key]) != Type:
raise InvalidConfigException(
'invalid type "{}" for secret in scram config'.format(type(authconfig[key]))
)
elif authmethod == "cookie":
raise NotImplementedError("FIXME: check cookie authmethod configuration")
elif authmethod == "anonymous":
# there is nothing to configure for authmethod==anonymous (authid/authrole actually assigned
# is defined already "outside" this config dict)
for key in authconfig:
if key not in []:
raise InvalidConfigException('invalid attribute "{}" in anonymous config'.format(key))
else:
raise InvalidConfigException('invalid authmethod "{}"'.format(authmethod))
obj = Credential(
oid=oid,
created=created,
authmethod=authmethod,
realm=realm,
authid=authid,
authconfig=authconfig,
principal_oid=principal_oid,
_unknown=_unknown,
)
return obj