Source code for aries_cloudagent.holder.indy

"""Indy holder implementation."""

import json
import logging

from collections import OrderedDict
from typing import Sequence, Tuple, Union

import indy.anoncreds
from indy.error import ErrorCode, IndyError

from ..indy import create_tails_reader
from ..indy.error import IndyErrorHandler
from ..storage.indy import IndyStorage
from ..storage.error import StorageError, StorageNotFoundError
from ..storage.record import StorageRecord

from ..wallet.error import WalletNotFoundError

from .base import BaseHolder, HolderError


[docs]class IndyHolder(BaseHolder): """Indy holder class.""" RECORD_TYPE_MIME_TYPES = "attribute-mime-types" def __init__(self, wallet): """ Initialize an IndyHolder instance. Args: wallet: IndyWallet instance """ self.logger = logging.getLogger(__name__) self.wallet = wallet
[docs] async def create_credential_request( self, credential_offer: dict, credential_definition: dict, holder_did: str ) -> Tuple[str, str]: """ Create a credential request for the given credential offer. Args: credential_offer: The credential offer to create request for credential_definition: The credential definition to create an offer for holder_did: the DID of the agent making the request Returns: A tuple of the credential request and credential request metadata """ with IndyErrorHandler("Error when creating credential request", HolderError): ( credential_request_json, credential_request_metadata_json, ) = await indy.anoncreds.prover_create_credential_req( self.wallet.handle, holder_did, json.dumps(credential_offer), json.dumps(credential_definition), self.wallet.master_secret_id, ) self.logger.debug( "Created credential request. " "credential_request_json=%s credential_request_metadata_json=%s", credential_request_json, credential_request_metadata_json, ) return credential_request_json, credential_request_metadata_json
[docs] async def store_credential( self, credential_definition: dict, credential_data: dict, credential_request_metadata: dict, credential_attr_mime_types=None, credential_id: str = None, rev_reg_def: dict = None, ) -> str: """ Store a credential in the wallet. Args: credential_definition: Credential definition for this credential credential_data: Credential data generated by the issuer credential_request_metadata: credential request metadata generated by the issuer credential_attr_mime_types: dict mapping attribute names to (optional) MIME types to store as non-secret record, if specified credential_id: optionally override the stored credential id rev_reg_def: revocation registry definition in json Returns: the ID of the stored credential """ with IndyErrorHandler("Error when storing credential in wallet", HolderError): credential_id = await indy.anoncreds.prover_store_credential( wallet_handle=self.wallet.handle, cred_id=credential_id, cred_req_metadata_json=json.dumps(credential_request_metadata), cred_json=json.dumps(credential_data), cred_def_json=json.dumps(credential_definition), rev_reg_def_json=json.dumps(rev_reg_def) if rev_reg_def else None, ) if credential_attr_mime_types: mime_types = { attr: credential_attr_mime_types.get(attr) for attr in credential_data["values"] if attr in credential_attr_mime_types } if mime_types: record = StorageRecord( type=IndyHolder.RECORD_TYPE_MIME_TYPES, value=credential_id, tags=mime_types, id=f"{IndyHolder.RECORD_TYPE_MIME_TYPES}::{credential_id}", ) indy_stor = IndyStorage(self.wallet) await indy_stor.add_record(record) return credential_id
[docs] async def get_credentials(self, start: int, count: int, wql: dict): """ Get credentials stored in the wallet. Args: start: Starting index count: Number of records to return wql: wql query dict """ with IndyErrorHandler( "Error when constructing wallet credential query", HolderError ): ( search_handle, record_count, ) = await indy.anoncreds.prover_search_credentials( self.wallet.handle, json.dumps(wql) ) # We need to move the database cursor position manually... if start > 0: # TODO: move cursor in chunks to avoid exploding memory await indy.anoncreds.prover_fetch_credentials(search_handle, start) credentials_json = await indy.anoncreds.prover_fetch_credentials( search_handle, count ) await indy.anoncreds.prover_close_credentials_search(search_handle) credentials = json.loads(credentials_json) return credentials
[docs] async def get_credentials_for_presentation_request_by_referent( self, presentation_request: dict, referents: Sequence[str], start: int, count: int, extra_query: dict = {}, ): """ Get credentials stored in the wallet. Args: presentation_request: Valid presentation request from issuer referents: Presentation request referents to use to search for creds start: Starting index count: Maximum number of records to return extra_query: wql query dict """ with IndyErrorHandler( "Error when constructing wallet credential query", HolderError ): search_handle = await ( indy.anoncreds.prover_search_credentials_for_proof_req( self.wallet.handle, json.dumps(presentation_request), json.dumps(extra_query), ) ) if not referents: referents = ( *presentation_request["requested_attributes"], *presentation_request["requested_predicates"], ) creds_dict = OrderedDict() try: for reft in referents: # We need to move the database cursor position manually... if start > 0: # TODO: move cursors in chunks to avoid exploding memory await indy.anoncreds.prover_fetch_credentials_for_proof_req( search_handle, reft, start ) ( credentials_json ) = await indy.anoncreds.prover_fetch_credentials_for_proof_req( search_handle, reft, count ) credentials = json.loads(credentials_json) for cred in credentials: cred_id = cred["cred_info"]["referent"] if cred_id not in creds_dict: cred["presentation_referents"] = {reft} creds_dict[cred_id] = cred else: creds_dict[cred_id]["presentation_referents"].add(reft) finally: # Always close await indy.anoncreds.prover_close_credentials_search_for_proof_req( search_handle ) for cred in creds_dict.values(): cred["presentation_referents"] = list(cred["presentation_referents"]) return tuple(creds_dict.values())[:count]
[docs] async def get_credential(self, credential_id: str) -> str: """ Get a credential stored in the wallet. Args: credential_id: Credential id to retrieve """ try: credential_json = await indy.anoncreds.prover_get_credential( self.wallet.handle, credential_id ) except IndyError as e: if e.error_code == ErrorCode.WalletItemNotFound: raise WalletNotFoundError( "Credential not found in the wallet: {}".format(credential_id) ) else: raise IndyErrorHandler.wrap_error( e, "Error when fetching credential", HolderError ) from e return credential_json
[docs] async def delete_credential(self, credential_id: str): """ Remove a credential stored in the wallet. Args: credential_id: Credential id to remove """ try: indy_stor = IndyStorage(self.wallet) mime_types_record = await indy_stor.get_record( IndyHolder.RECORD_TYPE_MIME_TYPES, f"{IndyHolder.RECORD_TYPE_MIME_TYPES}::{credential_id}", ) await indy_stor.delete_record(mime_types_record) except StorageNotFoundError: pass # MIME types record not present: carry on try: await indy.anoncreds.prover_delete_credential( self.wallet.handle, credential_id ) except IndyError as e: if e.error_code == ErrorCode.WalletItemNotFound: raise WalletNotFoundError( "Credential not found in the wallet: {}".format(credential_id) ) else: raise IndyErrorHandler.wrap_error( e, "Error when deleting credential", HolderError ) from e
[docs] async def get_mime_type( self, credential_id: str, attr: str = None ) -> Union[dict, str]: """ Get MIME type per attribute (or for all attributes). Args: credential_id: credential id attr: attribute of interest or omit for all Returns: Attribute MIME type or dict mapping attribute names to MIME types attr_meta_json = all_meta.tags.get(attr) """ try: mime_types_record = await IndyStorage(self.wallet).get_record( IndyHolder.RECORD_TYPE_MIME_TYPES, f"{IndyHolder.RECORD_TYPE_MIME_TYPES}::{credential_id}", ) except StorageError: return None # no MIME types: not an error return mime_types_record.tags.get(attr) if attr else mime_types_record.tags
[docs] async def create_presentation( self, presentation_request: dict, requested_credentials: dict, schemas: dict, credential_definitions: dict, rev_states: dict = None, ) -> str: """ Get credentials stored in the wallet. Args: presentation_request: Valid indy format presentation request requested_credentials: Indy format requested credentials schemas: Indy formatted schemas JSON credential_definitions: Indy formatted credential definitions JSON rev_states: Indy format revocation states JSON """ with IndyErrorHandler("Error when constructing proof", HolderError): presentation_json = await indy.anoncreds.prover_create_proof( self.wallet.handle, json.dumps(presentation_request), json.dumps(requested_credentials), self.wallet.master_secret_id, json.dumps(schemas), json.dumps(credential_definitions), json.dumps(rev_states) if rev_states else "{}", ) return presentation_json
[docs] async def create_revocation_state( self, cred_rev_id: str, rev_reg_def: dict, rev_reg_delta: dict, timestamp: int, tails_file_path: str, ) -> str: """ Create current revocation state for a received credential. Args: cred_rev_id: credential revocation id in revocation registry rev_reg_def: revocation registry definition rev_reg_delta: revocation delta timestamp: delta timestamp Returns: the revocation state """ with IndyErrorHandler("Error when constructing revocation state", HolderError): tails_file_reader = await create_tails_reader(tails_file_path) rev_state_json = await indy.anoncreds.create_revocation_state( tails_file_reader, rev_reg_def_json=json.dumps(rev_reg_def), cred_rev_id=cred_rev_id, rev_reg_delta_json=json.dumps(rev_reg_delta), timestamp=timestamp, ) return rev_state_json