"""Manage Aries-Askar profile interaction."""
import asyncio
import logging
import time
from typing import Any, Mapping, Optional
from weakref import ref
from aries_askar import AskarError, Session
from aries_askar import Store as AskarStore
from ..cache.base import BaseCache
from ..config.injection_context import InjectionContext
from ..config.provider import ClassProvider
from ..core.error import ProfileError
from ..core.profile import Profile, ProfileManager, ProfileSession
from ..database_manager.db_errors import DBError
from ..database_manager.dbstore import DBStore, DBStoreError, DBStoreSession
from ..indy.holder import IndyHolder
from ..indy.issuer import IndyIssuer
from ..ledger.base import BaseLedger
from ..ledger.indy_vdr import IndyVdrLedger, IndyVdrLedgerPool
from ..storage.base import BaseStorage, BaseStorageSearch
from ..storage.vc_holder.base import VCHolder
from ..utils.multi_ledger import get_write_ledger_config_for_profile
from ..wallet.base import BaseWallet
from ..wallet.crypto import validate_seed
from .store_kanon import KanonOpenStore, KanonStoreConfig
LOGGER = logging.getLogger(__name__)
[docs]
class KanonAnonCredsProfile(Profile):
"""Kanon AnonCreds profile implementation."""
BACKEND_NAME = "kanon-anoncreds"
TEST_PROFILE_NAME = "test-profile"
def __init__(
self,
opened: KanonOpenStore,
context: Optional[InjectionContext] = None,
*,
profile_id: Optional[str] = None,
):
"""Initialize the KanonAnonCredsProfile with a store and context."""
super().__init__(
context=context, name=profile_id or opened.name, created=opened.created
)
self.opened = opened # Store the single KanonOpenStore instance
self.ledger_pool: Optional[IndyVdrLedgerPool] = None
self.profile_id = profile_id
self.init_ledger_pool()
self.bind_providers()
@property
def name(self) -> str:
"""Accessor for the profile name."""
return self.profile_id or self.opened.name
@property
def store(self) -> DBStore:
"""Accessor for the opened Store instance."""
return self.opened.db_store
[docs]
async def remove(self):
"""Remove profile."""
if not self.profile_id:
return # Nothing to remove
errors = []
# Attempt to remove from DBStore
try:
await self.opened.db_store.remove_profile(self.profile_id)
except (DBStoreError, Exception) as e:
errors.append(f"Failed to remove profile from DBStore: {str(e)}")
# Attempt to remove from Askar
try:
await self.opened.askar_store.remove_profile(self.profile_id)
except (AskarError, Exception) as e:
errors.append(f"Failed to remove profile from Askar: {str(e)}")
# If any errors occurred, raise an exception
if errors:
raise ProfileError(
"Errors occurred while removing profile: " + "; ".join(errors)
)
[docs]
def init_ledger_pool(self):
"""Initialize the ledger pool."""
if self.settings.get("ledger.disabled"):
LOGGER.info("Ledger support is disabled")
return
if self.settings.get("ledger.genesis_transactions"):
pool_name = self.settings.get("ledger.pool_name", "default")
keepalive = int(self.settings.get("ledger.keepalive", 5))
read_only = bool(self.settings.get("ledger.read_only", False))
socks_proxy = self.settings.get("ledger.socks_proxy")
if read_only:
LOGGER.warning("Note: setting ledger to read-only mode")
genesis_transactions = self.settings.get("ledger.genesis_transactions")
cache = self.context.injector.inject_or(BaseCache)
self.ledger_pool = IndyVdrLedgerPool(
pool_name,
keepalive=keepalive,
cache=cache,
genesis_transactions=genesis_transactions,
read_only=read_only,
socks_proxy=socks_proxy,
)
[docs]
def bind_providers(self):
"""Initialize the profile-level instance providers."""
injector = self._context.injector
injector.bind_provider(
BaseStorageSearch,
ClassProvider(
"acapy_agent.storage.kanon_storage.KanonStorageSearch", ref(self)
),
)
injector.bind_provider(
VCHolder,
ClassProvider(
"acapy_agent.storage.vc_holder.kanon.KanonVCHolder",
ref(self),
),
)
injector.bind_provider(
IndyHolder,
ClassProvider(
"acapy_agent.indy.credx.holder_kanon.KanonIndyCredxHolder",
ref(self),
),
)
injector.bind_provider(
IndyIssuer,
ClassProvider(
"acapy_agent.indy.credx.issuer_kanon.KanonIndyCredxIssuer", ref(self)
),
)
if (
self.settings.get("ledger.ledger_config_list")
and len(self.settings.get("ledger.ledger_config_list")) >= 1
):
write_ledger_config = get_write_ledger_config_for_profile(
settings=self.settings
)
cache = self.context.injector.inject_or(BaseCache)
injector.bind_provider(
BaseLedger,
ClassProvider(
IndyVdrLedger,
IndyVdrLedgerPool(
write_ledger_config.get("pool_name")
or write_ledger_config.get("id"),
keepalive=write_ledger_config.get("keepalive"),
cache=cache,
genesis_transactions=write_ledger_config.get(
"genesis_transactions"
),
read_only=write_ledger_config.get("read_only"),
socks_proxy=write_ledger_config.get("socks_proxy"),
),
ref(self),
),
)
self.settings["ledger.write_ledger"] = write_ledger_config.get("id")
if (
"endorser_alias" in write_ledger_config
and "endorser_did" in write_ledger_config
):
self.settings["endorser.endorser_alias"] = write_ledger_config.get(
"endorser_alias"
)
self.settings["endorser.endorser_public_did"] = write_ledger_config.get(
"endorser_did"
)
elif self.ledger_pool:
injector.bind_provider(
BaseLedger, ClassProvider(IndyVdrLedger, self.ledger_pool, ref(self))
)
[docs]
def session(self, context: Optional[InjectionContext] = None) -> ProfileSession:
"""Create a new session."""
return KanonAnonCredsProfileSession(self, False, context=context)
[docs]
def transaction(self, context: Optional[InjectionContext] = None) -> ProfileSession:
"""Create a new transaction."""
return KanonAnonCredsProfileSession(self, True, context=context)
[docs]
async def close(self):
"""Close both stores."""
# ***CHANGE***: Close the single opened store
if self.opened:
await self.opened.close()
self.opened = None
[docs]
class KanonAnonCredsProfileSession(ProfileSession):
"""An active connection to the profile management backend."""
def __init__(
self,
profile: KanonAnonCredsProfile,
is_txn: bool,
*,
context: Optional[InjectionContext] = None,
settings: Mapping[str, Any] = None,
):
"""Create a new KanonAnonCredsProfileSession instance."""
super().__init__(profile=profile, context=context, settings=settings)
if is_txn:
self._dbstore_opener = profile.opened.db_store.transaction(profile.profile_id)
self._askar_opener = profile.opened.askar_store.transaction(
profile.profile_id
)
else:
self._dbstore_opener = profile.opened.db_store.session(profile.profile_id)
self._askar_opener = profile.opened.askar_store.session(profile.profile_id)
self._profile = profile
self._dbstore_handle: Optional[DBStoreSession] = None
self._askar_handle: Optional[Session] = None
self._acquire_start: Optional[float] = None
self._acquire_end: Optional[float] = None
# THIS IS ONLY USED BY acapy_agent.wallet.anoncreds_upgrade.
# It needs a handle for dbstore only.
@property
def handle(self) -> DBStoreSession:
"""Accessor for the Session instance."""
return self._dbstore_handle
@property
def dbstore_handle(self) -> DBStoreSession:
"""Accessor for DBStore session."""
return self._dbstore_handle
@property
def askar_handle(self) -> Session:
"""Accessor for Askar session."""
return self._askar_handle
@property
def store(self) -> DBStore:
"""Get store instance."""
return self._profile and self._profile.store
@property
def is_transaction(self) -> bool:
"""Check if this is a transaction."""
if self._dbstore_handle and self._askar_handle:
return (
self._dbstore_handle.is_transaction and self._askar_handle.is_transaction
)
if self._dbstore_opener and self._askar_opener:
return (
self._dbstore_opener.is_transaction and self._askar_opener.is_transaction
)
raise ProfileError("Session not open")
async def _setup(self):
self._acquire_start = time.perf_counter()
try:
self._dbstore_handle = await asyncio.wait_for(self._dbstore_opener, 60)
self._askar_handle = await asyncio.wait_for(self._askar_opener, 60)
except asyncio.TimeoutError:
LOGGER.error("Timeout waiting for store session")
raise
except DBError as err:
LOGGER.error("Error opening store session: %s", str(err))
raise ProfileError("Error opening store session") from err
self._acquire_end = time.perf_counter()
self._dbstore_opener = None
self._askar_opener = None
injector = self._context.injector
injector.bind_provider(
BaseWallet,
ClassProvider("acapy_agent.wallet.kanon_wallet.KanonWallet", ref(self)),
)
injector.bind_provider(
BaseStorage,
ClassProvider("acapy_agent.storage.kanon_storage.KanonStorage", ref(self)),
)
async def _teardown(self, commit: Optional[bool] = None):
"""Close both sessions, committing transactions if needed."""
if commit and self.is_transaction:
try:
# ***CHANGE***: Commit both sessions if transaction
await self._dbstore_handle.commit()
await self._askar_handle.commit()
except DBError as err:
raise ProfileError("Error committing transaction") from err
if self._dbstore_handle:
await self._dbstore_handle.close()
if self._askar_handle:
await self._askar_handle.close()
self._check_duration()
def _check_duration(self):
"""Check transaction duration for monitoring purposes.
This method is intentionally empty as duration checking is not
implemented in the current kanon profile implementation.
"""
pass
def __del__(self):
"""Clean up resources."""
if hasattr(self, "_dbstore_handle") and self._dbstore_handle:
self._check_duration()
[docs]
class KanonAnonProfileManager(ProfileManager):
"""Manager for Aries-Askar stores."""
[docs]
async def provision(
self, context: InjectionContext, config: Mapping[str, Any] = None
) -> Profile:
"""Provision a new profile."""
print(f"KanonProfileManager Provision store with config: {config}")
# Provision both stores with a single config
store_config = KanonStoreConfig(config) # No store_class specialization needed
opened = await store_config.open_store(
provision=True, in_memory=config.get("test")
)
return KanonAnonCredsProfile(opened, context)
[docs]
async def open(
self, context: InjectionContext, config: Mapping[str, Any] = None
) -> Profile:
"""Open an instance of an existing profile."""
store_config = KanonStoreConfig(config) # No store_class specialization needed
opened = await store_config.open_store(
provision=False, in_memory=config.get("test")
)
# Note: Health checks removed - if opening fails, exceptions are raised
# by the open_store method. The stores will be validated when first used.
return KanonAnonCredsProfile(opened, context)
[docs]
@classmethod
async def generate_store_key(cls, seed: Optional[str] = None) -> str:
"""Generate a raw store key."""
return AskarStore.generate_raw_key(validate_seed(seed))