Source code for py_abac.storage.file.storage

"""
    File storage implementation
"""

import logging
import os
import shelve
from itertools import islice
from typing import Union, Generator

from ..base import Storage
from ...exceptions import PolicyExistsError
from ...policy import Policy

LOG = logging.getLogger(__name__)


[docs]class FileStorage(Storage): """ Stores and retrieves policies from files on disk. .. important:: This storage does not yet perform ACID transactions. It uses the python shelve package for storage which does not support transaction based access to stored objects. Thus so not use it in distributed applications. There is a separate project under development to address this issue. :param storage_dir: path to directory where storage files are to be saved. """ # Policy storage file name POLICY_FILE = "policies" def __init__(self, storage_dir: str): # Create path directory if not exists os.makedirs(storage_dir, exist_ok=True) self._file = "{}/{}".format(os.path.abspath(storage_dir), self.POLICY_FILE)
[docs] def add(self, policy: Policy): """ Store a policy """ with shelve.open(self._file, flag='c', writeback=True) as curr: if policy.uid in curr: raise PolicyExistsError(policy.uid) curr[policy.uid] = policy.to_json() LOG.info('Added Policy: %s', policy)
[docs] def get(self, uid: str) -> Union[Policy, None]: """ Get specific policy """ with shelve.open(self._file, flag='r') as curr: policy_json = curr.get(uid, None) policy = Policy.from_json(policy_json) if policy_json else None return policy
[docs] def get_all(self, limit: int, offset: int) -> Generator[Policy, None, None]: """ Retrieve all the policies within a window. .. note: Currently all policies are retrieved from storage and then sliced to given limit and offset. This issue will get resolved once indexing is supported for file storage. """ self._check_limit_and_offset(limit, offset) with shelve.open(self._file, flag="r") as curr: # Note: python by default sorts dict by key policies = islice(curr.values(), offset, offset + limit) for policy_json in policies: yield Policy.from_json(policy_json)
[docs] def get_for_target( self, subject_id: str, resource_id: str, action_id: str ) -> Generator[Policy, None, None]: """ Get all policies for given target IDs. .. note: Currently all policies are returned for evaluation. """ # TODO: Create glob match based topologically sorted graph index for filtering with shelve.open(self._file, flag="r") as curr: for policy_json in curr.values(): yield Policy.from_json(policy_json)
[docs] def update(self, policy: Policy): """ Update a policy """ with shelve.open(self._file, flag='c', writeback=True) as curr: if policy.uid not in curr: raise ValueError("Policy with UID='{}' does not exist.".format(policy.uid)) curr[policy.uid] = policy.to_json() LOG.info('Updated Policy with UID=%s. New value is: %s', policy.uid, policy)
[docs] def delete(self, uid: str): """ Delete a policy """ with shelve.open(self._file, flag='c', writeback=True) as curr: if uid not in curr: raise ValueError("Policy with UID='{}' does not exist.".format(uid)) curr.pop(uid) LOG.info('Deleted Policy with UID=%s.', uid)