Source code for aries_cloudagent.issuer.indy

"""Indy issuer implementation."""

import json
import logging
from typing import Sequence, Tuple

import indy.anoncreds
import indy.blob_storage
from indy.error import AnoncredsRevocationRegistryFullError, IndyError, ErrorCode

from ..messaging.util import encode

from .base import (
    BaseIssuer,
    IssuerError,
    IssuerRevocationRegistryFullError,
    DEFAULT_CRED_DEF_TAG,
    DEFAULT_ISSUANCE_TYPE,
    DEFAULT_SIGNATURE_TYPE,
)
from ..indy import create_tails_reader, create_tails_writer
from ..indy.error import IndyErrorHandler


[docs]class IndyIssuer(BaseIssuer): """Indy issuer class.""" def __init__(self, wallet): """ Initialize an IndyIssuer instance. Args: wallet: IndyWallet instance """ self.logger = logging.getLogger(__name__) self.wallet = wallet
[docs] def make_schema_id( self, origin_did: str, schema_name: str, schema_version: str ) -> str: """Derive the ID for a schema.""" return f"{origin_did}:2:{schema_name}:{schema_version}"
[docs] async def create_and_store_schema( self, origin_did: str, schema_name: str, schema_version: str, attribute_names: Sequence[str], ) -> Tuple[str, str]: """ Create a new credential schema and store it in the wallet. Args: origin_did: the DID issuing the credential definition schema_name: the schema name schema_version: the schema version attribute_names: a sequence of schema attribute names Returns: A tuple of the schema ID and JSON """ with IndyErrorHandler("Error when creating schema", IssuerError): schema_id, schema_json = await indy.anoncreds.issuer_create_schema( origin_did, schema_name, schema_version, json.dumps(attribute_names), ) return (schema_id, schema_json)
[docs] def make_credential_definition_id( self, origin_did: str, schema: dict, signature_type: str = None, tag: str = None ) -> str: """Derive the ID for a credential definition.""" signature_type = signature_type or DEFAULT_SIGNATURE_TYPE tag = tag or DEFAULT_CRED_DEF_TAG return f"{origin_did}:3:{signature_type}:{str(schema['seqNo'])}:{tag}"
[docs] async def credential_definition_in_wallet( self, credential_definition_id: str ) -> bool: """ Check whether a given credential definition ID is present in the wallet. Args: credential_definition_id: The credential definition ID to check """ try: await indy.anoncreds.issuer_create_credential_offer( self.wallet.handle, credential_definition_id ) return True except IndyError as error: if error.error_code not in ( ErrorCode.CommonInvalidStructure, ErrorCode.WalletItemNotFound, ): raise IndyErrorHandler.wrap_error( error, "Error when checking wallet for credential definition", IssuerError, ) from error # recognized error signifies no such cred def in wallet: pass return False
[docs] async def create_and_store_credential_definition( self, origin_did: str, schema: dict, signature_type: str = None, tag: str = None, support_revocation: bool = False, ) -> Tuple[str, str]: """ Create a new credential definition and store it in the wallet. Args: origin_did: the DID issuing the credential definition schema: the schema used as a basis signature_type: the credential definition signature type (default 'CL') tag: the credential definition tag support_revocation: whether to enable revocation for this credential def Returns: A tuple of the credential definition ID and JSON """ with IndyErrorHandler("Error when creating credential definition", IssuerError): ( credential_definition_id, credential_definition_json, ) = await indy.anoncreds.issuer_create_and_store_credential_def( self.wallet.handle, origin_did, json.dumps(schema), tag or DEFAULT_CRED_DEF_TAG, signature_type or DEFAULT_SIGNATURE_TYPE, json.dumps({"support_revocation": support_revocation}), ) return (credential_definition_id, credential_definition_json)
[docs] async def create_credential_offer(self, credential_definition_id: str) -> str: """ Create a credential offer for the given credential definition id. Args: credential_definition_id: The credential definition to create an offer for Returns: The created credential offer """ with IndyErrorHandler("Exception when creating credential offer", IssuerError): credential_offer_json = await indy.anoncreds.issuer_create_credential_offer( self.wallet.handle, credential_definition_id ) return credential_offer_json
[docs] async def create_credential( self, schema: dict, credential_offer: dict, credential_request: dict, credential_values: dict, revoc_reg_id: str = None, tails_file_path: str = None, ) -> Tuple[str, str]: """ Create a credential. Args schema: Schema to create credential for credential_offer: Credential Offer to create credential for credential_request: Credential request to create credential for credential_values: Values to go in credential revoc_reg_id: ID of the revocation registry tails_file_path: Path to the local tails file Returns: A tuple of created credential and revocation id """ encoded_values = {} schema_attributes = schema["attrNames"] for attribute in schema_attributes: # Ensure every attribute present in schema to be set. # Extraneous attribute names are ignored. try: credential_value = credential_values[attribute] except KeyError: raise IssuerError( "Provided credential values are missing a value " + f"for the schema attribute '{attribute}'" ) encoded_values[attribute] = {} encoded_values[attribute]["raw"] = str(credential_value) encoded_values[attribute]["encoded"] = encode(credential_value) tails_reader_handle = ( await create_tails_reader(tails_file_path) if tails_file_path is not None else None ) try: ( credential_json, credential_revocation_id, revoc_reg_delta_json, ) = await indy.anoncreds.issuer_create_credential( self.wallet.handle, json.dumps(credential_offer), json.dumps(credential_request), json.dumps(encoded_values), revoc_reg_id, tails_reader_handle, ) except AnoncredsRevocationRegistryFullError: self.logger.error( f"Revocation registry {revoc_reg_id} is full: cannot create credential" ) raise IssuerRevocationRegistryFullError( f"Revocation registry {revoc_reg_id} full" ) except IndyError as error: raise IndyErrorHandler.wrap_error( error, "Error when issuing credential", IssuerError ) from error return credential_json, credential_revocation_id
[docs] async def revoke_credentials( self, revoc_reg_id: str, tails_file_path: str, cred_revoc_ids: Sequence[str] ) -> str: """ Revoke a set of credentials in a revocation registry. Args: revoc_reg_id: ID of the revocation registry tails_file_path: path to the local tails file cred_revoc_ids: sequences of credential indexes in the revocation registry Returns: the combined revocation delta """ tails_reader_handle = await create_tails_reader(tails_file_path) result_json = None for cred_revoc_id in cred_revoc_ids: with IndyErrorHandler("Exception when revoking credential", IssuerError): # may throw AnoncredsInvalidUserRevocId if using ISSUANCE_ON_DEMAND delta_json = await indy.anoncreds.issuer_revoke_credential( self.wallet.handle, tails_reader_handle, revoc_reg_id, cred_revoc_id ) if result_json: result_json = await self.merge_revocation_registry_deltas( result_json, delta_json ) else: result_json = delta_json return result_json
[docs] async def merge_revocation_registry_deltas( self, fro_delta: str, to_delta: str ) -> str: """ Merge revocation registry deltas. Args: fro_delta: original delta in JSON format to_delta: incoming delta in JSON format Returns: Merged delta in JSON format """ return await indy.anoncreds.issuer_merge_revocation_registry_deltas( fro_delta, to_delta )
[docs] async def create_and_store_revocation_registry( self, origin_did: str, cred_def_id: str, revoc_def_type: str, tag: str, max_cred_num: int, tails_base_path: str, issuance_type: str = None, ) -> Tuple[str, str, str]: """ Create a new revocation registry and store it in the wallet. Args: origin_did: the DID issuing the revocation registry cred_def_id: the identifier of the related credential definition revoc_def_type: the revocation registry type (default CL_ACCUM) tag: the unique revocation registry tag max_cred_num: the number of credentials supported in the registry tails_base_path: where to store the tails file issuance_type: optionally override the issuance type Returns: A tuple of the revocation registry ID, JSON, and entry JSON """ tails_writer = await create_tails_writer(tails_base_path) with IndyErrorHandler( "Exception when creating revocation registry", IssuerError ): ( revoc_reg_id, revoc_reg_def_json, revoc_reg_entry_json, ) = await indy.anoncreds.issuer_create_and_store_revoc_reg( self.wallet.handle, origin_did, revoc_def_type, tag, cred_def_id, json.dumps( { "max_cred_num": max_cred_num, "issuance_type": issuance_type or DEFAULT_ISSUANCE_TYPE, } ), tails_writer, ) return (revoc_reg_id, revoc_reg_def_json, revoc_reg_entry_json)