Source code for aries_cloudagent.storage.in_memory

"""Basic in-memory storage implementation (non-wallet)."""

from typing import Mapping, Sequence

from ..core.in_memory import InMemoryProfile

from .base import (
    DEFAULT_PAGE_SIZE,
    BaseStorage,
    BaseStorageSearch,
    BaseStorageSearchSession,
    validate_record,
)
from .error import (
    StorageDuplicateError,
    StorageNotFoundError,
    StorageSearchError,
)
from .record import StorageRecord


[docs]class InMemoryStorage(BaseStorage, BaseStorageSearch): """Basic in-memory storage class.""" def __init__(self, profile: InMemoryProfile): """Initialize a `InMemoryStorage` instance. Args: profile: The in-memory profile instance """ self.profile = profile
[docs] async def add_record(self, record: StorageRecord): """Add a new record to the store. Args: record: `StorageRecord` to be stored Raises: StorageError: If no record is provided StorageError: If the record has no ID """ validate_record(record) if record.id in self.profile.records: raise StorageDuplicateError("Duplicate record") self.profile.records[record.id] = record
[docs] async def get_record( self, record_type: str, record_id: str, options: Mapping = None ) -> StorageRecord: """Fetch a record from the store by type and ID. Args: record_type: The record type record_id: The record id options: A dictionary of backend-specific options Returns: A `StorageRecord` instance Raises: StorageNotFoundError: If the record is not found """ row = self.profile.records.get(record_id) if row and row.type == record_type: return row raise StorageNotFoundError("Record not found: {}".format(record_id))
[docs] async def update_record(self, record: StorageRecord, value: str, tags: Mapping): """Update an existing stored record's value. Args: record: `StorageRecord` to update value: The new value tags: The new tags Raises: StorageNotFoundError: If record not found """ validate_record(record) oldrec = self.profile.records.get(record.id) if not oldrec: raise StorageNotFoundError("Record not found: {}".format(record.id)) self.profile.records[record.id] = oldrec._replace(value=value, tags=tags)
[docs] async def delete_record(self, record: StorageRecord): """Delete a record. Args: record: `StorageRecord` to delete Raises: StorageNotFoundError: If record not found """ validate_record(record, delete=True) if record.id not in self.profile.records: raise StorageNotFoundError("Record not found: {}".format(record.id)) del self.profile.records[record.id]
[docs] async def find_all_records( self, type_filter: str, tag_query: Mapping = None, options: Mapping = None, ): """Retrieve all records matching a particular type filter and tag query.""" results = [] for record in self.profile.records.values(): if record.type == type_filter and tag_query_match(record.tags, tag_query): results.append(record) return results
[docs] async def delete_all_records( self, type_filter: str, tag_query: Mapping = None, ): """Remove all records matching a particular type filter and tag query.""" ids = [] for record_id, record in self.profile.records.items(): if record.type == type_filter and tag_query_match(record.tags, tag_query): ids.append(record_id) for record_id in ids: del self.profile.records[record_id]
[docs] def search_records( self, type_filter: str, tag_query: Mapping = None, page_size: int = None, options: Mapping = None, ) -> "InMemoryStorageSearch": """Search stored records. Args: type_filter: Filter string tag_query: Tags to query page_size: Page size options: Dictionary of backend-specific options Returns: An instance of `InMemoryStorageSearch` """ return InMemoryStorageSearch( self.profile, type_filter, tag_query, page_size, options )
[docs]def tag_value_match(value: str, match: dict) -> bool: """Match a single tag against a tag subquery. TODO: What type coercion is needed? (support int or float values?) """ if len(match) != 1: raise StorageSearchError("Unsupported subquery: {}".format(match)) if value is None: return False op = list(match.keys())[0] cmp_val = match[op] if op == "$in": if not isinstance(cmp_val, list): raise StorageSearchError("Expected list for $in value") chk = value in cmp_val else: if not isinstance(cmp_val, str): raise StorageSearchError("Expected string for filter value") if op == "$neq": chk = value != cmp_val elif op == "$gt": chk = float(value) > float(cmp_val) elif op == "$gte": chk = float(value) >= float(cmp_val) elif op == "$lt": chk = float(value) < float(cmp_val) elif op == "$lte": chk = float(value) <= float(cmp_val) # elif op == "$like": NYI else: raise StorageSearchError(f"Unsupported match operator: {op}") return chk
[docs]def tag_query_match(tags: dict, tag_query: dict) -> bool: """Match simple tag filters (string values).""" result = True if not tags: tags = {} if tag_query: for k, v in tag_query.items(): if k == "$or": if not isinstance(v, list): raise StorageSearchError("Expected list for $or filter value") chk = False for opt in v: if tag_query_match(tags, opt): chk = True break elif k == "$and": if not isinstance(v, list): raise StorageSearchError("Expected list for $and filter value") chk = False for opt in v: if not tag_query_match(tags, opt): return False chk = True elif k == "$not": if not isinstance(v, dict): raise StorageSearchError("Expected dict for $not filter value") chk = not tag_query_match(tags, v) elif k[0] == "$": raise StorageSearchError("Unexpected filter operator: {}".format(k)) elif isinstance(v, str): chk = tags.get(k) == v elif isinstance(v, dict): chk = tag_value_match(tags.get(k), v) else: raise StorageSearchError( "Expected string or dict for filter value, got {}".format(v) ) if not chk: result = False break return result
[docs]class InMemoryStorageSearch(BaseStorageSearchSession): """Represent an active stored records search.""" def __init__( self, profile: InMemoryProfile, type_filter: str, tag_query: Mapping, page_size: int = None, options: Mapping = None, ): """Initialize a `InMemoryStorageSearch` instance. Args: profile: The in-memory profile to search type_filter: Filter string tag_query: Tags to search page_size: Size of page to return options: Dictionary of backend-specific options """ self._cache = profile.records.copy() self._iter = iter(self._cache) self.page_size = page_size or DEFAULT_PAGE_SIZE self.tag_query = tag_query self.type_filter = type_filter self._done = False
[docs] async def fetch(self, max_count: int = None) -> Sequence[StorageRecord]: """Fetch the next list of results from the store. Args: max_count: Max number of records to return. If not provided, defaults to the backend's preferred page size Returns: A list of `StorageRecord` instances Raises: StorageSearchError: If the search query has not been opened """ if self._cache is None and self._done: raise StorageSearchError("Search query is complete") ret = [] check_type = self.type_filter i = max_count or self.page_size while i > 0: try: id = next(self._iter) except StopIteration: break record = self._cache[id] if record.type == check_type and tag_query_match( record.tags, self.tag_query ): ret.append(record) i -= 1 if not ret: self._cache = None self._done = True return ret
[docs] async def close(self): """Dispose of the search query.""" self._cache = None self._done = True