Source code for aries_cloudagent.askar.store

"""Aries-Askar backend store configuration."""

import json
import logging
import urllib

from aries_askar import AskarError, AskarErrorCode, Store

from ..core.error import ProfileError, ProfileDuplicateError, ProfileNotFoundError
from ..core.profile import Profile
from ..utils.env import storage_path

LOGGER = logging.getLogger(__name__)


[docs]class AskarStoreConfig: """A helper class for handling Askar store configuration.""" DEFAULT_KEY = "" DEFAULT_KEY_DERIVATION = "kdf:argon2i:mod" DEFAULT_STORAGE_TYPE = None KEY_DERIVATION_RAW = "RAW" KEY_DERIVATION_ARGON2I_INT = "kdf:argon2i:int" KEY_DERIVATION_ARGON2I_MOD = "kdf:argon2i:mod" def __init__(self, config: dict = None): """Initialize a `AskarWallet` instance. Args: config: {name, key, seed, did, auto_recreate, auto_remove, storage_type, storage_config, storage_creds} """ if not config: config = {} self.auto_recreate = config.get("auto_recreate", False) self.auto_remove = config.get("auto_remove", False) self.key = config.get("key", self.DEFAULT_KEY) self.key_derivation_method = ( config.get("key_derivation_method") or self.DEFAULT_KEY_DERIVATION ) if ( self.key_derivation_method.lower() == self.KEY_DERIVATION_RAW.lower() and self.key == "" ): raise ProfileError( f"With key derivation method '{self.KEY_DERIVATION_RAW}'," "key should also be provided" ) # self.rekey = config.get("rekey") # self.rekey_derivation_method = config.get("rekey_derivation_method") self.name = config.get("name") or Profile.DEFAULT_NAME self.in_memory = self.name == ":memory:" if self.in_memory: self.name = Profile.DEFAULT_NAME self.storage_config = config.get("storage_config", None) self.storage_creds = config.get("storage_creds", None) storage_type = config.get("storage_type") if not storage_type or storage_type == "default": storage_type = "sqlite" elif storage_type == "postgres_storage": storage_type = "postgres" if storage_type not in ("postgres", "sqlite"): raise ProfileError(f"Unsupported storage type: {storage_type}") if storage_type != "sqlite" and self.in_memory: raise ProfileError("In-memory wallet only supported for SQLite backend") self.storage_type = storage_type
[docs] def get_uri(self, create: bool = False) -> str: """Accessor for the storage URI.""" uri = f"{self.storage_type}://" if self.storage_type == "sqlite": if self.in_memory: uri += ":memory:" else: path = storage_path("wallet", self.name, create=create).as_posix() uri += urllib.parse.quote(f"{path}/sqlite.db") elif self.storage_type == "postgres": if not self.storage_config: raise ProfileError("No 'storage_config' provided for postgres store") if not self.storage_creds: raise ProfileError("No 'storage_creds' provided for postgres store") config = json.loads(self.storage_config) creds = json.loads(self.storage_creds) config_url = config.get("url") if not config_url: raise ProfileError("No 'url' provided for postgres store") if "account" not in creds: raise ProfileError("No 'account' provided for postgres store") if "password" not in creds: raise ProfileError("No 'password' provided for postgres store") account = urllib.parse.quote(creds["account"]) password = urllib.parse.quote(creds["password"]) db_name = urllib.parse.quote(self.name) # FIXME parse the URL, check for parameters, remove postgres:// prefix, etc # config url expected to be in the form "host:port" uri += f"{account}:{password}@{config_url}/{db_name}" params = {} if "connection_timeout" in config: params["connect_timeout"] = config["connection_timeout"] if "max_connections" in config: params["max_connections"] = config["max_connections"] if "min_idle_count" in config: params["min_connections"] = config["min_idle_count"] # FIXME handle 'tls' config parameter if "admin_account" in creds: params["admin_account"] = creds["admin_account"] if "admin_password" in creds: params["admin_password"] = creds["admin_password"] if params: uri += "?" + urllib.parse.urlencode(params) return uri
[docs] async def remove_store(self): """Remove an existing store. Raises: ProfileNotFoundError: If the wallet could not be found ProfileError: If there was another aries_askar error """ try: await Store.remove(self.get_uri()) except AskarError as err: if err.code == AskarErrorCode.NOT_FOUND: raise ProfileNotFoundError( f"Store '{self.name}' not found", ) raise ProfileError("Error removing store") from err
[docs] async def open_store(self, provision: bool = False) -> "AskarOpenStore": """Open a store, removing and/or creating it if so configured. Raises: ProfileNotFoundError: If the store is not found ProfileError: If there is another aries_askar error """ try: if provision or self.in_memory: store = await Store.provision( self.get_uri(create=True), self.key_derivation_method, self.key, recreate=self.auto_recreate, ) else: store = await Store.open( self.get_uri(), self.key_derivation_method, self.key, ) except AskarError as err: if err.code == AskarErrorCode.DUPLICATE: raise ProfileDuplicateError( f"Duplicate store '{self.name}'", ) if err.code == AskarErrorCode.NOT_FOUND: raise ProfileNotFoundError( f"Store '{self.name}' not found", ) raise ProfileError("Error opening store") from err return AskarOpenStore(self, provision, store)
[docs]class AskarOpenStore: """Handle and metadata for an opened Askar store.""" def __init__( self, config: AskarStoreConfig, created, store: Store, ): """Create a new AskarOpenStore instance.""" self.config = config self.created = created self.store = store @property def name(self) -> str: """Accessor for the store name.""" return self.config.name
[docs] async def close(self): """Close previously-opened store, removing it if so configured.""" if self.store: await self.store.close(remove=self.config.auto_remove) self.store = None