"""Basic in-memory storage implementation (non-wallet)."""
from collections import OrderedDict
from typing import Mapping, Sequence
from .base import BaseStorage, BaseStorageRecordSearch
from .error import (
StorageError,
StorageDuplicateError,
StorageNotFoundError,
StorageSearchError,
)
from .record import StorageRecord
from ..wallet.base import BaseWallet
[docs]class BasicStorage(BaseStorage):
"""Basic in-memory storage class."""
def __init__(self, _wallet: BaseWallet = None):
"""
Initialize a `BasicStorage` instance.
Args:
_wallet: The wallet implementation to use
"""
self._records = OrderedDict()
[docs] async def add_record(self, record: StorageRecord):
"""
Add a new record to the store.
Args:
record: `StorageRecord` to be stored
Raises:
StorageError: If no record is provided
StorageError: If the record has no ID
"""
if not record:
raise StorageError("No record provided")
if not record.id:
raise StorageError("Record has no ID")
if record.id in self._records:
raise StorageDuplicateError("Duplicate record")
self._records[record.id] = record
[docs] async def get_record(
self, record_type: str, record_id: str, options: Mapping = None
) -> StorageRecord:
"""
Fetch a record from the store by type and ID.
Args:
record_type: The record type
record_id: The record id
options: A dictionary of backend-specific options
Returns:
A `StorageRecord` instance
Raises:
StorageNotFoundError: If the record is not found
"""
row = self._records.get(record_id)
if row and row.type == record_type:
return row
if not row:
raise StorageNotFoundError("Record not found: {}".format(record_id))
[docs] async def update_record_value(self, record: StorageRecord, value: str):
"""
Update an existing stored record's value.
Args:
record: `StorageRecord` to update
value: The new value
Raises:
StorageNotFoundError: If record not found
"""
oldrec = self._records.get(record.id)
if not oldrec:
raise StorageNotFoundError("Record not found: {}".format(record.id))
self._records[record.id] = oldrec._replace(value=value)
[docs] async def delete_record(self, record: StorageRecord):
"""
Delete a record.
Args:
record: `StorageRecord` to delete
Raises:
StorageNotFoundError: If record not found
"""
if record.id not in self._records:
raise StorageNotFoundError("Record not found: {}".format(record.id))
del self._records[record.id]
[docs] def search_records(
self,
type_filter: str,
tag_query: Mapping = None,
page_size: int = None,
options: Mapping = None,
) -> "BasicStorageRecordSearch":
"""
Search stored records.
Args:
type_filter: Filter string
tag_query: Tags to query
page_size: Page size
options: Dictionary of backend-specific options
Returns:
An instance of `BaseStorageRecordSearch`
"""
return BasicStorageRecordSearch(
self, type_filter, tag_query, page_size, options
)
[docs]def basic_tag_value_match(value: str, match: dict) -> bool:
"""Match a single tag against a tag subquery.
TODO: What type coercion is needed? (support int or float values?)
"""
if len(match) != 1:
raise StorageSearchError("Unsupported subquery: {}".format(match))
if value is None:
return False
op = list(match.keys())[0]
cmp_val = match[op]
if op == "$in":
if not isinstance(cmp_val, list):
raise StorageSearchError("Expected list for $in value")
chk = value in cmp_val
else:
if not isinstance(cmp_val, str):
raise StorageSearchError("Expected string for filter value")
if op == "$neq":
chk = value != cmp_val
elif op == "$gt":
chk = value > cmp_val
elif op == "$gte":
chk = value >= cmp_val
elif op == "$lt":
chk = value < cmp_val
elif op == "$lte":
chk = value <= cmp_val
# elif op == "$like": NYI
else:
raise StorageSearchError("Unsupported match operator: ".format(op))
return chk
[docs]def basic_tag_query_match(tags: dict, tag_query: dict) -> bool:
"""Match simple tag filters (string values)."""
result = True
if not tags:
tags = {}
if tag_query:
for k, v in tag_query.items():
if k == "$or":
if not isinstance(v, list):
raise StorageSearchError("Expected list for $or filter value")
chk = False
for opt in v:
if basic_tag_query_match(tags, opt):
chk = True
break
elif k == "$not":
if not isinstance(v, dict):
raise StorageSearchError("Expected dict for $not filter value")
chk = not basic_tag_query_match(tags, v)
elif k[0] == "$":
raise StorageSearchError("Unexpected filter operator: {}".format(k))
elif isinstance(v, str):
chk = tags.get(k) == v
elif isinstance(v, dict):
chk = basic_tag_value_match(tags.get(k), v)
else:
raise StorageSearchError(
"Expected string or dict for filter value, got {}".format(v)
)
if not chk:
result = False
break
return result
[docs]class BasicStorageRecordSearch(BaseStorageRecordSearch):
"""Represent an active stored records search."""
def __init__(
self,
store: BasicStorage,
type_filter: str,
tag_query: Mapping,
page_size: int = None,
options: Mapping = None,
):
"""
Initialize a `BasicStorageRecordSearch` instance.
Args:
store: `BaseStorage` to search
type_filter: Filter string
tag_query: Tags to search
page_size: Size of page to return
options: Dictionary of backend-specific options
"""
super(BasicStorageRecordSearch, self).__init__(
store, type_filter, tag_query, page_size, options
)
self._cache = None
self._iter = None
@property
def opened(self) -> bool:
"""
Accessor for open state.
Returns:
True if opened, else False
"""
return self._cache is not None
[docs] async def fetch(self, max_count: int) -> Sequence[StorageRecord]:
"""
Fetch the next list of results from the store.
Args:
max_count: Max number of records to return
Returns:
A list of `StorageRecord`
Raises:
StorageSearchError: If the search query has not been opened
"""
if not self.opened:
raise StorageSearchError("Search query has not been opened")
ret = []
check_type = self.type_filter
i = max_count
while i > 0:
try:
id = next(self._iter)
except StopIteration:
break
record = self._cache[id]
if record.type == check_type and basic_tag_query_match(
record.tags, self.tag_query
):
ret.append(record)
i -= 1
return ret
[docs] async def open(self):
"""Start the search query."""
self._cache = self._store._records.copy()
self._iter = iter(self._cache)
[docs] async def close(self):
"""Dispose of the search query."""
self._cache = None