"""Abstract base classes for cache."""
import asyncio
from abc import ABC, abstractmethod
from typing import Any, Sequence, Text, Union
from ..core.error import BaseError
[docs]class CacheError(BaseError):
"""Base class for cache-related errors."""
[docs]class BaseCache(ABC):
"""Abstract cache interface."""
def __init__(self):
"""Initialize the cache instance."""
self._key_locks = {}
[docs] @abstractmethod
async def get(self, key: Text):
"""
Get an item from the cache.
Args:
key: the key to retrieve an item for
Returns:
The record found or `None`
"""
[docs] @abstractmethod
async def set(self, keys: Union[Text, Sequence[Text]], value: Any, ttl: int = None):
"""
Add an item to the cache with an optional ttl.
Args:
keys: the key or keys for which to set an item
value: the value to store in the cache
ttl: number of second that the record should persist
"""
[docs] @abstractmethod
async def clear(self, key: Text):
"""
Remove an item from the cache, if present.
Args:
key: the key to remove
"""
[docs] @abstractmethod
async def flush(self):
"""Remove all items from the cache."""
[docs] def acquire(self, key: Text):
"""Acquire a lock on a given cache key."""
result = CacheKeyLock(self, key)
first = self._key_locks.setdefault(key, result)
if first is not result:
result.parent = first
return result
[docs] def release(self, key: Text):
"""Release the lock on a given cache key."""
if key in self._key_locks:
del self._key_locks[key]
def __repr__(self) -> str:
"""Human readable representation of `BaseStorageRecordSearch`."""
return "<{}>".format(self.__class__.__name__)
[docs]class CacheKeyLock:
"""
A lock on a particular cache key.
Used to prevent multiple async threads from generating
or querying the same semi-expensive data. Not thread safe.
"""
def __init__(self, cache: BaseCache, key: Text):
"""Initialize the key lock."""
self.cache = cache
self.exception: BaseException = None
self.key = key
self.released = False
self._future: asyncio.Future = asyncio.get_event_loop().create_future()
self._parent: "CacheKeyLock" = None
@property
def done(self) -> bool:
"""Accessor for the done state."""
return self._future.done()
@property
def future(self) -> asyncio.Future:
"""Fetch the result in the form of an awaitable future."""
return self._future
@property
def result(self) -> Any:
"""Fetch the current result, if any."""
if self.done:
return self._future.result()
@property
def parent(self) -> "CacheKeyLock":
"""Accessor for the parent key lock, if any."""
return self._parent
@parent.setter
def parent(self, parent: "CacheKeyLock"):
"""Setter for the parent lock."""
self._parent = parent
parent._future.add_done_callback(self._handle_parent_done)
def _handle_parent_done(self, fut: asyncio.Future):
"""Handle completion of parent's future."""
result = fut.result()
if result:
self._future.set_result(fut.result())
[docs] async def set_result(self, value: Any, ttl: int = None):
"""Set the result, updating the cache and any waiters."""
if self.done and value:
raise CacheError("Result already set")
self._future.set_result(value)
if not self._parent or self._parent.done:
await self.cache.set(self.key, value, ttl)
def __await__(self):
"""Wait for a result to be produced."""
return (yield from self._future)
async def __aenter__(self):
"""Async context manager entry."""
result = None
if self.parent:
result = await self.parent
if result:
await self # wait for parent's done handler to complete
if not result:
found = await self.cache.get(self.key)
if found:
self._future.set_result(found)
return self
[docs] def release(self):
"""Release the cache lock."""
if not self.parent and not self.released:
self.cache.release(self.key)
self.released = True
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""
Async context manager exit.
`None` is returned to any waiters if no value is produced.
"""
if exc_val:
self.exception = exc_val
if not self.done:
self._future.set_result(None)
self.release()
def __del__(self):
"""Handle deletion."""
self.release()