Source code for acapy_agent.indy.credx.issuer

"""Indy issuer implementation."""

import asyncio
import logging
from typing import TYPE_CHECKING, Optional, Sequence, Tuple

from aries_askar import AskarError
from indy_credx import (
    Credential,
    CredentialDefinition,
    CredentialOffer,
    CredentialRevocationConfig,
    CredxError,
    RevocationRegistry,
    RevocationRegistryDefinition,
    RevocationRegistryDefinitionPrivate,
    RevocationRegistryDelta,
    Schema,
)

from ...utils.general import strip_did_prefix
from ..constants import (
    CATEGORY_CRED_DEF,
    CATEGORY_CRED_DEF_KEY_PROOF,
    CATEGORY_CRED_DEF_PRIVATE,
    CATEGORY_REV_REG,
    CATEGORY_REV_REG_DEF,
    CATEGORY_REV_REG_DEF_PRIVATE,
    CATEGORY_REV_REG_INFO,
    CATEGORY_SCHEMA,
)
from ..issuer import (
    DEFAULT_CRED_DEF_TAG,
    DEFAULT_SIGNATURE_TYPE,
    IndyIssuer,
    IndyIssuerError,
    IndyIssuerRevocationRegistryFullError,
)

if TYPE_CHECKING:
    from ...askar.profile import AskarProfile

LOGGER = logging.getLogger(__name__)


[docs] class IndyCredxIssuer(IndyIssuer): """Indy-Credx issuer class.""" def __init__(self, profile: "AskarProfile"): """Initialize an IndyCredxIssuer instance. Args: profile: The active profile instance """ self._profile = profile @property def profile(self) -> "AskarProfile": """Accessor for the profile instance.""" return self._profile
[docs] async def create_schema( self, origin_did: str, schema_name: str, schema_version: str, attribute_names: Sequence[str], ) -> Tuple[str, str]: """Create a new credential schema and store it in the wallet. Args: origin_did: the DID issuing the credential definition schema_name: the schema name schema_version: the schema version attribute_names: a sequence of schema attribute names Returns: A tuple of the schema ID and JSON """ try: schema = Schema.create( strip_did_prefix(origin_did), schema_name, schema_version, attribute_names, ) schema_id = schema.id schema_json = schema.to_json() async with self._profile.session() as session: await session.handle.insert(CATEGORY_SCHEMA, schema_id, schema_json) except CredxError as err: raise IndyIssuerError("Error creating schema") from err except AskarError as err: raise IndyIssuerError("Error storing schema") from err return (schema_id, schema_json)
[docs] async def credential_definition_in_wallet( self, credential_definition_id: str ) -> bool: """Check whether a given credential definition ID is present in the wallet. Args: credential_definition_id: The credential definition ID to check """ try: async with self._profile.session() as session: return ( await session.handle.fetch( CATEGORY_CRED_DEF_PRIVATE, credential_definition_id ) ) is not None except AskarError as err: raise IndyIssuerError("Error checking for credential definition") from err
[docs] async def create_and_store_credential_definition( self, origin_did: str, schema: dict, signature_type: Optional[str] = None, tag: Optional[str] = None, support_revocation: bool = False, ) -> Tuple[str, str]: """Create a new credential definition and store it in the wallet. Args: origin_did (str): The DID issuing the credential definition. schema (dict): The schema to create a credential definition for. signature_type (str, optional): The credential definition signature type (default 'CL'). tag (str, optional): The credential definition tag. support_revocation (bool, optional): Whether to enable revocation for this credential definition. Returns: Tuple[str, str]: A tuple of the credential definition ID and JSON. Raises: IndyIssuerError: If there is an error creating or storing the credential definition. """ try: ( cred_def, cred_def_private, key_proof, ) = await asyncio.get_event_loop().run_in_executor( None, lambda: CredentialDefinition.create( strip_did_prefix(origin_did), schema, signature_type or DEFAULT_SIGNATURE_TYPE, tag or DEFAULT_CRED_DEF_TAG, support_revocation=support_revocation, ), ) cred_def_id = cred_def.id cred_def_json = cred_def.to_json() except CredxError as err: raise IndyIssuerError("Error creating credential definition") from err try: async with self._profile.transaction() as txn: await txn.handle.insert( CATEGORY_CRED_DEF, cred_def_id, cred_def_json, # Note: Indy-SDK uses a separate SchemaId record for this tags={"schema_id": schema["id"]}, ) await txn.handle.insert( CATEGORY_CRED_DEF_PRIVATE, cred_def_id, cred_def_private.to_json_buffer(), ) await txn.handle.insert( CATEGORY_CRED_DEF_KEY_PROOF, cred_def_id, key_proof.to_json_buffer() ) await txn.commit() except AskarError as err: raise IndyIssuerError("Error storing credential definition") from err return (cred_def_id, cred_def_json)
[docs] async def create_credential_offer(self, credential_definition_id: str) -> str: """Create a credential offer for the given credential definition id. Args: credential_definition_id: The credential definition to create an offer for Returns: The new credential offer """ try: async with self._profile.session() as session: cred_def = await session.handle.fetch( CATEGORY_CRED_DEF, credential_definition_id ) key_proof = await session.handle.fetch( CATEGORY_CRED_DEF_KEY_PROOF, credential_definition_id ) except AskarError as err: raise IndyIssuerError("Error retrieving credential definition") from err if not cred_def or not key_proof: raise IndyIssuerError("Credential definition not found for credential offer") try: # The tag holds the full name of the schema, # as opposed to just the sequence number schema_id = cred_def.tags.get("schema_id") cred_def = CredentialDefinition.load(cred_def.raw_value) credential_offer = CredentialOffer.create( schema_id or cred_def.schema_id, cred_def, key_proof.raw_value, ) except CredxError as err: raise IndyIssuerError("Error creating credential offer") from err return credential_offer.to_json()
[docs] async def create_credential( self, schema: dict, credential_offer: dict, credential_request: dict, credential_values: dict, revoc_reg_id: Optional[str] = None, tails_file_path: Optional[str] = None, ) -> Tuple[str, str]: """Create a credential. Args: schema: Schema to create credential for credential_offer: Credential Offer to create credential for credential_request: Credential request to create credential for credential_values: Values to go in credential revoc_reg_id: ID of the revocation registry tails_file_path: The location of the tails file Returns: A tuple of created credential and revocation id """ credential_definition_id = credential_offer["cred_def_id"] try: async with self._profile.session() as session: cred_def = await session.handle.fetch( CATEGORY_CRED_DEF, credential_definition_id ) cred_def_private = await session.handle.fetch( CATEGORY_CRED_DEF_PRIVATE, credential_definition_id ) except AskarError as err: raise IndyIssuerError("Error retrieving credential definition") from err if not cred_def or not cred_def_private: raise IndyIssuerError( "Credential definition not found for credential issuance" ) raw_values = {} schema_attributes = schema["attrNames"] for attribute in schema_attributes: # Ensure every attribute present in schema to be set. # Extraneous attribute names are ignored. try: credential_value = credential_values[attribute] except KeyError: raise IndyIssuerError( "Provided credential values are missing a value " f"for the schema attribute '{attribute}'" ) raw_values[attribute] = str(credential_value) if revoc_reg_id: try: async with self._profile.transaction() as txn: rev_reg = await txn.handle.fetch(CATEGORY_REV_REG, revoc_reg_id) rev_reg_info = await txn.handle.fetch( CATEGORY_REV_REG_INFO, revoc_reg_id, for_update=True ) rev_reg_def = await txn.handle.fetch( CATEGORY_REV_REG_DEF, revoc_reg_id ) rev_key = await txn.handle.fetch( CATEGORY_REV_REG_DEF_PRIVATE, revoc_reg_id ) if not rev_reg: raise IndyIssuerError("Revocation registry not found") if not rev_reg_info: raise IndyIssuerError("Revocation registry metadata not found") if not rev_reg_def: raise IndyIssuerError("Revocation registry definition not found") if not rev_key: raise IndyIssuerError( "Revocation registry definition private data not found" ) # NOTE: we increment the index ahead of time to keep the # transaction short. The revocation registry itself will NOT # be updated because we always use ISSUANCE_BY_DEFAULT. # If something goes wrong later, the index will be skipped. # FIXME - double check issuance type in case of upgraded wallet? rev_info = rev_reg_info.value_json rev_reg_index = rev_info["curr_id"] + 1 try: rev_reg_def = RevocationRegistryDefinition.load( rev_reg_def.raw_value ) except CredxError as err: raise IndyIssuerError( "Error loading revocation registry definition" ) from err if rev_reg_index > rev_reg_def.max_cred_num: raise IndyIssuerRevocationRegistryFullError( "Revocation registry is full" ) rev_info["curr_id"] = rev_reg_index await txn.handle.replace( CATEGORY_REV_REG_INFO, revoc_reg_id, value_json=rev_info ) await txn.commit() except AskarError as err: raise IndyIssuerError("Error updating revocation registry index") from err revoc = CredentialRevocationConfig( rev_reg_def, rev_key.raw_value, rev_reg.raw_value, rev_reg_index, rev_info.get("used_ids") or [], ) credential_revocation_id = str(rev_reg_index) else: revoc = None credential_revocation_id = None # This is for compatibility with an anoncreds holder if not credential_request.get("prover_did"): credential_request["prover_did"] = credential_request["entropy"] del credential_request["entropy"] try: ( credential, _upd_rev_reg, _delta, ) = await asyncio.get_event_loop().run_in_executor( None, Credential.create, cred_def.raw_value, cred_def_private.raw_value, credential_offer, credential_request, raw_values, None, revoc, ) except CredxError as err: raise IndyIssuerError("Error creating credential") from err return credential.to_json(), credential_revocation_id
[docs] async def revoke_credentials( self, cred_def_id: str, revoc_reg_id: str, tails_file_path: str, cred_revoc_ids: Sequence[str], ) -> Tuple[str, Sequence[str]]: """Revoke a set of credentials in a revocation registry. Args: cred_def_id: ID of the credential definition revoc_reg_id: ID of the revocation registry tails_file_path: path to the local tails file cred_revoc_ids: sequences of credential indexes in the revocation registry Returns: Tuple with the combined revocation delta, list of cred rev ids not revoked """ delta = None failed_crids = set() max_attempt = 5 attempt = 0 while True: attempt += 1 if attempt >= max_attempt: raise IndyIssuerError("Repeated conflict attempting to update registry") try: async with self._profile.session() as session: cred_def = await session.handle.fetch(CATEGORY_CRED_DEF, cred_def_id) rev_reg_def = await session.handle.fetch( CATEGORY_REV_REG_DEF, revoc_reg_id ) rev_reg_def_private = await session.handle.fetch( CATEGORY_REV_REG_DEF_PRIVATE, revoc_reg_id ) rev_reg = await session.handle.fetch(CATEGORY_REV_REG, revoc_reg_id) rev_reg_info = await session.handle.fetch( CATEGORY_REV_REG_INFO, revoc_reg_id ) if not cred_def: raise IndyIssuerError("Credential definition not found") if not rev_reg_def: raise IndyIssuerError("Revocation registry definition not found") if not rev_reg_def_private: raise IndyIssuerError( "Revocation registry definition private key not found" ) if not rev_reg: raise IndyIssuerError("Revocation registry not found") if not rev_reg_info: raise IndyIssuerError("Revocation registry metadata not found") except AskarError as err: raise IndyIssuerError("Error retrieving revocation registry") from err try: cred_def = CredentialDefinition.load(cred_def.raw_value) except CredxError as err: raise IndyIssuerError("Error loading credential definition") from err try: rev_reg_def = RevocationRegistryDefinition.load(rev_reg_def.raw_value) except CredxError as err: raise IndyIssuerError( "Error loading revocation registry definition" ) from err try: rev_reg_def_private = RevocationRegistryDefinitionPrivate.load( rev_reg_def_private.raw_value ) except CredxError as err: raise IndyIssuerError( "Error loading revocation registry private key" ) from err try: rev_reg = RevocationRegistry.load(rev_reg.raw_value) except CredxError as err: raise IndyIssuerError("Error loading revocation registry") from err rev_crids = set() failed_crids = set() max_cred_num = rev_reg_def.max_cred_num rev_info = rev_reg_info.value_json used_ids = set(rev_info.get("used_ids") or []) for rev_id in cred_revoc_ids: rev_id = int(rev_id) if rev_id < 1 or rev_id > max_cred_num: LOGGER.error( "Skipping requested credential revocation" "on rev reg id %s, cred rev id=%s not in range", revoc_reg_id, rev_id, ) failed_crids.add(rev_id) elif rev_id > rev_info["curr_id"]: LOGGER.warning( "Skipping requested credential revocation" "on rev reg id %s, cred rev id=%s not yet issued", revoc_reg_id, rev_id, ) failed_crids.add(rev_id) elif rev_id in used_ids: LOGGER.warning( "Skipping requested credential revocation" "on rev reg id %s, cred rev id=%s already revoked", revoc_reg_id, rev_id, ) failed_crids.add(rev_id) else: rev_crids.add(rev_id) if not rev_crids: break try: delta = await asyncio.get_event_loop().run_in_executor( None, lambda: rev_reg.update( cred_def, rev_reg_def, rev_reg_def_private, issued=None, revoked=list(rev_crids), # revoked ), ) except CredxError as err: raise IndyIssuerError("Error updating revocation registry") from err try: async with self._profile.transaction() as txn: rev_reg_upd = await txn.handle.fetch( CATEGORY_REV_REG, revoc_reg_id, for_update=True ) rev_info_upd = await txn.handle.fetch( CATEGORY_REV_REG_INFO, revoc_reg_id, for_update=True ) if not rev_reg_upd or not rev_reg_info: LOGGER.warning( "Revocation registry missing, skipping update: {}", revoc_reg_id, ) delta = None break rev_info_upd = rev_info_upd.value_json if rev_info_upd != rev_info: # handle concurrent update to the registry by retrying continue await txn.handle.replace( CATEGORY_REV_REG, revoc_reg_id, rev_reg.to_json_buffer() ) used_ids.update(rev_crids) rev_info_upd["used_ids"] = sorted(used_ids) await txn.handle.replace( CATEGORY_REV_REG_INFO, revoc_reg_id, value_json=rev_info_upd ) await txn.commit() except AskarError as err: raise IndyIssuerError("Error saving revocation registry") from err break return ( delta and delta.to_json(), [str(rev_id) for rev_id in sorted(failed_crids)], )
[docs] async def merge_revocation_registry_deltas( self, fro_delta: str, to_delta: str ) -> str: """Merge revocation registry deltas. Args: fro_delta: original delta in JSON format to_delta: incoming delta in JSON format Returns: Merged delta in JSON format """ def update(d1, d2): try: delta = RevocationRegistryDelta.load(d1) delta.update_with(d2) return delta.to_json() except CredxError as err: raise IndyIssuerError("Error merging revocation registry deltas") from err return await asyncio.get_event_loop().run_in_executor( None, update, fro_delta, to_delta )
[docs] async def create_and_store_revocation_registry( self, origin_did: str, cred_def_id: str, revoc_def_type: str, tag: str, max_cred_num: int, tails_base_path: str, ) -> Tuple[str, str, str]: """Create a new revocation registry and store it in the wallet. Args: origin_did: the DID issuing the revocation registry cred_def_id: the identifier of the related credential definition revoc_def_type: the revocation registry type (default CL_ACCUM) tag: the unique revocation registry tag max_cred_num: the number of credentials supported in the registry tails_base_path: where to store the tails file issuance_type: optionally override the issuance type Returns: A tuple of the revocation registry ID, JSON, and entry JSON """ try: async with self._profile.session() as session: cred_def = await session.handle.fetch(CATEGORY_CRED_DEF, cred_def_id) except AskarError as err: raise IndyIssuerError("Error retrieving credential definition") from err if not cred_def: raise IndyIssuerError( "Credential definition not found for revocation registry" ) try: ( rev_reg_def, rev_reg_def_private, rev_reg, _rev_reg_delta, ) = await asyncio.get_event_loop().run_in_executor( None, lambda: RevocationRegistryDefinition.create( strip_did_prefix(origin_did), cred_def.raw_value, tag, revoc_def_type, max_cred_num, tails_dir_path=tails_base_path, ), ) except CredxError as err: raise IndyIssuerError("Error creating revocation registry") from err rev_reg_def_id = rev_reg_def.id rev_reg_def_json = rev_reg_def.to_json() rev_reg_json = rev_reg.to_json() try: async with self._profile.transaction() as txn: await txn.handle.insert(CATEGORY_REV_REG, rev_reg_def_id, rev_reg_json) await txn.handle.insert( CATEGORY_REV_REG_INFO, rev_reg_def_id, value_json={"curr_id": 0, "used_ids": []}, ) await txn.handle.insert( CATEGORY_REV_REG_DEF, rev_reg_def_id, rev_reg_def_json ) await txn.handle.insert( CATEGORY_REV_REG_DEF_PRIVATE, rev_reg_def_id, rev_reg_def_private.to_json_buffer(), ) await txn.commit() except AskarError as err: raise IndyIssuerError("Error saving new revocation registry") from err return ( rev_reg_def_id, rev_reg_def_json, rev_reg_json, )