"""Manage Aries-Askar profile interaction."""
import asyncio
import logging
import time
# import traceback
from typing import Any, Mapping
from weakref import ref
from aries_askar import AskarError, Session, Store
from ..cache.base import BaseCache
from ..config.injection_context import InjectionContext
from ..config.provider import ClassProvider
from ..core.error import ProfileError
from ..core.profile import Profile, ProfileManager, ProfileSession
from ..indy.holder import IndyHolder
from ..indy.issuer import IndyIssuer
from ..ledger.base import BaseLedger
from ..ledger.indy_vdr import IndyVdrLedger, IndyVdrLedgerPool
from ..storage.base import BaseStorage, BaseStorageSearch
from ..storage.vc_holder.base import VCHolder
from ..utils.multi_ledger import get_write_ledger_config_for_profile
from ..wallet.base import BaseWallet
from ..wallet.crypto import validate_seed
from .store import AskarStoreConfig, AskarOpenStore
LOGGER = logging.getLogger(__name__)
[docs]class AskarAnoncredsProfile(Profile):
"""Provide access to Aries-Askar profile interaction methods."""
BACKEND_NAME = "askar-anoncreds"
TEST_PROFILE_NAME = "test-profile"
def __init__(
self,
opened: AskarOpenStore,
context: InjectionContext = None,
*,
profile_id: str = None
):
"""Create a new AskarProfile instance."""
super().__init__(context=context, name=opened.name, created=opened.created)
self.opened = opened
self.ledger_pool: IndyVdrLedgerPool = None
self.profile_id = profile_id
self.init_ledger_pool()
self.bind_providers()
@property
def name(self) -> str:
"""Accessor for the profile name."""
return self.opened.name
@property
def store(self) -> Store:
"""Accessor for the opened Store instance."""
return self.opened.store
[docs] async def remove(self):
"""Remove the profile."""
if self.profile_id:
await self.store.remove_profile(self.profile_id)
[docs] def init_ledger_pool(self):
"""Initialize the ledger pool."""
if self.settings.get("ledger.disabled"):
LOGGER.info("Ledger support is disabled")
return
if self.settings.get("ledger.genesis_transactions"):
pool_name = self.settings.get("ledger.pool_name", "default")
keepalive = int(self.settings.get("ledger.keepalive", 5))
read_only = bool(self.settings.get("ledger.read_only", False))
socks_proxy = self.settings.get("ledger.socks_proxy")
if read_only:
LOGGER.error("Note: setting ledger to read-only mode")
genesis_transactions = self.settings.get("ledger.genesis_transactions")
cache = self.context.injector.inject_or(BaseCache)
self.ledger_pool = IndyVdrLedgerPool(
pool_name,
keepalive=keepalive,
cache=cache,
genesis_transactions=genesis_transactions,
read_only=read_only,
socks_proxy=socks_proxy,
)
[docs] def bind_providers(self):
"""Initialize the profile-level instance providers."""
injector = self._context.injector
injector.bind_provider(
BaseStorageSearch,
ClassProvider(
"aries_cloudagent.storage.askar.AskarStorageSearch", ref(self)
),
)
injector.bind_provider(
VCHolder,
ClassProvider(
"aries_cloudagent.storage.vc_holder.askar.AskarVCHolder",
ref(self),
),
)
injector.bind_provider(
IndyHolder,
ClassProvider(
"aries_cloudagent.indy.credx.holder.IndyCredxHolder",
ref(self),
),
)
injector.bind_provider(
IndyIssuer,
ClassProvider(
"aries_cloudagent.indy.credx.issuer.IndyCredxIssuer", ref(self)
),
)
if (
self.settings.get("ledger.ledger_config_list")
and len(self.settings.get("ledger.ledger_config_list")) >= 1
):
write_ledger_config = get_write_ledger_config_for_profile(
settings=self.settings
)
cache = self.context.injector.inject_or(BaseCache)
injector.bind_provider(
BaseLedger,
ClassProvider(
IndyVdrLedger,
IndyVdrLedgerPool(
write_ledger_config.get("pool_name")
or write_ledger_config.get("id"),
keepalive=write_ledger_config.get("keepalive"),
cache=cache,
genesis_transactions=write_ledger_config.get(
"genesis_transactions"
),
read_only=write_ledger_config.get("read_only"),
socks_proxy=write_ledger_config.get("socks_proxy"),
),
ref(self),
),
)
self.settings["ledger.write_ledger"] = write_ledger_config.get("id")
if (
"endorser_alias" in write_ledger_config
and "endorser_did" in write_ledger_config
):
self.settings["endorser.endorser_alias"] = write_ledger_config.get(
"endorser_alias"
)
self.settings["endorser.endorser_public_did"] = write_ledger_config.get(
"endorser_did"
)
elif self.ledger_pool:
injector.bind_provider(
BaseLedger, ClassProvider(IndyVdrLedger, self.ledger_pool, ref(self))
)
[docs] def session(
self, context: InjectionContext = None
) -> "AskarAnoncredsProfileSession":
"""Start a new interactive session with no transaction support requested."""
return AskarAnoncredsProfileSession(self, False, context=context)
[docs] def transaction(
self, context: InjectionContext = None
) -> "AskarAnoncredsProfileSession":
"""Start a new interactive session with commit and rollback support.
If the current backend does not support transactions, then commit
and rollback operations of the session will not have any effect.
"""
return AskarAnoncredsProfileSession(self, True, context=context)
[docs] async def close(self):
"""Close the profile instance."""
if self.opened:
await self.opened.close()
self.opened = None
[docs]class AskarAnoncredsProfileSession(ProfileSession):
"""An active connection to the profile management backend."""
def __init__(
self,
profile: AskarAnoncredsProfile,
is_txn: bool,
*,
context: InjectionContext = None,
settings: Mapping[str, Any] = None
):
"""Create a new AskarAnoncredsProfileSession instance."""
super().__init__(profile=profile, context=context, settings=settings)
if is_txn:
self._opener = self.profile.store.transaction(profile.profile_id)
else:
self._opener = self.profile.store.session(profile.profile_id)
self._handle: Session = None
self._acquire_start: float = None
self._acquire_end: float = None
@property
def handle(self) -> Session:
"""Accessor for the Session instance."""
return self._handle
@property
def store(self) -> Store:
"""Accessor for the Store instance."""
return self._handle and self._handle.store
@property
def is_transaction(self) -> bool:
"""Check if the session supports commit and rollback operations."""
if self._handle:
return self._handle.is_transaction
return self._opener.is_transaction
async def _setup(self):
"""Create the session or transaction connection, if needed."""
self._acquire_start = time.perf_counter()
try:
self._handle = await asyncio.wait_for(self._opener, 10)
except AskarError as err:
raise ProfileError("Error opening store session") from err
self._acquire_end = time.perf_counter()
self._opener = None
injector = self._context.injector
injector.bind_provider(
BaseWallet,
ClassProvider("aries_cloudagent.wallet.askar.AskarWallet", ref(self)),
)
injector.bind_provider(
BaseStorage,
ClassProvider("aries_cloudagent.storage.askar.AskarStorage", ref(self)),
)
async def _teardown(self, commit: bool = None):
"""Dispose of the session or transaction connection."""
if commit:
try:
await self._handle.commit()
except AskarError as err:
raise ProfileError("Error committing transaction") from err
if self._handle:
await self._handle.close()
self._handle = None
self._check_duration()
def _check_duration(self):
# LOGGER.error(
# "release session after %.2f, acquire took %.2f",
# end - self._acquire_end,
# self._acquire_end - self._acquire_start,
# )
# end = time.perf_counter()
# if end - self._acquire_end > 1.0:
# LOGGER.error("Long session")
# traceback.print_stack(limit=5)
pass
def __del__(self):
"""Delete magic method."""
if self._handle:
self._check_duration()
[docs]class AskarAnonProfileManager(ProfileManager):
"""Manager for Aries-Askar stores."""
[docs] async def provision(
self, context: InjectionContext, config: Mapping[str, Any] = None
) -> Profile:
"""Provision a new instance of a profile."""
store_config = AskarStoreConfig(config)
opened = await store_config.open_store(provision=True)
return AskarAnoncredsProfile(opened, context)
[docs] async def open(
self, context: InjectionContext, config: Mapping[str, Any] = None
) -> Profile:
"""Open an instance of an existing profile."""
store_config = AskarStoreConfig(config)
opened = await store_config.open_store(provision=False)
return AskarAnoncredsProfile(opened, context)
[docs] @classmethod
async def generate_store_key(self, seed: str = None) -> str:
"""Generate a raw store key."""
return Store.generate_raw_key(validate_seed(seed))