Source code for acapy_agent.kanon.store_kanon

"""Module docstring."""

import json
import logging
import os
import urllib
from typing import Optional

import base58
from aries_askar import AskarError, AskarErrorCode, Store

from ..askar.store import ERR_NO_STORAGE_CONFIG, ERR_NO_STORAGE_CREDS
from ..core.error import ProfileDuplicateError, ProfileError, ProfileNotFoundError
from ..core.profile import Profile
from ..database_manager.db_errors import DBCode, DBError
from ..database_manager.dbstore import (
    DBStore,
    DBStoreError,
    DBStoreErrorCode,
)
from ..utils.env import storage_path

LOGGER = logging.getLogger(__name__)

ERR_STORAGE_TYPE_UNSUPPORTED = "Unsupported storage type: {}"
ERR_DBSTORE_STORAGE_TYPE_UNSUPPORTED = "Unsupported dbstore storage type: {}"
ERR_JSON_INVALID = "{} must be valid JSON: {}"
ERR_NO_URL = "No 'url' provided for postgres store"
ERR_NO_ACCOUNT = "No 'account' provided for postgres store"
ERR_NO_PASSWORD = "No 'password' provided for postgres store"
ERR_REMOVE_STORE = "Error removing {} store: {}"


[docs] class KanonStoreConfig: """A helper class for handling Kanon store configuration.""" DEFAULT_KEY = "" DEFAULT_KEY_DERIVATION = "kdf:argon2i:mod" DEFAULT_STORAGE_TYPE = None DEFAULT_SCHEMA_CONFIG = "normalize" # Current schema release number refers to the schema version currently # supported by this ACA-Py release. # If the schema version in the database is lower than the current # release version, # then during store opening, the system will halt and prompt the user to # perform an upgrade. CURRENT_SCHEMA_RELEASE_NUMBER = "release_0_1" KEY_DERIVATION_RAW = "RAW" KEY_DERIVATION_ARGON2I_INT = "kdf:argon2i:int" KEY_DERIVATION_ARGON2I_MOD = "kdf:argon2i:mod" # { # "url": "postgresql://localhost:5432/mydb", # "min_connections": 4, # "max_connections": 10, # "connect_timeout_ms": 30000, # "max_idle": 5.0, # "max_lifetime": 3600.0, # "tls": { # "sslmode": "verify-full", # "sslcert": "/path/to/client-cert.pem", # "sslkey": "/path/to/client-key.pem", # "sslrootcert": "/path/to/root-cert.pem" # } # } def __init__(self, config: Optional[dict] = None, store_class: str = "dbstore"): """Initialize a `KanonStoreConfig` instance.""" if not config: config = {} self.store_class = store_class self._init_basic_config(config) self._init_askar_config(config) self._init_dbstore_config(config) def _init_basic_config(self, config: dict): """Initialize basic configuration settings.""" self.auto_recreate = config.get("auto_recreate", False) self.auto_remove = config.get("auto_remove", False) self.key = config.get("key", self.DEFAULT_KEY) self.key_is_encoded = config.get("key_is_encoded", False) self.key_derivation_method = ( config.get("key_derivation_method") or self.DEFAULT_KEY_DERIVATION ) self.rekey = config.get("rekey") self.rekey_derivation_method = ( config.get("rekey_derivation_method") or self.DEFAULT_KEY_DERIVATION ) self.name = config.get("name") or Profile.DEFAULT_NAME def _init_askar_config(self, config: dict): """Initialize Askar-specific configuration.""" self.storage_config = config.get("storage_config", None) self.storage_creds = config.get("storage_creds", None) storage_type = config.get("storage_type") if not storage_type or storage_type == "default": storage_type = "sqlite" elif storage_type == "postgres_storage": storage_type = "postgres" if storage_type not in ("postgres", "sqlite"): raise ProfileError(ERR_STORAGE_TYPE_UNSUPPORTED.format(storage_type)) self.storage_type = storage_type def _init_dbstore_config(self, config: dict): """Initialize DBStore-specific configuration.""" self.dbstore_key = config.get("dbstore_key") LOGGER.debug("dbstore_key: %s", self.dbstore_key) self.dbstore_rekey = config.get("dbstore_rekey") LOGGER.debug("dbstore_rekey: %s", self.dbstore_rekey) self._validate_dbstore_storage_config(config) self._init_dbstore_schema_config(config) self._init_dbstore_storage_type(config) def _validate_dbstore_storage_config(self, config: dict): """Validate DBStore storage configuration.""" self.dbstore_storage_config = config.get("dbstore_storage_config", None) if self.dbstore_storage_config: try: config_dict = json.loads(self.dbstore_storage_config) required_keys = ["url"] for key in required_keys: if key not in config_dict: raise ProfileError( f"Missing required key '{key}' in dbstore_storage_config" ) self._validate_tls_config(config_dict) except json.JSONDecodeError as e: LOGGER.error( "Invalid JSON in dbstore_storage_config: %s", self.dbstore_storage_config, ) raise ProfileError( ERR_JSON_INVALID.format("dbstore_storage_config", str(e)) ) LOGGER.debug("dbstore_storage_config: %s", self.dbstore_storage_config) self.dbstore_storage_creds = config.get("dbstore_storage_creds", None) LOGGER.debug("dbstore_storage_creds: %s", self.dbstore_storage_creds) def _validate_tls_config(self, config_dict: dict): """Validate TLS configuration settings.""" if "tls" in config_dict and isinstance(config_dict["tls"], dict): tls_config = config_dict["tls"] valid_sslmodes = [ "disable", "allow", "prefer", "require", "verify-ca", "verify-full", ] if "sslmode" in tls_config and tls_config["sslmode"] not in valid_sslmodes: raise ProfileError("Invalid sslmode in tls configuration") def _init_dbstore_schema_config(self, config: dict): """Initialize DBStore schema configuration.""" self.dbstore_schema_config = config.get( "dbstore_schema_config", self.DEFAULT_SCHEMA_CONFIG ) LOGGER.debug("dbstore_schema_config: %s", self.dbstore_schema_config) self.dbstore_schema_migration = config.get("dbstore_schema_migration", None) LOGGER.debug("dbstore_schema_migration: %s", self.dbstore_schema_migration) def _init_dbstore_storage_type(self, config: dict): """Initialize and validate DBStore storage type.""" dbstore_storage_type = config.get("dbstore_storage_type") if not dbstore_storage_type or dbstore_storage_type == "default": dbstore_storage_type = "sqlite" elif dbstore_storage_type == "postgres_storage": dbstore_storage_type = "postgres" if dbstore_storage_type not in ("postgres", "sqlite"): raise ProfileError( ERR_DBSTORE_STORAGE_TYPE_UNSUPPORTED.format(dbstore_storage_type) ) self.dbstore_storage_type = dbstore_storage_type LOGGER.debug("dbstore_storage_type: %s", self.dbstore_storage_type) if self.dbstore_storage_type == "postgres" and self.dbstore_storage_creds: try: json.loads(self.dbstore_storage_creds) except json.JSONDecodeError as e: LOGGER.error( "Invalid JSON in dbstore_storage_creds: %s", self.dbstore_storage_creds, ) raise ProfileError( ERR_JSON_INVALID.format("dbstore_storage_creds", str(e)) )
[docs] @staticmethod def validate_base58_key(key: str): """Validate base58 key.""" try: decoded = base58.b58decode(key) print(f"Decoded length: {len(decoded)}") except ValueError as e: print(f"Decode error: {e}")
[docs] def get_dbstore_uri( self, create: bool = False, in_memory: Optional[bool] = False ) -> str: """Get DBStore URI.""" LOGGER.debug( "DBStore URI: dbstore_storage_type=%s, create=%s, in_memory=%s", self.dbstore_storage_type, create, in_memory, ) uri = f"{self.dbstore_storage_type}://" if self.dbstore_storage_type == "sqlite": return self._build_sqlite_dbstore_uri(uri, create, in_memory) elif self.dbstore_storage_type == "postgres": return self._build_postgres_dbstore_uri(uri) return uri
def _build_sqlite_dbstore_uri( self, base_uri: str, create: bool, in_memory: Optional[bool] ) -> str: """Build SQLite DBStore URI.""" if in_memory: uri = base_uri + ":memory:" LOGGER.debug("Generated SQLite in-memory URI: %s", uri) return uri base_path = storage_path("wallet", self.name, create=create).as_posix() db_file = "sqlite_dbstore.db" path = f"{base_path}/dbstore" os.makedirs(path, exist_ok=True) uri = base_uri + urllib.parse.quote(f"{path}/{db_file}") LOGGER.debug("Generated SQLite file URI: %s", uri) return uri def _build_postgres_dbstore_uri(self, base_uri: str) -> str: """Build PostgreSQL DBStore URI.""" self._validate_postgres_dbstore_config() config = json.loads(self.dbstore_storage_config) creds = json.loads(self.dbstore_storage_creds) LOGGER.debug("Parsed dbstore_storage_config (keys): %s", list(config.keys())) LOGGER.debug("Parsed dbstore_storage_creds (keys): %s", list(creds.keys())) config_url = self._validate_postgres_dbstore_url(config) account, password = self._validate_postgres_dbstore_creds(creds) db_name = urllib.parse.quote(self.name + "_dbstore") uri = base_uri + f"{account}:{password}@{config_url}/{db_name}" params = self._build_postgres_dbstore_params(config, creds) if params: uri += "?" + urllib.parse.urlencode(params) # Log redacted version for security redacted_uri = base_uri + f"{account}:***@{config_url}/{db_name}" if params: redacted_uri += "?" + urllib.parse.urlencode(params) LOGGER.debug("Generated PostgreSQL URI: %s", redacted_uri) return uri def _validate_postgres_dbstore_config(self): """Validate PostgreSQL DBStore configuration.""" if not self.dbstore_storage_config: LOGGER.error(ERR_NO_STORAGE_CONFIG) raise ProfileError(ERR_NO_STORAGE_CONFIG) if not self.dbstore_storage_creds: LOGGER.error(ERR_NO_STORAGE_CREDS) raise ProfileError(ERR_NO_STORAGE_CREDS) def _validate_postgres_dbstore_url(self, config: dict) -> str: """Validate and return PostgreSQL DBStore URL.""" config_url = config.get("url") if not config_url: LOGGER.error(ERR_NO_URL) raise ProfileError(ERR_NO_URL) return config_url def _validate_postgres_dbstore_creds(self, creds: dict) -> tuple[str, str]: """Validate and return PostgreSQL DBStore credentials.""" if "account" not in creds: LOGGER.error(ERR_NO_ACCOUNT) raise ProfileError(ERR_NO_ACCOUNT) if "password" not in creds: LOGGER.error(ERR_NO_PASSWORD) raise ProfileError(ERR_NO_PASSWORD) account = urllib.parse.quote(creds["account"]) password = urllib.parse.quote(creds["password"]) return account, password def _build_postgres_dbstore_params(self, config: dict, creds: dict) -> dict: """Build PostgreSQL DBStore connection parameters.""" params = {} if "connection_timeout" in config: params["connect_timeout"] = config["connection_timeout"] self._add_tls_params(config, params) self._add_admin_params(creds, params) return params def _add_tls_params(self, config: dict, params: dict): """Add TLS parameters to connection params.""" if "tls" in config: tls_config = config["tls"] if isinstance(tls_config, dict): tls_fields = ["sslmode", "sslcert", "sslkey", "sslrootcert"] for field in tls_fields: if field in tls_config: params[field] = tls_config[field] def _add_admin_params(self, creds: dict, params: dict): """Add admin parameters to connection params.""" admin_fields = ["admin_account", "admin_password"] for field in admin_fields: if field in creds: params[field] = creds[field]
[docs] def get_askar_uri( self, create: bool = False, in_memory: Optional[bool] = False ) -> str: """Get Askar URI.""" uri = f"{self.storage_type}://" if self.storage_type == "sqlite": return self._build_sqlite_askar_uri(uri, create, in_memory) elif self.storage_type == "postgres": return self._build_postgres_askar_uri(uri) return uri
def _build_sqlite_askar_uri( self, base_uri: str, create: bool, in_memory: Optional[bool] ) -> str: """Build SQLite Askar URI.""" if in_memory: return base_uri + ":memory:" base_path = storage_path("wallet", self.name, create=create).as_posix() db_file = "sqlite_kms.db" path = f"{base_path}/askar" os.makedirs(path, exist_ok=True) return base_uri + urllib.parse.quote(f"{path}/{db_file}") def _build_postgres_askar_uri(self, base_uri: str) -> str: """Build PostgreSQL Askar URI.""" self._validate_postgres_askar_config() config = json.loads(self.storage_config) creds = json.loads(self.storage_creds) config_url = self._validate_postgres_askar_url(config) account, password = self._validate_postgres_askar_creds(creds) db_name = urllib.parse.quote(self.name) uri = base_uri + f"{account}:{password}@{config_url}/{db_name}" params = self._build_postgres_askar_params(config, creds) if params: uri += "?" + urllib.parse.urlencode(params) return uri def _validate_postgres_askar_config(self): """Validate PostgreSQL Askar configuration.""" if not self.storage_config: raise ProfileError(ERR_NO_STORAGE_CONFIG) if not self.storage_creds: raise ProfileError(ERR_NO_STORAGE_CREDS) def _validate_postgres_askar_url(self, config: dict) -> str: """Validate and return PostgreSQL Askar URL.""" config_url = config.get("url") if not config_url: raise ProfileError(ERR_NO_URL) return config_url def _validate_postgres_askar_creds(self, creds: dict) -> tuple[str, str]: """Validate and return PostgreSQL Askar credentials.""" if "account" not in creds: raise ProfileError(ERR_NO_ACCOUNT) if "password" not in creds: raise ProfileError(ERR_NO_PASSWORD) account = urllib.parse.quote(creds["account"]) password = urllib.parse.quote(creds["password"]) return account, password def _build_postgres_askar_params(self, config: dict, creds: dict) -> dict: """Build PostgreSQL Askar connection parameters.""" params = {} if "connection_timeout" in config: params["connect_timeout"] = config["connection_timeout"] if "max_connections" in config: params["max_connections"] = config["max_connections"] if "min_idle_count" in config: params["min_connections"] = config["min_idle_count"] admin_fields = ["admin_account", "admin_password"] for field in admin_fields: if field in creds: params[field] = creds[field] return params # ---------- helpers to reduce cognitive complexity ---------- def _build_sqlite_uri(self, base: str, subdir: str, filename: str) -> str: base_path = storage_path("wallet", self.name, create=base == "dbstore").as_posix() path = f"{base_path}/{subdir}" os.makedirs(path, exist_ok=True) return urllib.parse.quote(f"{path}/{filename}") async def _open_or_provision_dbstore( self, db_uri: str, provision: bool, config: dict, ): if provision: release_number = ( "release_0" if self.dbstore_schema_config == "generic" else self.CURRENT_SCHEMA_RELEASE_NUMBER ) return await DBStore.provision( db_uri, self.key_derivation_method, self.dbstore_key, profile=self.name, recreate=self.auto_recreate, release_number=release_number, schema_config=self.dbstore_schema_config, config=config, ) target_release = self.CURRENT_SCHEMA_RELEASE_NUMBER return await DBStore.open( db_uri, self.key_derivation_method, self.dbstore_key, profile=self.name, schema_migration=self.dbstore_schema_migration, target_schema_release_number=target_release, config=config, ) async def _open_or_provision_askar(self, askar_uri: str, provision: bool): if provision: return await Store.provision( askar_uri, self.key_derivation_method, self.key, profile=self.name, recreate=self.auto_recreate, ) return await Store.open( askar_uri, self.key_derivation_method, self.key, profile=self.name, )
[docs] async def remove_store(self): """Remove store.""" try: if self.store_class == "askar": await Store.remove(self.get_askar_uri()) else: config = ( json.loads(self.dbstore_storage_config) if self.dbstore_storage_config else {} ) await DBStore.remove(self.get_dbstore_uri(), config=config) except DBError as err: if err.code in DBCode.NOT_FOUND: raise ProfileNotFoundError(f"Store '{self.name}' not found") raise ProfileError( ERR_REMOVE_STORE.format(self.store_class, str(err)) ) from err
def _handle_askar_open_error(self, err: AskarError, retry: bool = False): if err.code == AskarErrorCode.DUPLICATE: raise ProfileDuplicateError(f"Duplicate store '{self.name}'") if err.code == AskarErrorCode.NOT_FOUND: raise ProfileNotFoundError(f"Store '{self.name}' not found") if retry and self.rekey: return raise ProfileError("Error opening Askar store") from err
[docs] async def open_store( self, provision: bool = False, in_memory: Optional[bool] = False ) -> "KanonOpenStore": """Open or provision both DBStore and Askar Store with separate error handling.""" db_uri = self.get_dbstore_uri(create=provision, in_memory=in_memory) askar_uri = self.get_askar_uri(create=provision, in_memory=in_memory) config = ( json.loads(self.dbstore_storage_config) if self.dbstore_storage_config else {} ) db_store = await self._open_dbstore_with_error_handling(db_uri, provision, config) askar_store = await self._open_askar_with_error_handling(askar_uri, provision) await self._handle_store_rekeying(db_store, askar_store) return KanonOpenStore(self, provision, db_store, askar_store)
async def _open_dbstore_with_error_handling( self, db_uri: str, provision: bool, config: dict ): """Open DBStore with proper error handling.""" try: return await self._open_or_provision_dbstore(db_uri, provision, config) except DBStoreError as err: if err.code == DBStoreErrorCode.NOT_FOUND: raise ProfileNotFoundError(f"DBStore '{self.name}' not found") elif err.code == DBStoreErrorCode.DUPLICATE: raise ProfileDuplicateError(f"Duplicate DBStore '{self.name}'") raise ProfileError("Error opening DBStore") from err async def _open_askar_with_error_handling(self, askar_uri: str, provision: bool): """Open Askar store with proper error handling and retry logic.""" try: return await self._open_or_provision_askar(askar_uri, provision) except AskarError as err: self._handle_askar_open_error(err, retry=True) if self.rekey: return await self._retry_askar_open_with_rekey(askar_uri) return None async def _retry_askar_open_with_rekey(self, askar_uri: str): """Retry opening Askar store with rekey.""" try: askar_store = await Store.open( askar_uri, self.key_derivation_method, self.DEFAULT_KEY, profile=self.name, ) await askar_store.rekey(self.rekey_derivation_method, self.rekey) return askar_store except AskarError as retry_err: raise ProfileError("Error opening Askar store after retry") from retry_err async def _handle_store_rekeying(self, db_store, askar_store): """Handle rekeying for both stores if required.""" if db_store and self.dbstore_rekey: try: await db_store.rekey(self.rekey_derivation_method, self.dbstore_rekey) except DBStoreError as err: raise ProfileError("Error rekeying DBStore") from err if askar_store and self.rekey: try: await askar_store.rekey(self.rekey_derivation_method, self.rekey) except AskarError as err: raise ProfileError("Error rekeying Askar store") from err
[docs] class KanonOpenStore: """Kanon open store.""" def __init__( self, config: KanonStoreConfig, created, db_store: DBStore, askar_store: Store ): """Initialize KanonOpenStore with configuration and stores.""" self.config = config self.created = created self.db_store = db_store self.askar_store = askar_store @property def name(self) -> str: """Get store name.""" return self.config.name
[docs] async def close(self): """Close store.""" if self.db_store: await self.db_store.close(remove=self.config.auto_remove) if self.askar_store: await self.askar_store.close(remove=self.config.auto_remove)