Source code for aries_cloudagent.holder.indy

"""Indy issuer implementation."""

import json
import logging

from collections import OrderedDict
from typing import Sequence

import indy.anoncreds
from indy.error import ErrorCode, IndyError

from ..wallet.error import WalletNotFoundError

from .base import BaseHolder


[docs]class IndyHolder(BaseHolder): """Indy holder class.""" def __init__(self, wallet): """ Initialize an IndyHolder instance. Args: wallet: IndyWallet instance """ self.logger = logging.getLogger(__name__) self.wallet = wallet
[docs] async def create_credential_request( self, credential_offer, credential_definition, did ): """ Create a credential offer for the given credential definition id. Args: credential_offer: The credential offer to create request for credential_definition: The credential definition to create an offer for Returns: A credential request """ ( credential_request_json, credential_request_metadata_json, ) = await indy.anoncreds.prover_create_credential_req( self.wallet.handle, did, json.dumps(credential_offer), json.dumps(credential_definition), self.wallet.master_secret_id, ) self.logger.debug( "Created credential request. " + f"credential_request_json={credential_request_json} " + f"credential_request_metadata_json={credential_request_metadata_json}" ) credential_request = json.loads(credential_request_json) credential_request_metadata = json.loads(credential_request_metadata_json) return credential_request, credential_request_metadata
[docs] async def store_credential( self, credential_definition, credential_data, credential_request_metadata ): """ Store a credential in the wallet. Args: credential_definition: Credential definition for this credential credential_data: Credential data generated by the issuer """ credential_id = await indy.anoncreds.prover_store_credential( self.wallet.handle, None, # Always let indy set the id for now json.dumps(credential_request_metadata), json.dumps(credential_data), json.dumps(credential_definition), None, # We don't support revocation yet ) return credential_id
[docs] async def get_credentials(self, start: int, count: int, wql: dict): """ Get credentials stored in the wallet. Args: start: Starting index count: Number of records to return wql: wql query dict """ search_handle, record_count = await indy.anoncreds.prover_search_credentials( self.wallet.handle, json.dumps(wql) ) # We need to move the database cursor position manually... if start > 0: # TODO: move cursor in chunks to avoid exploding memory await indy.anoncreds.prover_fetch_credentials(search_handle, start) credentials_json = await indy.anoncreds.prover_fetch_credentials( search_handle, count ) await indy.anoncreds.prover_close_credentials_search(search_handle) credentials = json.loads(credentials_json) return credentials
[docs] async def get_credentials_for_presentation_request_by_referent( self, presentation_request: dict, referents: Sequence[str], start: int, count: int, extra_query: dict = {}, ): """ Get credentials stored in the wallet. Args: presentation_request: Valid presentation request from issuer referents: Presentation request referents to use to search for creds start: Starting index count: Maximum number of records to return extra_query: wql query dict """ search_handle = await indy.anoncreds.prover_search_credentials_for_proof_req( self.wallet.handle, json.dumps(presentation_request), json.dumps(extra_query), ) if not referents: referents = ( *presentation_request["requested_attributes"], *presentation_request["requested_predicates"], ) creds_dict = OrderedDict() try: for reft in referents: # We need to move the database cursor position manually... if start > 0: # TODO: move cursors in chunks to avoid exploding memory await indy.anoncreds.prover_fetch_credentials_for_proof_req( search_handle, reft, start ) ( credentials_json ) = await indy.anoncreds.prover_fetch_credentials_for_proof_req( search_handle, reft, count ) credentials = json.loads(credentials_json) for cred in credentials: cred_id = cred["cred_info"]["referent"] if cred_id not in creds_dict: cred["presentation_referents"] = {reft} creds_dict[cred_id] = cred else: creds_dict[cred_id]["presentation_referents"].add(reft) finally: # Always close await indy.anoncreds.prover_close_credentials_search_for_proof_req( search_handle ) for cred in creds_dict.values(): cred["presentation_referents"] = list(cred["presentation_referents"]) return tuple(creds_dict.values())[:count]
[docs] async def get_credential(self, credential_id: str): """ Get a credential stored in the wallet. Args: credential_id: Credential id to retrieve """ try: credential_json = await indy.anoncreds.prover_get_credential( self.wallet.handle, credential_id ) except IndyError as e: if e.error_code == ErrorCode.WalletItemNotFound: raise WalletNotFoundError( "Credential not found in the wallet: {}".format(credential_id) ) else: raise credential = json.loads(credential_json) return credential
[docs] async def delete_credential(self, credential_id: str): """ Remove a credential stored in the wallet. Args: credential_id: Credential id to remove """ try: await indy.anoncreds.prover_delete_credential( self.wallet.handle, credential_id ) except IndyError as e: if e.error_code == ErrorCode.WalletItemNotFound: raise WalletNotFoundError( "Credential not found in the wallet: {}".format(credential_id) ) else: raise
[docs] async def create_presentation( self, presentation_request: dict, requested_credentials: dict, schemas: dict, credential_definitions: dict, ): """ Get credentials stored in the wallet. Args: presentation_request: Valid indy format presentation request requested_credentials: Indy format requested_credentials schemas: Indy formatted schemas_json credential_definitions: Indy formatted schemas_json """ presentation_json = await indy.anoncreds.prover_create_proof( self.wallet.handle, json.dumps(presentation_request), json.dumps(requested_credentials), self.wallet.master_secret_id, json.dumps(schemas), json.dumps(credential_definitions), json.dumps({}), # We don't support revocation currently. ) presentation = json.loads(presentation_json) return presentation