acapy_agent.database_manager.databases.sqlite_normalized package

class acapy_agent.database_manager.databases.sqlite_normalized.ConnectionPool(db_path: str, pool_size: int, busy_timeout: float, encryption_key: str | None = None, journal_mode: str = 'WAL', locking_mode: str = 'NORMAL', synchronous: str = 'FULL', shared_cache: bool = True)[source]

Bases: object

Connection pool manager for SQLite databases.

close()[source]

Close the connection pool.

drain_all_connections()[source]

Drain all connections from the pool.

get_connection(timeout: float = 30.0)[source]

Get a connection from the pool.

return_connection(conn)[source]

Return a connection to the pool.

exception acapy_agent.database_manager.databases.sqlite_normalized.DatabaseError(code: DatabaseErrorCode, message: str, actual_error: str = None)[source]

Bases: Exception

Custom exception class for database-related errors.

class acapy_agent.database_manager.databases.sqlite_normalized.DatabaseErrorCode(*values)[source]

Bases: Enum

Enum for database error codes.

CONNECTION_ERROR = 'CONNECTION_ERROR'
CONNECTION_POOL_EXHAUSTED = 'CONNECTION_POOL_EXHAUSTED'
DATABASE_NOT_ENCRYPTED = 'DATABASE_NOT_ENCRYPTED'
DATABASE_NOT_FOUND = 'DATABASE_NOT_FOUND'
DEFAULT_PROFILE_NOT_FOUND = 'DEFAULT_PROFILE_NOT_FOUND'
DUPLICATE_ITEM_ENTRY_ERROR = 'DUPLICATE_ITEM_ENTRY_ERROR'
PROFILE_ALREADY_EXISTS = 'PROFILE_ALREADY_EXISTS'
PROFILE_NOT_FOUND = 'PROFILE_NOT_FOUND'
PROVISION_ERROR = 'PROVISION_ERROR'
QUERY_ERROR = 'QUERY_ERROR'
RECORD_NOT_FOUND = 'RECORD_NOT_FOUND'
UNSUPPORTED_VERSION = 'UNSUPPORTED_VERSION'
class acapy_agent.database_manager.databases.sqlite_normalized.SqliteConfig(uri: str = 'sqlite://:memory:', busy_timeout: float = None, pool_size: int = None, journal_mode: str = 'WAL', locking_mode: str = 'NORMAL', shared_cache: bool = True, synchronous: str = 'FULL', encryption_key: str | None = None, schema_config: str = 'generic')[source]

Bases: object

Configuration for SQLite database connections.

open(profile: str | None = None, schema_migration: bool | None = None, target_schema_release_number: str | None = None) Tuple[ConnectionPool, str, str, str][source]

Open database connection and validate configuration.

Parameters:
  • profile – Profile name to use

  • schema_migration – Whether schema migration is requested (ignored for SQLite)

  • target_schema_release_number – Target schema release number

Returns:

Tuple of (connection pool, profile name, db path, release number)

provision(profile: str | None = None, recreate: bool = False, release_number: str = 'release_0') Tuple[ConnectionPool, str, str, str][source]

Provision the SQLite database.

remove() bool[source]

Remove the database file.

Returns:

True if successful or in-memory database

Subpackages

Submodules

acapy_agent.database_manager.databases.sqlite_normalized.backend module

Module docstring.

class acapy_agent.database_manager.databases.sqlite_normalized.backend.SqliteBackend[source]

Bases: DatabaseBackend

SQLite backend implementation for database manager.

open(uri: str, key_method: str | None, pass_key: str | None, profile: str | None, schema_migration: bool | None = None, target_schema_release_number: str | None = None, schema_config: str | None = None, config: dict | None = None)[source]

Open an existing SQLite database instance with optional migration.

provision(uri: str, key_method: str | None, pass_key: str | None, profile: str | None, recreate: bool, release_number: str = 'release_0', schema_config: str = 'generic', config: dict | None = None)[source]

Provision a new SQLite database instance.

Uses specified release number and schema config.

remove(uri: str, release_number: str = 'release_0', config: dict | None = None)[source]

Remove the SQLite database file.

translate_error(exception)[source]

Translate backend-specific exceptions to DBStoreError.

acapy_agent.database_manager.databases.sqlite_normalized.config module

Module docstring.

class acapy_agent.database_manager.databases.sqlite_normalized.config.SqliteConfig(uri: str = 'sqlite://:memory:', busy_timeout: float = None, pool_size: int = None, journal_mode: str = 'WAL', locking_mode: str = 'NORMAL', shared_cache: bool = True, synchronous: str = 'FULL', encryption_key: str | None = None, schema_config: str = 'generic')[source]

Bases: object

Configuration for SQLite database connections.

open(profile: str | None = None, schema_migration: bool | None = None, target_schema_release_number: str | None = None) Tuple[ConnectionPool, str, str, str][source]

Open database connection and validate configuration.

Parameters:
  • profile – Profile name to use

  • schema_migration – Whether schema migration is requested (ignored for SQLite)

  • target_schema_release_number – Target schema release number

Returns:

Tuple of (connection pool, profile name, db path, release number)

provision(profile: str | None = None, recreate: bool = False, release_number: str = 'release_0') Tuple[ConnectionPool, str, str, str][source]

Provision the SQLite database.

remove() bool[source]

Remove the database file.

Returns:

True if successful or in-memory database

acapy_agent.database_manager.databases.sqlite_normalized.connection_pool module

Module docstring.

class acapy_agent.database_manager.databases.sqlite_normalized.connection_pool.ConnectionPool(db_path: str, pool_size: int, busy_timeout: float, encryption_key: str | None = None, journal_mode: str = 'WAL', locking_mode: str = 'NORMAL', synchronous: str = 'FULL', shared_cache: bool = True)[source]

Bases: object

Connection pool manager for SQLite databases.

close()[source]

Close the connection pool.

drain_all_connections()[source]

Drain all connections from the pool.

get_connection(timeout: float = 30.0)[source]

Get a connection from the pool.

return_connection(conn)[source]

Return a connection to the pool.

acapy_agent.database_manager.databases.sqlite_normalized.database module

SQLite normalized database implementation.

class acapy_agent.database_manager.databases.sqlite_normalized.database.SqliteDatabase(pool: ConnectionPool, default_profile: str, path: str, release_number: str = 'release_0')[source]

Bases: AbstractDatabaseStore

SQLite database implementation for normalized storage.

close(remove: bool = False)[source]

Close the database and optionally remove the file.

Parameters:

remove – Whether to remove the database file

async create_profile(name: str = None) str[source]

Create a new profile in the database.

Parameters:

name – Profile name to create

Returns:

The created profile name

Return type:

str

async get_profile_name() str[source]

Get the default profile name.

Returns:

Default profile name

Return type:

str

async rekey(key_method: str = None, pass_key: str = None)[source]

Rekey the database with new encryption.

Parameters:
  • key_method – Key method to use

  • pass_key – Password key for encryption

async remove_profile(name: str) bool[source]

Remove a profile from the database.

Parameters:

name – Profile name to remove

Returns:

True if removed successfully

Return type:

bool

scan(profile: str | None, category: str, tag_filter: str | dict = None, offset: int = None, limit: int = None, order_by: str | None = None, descending: bool = False) Generator[Entry, None, None][source]

Scan entries in the database with filtering and pagination.

Parameters:
  • profile – Profile name to scan

  • category – Category to scan

  • tag_filter – Tag filter criteria

  • offset – Offset for pagination

  • limit – Limit for pagination

  • order_by – Column to order by

  • descending – Whether to sort descending

Yields:

Entry – Database entries matching criteria

scan_keyset(profile: str | None, category: str, tag_filter: str | dict = None, last_id: int | None = None, limit: int = None, order_by: str | None = None, descending: bool = False) Generator[Entry, None, None][source]

Scan entries using keyset pagination.

Parameters:
  • profile – Profile name to scan

  • category – Category to scan

  • tag_filter – Tag filter criteria

  • last_id – Last ID for cursor-based pagination

  • limit – Limit for pagination

  • order_by – Column to order by

  • descending – Whether to sort descending

Yields:

Entry – Database entries

session(profile: str = None, release_number: str = 'release_0')[source]

Create a context manager for database session.

Parameters:
  • profile – Profile name to use

  • release_number – Release number for schema versioning

Returns:

Database session context manager

Return type:

SqliteSession

async start_monitoring()[source]

Start monitoring active database sessions.

transaction(profile: str = None, release_number: str = 'release_0')[source]

Create a transaction context manager.

Parameters:
  • profile – Profile name to use

  • release_number – Release number for schema versioning

Returns:

Database transaction context manager

Return type:

SqliteSession

acapy_agent.database_manager.databases.sqlite_normalized.database.enc_name(name: str) str[source]

Encode name for database storage.

Parameters:

name – Name to encode

Returns:

Encoded name

acapy_agent.database_manager.databases.sqlite_normalized.database.enc_value(value: str) str[source]

Encode value for database storage.

Parameters:

value – Value to encode

Returns:

Encoded value

acapy_agent.database_manager.databases.sqlite_normalized.session module

SQLite database session implementation.

class acapy_agent.database_manager.databases.sqlite_normalized.session.SqliteSession(database: SqliteDatabase, profile: str, is_txn: bool, release_number: str = 'release_0_1')[source]

Bases: AbstractDatabaseSession

SQLite database session implementation.

async close()[source]

Close session.

async commit()[source]

Commit transaction.

async count(category: str, tag_filter: str | dict = None) int[source]

Count entries in a category.

async fetch(category: str, name: str, tag_filter: str | dict = None, for_update: bool = False) Entry | None[source]

Fetch a single entry.

async fetch_all(category: str, tag_filter: str | dict = None, limit: int = None, for_update: bool = False, order_by: str | None = None, descending: bool = False) Sequence[Entry][source]

Fetch all entries matching criteria.

async insert(category: str, name: str, value: str | bytes = None, tags: dict = None, expiry_ms: int = None)[source]

Insert an entry.

async remove(category: str, name: str)[source]

Remove a single entry.

async remove_all(category: str, tag_filter: str | dict = None) int[source]

Remove all entries matching criteria.

async replace(category: str, name: str, value: str | bytes = None, tags: dict = None, expiry_ms: int = None)[source]

Replace an entry.

async rollback()[source]

Rollback transaction.

translate_error(error: Exception) DBStoreError[source]

Translate database-specific errors to DBStoreError.