Source code for cfxdb.mrealm.permission

##############################################################################
#
#                        Crossbar.io Database
#     Copyright (c) Crossbar.io Technologies GmbH. Licensed under MIT.
#
##############################################################################

from typing import Optional, List
import pprint
from uuid import UUID

import numpy as np

from cfxdb.common import ConfigurationElement


[docs]class Permission(ConfigurationElement): """ Role permission database object. """ MATCH_TYPE_NONE: int = 0 MATCH_TYPE_EXACT: int = 1 MATCH_TYPE_PREFIX: int = 2 MATCH_TYPE_WILDCARD: int = 3 MATCH_TYPES_TOSTR = { 0: None, 1: 'exact', 2: 'prefix', 3: 'wildcard', } MATCH_TYPES_FROMSTR = {val: key for key, val in MATCH_TYPES_TOSTR.items()} URI_CHECK_LEVEL_NONE: int = 0 URI_CHECK_LEVEL_STRICT: int = 1 URI_CHECK_LEVEL_LOOSE: int = 2 URI_CHECK_LEVELS_TOSTR = {0: None, 1: 'strict', 2: 'loose'} URI_CHECK_LEVELS_FROMSTR = {val: key for key, val in URI_CHECK_LEVELS_TOSTR.items()} def __init__(self, oid: Optional[UUID] = None, label: Optional[str] = None, description: Optional[str] = None, tags: Optional[List[str]] = None, role_oid: Optional[UUID] = None, uri: Optional[str] = None, uri_check_level: Optional[int] = None, match: Optional[int] = None, allow_call: Optional[bool] = None, allow_register: Optional[bool] = None, allow_publish: Optional[bool] = None, allow_subscribe: Optional[bool] = None, disclose_caller: Optional[bool] = None, disclose_publisher: Optional[bool] = None, cache: Optional[bool] = None, created: Optional[np.datetime64] = None, owner: Optional[UUID] = None, _unknown=None): """ :param oid: Object ID of this permission object :param label: Optional user label of permission :param description: Optional user description of permission :param tags: Optional list of user tags on permission :param role_oid: Object ID of role this permission applies to. :param uri: URI matched for permission. :param created: Timestamp when the permission was created :param owner: Owning user (object ID) """ ConfigurationElement.__init__(self, oid=oid, label=label, description=description, tags=tags) self.role_oid = role_oid self.uri = uri self.uri_check_level = uri_check_level self.match = match self.allow_call = allow_call self.allow_register = allow_register self.allow_publish = allow_publish self.allow_subscribe = allow_subscribe self.disclose_caller = disclose_caller self.disclose_publisher = disclose_publisher self.cache = cache self.created = created self.owner = owner # private member with unknown/untouched data passing through self._unknown = _unknown def __eq__(self, other): if not isinstance(other, self.__class__): return False if not ConfigurationElement.__eq__(self, other): return False if other.role_oid != self.role_oid: return False if other.uri != self.uri: return False if other.uri_check_level != self.uri_check_level: return False if other.match != self.match: return False if other.allow_call != self.allow_call: return False if other.allow_register != self.allow_register: return False if other.allow_publish != self.allow_publish: return False if other.allow_subscribe != self.allow_subscribe: return False if other.disclose_caller != self.disclose_caller: return False if other.disclose_publisher != self.disclose_publisher: return False if other.cache != self.cache: return False if other.created != self.created: return False if other.owner != self.owner: return False return True def __ne__(self, other): return not self.__eq__(other) def __str__(self): return pprint.pformat(self.marshal())
[docs] def copy(self, other, overwrite=False): """ Copy over other object. :param other: Other permission to copy data from. :type other: instance of :class:`ManagementRealm` :return: """ ConfigurationElement.copy(self, other, overwrite=overwrite) if (not self.role_oid and other.role_oid) or overwrite: self.role_oid = other.role_oid if (not self.uri and other.uri) or overwrite: self.uri = other.uri if (not self.uri_check_level and other.uri_check_level) or overwrite: self.uri_check_level = other.uri_check_level if (not self.match and other.match) or overwrite: self.match = other.match if (not self.allow_call and other.allow_call) or overwrite: self.allow_call = other.allow_call if (not self.allow_register and other.allow_register) or overwrite: self.allow_register = other.allow_register if (not self.allow_publish and other.allow_publish) or overwrite: self.allow_publish = other.allow_publish if (not self.allow_subscribe and other.allow_subscribe) or overwrite: self.allow_subscribe = other.allow_subscribe if (not self.disclose_caller and other.disclose_caller) or overwrite: self.disclose_caller = other.disclose_caller if (not self.disclose_publisher and other.disclose_publisher) or overwrite: self.disclose_publisher = other.disclose_publisher if (not self.cache and other.cache) or overwrite: self.cache = other.cache if (not self.created and other.created) or overwrite: self.created = other.created if (not self.owner and other.owner) or overwrite: self.owner = other.owner
# _unknown is not copied!
[docs] def marshal(self): """ Marshal this object to a generic host language object. :return: dict """ obj = ConfigurationElement.marshal(self) obj.update({ 'oid': str(self.oid) if self.oid else None, 'role_oid': str(self.role_oid) if self.role_oid else None, 'uri': self.uri, 'uri_check_level': Permission.URI_CHECK_LEVELS_TOSTR[self.uri_check_level] if self.uri_check_level else None, 'match': Permission.MATCH_TYPES_TOSTR[self.match] if self.match else None, 'allow_call': self.allow_call, 'allow_register': self.allow_register, 'allow_publish': self.allow_publish, 'allow_subscribe': self.allow_subscribe, 'disclose_caller': self.disclose_caller, 'disclose_publisher': self.disclose_publisher, 'cache': self.cache, 'created': int(self.created) if self.created else None, 'owner': str(self.owner) if self.owner else None, }) if self._unknown: # pass through all attributes unknown obj.update(self._unknown) return obj
[docs] @staticmethod def parse(data): """ Parse generic host language object into an object of this class. :param data: Generic host language object :type data: dict :return: instance of :class:`ManagementRealm` """ assert type(data) == dict obj = ConfigurationElement.parse(data) data = obj._unknown # future attributes (yet unknown) are not only ignored, but passed through! _unknown = {} for k in data: if k not in [ 'oid', 'role_oid', 'uri', 'uri_check_level', 'match', 'allow_call', 'allow_register', 'allow_publish', 'allow_subscribe', 'disclose_caller', 'disclose_publisher', 'cache', 'owner', 'created' ]: _unknown[k] = data[k] role_oid = data.get('role_oid', None) assert role_oid is None or type(role_oid) == str if role_oid: role_oid = UUID(role_oid) uri = data.get('uri', None) assert uri is None or type(uri) == str uri_check_level = data.get('uri_check_level', None) assert uri_check_level is None or type(uri_check_level) == str if uri_check_level: assert uri_check_level in Permission.URI_CHECK_LEVELS_FROMSTR uri_check_level = Permission.URI_CHECK_LEVELS_FROMSTR[uri_check_level] match = data.get('match', None) assert match is None or type(match) == str if match: assert match in Permission.MATCH_TYPES_FROMSTR match = Permission.MATCH_TYPES_FROMSTR[match] allow_call = data.get('allow_call', None) assert allow_call is None or type(allow_call) == bool allow_register = data.get('allow_register', None) assert allow_register is None or type(allow_register) == bool allow_publish = data.get('allow_publish', None) assert allow_publish is None or type(allow_publish) == bool allow_subscribe = data.get('allow_subscribe', None) assert allow_subscribe is None or type(allow_subscribe) == bool disclose_caller = data.get('disclose_caller', None) assert disclose_caller is None or type(disclose_caller) == bool disclose_publisher = data.get('disclose_publisher', None) assert disclose_publisher is None or type(disclose_publisher) == bool cache = data.get('cache', None) assert cache is None or type(cache) == bool created = data.get('created', None) assert created is None or type(created) == int if created: created = np.datetime64(created, 'ns') owner = data.get('owner', None) assert owner is None or type(owner) == str if owner: owner = UUID(owner) obj = Permission(oid=obj.oid, label=obj.label, description=obj.description, tags=obj.tags, role_oid=role_oid, uri=uri, uri_check_level=uri_check_level, match=match, allow_call=allow_call, allow_register=allow_register, allow_publish=allow_publish, allow_subscribe=allow_subscribe, disclose_caller=disclose_caller, disclose_publisher=disclose_publisher, cache=cache, created=created, owner=owner, _unknown=_unknown) return obj