Source code for py_abac.policy.rules

"""
    Policy rules class
"""

from typing import Union, List, Dict

from marshmallow import Schema, fields, post_load

from .conditions.attribute.base import validate_path
from .conditions.schema import ConditionSchema
from ..context import EvaluationContext


[docs]class Rules(object): """ Policy rules """ def __init__( self, subject: Union[List, Dict], resource: Union[List, Dict], action: Union[List, Dict], context: Union[List, Dict] ): self.subject = subject self.resource = resource self.action = action self.context = context
[docs] def is_satisfied(self, ctx: EvaluationContext): """ Check if request satisfies all conditions :param ctx: policy evaluation context :return: True if satisfied else False """ return self._is_satisfied("subject", self.subject, ctx) and \ self._is_satisfied("resource", self.resource, ctx) and \ self._is_satisfied("action", self.action, ctx) and \ self._is_satisfied("context", self.context, ctx)
def _is_satisfied(self, ace_name: str, ace_conditions, ctx: EvaluationContext): """ Check if the access control element satisfies request :param ace_name: access control element name :param ace_conditions: access control element conditions :param ctx: policy evaluation context :return: True if satisfied else False """ if isinstance(ace_conditions, list): return self._implicit_or(ace_name, ace_conditions, ctx) if isinstance(ace_conditions, dict): return self._implicit_and(ace_name, ace_conditions, ctx) # If ace is not in correct format, return False. This condition is just for best # practice and will never happen return False # pragma: no cover def _implicit_or(self, ace_name: str, ace_conditions: list, ctx: EvaluationContext): for _ace_conditions in ace_conditions: # If even one of the conditions is satisfied, return True if self._implicit_and(ace_name, _ace_conditions, ctx): return True # If no conditions are satisfied, return False return False @staticmethod def _implicit_and(ace_name: str, ace_conditions: dict, ctx: EvaluationContext): for attribute_path, condition in ace_conditions.items(): ctx.ace = ace_name ctx.attribute_path = attribute_path # If even one of the conditions is not satisfied, return False if not condition.is_satisfied(ctx): return False # If all conditions are satisfied, return True return True
[docs]class RuleField(fields.Field): """ Marshmallow field class for rules """ _implicit_and_field = fields.Dict( keys=fields.String(validate=validate_path), values=fields.Nested(ConditionSchema) ) _implicit_or_field = fields.List( fields.Dict( keys=fields.String(validate=validate_path), values=fields.Nested(ConditionSchema) ) ) def _serialize(self, value, attr, obj, **kwargs): if isinstance(value, list): return self._implicit_or_field._serialize(value, attr, obj, **kwargs) # pylint: disable=protected-access return self._implicit_and_field._serialize(value, attr, obj, **kwargs) # pylint: disable=protected-access def _deserialize(self, value, attr, data, **kwargs): if isinstance(value, list): return self._implicit_or_field.deserialize(value, attr, data, **kwargs) # pylint: disable=protected-access return self._implicit_and_field.deserialize(value, attr, data, **kwargs) # pylint: disable=protected-access
[docs]class RulesSchema(Schema): """ JSON schema for rules """ subject = RuleField(default={}, missing={}) resource = RuleField(default={}, missing={}) action = RuleField(default={}, missing={}) context = RuleField(default={}, missing={})
[docs] @post_load def post_load(self, data, **_): # pylint: disable=missing-docstring,no-self-use return Rules(**data)