"""Classes for managing profile information within a request context."""
import logging
from abc import ABC, abstractmethod
from typing import Any, Mapping, Optional, Type
from weakref import ref
from ..config.base import InjectionError
from ..config.injection_context import InjectionContext
from ..config.injector import BaseInjector, InjectType
from ..config.provider import BaseProvider
from ..config.settings import BaseSettings
from ..database_manager.db_types import Entry, EntryList
from ..utils.classloader import ClassLoader, ClassNotFoundError
from .error import ProfileSessionInactiveError
from .event_bus import Event, EventBus
LOGGER = logging.getLogger(__name__)
[docs]
class Profile(ABC):
"""Base abstraction for handling identity-related state."""
BACKEND_NAME: Optional[str] = None
DEFAULT_NAME: str = "default"
def __init__(
self,
*,
context: Optional[InjectionContext] = None,
name: Optional[str] = None,
created: bool = False,
):
"""Initialize a base profile."""
self._created = created
self._name = name or Profile.DEFAULT_NAME
context = context or InjectionContext()
self._context = context.start_scope()
self._context.injector.bind_instance(Profile, ref(self))
@property
def backend(self) -> str:
"""Accessor for the backend implementation name."""
return self.__class__.BACKEND_NAME or ""
@property
def is_anoncreds(self) -> bool:
"""Check if this profile uses an AnonCreds-compatible backend."""
return "anoncreds" in self.backend.lower()
@property
def context(self) -> InjectionContext:
"""Accessor for the injection context."""
return self._context
@property
def created(self) -> bool:
"""Accessor for the created flag indicating a new profile."""
return self._created
@property
def name(self) -> str:
"""Accessor for the profile name."""
return self._name
@property
def settings(self) -> BaseSettings:
"""Accessor for scope-specific settings."""
return self._context.settings
[docs]
@abstractmethod
def session(self, context: Optional[InjectionContext] = None) -> "ProfileSession":
"""Start a new interactive session with no transaction support requested."""
[docs]
@abstractmethod
def transaction(self, context: Optional[InjectionContext] = None) -> "ProfileSession":
"""Start a new interactive session with commit and rollback support.
If the current backend does not support transactions, then commit
and rollback operations of the session will not have any effect.
"""
[docs]
def inject(
self,
base_cls: Type[InjectType],
settings: Mapping[str, object] = None,
) -> InjectType:
"""Get the provided instance of a given class identifier.
Args:
cls: The base class to retrieve an instance of
base_cls: The base class to retrieve
settings: An optional mapping providing configuration to the provider
Returns:
An instance of the base class, or None
"""
return self._context.inject(base_cls, settings)
[docs]
def inject_or(
self,
base_cls: Type[InjectType],
settings: Mapping[str, object] = None,
default: Optional[InjectType] = None,
) -> Optional[InjectType]:
"""Get the provided instance of a given class identifier or default if not found.
Args:
base_cls: The base class to retrieve an instance of
settings: An optional dict providing configuration to the provider
default: default return value if no instance is found
Returns:
An instance of the base class, or None
"""
return self._context.inject_or(base_cls, settings, default)
[docs]
async def close(self):
"""Close the profile instance."""
# Shutdown the EventBus to clean up background tasks
event_bus = self.inject_or(EventBus)
if event_bus:
await event_bus.shutdown()
[docs]
async def remove(self):
"""Remove the profile."""
[docs]
async def notify(self, topic: str, payload: Any):
"""Signal an event."""
event_bus = self.inject_or(EventBus)
if event_bus:
await event_bus.notify(self, Event(topic, payload))
else:
LOGGER.warning("No event bus found for profile %s", self.name)
def __repr__(self) -> str:
"""Get a human readable string."""
return "<{}(backend={}, name={})>".format(
self.__class__.__name__, self.backend, self.name
)
def __eq__(self, other) -> bool:
"""Equality checks for profiles.
Multiple profile instances can exist at the same time but point to the
same profile. This allows us to test equality based on the profile
pointed to by the instance rather than by object reference comparison.
"""
if not isinstance(other, Profile):
return False
if type(self) is not type(other):
return False
return self.name == other.name
[docs]
class ProfileManager(ABC):
"""Handle provision and open for profile instances."""
def __init__(self):
"""Initialize the profile manager."""
[docs]
@abstractmethod
async def provision(
self, context: InjectionContext, config: Mapping[str, Any] = None
) -> Profile:
"""Provision a new instance of a profile."""
[docs]
@abstractmethod
async def open(
self, context: InjectionContext, config: Mapping[str, Any] = None
) -> Profile:
"""Open an instance of an existing profile."""
[docs]
class ProfileSessionHandle(ABC):
"""Abstract interface for profile session handles.
This interface defines the common methods that are available across different
session handle implementations (aries_askar.Session and DBStoreSession).
"""
@property
@abstractmethod
def is_transaction(self) -> bool:
"""Check if the session supports commit and rollback operations."""
[docs]
@abstractmethod
async def count(self, category: str, tag_filter: str | dict = None) -> int:
"""Count the records matching a category and tag filter."""
[docs]
@abstractmethod
async def fetch(
self, category: str, name: str, *, for_update: bool = False
) -> Optional[Entry]:
"""Fetch a record from the store by category and name."""
[docs]
@abstractmethod
async def fetch_all(
self,
category: str = None,
tag_filter: str | dict = None,
limit: int = None,
*,
order_by: Optional[str] = None,
descending: bool = False,
for_update: bool = False,
) -> EntryList:
"""Fetch all records matching a category and tag filter."""
[docs]
@abstractmethod
async def insert(
self,
category: str,
name: str,
value: str | bytes = None,
tags: dict = None,
expiry_ms: int = None,
value_json=None,
) -> None:
"""Insert a new record into the store."""
[docs]
@abstractmethod
async def replace(
self,
category: str,
name: str,
value: str | bytes = None,
tags: dict = None,
expiry_ms: int = None,
value_json=None,
) -> None:
"""Replace a record in the store matching a category and name."""
[docs]
@abstractmethod
async def remove(
self,
category: str,
name: str,
) -> None:
"""Remove a record by category and name."""
[docs]
@abstractmethod
async def remove_all(
self,
category: str = None,
tag_filter: str | dict = None,
) -> int:
"""Remove all records matching a category and tag filter."""
[docs]
@abstractmethod
async def commit(self) -> None:
"""Commit any updates performed within the transaction."""
[docs]
@abstractmethod
async def rollback(self) -> None:
"""Roll back any updates performed within the transaction."""
[docs]
@abstractmethod
async def close(self) -> None:
"""Close the session."""
[docs]
class ProfileSession(ABC):
"""An active connection to the profile management backend."""
def __init__(
self,
profile: Profile,
*,
context: Optional[InjectionContext] = None,
settings: Mapping[str, Any] = None,
):
"""Initialize a base profile session."""
self._active = False
self._awaited = False
self._entered = 0
self._context = (context or profile.context).start_scope(settings)
self._profile = profile
self._events = []
self._context.injector.bind_instance(ProfileSession, ref(self))
async def _setup(self):
"""Create the underlying session or transaction."""
async def _teardown(self, commit: Optional[bool] = None):
"""Dispose of the underlying session or transaction."""
def __await__(self):
"""Coroutine magic method.
A session must be awaited or used as an async context manager.
"""
async def _init():
if not self._active:
await self._setup()
self._active = True
self._awaited = True
return self
return _init().__await__()
async def __aenter__(self):
"""Async context manager entry."""
if not self._active:
await self._setup()
self._active = True
self._entered += 1
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Async context manager exit."""
self._entered -= 1
if not self._awaited and not self._entered:
await self._teardown()
self._active = False
@property
def active(self) -> bool:
"""Accessor for the session active state."""
return self._active
@property
def context(self) -> InjectionContext:
"""Accessor for the associated injection context."""
return self._context
@property
def settings(self) -> BaseSettings:
"""Accessor for scope-specific settings."""
return self._context.settings
@property
def is_transaction(self) -> bool:
"""Check if the session supports commit and rollback operations."""
return False
@property
def profile(self) -> Profile:
"""Accessor for the associated profile instance."""
return self._profile
@property
def handle(self) -> ProfileSessionHandle:
"""Accessor for the session handle."""
[docs]
async def commit(self):
"""Commit any updates performed within the transaction.
If the current session is not a transaction, then nothing is performed.
"""
if not self._active:
raise ProfileSessionInactiveError()
await self._teardown(commit=True)
# emit any pending events
for event in self._events:
await self.emit_event(event["topic"], event["payload"], force_emit=True)
self._events = []
self._active = False
[docs]
async def rollback(self):
"""Roll back any updates performed within the transaction.
If the current session is not a transaction, then nothing is performed.
"""
if not self._active:
raise ProfileSessionInactiveError()
await self._teardown(commit=False)
# clear any pending events
self._events = []
self._active = False
[docs]
async def emit_event(self, topic: str, payload: Any, force_emit: bool = False):
"""Emit an event.
If we are in an active transaction, just queue the event, otherwise emit it.
Args:
topic (str): The topic of the event.
payload (Any): The payload of the event.
force_emit (bool, optional): If True, force the event to be emitted even
if there is an active transaction. Defaults to False.
"""
if force_emit or (not self.is_transaction):
# just emit directly
await self.profile.notify(topic, payload)
else:
# add to queue
self._events.append(
{
"topic": topic,
"payload": payload,
}
)
[docs]
def inject(
self,
base_cls: Type[InjectType],
settings: Mapping[str, object] = None,
) -> InjectType:
"""Get the provided instance of a given class identifier.
Args:
base_cls (Type[InjectType]): The base class to retrieve an instance of.
settings (Mapping[str, object], optional): An optional mapping providing
configuration to the provider.
Returns:
InjectType: An instance of the base class, or None.
Raises:
ProfileSessionInactiveError: If the profile session is inactive.
"""
if not self._active:
raise ProfileSessionInactiveError()
return self._context.inject(base_cls, settings)
[docs]
def inject_or(
self,
base_cls: Type[InjectType],
settings: Mapping[str, object] = None,
default: Optional[InjectType] = None,
) -> Optional[InjectType]:
"""Get the provided instance of a given class identifier or default if not found.
Args:
base_cls: The base class to retrieve an instance of
settings: An optional dict providing configuration to the provider
default: default return value if no instance is found
Returns:
An instance of the base class, or None
"""
if not self._active:
raise ProfileSessionInactiveError()
return self._context.inject_or(base_cls, settings, default)
def __repr__(self) -> str:
"""Get a human readable string."""
return "<{}(active={}, is_transaction={})>".format(
self.__class__.__name__, self._active, self.is_transaction
)
[docs]
class ProfileManagerProvider(BaseProvider):
"""The standard profile manager provider which keys off the selected wallet type."""
MANAGER_TYPES = {
"askar": "acapy_agent.askar.profile.AskarProfileManager",
"askar-anoncreds": "acapy_agent.askar.profile_anon.AskarAnonProfileManager",
"kanon-anoncreds": "acapy_agent.kanon.profile_anon_kanon.KanonAnonProfileManager",
}
def __init__(self):
"""Initialize the profile manager provider."""
self._inst = {}
[docs]
def provide(self, settings: BaseSettings, injector: BaseInjector):
"""Create the profile manager instance."""
mgr_type = settings.get_value("wallet.type", default="askar")
# mgr_type may be a fully qualified class name
mgr_class = self.MANAGER_TYPES.get(mgr_type.lower(), mgr_type)
if mgr_class not in self._inst:
LOGGER.info("Create profile manager: %s", mgr_type)
try:
self._inst[mgr_class] = ClassLoader.load_class(mgr_class)()
except ClassNotFoundError as err:
raise InjectionError(f"Unknown profile manager: {mgr_type}") from err
return self._inst[mgr_class]