##############################################################################
#
# Crossbar.io Database
# Copyright (c) typedef int GmbH. Licensed under MIT.
#
##############################################################################
import pprint
from typing import List, Optional
from uuid import UUID
import numpy as np
from cfxdb.common import ConfigurationElement
[docs]
class Permission(ConfigurationElement):
"""
Role permission database object.
"""
[docs]
MATCH_TYPE_NONE: int = 0
[docs]
MATCH_TYPE_EXACT: int = 1
[docs]
MATCH_TYPE_PREFIX: int = 2
[docs]
MATCH_TYPE_WILDCARD: int = 3
[docs]
MATCH_TYPES_TOSTR = {
0: None,
1: "exact",
2: "prefix",
3: "wildcard",
}
[docs]
MATCH_TYPES_FROMSTR = {val: key for key, val in MATCH_TYPES_TOSTR.items()}
[docs]
URI_CHECK_LEVEL_NONE: int = 0
[docs]
URI_CHECK_LEVEL_STRICT: int = 1
[docs]
URI_CHECK_LEVEL_LOOSE: int = 2
[docs]
URI_CHECK_LEVELS_TOSTR = {0: None, 1: "strict", 2: "loose"}
[docs]
URI_CHECK_LEVELS_FROMSTR = {val: key for key, val in URI_CHECK_LEVELS_TOSTR.items()}
def __init__(
self,
oid: Optional[UUID] = None,
label: Optional[str] = None,
description: Optional[str] = None,
tags: Optional[List[str]] = None,
role_oid: Optional[UUID] = None,
uri: Optional[str] = None,
uri_check_level: Optional[int] = None,
match: Optional[int] = None,
allow_call: Optional[bool] = None,
allow_register: Optional[bool] = None,
allow_publish: Optional[bool] = None,
allow_subscribe: Optional[bool] = None,
disclose_caller: Optional[bool] = None,
disclose_publisher: Optional[bool] = None,
cache: Optional[bool] = None,
created: Optional[np.datetime64] = None,
owner: Optional[UUID] = None,
_unknown=None,
):
"""
:param oid: Object ID of this permission object
:param label: Optional user label of permission
:param description: Optional user description of permission
:param tags: Optional list of user tags on permission
:param role_oid: Object ID of role this permission applies to.
:param uri: URI matched for permission.
:param created: Timestamp when the permission was created
:param owner: Owning user (object ID)
"""
ConfigurationElement.__init__(self, oid=oid, label=label, description=description, tags=tags)
[docs]
self.role_oid = role_oid
[docs]
self.uri_check_level = uri_check_level
[docs]
self.allow_call = allow_call
[docs]
self.allow_register = allow_register
[docs]
self.allow_publish = allow_publish
[docs]
self.allow_subscribe = allow_subscribe
[docs]
self.disclose_caller = disclose_caller
[docs]
self.disclose_publisher = disclose_publisher
# private member with unknown/untouched data passing through
[docs]
self._unknown = _unknown
[docs]
def __eq__(self, other):
if not isinstance(other, self.__class__):
return False
if not ConfigurationElement.__eq__(self, other):
return False
if other.role_oid != self.role_oid:
return False
if other.uri != self.uri:
return False
if other.uri_check_level != self.uri_check_level:
return False
if other.match != self.match:
return False
if other.allow_call != self.allow_call:
return False
if other.allow_register != self.allow_register:
return False
if other.allow_publish != self.allow_publish:
return False
if other.allow_subscribe != self.allow_subscribe:
return False
if other.disclose_caller != self.disclose_caller:
return False
if other.disclose_publisher != self.disclose_publisher:
return False
if other.cache != self.cache:
return False
if other.created != self.created:
return False
if other.owner != self.owner:
return False
return True
[docs]
def __ne__(self, other):
return not self.__eq__(other)
[docs]
def __str__(self):
return pprint.pformat(self.marshal())
[docs]
def copy(self, other, overwrite=False):
"""
Copy over other object.
:param other: Other permission to copy data from.
:type other: instance of :class:`ManagementRealm`
:return:
"""
ConfigurationElement.copy(self, other, overwrite=overwrite)
if (not self.role_oid and other.role_oid) or overwrite:
self.role_oid = other.role_oid
if (not self.uri and other.uri) or overwrite:
self.uri = other.uri
if (not self.uri_check_level and other.uri_check_level) or overwrite:
self.uri_check_level = other.uri_check_level
if (not self.match and other.match) or overwrite:
self.match = other.match
if (not self.allow_call and other.allow_call) or overwrite:
self.allow_call = other.allow_call
if (not self.allow_register and other.allow_register) or overwrite:
self.allow_register = other.allow_register
if (not self.allow_publish and other.allow_publish) or overwrite:
self.allow_publish = other.allow_publish
if (not self.allow_subscribe and other.allow_subscribe) or overwrite:
self.allow_subscribe = other.allow_subscribe
if (not self.disclose_caller and other.disclose_caller) or overwrite:
self.disclose_caller = other.disclose_caller
if (not self.disclose_publisher and other.disclose_publisher) or overwrite:
self.disclose_publisher = other.disclose_publisher
if (not self.cache and other.cache) or overwrite:
self.cache = other.cache
if (not self.created and other.created) or overwrite:
self.created = other.created
if (not self.owner and other.owner) or overwrite:
self.owner = other.owner
# _unknown is not copied!
[docs]
def marshal(self):
"""
Marshal this object to a generic host language object.
:return: dict
"""
obj = ConfigurationElement.marshal(self)
obj.update(
{
"oid": str(self.oid) if self.oid else None,
"role_oid": str(self.role_oid) if self.role_oid else None,
"uri": self.uri,
"uri_check_level": Permission.URI_CHECK_LEVELS_TOSTR[self.uri_check_level]
if self.uri_check_level
else None,
"match": Permission.MATCH_TYPES_TOSTR[self.match] if self.match else None,
"allow_call": self.allow_call,
"allow_register": self.allow_register,
"allow_publish": self.allow_publish,
"allow_subscribe": self.allow_subscribe,
"disclose_caller": self.disclose_caller,
"disclose_publisher": self.disclose_publisher,
"cache": self.cache,
"created": int(self.created) if self.created else None,
"owner": str(self.owner) if self.owner else None,
}
)
if self._unknown:
# pass through all attributes unknown
obj.update(self._unknown)
return obj
@staticmethod
[docs]
def parse(data):
"""
Parse generic host language object into an object of this class.
:param data: Generic host language object
:type data: dict
:return: instance of :class:`ManagementRealm`
"""
assert type(data) == dict
obj = ConfigurationElement.parse(data)
data = obj._unknown
# future attributes (yet unknown) are not only ignored, but passed through!
_unknown = {}
for k in data:
if k not in [
"oid",
"role_oid",
"uri",
"uri_check_level",
"match",
"allow_call",
"allow_register",
"allow_publish",
"allow_subscribe",
"disclose_caller",
"disclose_publisher",
"cache",
"owner",
"created",
]:
_unknown[k] = data[k]
role_oid = data.get("role_oid", None)
assert role_oid is None or type(role_oid) == str
if role_oid:
role_oid = UUID(role_oid)
uri = data.get("uri", None)
assert uri is None or type(uri) == str
uri_check_level = data.get("uri_check_level", None)
assert uri_check_level is None or type(uri_check_level) == str
if uri_check_level:
assert uri_check_level in Permission.URI_CHECK_LEVELS_FROMSTR
uri_check_level = Permission.URI_CHECK_LEVELS_FROMSTR[uri_check_level]
match = data.get("match", None)
assert match is None or type(match) == str
if match:
assert match in Permission.MATCH_TYPES_FROMSTR
match = Permission.MATCH_TYPES_FROMSTR[match]
allow_call = data.get("allow_call", None)
assert allow_call is None or type(allow_call) == bool
allow_register = data.get("allow_register", None)
assert allow_register is None or type(allow_register) == bool
allow_publish = data.get("allow_publish", None)
assert allow_publish is None or type(allow_publish) == bool
allow_subscribe = data.get("allow_subscribe", None)
assert allow_subscribe is None or type(allow_subscribe) == bool
disclose_caller = data.get("disclose_caller", None)
assert disclose_caller is None or type(disclose_caller) == bool
disclose_publisher = data.get("disclose_publisher", None)
assert disclose_publisher is None or type(disclose_publisher) == bool
cache = data.get("cache", None)
assert cache is None or type(cache) == bool
created = data.get("created", None)
assert created is None or type(created) == int
if created:
created = np.datetime64(created, "ns")
owner = data.get("owner", None)
assert owner is None or type(owner) == str
if owner:
owner = UUID(owner)
obj = Permission(
oid=obj.oid,
label=obj.label,
description=obj.description,
tags=obj.tags,
role_oid=role_oid,
uri=uri,
uri_check_level=uri_check_level,
match=match,
allow_call=allow_call,
allow_register=allow_register,
allow_publish=allow_publish,
allow_subscribe=allow_subscribe,
disclose_caller=disclose_caller,
disclose_publisher=disclose_publisher,
cache=cache,
created=created,
owner=owner,
_unknown=_unknown,
)
return obj