Source code for aries_cloudagent.protocols.issue_credential.v2_0.manager

"""V2.0 issue-credential protocol manager."""

import logging

from typing import Mapping, Tuple

from ....core.error import BaseError
from ....core.profile import Profile
from ....messaging.responder import BaseResponder
from ....storage.error import StorageError, StorageNotFoundError

from .messages.cred_ack import V20CredAck
from .messages.cred_format import V20CredFormat
from .messages.cred_issue import V20CredIssue
from .messages.cred_offer import V20CredOffer
from .messages.cred_problem_report import V20CredProblemReport, ProblemReportReason
from .messages.cred_proposal import V20CredProposal
from .messages.cred_request import V20CredRequest
from .messages.inner.cred_preview import V20CredPreview
from .models.cred_ex_record import V20CredExRecord

LOGGER = logging.getLogger(__name__)


[docs]class V20CredManagerError(BaseError): """Credential manager error under issue-credential protocol v2.0."""
[docs]class V20CredManager: """Class for managing credentials.""" def __init__(self, profile: Profile): """ Initialize a V20CredManager. Args: profile: The profile instance for this credential manager """ self._profile = profile @property def profile(self) -> Profile: """ Accessor for the current profile instance. Returns: The profile instance for this credential manager """ return self._profile
[docs] async def prepare_send( self, connection_id: str, cred_proposal: V20CredProposal, auto_remove: bool = None, ) -> Tuple[V20CredExRecord, V20CredOffer]: """ Set up a new credential exchange record for an automated send. Args: connection_id: connection for which to create offer cred_proposal: credential proposal with preview auto_remove: flag to remove the record automatically on completion Returns: A tuple of the new credential exchange record and credential offer message """ if auto_remove is None: auto_remove = not self._profile.settings.get("preserve_exchange_records") cred_ex_record = V20CredExRecord( connection_id=connection_id, initiator=V20CredExRecord.INITIATOR_SELF, role=V20CredExRecord.ROLE_ISSUER, cred_proposal=cred_proposal, auto_issue=True, auto_remove=auto_remove, trace=(cred_proposal._trace is not None), ) (cred_ex_record, cred_offer) = await self.create_offer( cred_ex_record=cred_ex_record, counter_proposal=None, comment="create automated v2.0 credential exchange record", ) return (cred_ex_record, cred_offer)
[docs] async def create_proposal( self, connection_id: str, *, auto_remove: bool = None, comment: str = None, cred_preview: V20CredPreview, fmt2filter: Mapping[V20CredFormat.Format, Mapping[str, str]], trace: bool = False, ) -> V20CredExRecord: """ Create a credential proposal. Args: connection_id: connection for which to create proposal auto_remove: whether to remove record automatically on completion comment: optional human-readable comment to include in proposal cred_preview: credential preview to use to create credential proposal fmt2filter: mapping between format and filter trace: whether to trace the operation Returns: Resulting credential exchange record including credential proposal """ if auto_remove is None: auto_remove = not self._profile.settings.get("preserve_exchange_records") cred_ex_record = V20CredExRecord( connection_id=connection_id, initiator=V20CredExRecord.INITIATOR_SELF, role=V20CredExRecord.ROLE_HOLDER, state=V20CredExRecord.STATE_PROPOSAL_SENT, auto_remove=auto_remove, trace=trace, ) # Format specific create_proposal handler formats = [ await fmt.handler(self._profile).create_proposal(cred_ex_record, filter) for (fmt, filter) in fmt2filter.items() ] cred_proposal_message = V20CredProposal( comment=comment, credential_preview=cred_preview, formats=[format for (format, _) in formats], filters_attach=[attach for (_, attach) in formats], ) cred_ex_record.thread_id = cred_proposal_message._thread_id cred_ex_record.cred_proposal = cred_proposal_message cred_proposal_message.assign_trace_decorator(self._profile.settings, trace) async with self._profile.session() as session: await cred_ex_record.save( session, reason="create v2.0 credential proposal", ) return cred_ex_record
[docs] async def receive_proposal( self, cred_proposal_message: V20CredProposal, connection_id: str, ) -> V20CredExRecord: """ Receive a credential proposal. Returns: The resulting credential exchange record, created """ # at this point, cred def and schema still open to potential negotiation cred_ex_record = V20CredExRecord( connection_id=connection_id, thread_id=cred_proposal_message._thread_id, initiator=V20CredExRecord.INITIATOR_EXTERNAL, role=V20CredExRecord.ROLE_ISSUER, auto_offer=self._profile.settings.get( "debug.auto_respond_credential_proposal" ), auto_issue=self._profile.settings.get( "debug.auto_respond_credential_request" ), auto_remove=not self._profile.settings.get("preserve_exchange_records"), trace=(cred_proposal_message._trace is not None), ) # Format specific receive_proposal handlers for format in cred_proposal_message.formats: await V20CredFormat.Format.get(format.format).handler( self.profile ).receive_proposal(cred_ex_record, cred_proposal_message) cred_ex_record.cred_proposal = cred_proposal_message cred_ex_record.state = V20CredExRecord.STATE_PROPOSAL_RECEIVED async with self._profile.session() as session: await cred_ex_record.save( session, reason="receive v2.0 credential proposal", ) return cred_ex_record
[docs] async def create_offer( self, cred_ex_record: V20CredExRecord, counter_proposal: V20CredProposal = None, replacement_id: str = None, comment: str = None, ) -> Tuple[V20CredExRecord, V20CredOffer]: """ Create credential offer, update credential exchange record. Args: cred_ex_record: credential exchange record for which to create offer replacement_id: identifier to help coordinate credential replacement comment: optional human-readable comment to set in offer message Returns: A tuple (credential exchange record, credential offer message) """ cred_proposal_message = ( counter_proposal if counter_proposal else cred_ex_record.cred_proposal ) cred_proposal_message.assign_trace_decorator( self._profile.settings, cred_ex_record.trace ) formats = [] # Format specific create_offer handler for format in cred_proposal_message.formats: cred_format = V20CredFormat.Format.get(format.format) if cred_format: formats.append( await cred_format.handler(self.profile).create_offer(cred_ex_record) ) if len(formats) == 0: raise V20CredManagerError( "Unable to create credential offer. No supported formats" ) cred_offer_message = V20CredOffer( replacement_id=replacement_id, comment=comment, credential_preview=cred_proposal_message.credential_preview, formats=[format for (format, _) in formats], offers_attach=[attach for (_, attach) in formats], ) cred_offer_message._thread = {"thid": cred_ex_record.thread_id} cred_offer_message.assign_trace_decorator( self._profile.settings, cred_ex_record.trace ) cred_ex_record.thread_id = cred_offer_message._thread_id cred_ex_record.state = V20CredExRecord.STATE_OFFER_SENT cred_ex_record.cred_proposal = ( # any counter replaces original cred_proposal_message ) cred_ex_record.cred_offer = cred_offer_message async with self._profile.session() as session: await cred_ex_record.save(session, reason="create v2.0 credential offer") return (cred_ex_record, cred_offer_message)
[docs] async def receive_offer( self, cred_offer_message: V20CredOffer, connection_id: str, ) -> V20CredExRecord: """ Receive a credential offer. Args: cred_offer_message: credential offer message connection_id: connection identifier Returns: The credential exchange record, updated """ async with self._profile.session() as session: # Get credential exchange record (holder sent proposal first) # or create it (issuer sent offer first) try: cred_ex_record = await ( V20CredExRecord.retrieve_by_conn_and_thread( session, connection_id, cred_offer_message._thread_id ) ) except StorageNotFoundError: # issuer sent this offer free of any proposal cred_ex_record = V20CredExRecord( connection_id=connection_id, thread_id=cred_offer_message._thread_id, initiator=V20CredExRecord.INITIATOR_EXTERNAL, role=V20CredExRecord.ROLE_HOLDER, auto_remove=not self._profile.settings.get( "preserve_exchange_records" ), trace=(cred_offer_message._trace is not None), ) # Format specific receive_offer handler for format in cred_offer_message.formats: cred_format = V20CredFormat.Format.get(format.format) if cred_format: await cred_format.handler(self.profile).receive_offer( cred_ex_record, cred_offer_message ) cred_ex_record.cred_offer = cred_offer_message cred_ex_record.state = V20CredExRecord.STATE_OFFER_RECEIVED await cred_ex_record.save(session, reason="receive v2.0 credential offer") return cred_ex_record
[docs] async def create_request( self, cred_ex_record: V20CredExRecord, holder_did: str, comment: str = None ) -> Tuple[V20CredExRecord, V20CredRequest]: """ Create a credential request. Args: cred_ex_record: credential exchange record for which to create request holder_did: holder DID comment: optional human-readable comment to set in request message Returns: A tuple (credential exchange record, credential request message) """ if cred_ex_record.cred_request: raise V20CredManagerError( "create_request() called multiple times for " f"v2.0 credential exchange {cred_ex_record.cred_ex_id}" ) # react to credential offer, use offer formats if cred_ex_record.state: if cred_ex_record.state != V20CredExRecord.STATE_OFFER_RECEIVED: raise V20CredManagerError( f"Credential exchange {cred_ex_record.cred_ex_id} " f"in {cred_ex_record.state} state " f"(must be {V20CredExRecord.STATE_OFFER_RECEIVED})" ) cred_offer = cred_ex_record.cred_offer input_formats = cred_offer.formats # start with request (not allowed for indy -> checked in indy format handler) # use proposal formats else: cred_proposal = cred_ex_record.cred_proposal input_formats = cred_proposal.formats request_formats = [] # Format specific create_request handler for format in input_formats: cred_format = V20CredFormat.Format.get(format.format) if cred_format: request_formats.append( await cred_format.handler(self.profile).create_request( cred_ex_record, {"holder_did": holder_did} ) ) if len(request_formats) == 0: raise V20CredManagerError( "Unable to create credential request. No supported formats" ) cred_request_message = V20CredRequest( comment=comment, formats=[format for (format, _) in request_formats], requests_attach=[attach for (_, attach) in request_formats], ) cred_request_message._thread = {"thid": cred_ex_record.thread_id} cred_request_message.assign_trace_decorator( self._profile.settings, cred_ex_record.trace ) cred_ex_record.thread_id = cred_request_message._thread_id cred_ex_record.state = V20CredExRecord.STATE_REQUEST_SENT cred_ex_record.cred_request = cred_request_message async with self._profile.session() as session: await cred_ex_record.save(session, reason="create v2.0 credential request") return (cred_ex_record, cred_request_message)
[docs] async def receive_request( self, cred_request_message: V20CredRequest, connection_id: str ) -> V20CredExRecord: """ Receive a credential request. Args: cred_request_message: credential request to receive connection_id: connection identifier Returns: credential exchange record, updated """ async with self._profile.session() as session: try: cred_ex_record = await ( V20CredExRecord.retrieve_by_conn_and_thread( session, connection_id, cred_request_message._thread_id ) ) except StorageNotFoundError: # holder sent this request free of any offer cred_ex_record = V20CredExRecord( connection_id=connection_id, thread_id=cred_request_message._thread_id, initiator=V20CredExRecord.INITIATOR_EXTERNAL, role=V20CredExRecord.ROLE_ISSUER, auto_remove=not self._profile.settings.get( "preserve_exchange_records" ), trace=(cred_request_message._trace is not None), auto_issue=self._profile.settings.get( "debug.auto_respond_credential_request" ), ) for format in cred_request_message.formats: cred_format = V20CredFormat.Format.get(format.format) if cred_format: await cred_format.handler(self.profile).receive_request( cred_ex_record, cred_request_message ) cred_ex_record.cred_request = cred_request_message cred_ex_record.state = V20CredExRecord.STATE_REQUEST_RECEIVED await cred_ex_record.save(session, reason="receive v2.0 credential request") return cred_ex_record
[docs] async def issue_credential( self, cred_ex_record: V20CredExRecord, *, comment: str = None, ) -> Tuple[V20CredExRecord, V20CredIssue]: """ Issue a credential. Args: cred_ex_record: credential exchange record for which to issue credential comment: optional human-readable comment pertaining to credential issue Returns: Tuple: (Updated credential exchange record, credential issue message) """ if cred_ex_record.state != V20CredExRecord.STATE_REQUEST_RECEIVED: raise V20CredManagerError( f"Credential exchange {cred_ex_record.cred_ex_id} " f"in {cred_ex_record.state} state " f"(must be {V20CredExRecord.STATE_REQUEST_RECEIVED})" ) if cred_ex_record.cred_issue: raise V20CredManagerError( "issue_credential() called multiple times for " f"cred ex record {cred_ex_record.cred_ex_id}" ) replacement_id = None input_formats = cred_ex_record.cred_request.formats if cred_ex_record.cred_offer: cred_offer_message = cred_ex_record.cred_offer replacement_id = cred_offer_message.replacement_id # Format specific issue_credential handler issue_formats = [] for format in input_formats: cred_format = V20CredFormat.Format.get(format.format) if cred_format: issue_formats.append( await cred_format.handler(self.profile).issue_credential( cred_ex_record ) ) if len(issue_formats) == 0: raise V20CredManagerError( "Unable to issue credential. No supported formats" ) cred_issue_message = V20CredIssue( replacement_id=replacement_id, comment=comment, formats=[format for (format, _) in issue_formats], credentials_attach=[attach for (_, attach) in issue_formats], ) cred_ex_record.state = V20CredExRecord.STATE_ISSUED cred_ex_record.cred_issue = cred_issue_message async with self._profile.session() as session: # FIXME - re-fetch record to check state, apply transactional update await cred_ex_record.save(session, reason="v2.0 issue credential") cred_issue_message._thread = {"thid": cred_ex_record.thread_id} cred_issue_message.assign_trace_decorator( self._profile.settings, cred_ex_record.trace ) return (cred_ex_record, cred_issue_message)
[docs] async def receive_credential( self, cred_issue_message: V20CredIssue, connection_id: str ) -> V20CredExRecord: """ Receive a credential issue message from an issuer. Hold cred in storage potentially to be processed by controller before storing. Returns: Credential exchange record, retrieved and updated """ assert cred_issue_message.credentials_attach # FIXME use transaction, fetch for_update async with self._profile.session() as session: cred_ex_record = await ( V20CredExRecord.retrieve_by_conn_and_thread( session, connection_id, cred_issue_message._thread_id, ) ) cred_request_message = cred_ex_record.cred_request req_formats = [ V20CredFormat.Format.get(fmt.format) for fmt in cred_request_message.formats if V20CredFormat.Format.get(fmt.format) ] issue_formats = [ V20CredFormat.Format.get(fmt.format) for fmt in cred_issue_message.formats if V20CredFormat.Format.get(fmt.format) ] handled_formats = [] # check that we didn't receive any formats not present in the request if set(issue_formats) - set(req_formats): raise V20CredManagerError( "Received issue credential format(s) not present in credential " f"request: {set(issue_formats) - set(req_formats)}" ) for issue_format in issue_formats: await issue_format.handler(self.profile).receive_credential( cred_ex_record, cred_issue_message ) handled_formats.append(issue_format) if len(handled_formats) == 0: raise V20CredManagerError("No supported credential formats received.") cred_ex_record.cred_issue = cred_issue_message cred_ex_record.state = V20CredExRecord.STATE_CREDENTIAL_RECEIVED await cred_ex_record.save(session, reason="receive v2.0 credential issue") return cred_ex_record
[docs] async def store_credential( self, cred_ex_record: V20CredExRecord, cred_id: str = None ) -> Tuple[V20CredExRecord, V20CredAck]: """ Store a credential in holder wallet; send ack to issuer. Args: cred_ex_record: credential exchange record with credential to store and ack cred_id: optional credential identifier to override default on storage Returns: Updated credential exchange record """ if cred_ex_record.state != (V20CredExRecord.STATE_CREDENTIAL_RECEIVED): raise V20CredManagerError( f"Credential exchange {cred_ex_record.cred_ex_id} " f"in {cred_ex_record.state} state " f"(must be {V20CredExRecord.STATE_CREDENTIAL_RECEIVED})" ) # Format specific store_credential handler for format in cred_ex_record.cred_issue.formats: cred_format = V20CredFormat.Format.get(format.format) if cred_format: await cred_format.handler(self.profile).store_credential( cred_ex_record, cred_id ) # TODO: if storing multiple credentials we can't reuse the same id cred_id = None return cred_ex_record
[docs] async def send_cred_ack( self, cred_ex_record: V20CredExRecord, ): """ Create, send, and return ack message for input cred ex record. Delete cred ex record if set to auto-remove. Returns: Tuple: cred ex record, cred ack message for tracing """ cred_ack_message = V20CredAck() cred_ack_message.assign_thread_id( cred_ex_record.thread_id, cred_ex_record.parent_thread_id ) cred_ack_message.assign_trace_decorator( self._profile.settings, cred_ex_record.trace ) cred_ex_record.state = V20CredExRecord.STATE_DONE try: async with self._profile.session() as session: # FIXME - re-fetch record to check state, apply transactional update await cred_ex_record.save(session, reason="store credential v2.0") if cred_ex_record.auto_remove: await cred_ex_record.delete_record(session) # all done: delete except StorageError as err: LOGGER.exception(err) # holder still owes an ack: carry on responder = self._profile.inject(BaseResponder, required=False) if responder: await responder.send_reply( cred_ack_message, connection_id=cred_ex_record.connection_id, ) else: LOGGER.warning( "Configuration has no BaseResponder: cannot ack credential on %s", cred_ex_record.thread_id, ) return cred_ex_record, cred_ack_message
[docs] async def receive_credential_ack( self, cred_ack_message: V20CredAck, connection_id: str ) -> V20CredExRecord: """ Receive credential ack from holder. Args: cred_ack_message: credential ack message to receive connection_id: connection identifier Returns: credential exchange record, retrieved and updated """ # FIXME use transaction, fetch for_update async with self._profile.session() as session: cred_ex_record = await ( V20CredExRecord.retrieve_by_conn_and_thread( session, connection_id, cred_ack_message._thread_id, ) ) cred_ex_record.state = V20CredExRecord.STATE_DONE await cred_ex_record.save(session, reason="receive credential ack v2.0") if cred_ex_record.auto_remove: await self.delete_cred_ex_record(cred_ex_record.cred_ex_id) return cred_ex_record
[docs] async def delete_cred_ex_record(self, cred_ex_id: str) -> None: """Delete credential exchange record and associated detail records.""" async with self._profile.session() as session: for fmt in V20CredFormat.Format: # details first: do not strand any orphans for record in await fmt.detail.query_by_cred_ex_id( session, cred_ex_id, ): await record.delete_record(session) cred_ex_record = await V20CredExRecord.retrieve_by_id(session, cred_ex_id) await cred_ex_record.delete_record(session)
[docs] async def receive_problem_report( self, message: V20CredProblemReport, connection_id: str ): """ Receive problem report. Returns: credential exchange record, retrieved and updated """ # FIXME use transaction, fetch for_update async with self._profile.session() as session: cred_ex_record = await ( V20CredExRecord.retrieve_by_conn_and_thread( session, connection_id, message._thread_id, ) ) cred_ex_record.state = None code = message.description.get( "code", ProblemReportReason.ISSUANCE_ABANDONED.value, ) cred_ex_record.error_msg = f"{code}: {message.description.get('en', code)}" await cred_ex_record.save(session, reason="received problem report") return cred_ex_record