Source code for aries_cloudagent.protocols.issue_credential.v1_0.manager

"""Classes to manage credentials."""

import json
import logging
from typing import Mapping, Text, Sequence, Tuple

from .messages.credential_ack import CredentialAck
from .messages.credential_issue import CredentialIssue
from .messages.credential_offer import CredentialOffer
from .messages.credential_proposal import CredentialProposal
from .messages.credential_request import CredentialRequest
from .messages.inner.credential_preview import CredentialPreview
from .models.credential_exchange import V10CredentialExchange
from ....cache.base import BaseCache
from ....config.injection_context import InjectionContext
from ....core.error import BaseError
from ....holder.base import BaseHolder, HolderError
from ....issuer.base import BaseIssuer
from ....issuer.indy import IssuerRevocationRegistryFullError
from ....ledger.base import BaseLedger
from ....messaging.credential_definitions.util import (
    CRED_DEF_TAGS,
    CRED_DEF_SENT_RECORD_TYPE,
)
from ....revocation.indy import IndyRevocation
from ....revocation.models.revocation_registry import RevocationRegistry
from ....revocation.models.issuer_rev_reg_record import IssuerRevRegRecord
from ....storage.base import BaseStorage
from ....storage.error import StorageNotFoundError


[docs]class CredentialManagerError(BaseError): """Credential error."""
[docs]class CredentialManager: """Class for managing credentials.""" def __init__(self, context: InjectionContext): """ Initialize a CredentialManager. Args: context: The context for this credential """ self._context = context self._logger = logging.getLogger(__name__) @property def context(self) -> InjectionContext: """ Accessor for the current request context. Returns: The request context for this connection """ return self._context async def _match_sent_cred_def_id(self, tag_query: Mapping[str, str]) -> str: """Return most recent matching id of cred def that agent sent to ledger.""" storage: BaseStorage = await self.context.inject(BaseStorage) found = await storage.search_records( type_filter=CRED_DEF_SENT_RECORD_TYPE, tag_query=tag_query ).fetch_all() if not found: raise CredentialManagerError( f"Issuer has no operable cred def for proposal spec {tag_query}" ) return max(found, key=lambda r: int(r.tags["epoch"])).tags["cred_def_id"]
[docs] async def prepare_send( self, connection_id: str, credential_proposal: CredentialProposal, auto_remove: bool = None, ) -> Tuple[V10CredentialExchange, CredentialOffer]: """ Set up a new credential exchange for an automated send. Args: connection_id: Connection to create offer for credential_proposal: The credential proposal with preview on attribute values to use if auto_issue is enabled auto_remove: Flag to automatically remove the record on completion Returns: A tuple of the new credential exchange record and credential offer message """ if auto_remove is None: auto_remove = not self.context.settings.get("preserve_exchange_records") credential_exchange = V10CredentialExchange( auto_issue=True, auto_remove=auto_remove, connection_id=connection_id, initiator=V10CredentialExchange.INITIATOR_SELF, role=V10CredentialExchange.ROLE_ISSUER, credential_proposal_dict=credential_proposal.serialize(), trace=(credential_proposal._trace is not None), ) (credential_exchange, credential_offer) = await self.create_offer( credential_exchange_record=credential_exchange, comment="create automated credential exchange", ) return credential_exchange, credential_offer
[docs] async def create_proposal( self, connection_id: str, *, auto_offer: bool = None, auto_remove: bool = None, comment: str = None, credential_preview: CredentialPreview = None, schema_id: str = None, schema_issuer_did: str = None, schema_name: str = None, schema_version: str = None, cred_def_id: str = None, issuer_did: str = None, trace: bool = False, ) -> V10CredentialExchange: """ Create a credential proposal. Args: connection_id: Connection to create proposal for auto_offer: Should this proposal request automatically be handled to offer a credential auto_remove: Should the record be automatically removed on completion comment: Optional human-readable comment to include in proposal credential_preview: The credential preview to use to create the credential proposal schema_id: Schema id for credential proposal schema_issuer_did: Schema issuer DID for credential proposal schema_name: Schema name for credential proposal schema_version: Schema version for credential proposal cred_def_id: Credential definition id for credential proposal issuer_did: Issuer DID for credential proposal Returns: Resulting credential exchange record including credential proposal """ credential_proposal_message = CredentialProposal( comment=comment, credential_proposal=credential_preview, schema_id=schema_id, schema_issuer_did=schema_issuer_did, schema_name=schema_name, schema_version=schema_version, cred_def_id=cred_def_id, issuer_did=issuer_did, ) credential_proposal_message.assign_trace_decorator(self.context.settings, trace) if auto_remove is None: auto_remove = not self.context.settings.get("preserve_exchange_records") credential_exchange_record = V10CredentialExchange( connection_id=connection_id, thread_id=credential_proposal_message._thread_id, initiator=V10CredentialExchange.INITIATOR_SELF, role=V10CredentialExchange.ROLE_HOLDER, state=V10CredentialExchange.STATE_PROPOSAL_SENT, credential_proposal_dict=credential_proposal_message.serialize(), auto_offer=auto_offer, auto_remove=auto_remove, trace=trace, ) await credential_exchange_record.save( self.context, reason="create credential proposal" ) return credential_exchange_record
[docs] async def receive_proposal(self) -> V10CredentialExchange: """ Receive a credential proposal from message in context on manager creation. Returns: The resulting credential exchange record, created """ # go to cred def via ledger to get authoritative schema id credential_proposal_message = self.context.message connection_id = self.context.connection_record.connection_id # at this point, cred def and schema still open to potential negotiation credential_exchange_record = V10CredentialExchange( connection_id=connection_id, thread_id=credential_proposal_message._thread_id, initiator=V10CredentialExchange.INITIATOR_EXTERNAL, role=V10CredentialExchange.ROLE_ISSUER, state=V10CredentialExchange.STATE_PROPOSAL_RECEIVED, credential_proposal_dict=credential_proposal_message.serialize(), auto_offer=self.context.settings.get( "debug.auto_respond_credential_proposal" ), auto_issue=self.context.settings.get( "debug.auto_respond_credential_request" ), trace=(credential_proposal_message._trace is not None), ) await credential_exchange_record.save( self.context, reason="receive credential proposal" ) return credential_exchange_record
[docs] async def create_offer( self, credential_exchange_record: V10CredentialExchange, comment: str = None ) -> Tuple[V10CredentialExchange, CredentialOffer]: """ Create a credential offer, update credential exchange record. Args: credential_exchange_record: Credential exchange to create offer for comment: optional human-readable comment to set in offer message Returns: A tuple (credential exchange record, credential offer message) """ if credential_exchange_record.credential_proposal_dict: credential_proposal_message = CredentialProposal.deserialize( credential_exchange_record.credential_proposal_dict ) credential_proposal_message.assign_trace_decorator( self.context.settings, credential_exchange_record.trace ) cred_def_id = await self._match_sent_cred_def_id( { t: getattr(credential_proposal_message, t) for t in CRED_DEF_TAGS if getattr(credential_proposal_message, t) } ) cred_preview = credential_proposal_message.credential_proposal else: cred_def_id = credential_exchange_record.credential_definition_id cred_preview = None async def _create(cred_def_id): issuer: BaseIssuer = await self.context.inject(BaseIssuer) offer_json = await issuer.create_credential_offer(cred_def_id) return json.loads(offer_json) credential_offer = None cache_key = f"credential_offer::{cred_def_id}" cache: BaseCache = await self.context.inject(BaseCache, required=False) if cache: async with cache.acquire(cache_key) as entry: if entry.result: credential_offer = entry.result else: credential_offer = await _create(cred_def_id) await entry.set_result(credential_offer, 3600) if not credential_offer: credential_offer = await _create(cred_def_id) credential_offer_message = CredentialOffer( comment=comment, credential_preview=cred_preview, offers_attach=[CredentialOffer.wrap_indy_offer(credential_offer)], ) credential_offer_message._thread = { "thid": credential_exchange_record.thread_id } credential_offer_message.assign_trace_decorator( self.context.settings, credential_exchange_record.trace ) credential_exchange_record.thread_id = credential_offer_message._thread_id credential_exchange_record.schema_id = credential_offer["schema_id"] credential_exchange_record.credential_definition_id = credential_offer[ "cred_def_id" ] credential_exchange_record.state = V10CredentialExchange.STATE_OFFER_SENT credential_exchange_record.credential_offer = credential_offer await credential_exchange_record.save( self.context, reason="create credential offer" ) return (credential_exchange_record, credential_offer_message)
[docs] async def receive_offer(self) -> V10CredentialExchange: """ Receive a credential offer. Returns: The credential exchange record, updated """ credential_offer_message: CredentialOffer = self.context.message connection_id = self.context.connection_record.connection_id credential_preview = credential_offer_message.credential_preview indy_offer = credential_offer_message.indy_offer(0) schema_id = indy_offer["schema_id"] cred_def_id = indy_offer["cred_def_id"] if credential_preview: credential_proposal_dict = CredentialProposal( comment=credential_offer_message.comment, credential_proposal=credential_preview, schema_id=schema_id, cred_def_id=cred_def_id, ).serialize() else: credential_proposal_dict = None # Get credential exchange record (holder sent proposal first) # or create it (issuer sent offer first) try: ( credential_exchange_record ) = await V10CredentialExchange.retrieve_by_connection_and_thread( self.context, connection_id, credential_offer_message._thread_id ) credential_exchange_record.credential_proposal_dict = ( credential_proposal_dict ) except StorageNotFoundError: # issuer sent this offer free of any proposal credential_exchange_record = V10CredentialExchange( connection_id=connection_id, thread_id=credential_offer_message._thread_id, initiator=V10CredentialExchange.INITIATOR_EXTERNAL, role=V10CredentialExchange.ROLE_HOLDER, credential_proposal_dict=credential_proposal_dict, trace=(credential_offer_message._trace is not None), ) credential_exchange_record.credential_offer = indy_offer credential_exchange_record.state = V10CredentialExchange.STATE_OFFER_RECEIVED credential_exchange_record.schema_id = schema_id credential_exchange_record.credential_definition_id = cred_def_id await credential_exchange_record.save( self.context, reason="receive credential offer" ) return credential_exchange_record
[docs] async def create_request( self, credential_exchange_record: V10CredentialExchange, holder_did: str ) -> Tuple[V10CredentialExchange, CredentialRequest]: """ Create a credential request. Args: credential_exchange_record: Credential exchange record for which to create request holder_did: holder DID Returns: A tuple (credential exchange record, credential request message) """ credential_definition_id = credential_exchange_record.credential_definition_id credential_offer = credential_exchange_record.credential_offer async def _create(): ledger: BaseLedger = await self.context.inject(BaseLedger) async with ledger: credential_definition = await ledger.get_credential_definition( credential_definition_id ) holder: BaseHolder = await self.context.inject(BaseHolder) request_json, metadata_json = await holder.create_credential_request( credential_offer, credential_definition, holder_did ) return { "request": json.loads(request_json), "metadata": json.loads(metadata_json), } if credential_exchange_record.credential_request: self._logger.warning( "create_request called multiple times for v1.0 credential exchange: %s", credential_exchange_record.credential_exchange_id, ) else: if "nonce" not in credential_offer: raise CredentialManagerError("Missing nonce in credential offer") nonce = credential_offer["nonce"] cache_key = ( f"credential_request::{credential_definition_id}::{holder_did}::{nonce}" ) cred_req_result = None cache: BaseCache = await self.context.inject(BaseCache, required=False) if cache: async with cache.acquire(cache_key) as entry: if entry.result: cred_req_result = entry.result else: cred_req_result = await _create() await entry.set_result(cred_req_result, 3600) if not cred_req_result: cred_req_result = await _create() ( credential_exchange_record.credential_request, credential_exchange_record.credential_request_metadata, ) = (cred_req_result["request"], cred_req_result["metadata"]) credential_request_message = CredentialRequest( requests_attach=[ CredentialRequest.wrap_indy_cred_req( credential_exchange_record.credential_request ) ] ) credential_request_message._thread = { "thid": credential_exchange_record.thread_id } credential_request_message.assign_trace_decorator( self.context.settings, credential_exchange_record.trace ) credential_exchange_record.state = V10CredentialExchange.STATE_REQUEST_SENT await credential_exchange_record.save( self.context, reason="create credential request" ) return credential_exchange_record, credential_request_message
[docs] async def receive_request(self): """ Receive a credential request. Args: credential_request_message: Credential request to receive Returns: credential exchange record, retrieved and updated """ credential_request_message = self.context.message assert len(credential_request_message.requests_attach or []) == 1 credential_request = credential_request_message.indy_cred_req(0) ( credential_exchange_record ) = await V10CredentialExchange.retrieve_by_connection_and_thread( self.context, self.context.connection_record.connection_id, credential_request_message._thread_id, ) credential_exchange_record.credential_request = credential_request credential_exchange_record.state = V10CredentialExchange.STATE_REQUEST_RECEIVED await credential_exchange_record.save( self.context, reason="receive credential request" ) return credential_exchange_record
[docs] async def issue_credential( self, credential_exchange_record: V10CredentialExchange, *, comment: str = None, credential_values: dict, ) -> Tuple[V10CredentialExchange, CredentialIssue]: """ Issue a credential. Args: credential_exchange_record: The credential exchange record for which to issue a credential comment: optional human-readable comment pertaining to credential issue credential_values: dict of credential attribute {name: value} pairs Returns: Tuple: (Updated credential exchange record, credential message) """ schema_id = credential_exchange_record.schema_id registry = None if credential_exchange_record.credential: self._logger.warning( "issue_credential called multiple times for " + "v1.0 credential exchange: %s", credential_exchange_record.credential_exchange_id, ) else: credential_offer = credential_exchange_record.credential_offer credential_request = credential_exchange_record.credential_request ledger: BaseLedger = await self.context.inject(BaseLedger) async with ledger: schema = await ledger.get_schema(schema_id) credential_definition = await ledger.get_credential_definition( credential_exchange_record.credential_definition_id ) if credential_definition["value"].get("revocation"): issuer_rev_regs = await IssuerRevRegRecord.query_by_cred_def_id( self.context, credential_exchange_record.credential_definition_id, state=IssuerRevRegRecord.STATE_ACTIVE, ) if not issuer_rev_regs: raise CredentialManagerError( "Cred def id {} has no active revocation registry".format( credential_exchange_record.credential_definition_id ) ) registry = await issuer_rev_regs[0].get_registry() credential_exchange_record.revoc_reg_id = issuer_rev_regs[ 0 ].revoc_reg_id tails_path = registry.tails_local_path else: tails_path = None issuer: BaseIssuer = await self.context.inject(BaseIssuer) try: ( credential_json, credential_exchange_record.revocation_id, ) = await issuer.create_credential( schema, credential_offer, credential_request, credential_values, credential_exchange_record.revoc_reg_id, tails_path, ) if registry and registry.max_creds == int( credential_exchange_record.revocation_id # monotonic "1"-based ): await issuer_rev_regs[0].mark_full(self.context) except IssuerRevocationRegistryFullError: await issuer_rev_regs[0].mark_full(self.context) raise credential_exchange_record.credential = json.loads(credential_json) credential_exchange_record.state = V10CredentialExchange.STATE_ISSUED await credential_exchange_record.save(self.context, reason="issue credential") credential_message = CredentialIssue( comment=comment, credentials_attach=[ CredentialIssue.wrap_indy_credential( credential_exchange_record.credential ) ], ) credential_message._thread = {"thid": credential_exchange_record.thread_id} credential_message.assign_trace_decorator( self.context.settings, credential_exchange_record.trace ) return (credential_exchange_record, credential_message)
[docs] async def receive_credential(self) -> V10CredentialExchange: """ Receive a credential from an issuer. Hold in storage potentially to be processed by controller before storing. Returns: Credential exchange record, retrieved and updated """ credential_message = self.context.message assert len(credential_message.credentials_attach or []) == 1 raw_credential = credential_message.indy_credential(0) ( credential_exchange_record ) = await V10CredentialExchange.retrieve_by_connection_and_thread( self.context, self.context.connection_record.connection_id, credential_message._thread_id, ) credential_exchange_record.raw_credential = raw_credential credential_exchange_record.state = ( V10CredentialExchange.STATE_CREDENTIAL_RECEIVED ) await credential_exchange_record.save(self.context, reason="receive credential") return credential_exchange_record
[docs] async def store_credential( self, credential_exchange_record: V10CredentialExchange, credential_id: str = None, ) -> Tuple[V10CredentialExchange, CredentialAck]: """ Store a credential in holder wallet; send ack to issuer. Args: credential_exchange_record: credential exchange record with credential to store and ack credential_id: optional credential identifier to override default on storage Returns: Tuple: (Updated credential exchange record, credential ack message) """ raw_credential = credential_exchange_record.raw_credential revoc_reg_def = None ledger: BaseLedger = await self.context.inject(BaseLedger) async with ledger: credential_definition = await ledger.get_credential_definition( raw_credential["cred_def_id"] ) if ( "rev_reg_id" in raw_credential and raw_credential["rev_reg_id"] is not None ): revoc_reg_def = await ledger.get_revoc_reg_def( raw_credential["rev_reg_id"] ) holder: BaseHolder = await self.context.inject(BaseHolder) if ( credential_exchange_record.credential_proposal_dict and "credential_proposal" in credential_exchange_record.credential_proposal_dict ): mime_types = CredentialPreview.deserialize( credential_exchange_record.credential_proposal_dict[ "credential_proposal" ] ).mime_types() else: mime_types = None if revoc_reg_def: revoc_reg = RevocationRegistry.from_definition(revoc_reg_def, True) if not revoc_reg.has_local_tails_file(self.context): await revoc_reg.retrieve_tails(self.context) try: credential_id = await holder.store_credential( credential_definition, raw_credential, credential_exchange_record.credential_request_metadata, mime_types, credential_id=credential_id, rev_reg_def=revoc_reg_def, ) except HolderError as e: self._logger.error(f"Error storing credential. {e.error_code}: {e.message}") raise e credential_json = await holder.get_credential(credential_id) credential = json.loads(credential_json) credential_exchange_record.state = V10CredentialExchange.STATE_ACKED credential_exchange_record.credential_id = credential_id credential_exchange_record.credential = credential credential_exchange_record.revoc_reg_id = credential.get("rev_reg_id", None) credential_exchange_record.revocation_id = credential.get("cred_rev_id", None) await credential_exchange_record.save(self.context, reason="store credential") credential_ack_message = CredentialAck() credential_ack_message.assign_thread_id( credential_exchange_record.thread_id, credential_exchange_record.parent_thread_id, ) credential_ack_message.assign_trace_decorator( self.context.settings, credential_exchange_record.trace ) if credential_exchange_record.auto_remove: # Delete the exchange record since we're done with it await credential_exchange_record.delete_record(self.context) return (credential_exchange_record, credential_ack_message)
[docs] async def receive_credential_ack(self) -> V10CredentialExchange: """ Receive credential ack from holder. Returns: credential exchange record, retrieved and updated """ credential_ack_message = self.context.message ( credential_exchange_record ) = await V10CredentialExchange.retrieve_by_connection_and_thread( self.context, self.context.connection_record.connection_id, credential_ack_message._thread_id, ) credential_exchange_record.state = V10CredentialExchange.STATE_ACKED await credential_exchange_record.save(self.context, reason="credential acked") if credential_exchange_record.auto_remove: # We're done with the exchange so delete await credential_exchange_record.delete_record(self.context) return credential_exchange_record
[docs] async def revoke_credential( self, rev_reg_id: str, cred_rev_id: str, publish: bool = False ): """ Revoke a previously-issued credential. Optionally, publish the corresponding revocation registry delta to the ledger. Args: rev_reg_id: revocation registry id cred_rev_id: credential revocation id publish: whether to publish the resulting revocation registry delta """ issuer: BaseIssuer = await self.context.inject(BaseIssuer) revoc = IndyRevocation(self.context) registry_record = await revoc.get_issuer_rev_reg_record(rev_reg_id) if not registry_record: raise CredentialManagerError( f"No revocation registry record found for id {rev_reg_id}" ) if publish: # create entry and send to ledger delta = json.loads( await issuer.revoke_credentials( rev_reg_id, registry_record.tails_local_path, [cred_rev_id] ) ) if delta: registry_record.revoc_reg_entry = delta await registry_record.publish_registry_entry(self.context) else: await registry_record.mark_pending(self.context, cred_rev_id)
[docs] async def publish_pending_revocations(self) -> Mapping[Text, Sequence[Text]]: """ Publish pending revocations to the ledger. Returns: mapping from each revocation registry id to its cred rev ids published. """ result = {} issuer: BaseIssuer = await self.context.inject(BaseIssuer) registry_records = await IssuerRevRegRecord.query_by_pending(self.context) for registry_record in registry_records: revoke_idxs = list(registry_record.pending_pub) if revoke_idxs: delta_json = await issuer.revoke_credentials( registry_record.revoc_reg_id, registry_record.tails_local_path, revoke_idxs, ) registry_record.revoc_reg_entry = json.loads(delta_json) await registry_record.publish_registry_entry(self.context) result[registry_record.revoc_reg_id] = revoke_idxs await registry_record.clear_pending(self.context) return result