Source code for cfxdb.mrealm.permission

##############################################################################
#
#                        Crossbar.io Database
#     Copyright (c) typedef int GmbH. Licensed under MIT.
#
##############################################################################

import pprint
from typing import List, Optional
from uuid import UUID

import numpy as np

from cfxdb.common import ConfigurationElement


[docs] class Permission(ConfigurationElement): """ Role permission database object. """
[docs] MATCH_TYPE_NONE: int = 0
[docs] MATCH_TYPE_EXACT: int = 1
[docs] MATCH_TYPE_PREFIX: int = 2
[docs] MATCH_TYPE_WILDCARD: int = 3
[docs] MATCH_TYPES_TOSTR = { 0: None, 1: "exact", 2: "prefix", 3: "wildcard", }
[docs] MATCH_TYPES_FROMSTR = {val: key for key, val in MATCH_TYPES_TOSTR.items()}
[docs] URI_CHECK_LEVEL_NONE: int = 0
[docs] URI_CHECK_LEVEL_STRICT: int = 1
[docs] URI_CHECK_LEVEL_LOOSE: int = 2
[docs] URI_CHECK_LEVELS_TOSTR = {0: None, 1: "strict", 2: "loose"}
[docs] URI_CHECK_LEVELS_FROMSTR = {val: key for key, val in URI_CHECK_LEVELS_TOSTR.items()}
def __init__( self, oid: Optional[UUID] = None, label: Optional[str] = None, description: Optional[str] = None, tags: Optional[List[str]] = None, role_oid: Optional[UUID] = None, uri: Optional[str] = None, uri_check_level: Optional[int] = None, match: Optional[int] = None, allow_call: Optional[bool] = None, allow_register: Optional[bool] = None, allow_publish: Optional[bool] = None, allow_subscribe: Optional[bool] = None, disclose_caller: Optional[bool] = None, disclose_publisher: Optional[bool] = None, cache: Optional[bool] = None, created: Optional[np.datetime64] = None, owner: Optional[UUID] = None, _unknown=None, ): """ :param oid: Object ID of this permission object :param label: Optional user label of permission :param description: Optional user description of permission :param tags: Optional list of user tags on permission :param role_oid: Object ID of role this permission applies to. :param uri: URI matched for permission. :param created: Timestamp when the permission was created :param owner: Owning user (object ID) """ ConfigurationElement.__init__(self, oid=oid, label=label, description=description, tags=tags)
[docs] self.role_oid = role_oid
[docs] self.uri = uri
[docs] self.uri_check_level = uri_check_level
[docs] self.match = match
[docs] self.allow_call = allow_call
[docs] self.allow_register = allow_register
[docs] self.allow_publish = allow_publish
[docs] self.allow_subscribe = allow_subscribe
[docs] self.disclose_caller = disclose_caller
[docs] self.disclose_publisher = disclose_publisher
[docs] self.cache = cache
[docs] self.created = created
[docs] self.owner = owner
# private member with unknown/untouched data passing through
[docs] self._unknown = _unknown
[docs] def __eq__(self, other): if not isinstance(other, self.__class__): return False if not ConfigurationElement.__eq__(self, other): return False if other.role_oid != self.role_oid: return False if other.uri != self.uri: return False if other.uri_check_level != self.uri_check_level: return False if other.match != self.match: return False if other.allow_call != self.allow_call: return False if other.allow_register != self.allow_register: return False if other.allow_publish != self.allow_publish: return False if other.allow_subscribe != self.allow_subscribe: return False if other.disclose_caller != self.disclose_caller: return False if other.disclose_publisher != self.disclose_publisher: return False if other.cache != self.cache: return False if other.created != self.created: return False if other.owner != self.owner: return False return True
[docs] def __ne__(self, other): return not self.__eq__(other)
[docs] def __str__(self): return pprint.pformat(self.marshal())
[docs] def copy(self, other, overwrite=False): """ Copy over other object. :param other: Other permission to copy data from. :type other: instance of :class:`ManagementRealm` :return: """ ConfigurationElement.copy(self, other, overwrite=overwrite) if (not self.role_oid and other.role_oid) or overwrite: self.role_oid = other.role_oid if (not self.uri and other.uri) or overwrite: self.uri = other.uri if (not self.uri_check_level and other.uri_check_level) or overwrite: self.uri_check_level = other.uri_check_level if (not self.match and other.match) or overwrite: self.match = other.match if (not self.allow_call and other.allow_call) or overwrite: self.allow_call = other.allow_call if (not self.allow_register and other.allow_register) or overwrite: self.allow_register = other.allow_register if (not self.allow_publish and other.allow_publish) or overwrite: self.allow_publish = other.allow_publish if (not self.allow_subscribe and other.allow_subscribe) or overwrite: self.allow_subscribe = other.allow_subscribe if (not self.disclose_caller and other.disclose_caller) or overwrite: self.disclose_caller = other.disclose_caller if (not self.disclose_publisher and other.disclose_publisher) or overwrite: self.disclose_publisher = other.disclose_publisher if (not self.cache and other.cache) or overwrite: self.cache = other.cache if (not self.created and other.created) or overwrite: self.created = other.created if (not self.owner and other.owner) or overwrite: self.owner = other.owner
# _unknown is not copied!
[docs] def marshal(self): """ Marshal this object to a generic host language object. :return: dict """ obj = ConfigurationElement.marshal(self) obj.update( { "oid": str(self.oid) if self.oid else None, "role_oid": str(self.role_oid) if self.role_oid else None, "uri": self.uri, "uri_check_level": Permission.URI_CHECK_LEVELS_TOSTR[self.uri_check_level] if self.uri_check_level else None, "match": Permission.MATCH_TYPES_TOSTR[self.match] if self.match else None, "allow_call": self.allow_call, "allow_register": self.allow_register, "allow_publish": self.allow_publish, "allow_subscribe": self.allow_subscribe, "disclose_caller": self.disclose_caller, "disclose_publisher": self.disclose_publisher, "cache": self.cache, "created": int(self.created) if self.created else None, "owner": str(self.owner) if self.owner else None, } ) if self._unknown: # pass through all attributes unknown obj.update(self._unknown) return obj
@staticmethod
[docs] def parse(data): """ Parse generic host language object into an object of this class. :param data: Generic host language object :type data: dict :return: instance of :class:`ManagementRealm` """ assert type(data) == dict obj = ConfigurationElement.parse(data) data = obj._unknown # future attributes (yet unknown) are not only ignored, but passed through! _unknown = {} for k in data: if k not in [ "oid", "role_oid", "uri", "uri_check_level", "match", "allow_call", "allow_register", "allow_publish", "allow_subscribe", "disclose_caller", "disclose_publisher", "cache", "owner", "created", ]: _unknown[k] = data[k] role_oid = data.get("role_oid", None) assert role_oid is None or type(role_oid) == str if role_oid: role_oid = UUID(role_oid) uri = data.get("uri", None) assert uri is None or type(uri) == str uri_check_level = data.get("uri_check_level", None) assert uri_check_level is None or type(uri_check_level) == str if uri_check_level: assert uri_check_level in Permission.URI_CHECK_LEVELS_FROMSTR uri_check_level = Permission.URI_CHECK_LEVELS_FROMSTR[uri_check_level] match = data.get("match", None) assert match is None or type(match) == str if match: assert match in Permission.MATCH_TYPES_FROMSTR match = Permission.MATCH_TYPES_FROMSTR[match] allow_call = data.get("allow_call", None) assert allow_call is None or type(allow_call) == bool allow_register = data.get("allow_register", None) assert allow_register is None or type(allow_register) == bool allow_publish = data.get("allow_publish", None) assert allow_publish is None or type(allow_publish) == bool allow_subscribe = data.get("allow_subscribe", None) assert allow_subscribe is None or type(allow_subscribe) == bool disclose_caller = data.get("disclose_caller", None) assert disclose_caller is None or type(disclose_caller) == bool disclose_publisher = data.get("disclose_publisher", None) assert disclose_publisher is None or type(disclose_publisher) == bool cache = data.get("cache", None) assert cache is None or type(cache) == bool created = data.get("created", None) assert created is None or type(created) == int if created: created = np.datetime64(created, "ns") owner = data.get("owner", None) assert owner is None or type(owner) == str if owner: owner = UUID(owner) obj = Permission( oid=obj.oid, label=obj.label, description=obj.description, tags=obj.tags, role_oid=role_oid, uri=uri, uri_check_level=uri_check_level, match=match, allow_call=allow_call, allow_register=allow_register, allow_publish=allow_publish, allow_subscribe=allow_subscribe, disclose_caller=disclose_caller, disclose_publisher=disclose_publisher, cache=cache, created=created, owner=owner, _unknown=_unknown, ) return obj