Source code for acapy_agent.indy.credx.holder_kanon

"""Indy holder implementation."""

import asyncio
import inspect
import json
import logging
import re
from typing import Dict, Optional, Sequence, Tuple

from indy_credx import (
    Credential,
    CredentialRequest,
    CredentialRevocationState,
    CredxError,
    LinkSecret,
    Presentation,
    PresentCredentials,
)
from uuid_utils import uuid4

from ...core.profile import Profile, ProfileSession
from ...database_manager.db_errors import DBCode, DBError
from ...ledger.base import BaseLedger
from ...wallet.error import WalletNotFoundError
from ..holder import IndyHolder, IndyHolderError

LOGGER = logging.getLogger(__name__)

CATEGORY_CREDENTIAL = "credential"
CATEGORY_LINK_SECRET = "master_secret"

ERR_FETCH_LINK_SECRET = "Error fetching link secret"
ERR_LOAD_LINK_SECRET = "Error loading link secret"
ERR_CREATE_LINK_SECRET = "Error creating link secret"
ERR_SAVE_LINK_SECRET = "Error saving link secret"
ERR_CREATE_CRED_REQ = "Error creating credential request"
ERR_PROCESS_RECEIVED_CRED = "Error processing received credential"
ERR_PARSING_SCHEMA_ID = "Error parsing credential schema ID: {}"
ERR_PARSING_CRED_DEF_ID = "Error parsing credential definition ID: {}"
ERR_STORING_CREDENTIAL = "Error storing credential"
ERR_RETRIEVING_CREDENTIALS = "Error retrieving credentials"
ERR_LOADING_STORED_CREDENTIAL = "Error loading stored credential"
ERR_UNKNOWN_PRESENTATION_REQ_REF = "Unknown presentation request referent: {}"
ERR_RETRIEVING_CREDENTIAL = "Error retrieving credential"
ERR_LOADING_REQUESTED_CREDENTIAL = "Error loading requested credential"
ERR_RETRIEVING_CRED_MIME_TYPES = "Error retrieving credential mime types"
ERR_CREATE_PRESENTATION = "Error creating presentation"
ERR_CREATE_REV_STATE = "Error creating revocation state"


def _make_cred_info(cred_id, cred: Credential):
    cred_info = cred.to_dict()  # not secure!
    rev_info = cred_info["signature"]["r_credential"]
    return {
        "referent": cred_id,
        "schema_id": cred_info["schema_id"],
        "cred_def_id": cred_info["cred_def_id"],
        "rev_reg_id": cred_info["rev_reg_id"],
        "cred_rev_id": str(rev_info["i"]) if rev_info else None,
        "attrs": {name: val["raw"] for (name, val) in cred_info["values"].items()},
    }


def _normalize_attr_name(name: str) -> str:
    return name.replace(" ", "")


[docs] class IndyCredxHolder(IndyHolder): """Indy-credx holder class.""" LINK_SECRET_ID = "default" def __init__(self, profile: Profile): """Initialize an IndyCredxHolder instance. Args: profile: The active profile instance """ self._profile = profile @property def profile(self) -> Profile: """Accessor for the profile instance.""" return self._profile async def _fetch_link_secret_record(self, session: ProfileSession): """Fetch link secret record from storage.""" try: fetch_method = session.handle.fetch if inspect.iscoroutinefunction(fetch_method): return await fetch_method( CATEGORY_LINK_SECRET, IndyCredxHolder.LINK_SECRET_ID ) return fetch_method(CATEGORY_LINK_SECRET, IndyCredxHolder.LINK_SECRET_ID) except DBError as err: LOGGER.error("%s", ERR_FETCH_LINK_SECRET) raise IndyHolderError(ERR_FETCH_LINK_SECRET) from err def _load_existing_link_secret(self, record) -> LinkSecret: """Load existing link secret from record.""" try: LOGGER.debug("Loading LinkSecret") secret = LinkSecret.load(record.raw_value) LOGGER.debug("Loaded existing link secret.") return secret except CredxError as err: LOGGER.info("Attempt fallback method after error loading link secret") return self._load_link_secret_fallback(record, err) def _load_link_secret_fallback(self, record, original_err) -> LinkSecret: """Attempt fallback method to load link secret.""" try: ms_string = record.value.decode("ascii") link_secret_dict = {"value": {"ms": ms_string}} secret = LinkSecret.load(link_secret_dict) LOGGER.debug("Loaded LinkSecret from AnonCreds secret.") return secret except CredxError: LOGGER.error("%s", ERR_LOAD_LINK_SECRET) raise IndyHolderError(ERR_LOAD_LINK_SECRET) from original_err async def _create_and_save_link_secret(self, session: ProfileSession) -> LinkSecret: """Create and save a new link secret.""" secret = self._create_new_link_secret() try: insert_method = session.handle.insert if inspect.iscoroutinefunction(insert_method): await insert_method( CATEGORY_LINK_SECRET, IndyCredxHolder.LINK_SECRET_ID, secret.to_json_buffer(), ) else: insert_method( CATEGORY_LINK_SECRET, IndyCredxHolder.LINK_SECRET_ID, secret.to_json_buffer(), ) LOGGER.debug("Saved new link secret.") return secret except DBError as err: if self._is_duplicate_error(err): return None # Retry needed LOGGER.error("%s", ERR_SAVE_LINK_SECRET) raise IndyHolderError(ERR_SAVE_LINK_SECRET) from err def _create_new_link_secret(self) -> LinkSecret: """Create a new link secret.""" try: secret = LinkSecret.create() LOGGER.debug("Created new link secret.") return secret except CredxError as err: LOGGER.error("%s", ERR_CREATE_LINK_SECRET) raise IndyHolderError(ERR_CREATE_LINK_SECRET) from err def _is_duplicate_error(self, err) -> bool: """Check if error is a duplicate record error.""" try: return err.code in DBCode.DUPLICATE except Exception: return False
[docs] async def create_credential_request( self, credential_offer: dict, credential_definition: dict, holder_did: str ) -> Tuple[str, str]: """Create a credential request for the given credential offer. Args: credential_offer: The credential offer to create request for credential_definition: The credential definition to create an offer for holder_did: the DID of the agent making the request Returns: A tuple of the credential request and credential request metadata """ try: secret = await self.get_link_secret() ( cred_req, cred_req_metadata, ) = await asyncio.get_event_loop().run_in_executor( None, CredentialRequest.create, holder_did, credential_definition, secret, IndyCredxHolder.LINK_SECRET_ID, credential_offer, ) except CredxError as err: raise IndyHolderError(ERR_CREATE_CRED_REQ) from err cred_req_json, cred_req_metadata_json = ( cred_req.to_json(), cred_req_metadata.to_json(), ) LOGGER.debug( "Created credential request. " "credential_request_json=%s credential_request_metadata_json=%s", cred_req_json, cred_req_metadata_json, ) return cred_req_json, cred_req_metadata_json
def _parse_and_validate_ids(self, cred_recvd) -> tuple[tuple, tuple]: """Parse and validate schema and credential definition IDs. Returns: Tuple of (schema_id_parts, cdef_id_parts) """ schema_id = cred_recvd.schema_id # Handle both qualified (did:sov:V4SG:2:schema:1.0) # and unqualified (V4SG:2:schema:1.0) schema IDs schema_id_parts = re.match( r"^([^:]+(?::[^:]+:[^:]+)?):2:([^:]+):([^:]+)$", schema_id ) if not schema_id_parts: raise IndyHolderError(ERR_PARSING_SCHEMA_ID.format(schema_id)) cred_def_id = cred_recvd.cred_def_id cdef_id_parts = re.match( r"^([^:]+(?::[^:]+:[^:]+)?):3:CL:([^:]+):([^:]+)$", cred_def_id ) if not cdef_id_parts: raise IndyHolderError(ERR_PARSING_CRED_DEF_ID.format(cred_def_id)) return schema_id_parts, cdef_id_parts def _normalize_did(self, did: str) -> str: """Normalize DID to unqualified format for consistent storage.""" return did[8:] if did.startswith("did:sov:") else did def _build_credential_tags( self, cred_recvd, schema_id_parts: tuple, cdef_id_parts: tuple, credential_data: dict, credential_attr_mime_types: Optional[dict], ) -> tuple[dict, dict]: """Build tags and mime_types for credential storage. Returns: Tuple of (tags, mime_types) """ schema_issuer_did = self._normalize_did(schema_id_parts[1]) issuer_did = self._normalize_did(cdef_id_parts[1]) tags = { "schema_id": cred_recvd.schema_id, "schema_issuer_did": schema_issuer_did, "schema_name": schema_id_parts[2], "schema_version": schema_id_parts[3], "issuer_did": issuer_did, "cred_def_id": cred_recvd.cred_def_id, "rev_reg_id": cred_recvd.rev_reg_id or "None", } mime_types = {} for k, attr_value in credential_data["values"].items(): attr_name = _normalize_attr_name(k) tags[f"attr::{attr_name}::value"] = attr_value["raw"] if credential_attr_mime_types and k in credential_attr_mime_types: mime_types[k] = credential_attr_mime_types[k] return tags, mime_types async def _insert_credential_record( self, txn: ProfileSession, credential_id: str, cred_recvd, tags: dict ) -> None: """Insert credential record into storage.""" insert_method = txn.handle.insert if inspect.iscoroutinefunction(insert_method): await insert_method( CATEGORY_CREDENTIAL, credential_id, cred_recvd.to_json_buffer(), tags=tags, ) else: insert_method( CATEGORY_CREDENTIAL, credential_id, cred_recvd.to_json_buffer(), tags=tags, ) async def _insert_mime_types_record( self, txn: ProfileSession, credential_id: str, mime_types: dict ) -> None: """Insert MIME types record if needed.""" if not mime_types: return insert_method = txn.handle.insert if inspect.iscoroutinefunction(insert_method): await insert_method( IndyHolder.RECORD_TYPE_MIME_TYPES, credential_id, value_json=mime_types, ) else: insert_method( IndyHolder.RECORD_TYPE_MIME_TYPES, credential_id, value_json=mime_types, ) async def _commit_transaction(self, txn) -> None: """Commit transaction if commit method exists.""" commit_method = getattr(txn, "commit", None) if not commit_method: return if inspect.iscoroutinefunction(commit_method): await commit_method() else: commit_method()
[docs] async def store_credential( self, credential_definition: dict, credential_data: dict, credential_request_metadata: dict, credential_attr_mime_types: Optional[dict] = None, credential_id: Optional[str] = None, rev_reg_def: Optional[dict] = None, ) -> str: """Store a credential in the wallet. Args: credential_definition: Credential definition for this credential credential_data: Credential data generated by the issuer credential_request_metadata: credential request metadata generated by the issuer credential_attr_mime_types: dict mapping attribute names to (optional) MIME types to store as non-secret record, if specified credential_id: optionally override the stored credential id rev_reg_def: revocation registry definition in json Returns: the ID of the stored credential """ try: secret = await self.get_link_secret() cred = Credential.load(credential_data) cred_recvd = await asyncio.get_event_loop().run_in_executor( None, cred.process, credential_request_metadata, secret, credential_definition, rev_reg_def, ) except CredxError as err: raise IndyHolderError(ERR_PROCESS_RECEIVED_CRED) from err schema_id_parts, cdef_id_parts = self._parse_and_validate_ids(cred_recvd) credential_id = credential_id or str(uuid4()) tags, mime_types = self._build_credential_tags( cred_recvd, schema_id_parts, cdef_id_parts, credential_data, credential_attr_mime_types, ) try: async with self._profile.transaction() as txn: await self._insert_credential_record(txn, credential_id, cred_recvd, tags) await self._insert_mime_types_record(txn, credential_id, mime_types) await self._commit_transaction(txn) except DBError as err: raise IndyHolderError(ERR_STORING_CREDENTIAL) from err return credential_id
[docs] async def get_credentials(self, *, offset: int, limit: int, wql: dict): """Get credentials stored in the wallet. Args: offset: Starting index limit: Number of records to return wql: wql query dict """ result = [] try: rows = self._profile.store.scan( category=CATEGORY_CREDENTIAL, tag_filter=wql, offset=offset, limit=limit, profile=self._profile.settings.get("wallet.askar_profile"), ) async for row in rows: cred = Credential.load(row.raw_value) result.append(_make_cred_info(row.name, cred)) except DBError as err: raise IndyHolderError(ERR_RETRIEVING_CREDENTIALS) from err except CredxError as err: raise IndyHolderError(ERR_LOADING_STORED_CREDENTIAL) from err return result
[docs] async def get_credentials_for_presentation_request_by_referent( self, presentation_request: dict, referents: Sequence[str], *, offset: int, limit: int, extra_query: Optional[dict] = None, ): """Get credentials stored in the wallet. Args: presentation_request: Valid presentation request from issuer referents: Presentation request referents to use to search for creds offset: Starting index limit: Maximum number of records to return extra_query: wql query dict """ extra_query = extra_query or {} referents = self._get_effective_referents(presentation_request, referents) creds = {} for reft in referents: await self._process_referent( presentation_request, reft, creds, extra_query, offset, limit ) self._finalize_credential_referents(creds) return list(creds.values())
def _get_effective_referents( self, presentation_request: dict, referents: Sequence[str] ) -> Sequence[str]: """Get effective referents for the presentation request.""" if not referents: return ( *presentation_request["requested_attributes"], *presentation_request["requested_predicates"], ) return referents async def _process_referent( self, presentation_request: dict, reft: str, creds: dict, extra_query: dict, offset: int, limit: int, ): """Process a single referent to find matching credentials.""" names, restr = self._extract_referent_info(presentation_request, reft) tag_filter = self._build_tag_filter(names, restr, extra_query) rows = self._profile.store.scan( category=CATEGORY_CREDENTIAL, tag_filter=tag_filter, offset=offset, limit=limit, profile=self._profile.settings.get("wallet.askar_profile"), ) async for row in rows: self._add_credential_to_results(row, reft, creds, presentation_request) def _extract_referent_info( self, presentation_request: dict, reft: str ) -> tuple[set, dict]: """Extract names and restrictions from a referent.""" names = set() if reft in presentation_request["requested_attributes"]: attr = presentation_request["requested_attributes"][reft] names = self._extract_attribute_names(attr) restr = attr.get("restrictions") elif reft in presentation_request["requested_predicates"]: pred = presentation_request["requested_predicates"][reft] if "name" in pred: names.add(_normalize_attr_name(pred["name"])) restr = pred.get("restrictions") else: raise IndyHolderError(ERR_UNKNOWN_PRESENTATION_REQ_REF.format(reft)) return names, restr def _extract_attribute_names(self, attr: dict) -> set: """Extract attribute names from attribute specification.""" names = set() if "name" in attr: names.add(_normalize_attr_name(attr["name"])) elif "names" in attr: names.update(_normalize_attr_name(name) for name in attr["names"]) return names def _build_tag_filter(self, names: set, restr: dict, extra_query: dict) -> dict: """Build tag filter for credential search.""" tag_filter = {"$exist": [f"attr::{name}::value" for name in names]} filters_to_combine = [tag_filter] if restr: filters_to_combine.extend(restr if isinstance(restr, list) else [restr]) if extra_query: filters_to_combine.append(extra_query) return {"$and": filters_to_combine} if len(filters_to_combine) > 1 else tag_filter def _add_credential_to_results( self, row, reft: str, creds: dict, presentation_request: dict ): """Add credential to results or update existing entry.""" if row.name in creds: creds[row.name]["presentation_referents"].add(reft) else: cred_info = _make_cred_info(row.name, Credential.load(row.raw_value)) creds[row.name] = { "cred_info": cred_info, "interval": presentation_request.get("non_revoked"), "presentation_referents": {reft}, } def _finalize_credential_referents(self, creds: dict): """Convert presentation referents sets to lists.""" for cred in creds.values(): cred["presentation_referents"] = list(cred["presentation_referents"])
[docs] async def get_credential(self, credential_id: str) -> str: """Get a credential stored in the wallet. Args: credential_id: Credential id to retrieve """ get_cred_method = self._get_credential if inspect.iscoroutinefunction(get_cred_method): cred = await get_cred_method(credential_id) else: cred = get_cred_method(credential_id) return json.dumps(_make_cred_info(credential_id, cred))
async def _get_credential(self, credential_id: str) -> Credential: """Get an unencoded Credential instance from the store.""" try: async with self._profile.session() as session: fetch_method = session.handle.fetch if inspect.iscoroutinefunction(fetch_method): cred = await fetch_method(CATEGORY_CREDENTIAL, credential_id) else: cred = fetch_method(CATEGORY_CREDENTIAL, credential_id) except DBError as err: raise IndyHolderError(ERR_RETRIEVING_CREDENTIAL) from err if not cred: raise WalletNotFoundError( f"Credential {credential_id} not found in wallet {self.profile.name}" ) try: return Credential.load(cred.raw_value) except CredxError as err: raise IndyHolderError(ERR_LOADING_REQUESTED_CREDENTIAL) from err
[docs] async def credential_revoked( self, ledger: BaseLedger, credential_id: str, timestamp_from: Optional[int] = None, timestamp_to: Optional[int] = None, ) -> bool: """Check ledger for revocation status of credential by cred id. Args: ledger (BaseLedger): The ledger to check for revocation status. credential_id (str): The ID of the credential to check. timestamp_from (int, optional): The starting sequence number of the revocation registry delta. Defaults to None. timestamp_to (int, optional): The ending sequence number of the revocation registry delta. Defaults to None. Returns: bool: True if the credential is revoked, False otherwise. """ get_cred_method = self._get_credential if inspect.iscoroutinefunction(get_cred_method): cred = await get_cred_method(credential_id) else: cred = get_cred_method(credential_id) rev_reg_id = cred.rev_reg_id if rev_reg_id: cred_rev_id = cred.rev_reg_index get_delta = ledger.get_revoc_reg_delta if inspect.iscoroutinefunction(get_delta): (rev_reg_delta, _) = await get_delta( rev_reg_id, timestamp_from, timestamp_to, ) else: (rev_reg_delta, _) = get_delta( rev_reg_id, timestamp_from, timestamp_to, ) return cred_rev_id in rev_reg_delta["value"].get("revoked", []) else: return False
[docs] async def delete_credential(self, credential_id: str): """Remove a credential stored in the wallet. Args: credential_id: Credential id to remove """ try: async with self._profile.session() as session: remove_method = session.handle.remove if inspect.iscoroutinefunction(remove_method): await remove_method(CATEGORY_CREDENTIAL, credential_id) await remove_method(IndyHolder.RECORD_TYPE_MIME_TYPES, credential_id) else: remove_method(CATEGORY_CREDENTIAL, credential_id) remove_method(IndyHolder.RECORD_TYPE_MIME_TYPES, credential_id) except DBError as err: # Ignore not-found deletes; re-raise others try: if err.code in DBCode.NOT_FOUND: pass else: raise IndyHolderError("Error deleting credential") from err except Exception: # If err lacks a code, treat as unexpected raise IndyHolderError("Error deleting credential") from err
[docs] async def get_mime_type( self, credential_id: str, attr: Optional[str] = None ) -> dict | str: """Get MIME type per attribute (or for all attributes). Args: credential_id: credential id attr: attribute of interest or omit for all Returns: Attribute MIME type or dict mapping attribute names to MIME types attr_meta_json = all_meta.tags.get(attr) """ try: async with self._profile.session() as session: fetch_method = session.handle.fetch if inspect.iscoroutinefunction(fetch_method): mime_types_record = await fetch_method( IndyHolder.RECORD_TYPE_MIME_TYPES, credential_id, ) else: mime_types_record = fetch_method( IndyHolder.RECORD_TYPE_MIME_TYPES, credential_id, ) except DBError as err: raise IndyHolderError(ERR_RETRIEVING_CRED_MIME_TYPES) from err values = mime_types_record and mime_types_record.value_json if values: return values.get(attr) if attr else values
[docs] async def create_presentation( self, presentation_request: dict, requested_credentials: dict, schemas: dict, credential_definitions: dict, rev_states: Optional[dict] = None, ) -> str: """Get credentials stored in the wallet. Args: presentation_request: Valid indy format presentation request requested_credentials: Indy format requested credentials schemas: Indy formatted schemas JSON credential_definitions: Indy formatted credential definitions JSON rev_states: Indy format revocation states JSON """ creds: Dict[str, Credential] = {} present_creds = PresentCredentials() await self._process_requested_attributes( requested_credentials, creds, present_creds, rev_states ) await self._process_requested_predicates( requested_credentials, creds, present_creds, rev_states ) return await self._create_final_presentation( presentation_request, requested_credentials, present_creds, schemas, credential_definitions, )
async def _process_requested_attributes( self, requested_credentials: dict, creds: dict, present_creds, rev_states: dict ): """Process requested attributes for presentation.""" req_attrs = requested_credentials.get("requested_attributes") or {} for reft, detail in req_attrs.items(): cred_id = detail["cred_id"] if cred_id not in creds: creds[cred_id] = await self._get_credential(cred_id) timestamp, rev_state = self._get_rev_state(cred_id, detail, creds, rev_states) present_creds.add_attributes( creds[cred_id], reft, reveal=detail["revealed"], timestamp=timestamp, rev_state=rev_state, ) async def _process_requested_predicates( self, requested_credentials: dict, creds: dict, present_creds, rev_states: dict ): """Process requested predicates for presentation.""" req_preds = requested_credentials.get("requested_predicates") or {} for reft, detail in req_preds.items(): cred_id = detail["cred_id"] if cred_id not in creds: creds[cred_id] = await self._get_credential(cred_id) timestamp, rev_state = self._get_rev_state(cred_id, detail, creds, rev_states) present_creds.add_predicates( creds[cred_id], reft, timestamp=timestamp, rev_state=rev_state, ) def _get_rev_state( self, cred_id: str, detail: dict, creds: dict, rev_states: dict ) -> tuple: """Get revocation state for a credential.""" cred = creds[cred_id] rev_reg_id = cred.rev_reg_id timestamp = detail.get("timestamp") if rev_reg_id else None rev_state = None if timestamp: self._validate_rev_states(rev_states, rev_reg_id, cred_id) rev_state = rev_states[rev_reg_id].get(timestamp) if not rev_state: raise IndyHolderError( f"No revocation states provided for credential '{cred_id}' " f"with rev_reg_id '{rev_reg_id}' at timestamp {timestamp}" ) return timestamp, rev_state def _validate_rev_states(self, rev_states: dict, rev_reg_id: str, cred_id: str): """Validate that revocation states are available.""" if not rev_states or rev_reg_id not in rev_states: raise IndyHolderError( f"No revocation states provided for credential '{cred_id}' " f"with rev_reg_id '{rev_reg_id}'" ) async def _create_final_presentation( self, presentation_request: dict, requested_credentials: dict, present_creds, schemas: dict, credential_definitions: dict, ) -> str: """Create the final presentation.""" self_attest = requested_credentials.get("self_attested_attributes") or {} try: get_ls = self.get_link_secret if inspect.iscoroutinefunction(get_ls): secret = await get_ls() else: secret = get_ls() presentation = await asyncio.get_event_loop().run_in_executor( None, Presentation.create, presentation_request, present_creds, self_attest, secret, schemas.values(), credential_definitions.values(), ) except CredxError as err: raise IndyHolderError(ERR_CREATE_PRESENTATION) from err return presentation.to_json()
[docs] async def create_revocation_state( self, cred_rev_id: str, rev_reg_def: dict, rev_reg_delta: dict, timestamp: int, tails_file_path: str, ) -> str: """Create current revocation state for a received credential. This method creates the current revocation state for a received credential. It takes the credential revocation ID, revocation registry definition, revocation delta, delta timestamp, and tails file path as input parameters. Args: cred_rev_id (str): The credential revocation ID in the revocation registry. rev_reg_def (dict): The revocation registry definition. rev_reg_delta (dict): The revocation delta. timestamp (int): The delta timestamp. tails_file_path (str): The path to the tails file. Returns: str: The revocation state. Raises: IndyHolderError: If there is an error creating the revocation state. """ try: rev_state = await asyncio.get_event_loop().run_in_executor( None, CredentialRevocationState.create, rev_reg_def, rev_reg_delta, int(cred_rev_id), timestamp, tails_file_path, ) except CredxError as err: raise IndyHolderError(ERR_CREATE_REV_STATE) from err return rev_state.to_json()