Source code for acapy_agent.database_manager.category_registry

"""Category registry for database managers."""

import logging
from typing import Tuple

LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(logging.WARNING)


RELEASE_ORDER = ["release_0", "release_0_1", "release_0_2"]


[docs] def load_schema(category: str, version: str) -> dict: """Load schema from the versioned file.""" try: module_path = f"acapy_agent.database_manager.schemas.{category}_v{version}" module = __import__(module_path, fromlist=[category]) schemas = getattr(module, "SCHEMAS", {}) columns = getattr(module, "COLUMNS", []) drop_schemas = getattr(module, "DROP_SCHEMAS", {}) return {"schemas": schemas, "columns": columns, "drop_schemas": drop_schemas} except ImportError as e: LOGGER.error( "Failed to load schema for category=%s, version=%s: %s", category, version, str(e), ) return {"schemas": {}, "columns": [], "drop_schemas": {}} except Exception as e: LOGGER.error( "Unexpected error loading schema for category=%s, version=%s: %s", category, version, str(e), ) return {"schemas": {}, "columns": [], "drop_schemas": {}}
[docs] def load_release(release_number: str) -> dict: """Load the release configuration from its module.""" try: module_path = f"acapy_agent.database_manager.releases.{release_number}" module = __import__(module_path, fromlist=[release_number]) release = getattr(module, "RELEASE", {}) return release except ImportError as e: LOGGER.error("Failed to load release module %s: %s", release_number, str(e)) raise ValueError(f"Release module {release_number} not found") except Exception as e: LOGGER.error( "Unexpected error loading release module %s: %s", release_number, str(e) ) raise ValueError( f"Unexpected error loading release module {release_number}: {str(e)}" )
[docs] def get_release(release_number: str, db_type: str = "sqlite") -> Tuple[dict, dict, dict]: """Retrieve handlers and schemas for a given release number and database type.""" if release_number not in RELEASE_ORDER: LOGGER.error( "Invalid release number: %s, expected one of %s", release_number, RELEASE_ORDER, ) raise ValueError(f"Release number {release_number} not found") release = load_release(release_number) handlers = {} schemas = {} drop_schemas = {} if release_number == "release_0": default_handler = release["default"]["handlers"].get(db_type) if not default_handler: LOGGER.error( "Database type %s not supported for default handler in release %s", db_type, release_number, ) raise ValueError( f"Database type {db_type} not supported for default handler " f"in release {release_number}" ) for category in release: handlers[category] = ( default_handler() if callable(default_handler) else default_handler ) schemas[category] = None drop_schemas[category] = None else: for category, info in release.items(): if category == "default" and not info["schemas"]: handlers[category] = info["handlers"].get(db_type) schemas[category] = None drop_schemas[category] = None else: if db_type not in info["handlers"]: LOGGER.error( "Database type %s not supported for category %s in release %s", db_type, category, release_number, ) raise ValueError( f"Database type {db_type} not supported for category " f"{category} in release {release_number}" ) handlers[category] = info["handlers"][db_type] schema_list = info["schemas"].get(db_type) if info["schemas"] else None schemas[category] = {db_type: schema_list} if schema_list else None drop_list = ( info["drop_schemas"].get(db_type) if info.get("drop_schemas") else None ) drop_schemas[category] = {db_type: drop_list} if drop_list else None if schemas[category] is None and info["schemas"]: LOGGER.warning( "No schema found for category=%s, db_type=%s, " "despite schemas being defined: %s", category, db_type, info["schemas"], ) return handlers, schemas, drop_schemas