Source code for aries_cloudagent.messaging.credentials.manager

"""Classes to manage credentials."""

import asyncio
import json
import logging
import time

from ...config.injection_context import InjectionContext
from ...error import BaseError
from ...cache.base import BaseCache
from ...holder.base import BaseHolder
from ...issuer.base import BaseIssuer
from ...ledger.base import BaseLedger
from ...storage.error import StorageNotFoundError

from ..connections.models.connection_record import ConnectionRecord

from .messages.credential_issue import CredentialIssue
from .messages.credential_stored import CredentialStored
from .messages.credential_request import CredentialRequest
from .messages.credential_offer import CredentialOffer
from .models.credential_exchange import CredentialExchange


[docs]class CredentialManagerError(BaseError): """Credential error."""
[docs]class CredentialManager: """Class for managing credentials.""" def __init__(self, context: InjectionContext): """ Initialize a CredentialManager. Args: context: The context for this credential """ self._context = context self._logger = logging.getLogger(__name__) @property def context(self) -> InjectionContext: """ Accessor for the current injection context. Returns: The injection context for this credential manager """ return self._context
[docs] async def cache_credential_exchange( self, credential_exchange_record: CredentialExchange ): """Cache a credential exchange to avoid redundant credential requests.""" cache: BaseCache = await self.context.inject(BaseCache) await cache.set( "credential_exchange::" + f"{credential_exchange_record.credential_definition_id}::" + f"{credential_exchange_record.connection_id}", credential_exchange_record.credential_exchange_id, 600, )
[docs] async def prepare_send( self, credential_definition_id: str, connection_id: str, credential_values: dict ) -> CredentialExchange: """ Set up a new credential exchange for an automated send. Args: credential_definition_id: Credential definition id for offer connection_id: Connection to create offer for credential_values: The credential values to use if auto_issue is enabled Returns: A new `CredentialExchange` record """ cache: BaseCache = await self._context.inject(BaseCache) # This cache is populated in credential_request_handler.py # Do we have a source (parent) credential exchange for which # we can re-use the credential request/offer? source_credential_exchange_id = await cache.get( "credential_exchange::" + f"{credential_definition_id}::" + f"{connection_id}" ) source_credential_exchange = None if source_credential_exchange_id: # The cached credential exchange ID may not have an associated credential # request yet. Wait up to 30 seconds for that to be populated, then # move on and replace it as the cached credential exchange lookup_start = time.perf_counter() while True: source_credential_exchange = await CredentialExchange.retrieve_by_id( self._context, source_credential_exchange_id ) if source_credential_exchange.credential_request: break if lookup_start + 30 < time.perf_counter(): source_credential_exchange = None break await asyncio.sleep(0.3) if source_credential_exchange: # Since we have the source exchange cache, we can re-use the schema_id, # credential_offer, and credential_request to save a roundtrip credential_exchange = CredentialExchange( auto_issue=True, connection_id=connection_id, initiator=CredentialExchange.INITIATOR_SELF, state=CredentialExchange.STATE_REQUEST_RECEIVED, credential_definition_id=credential_definition_id, schema_id=source_credential_exchange.schema_id, credential_offer=source_credential_exchange.credential_offer, credential_request=source_credential_exchange.credential_request, credential_values=credential_values, # We use the source credential exchange's thread id as the parent # thread id. This thread is a branch of that parent so that the other # agent can use the parent thread id to look up its corresponding # source credential exchange object as needed parent_thread_id=source_credential_exchange.thread_id, ) await credential_exchange.save( self.context, reason="Create automated credential exchange from cached request", ) else: # If the cache is empty, we must use the normal credential flow while # also instructing the agent to automatically issue the credential # once it receives the credential request credential_exchange = await self.create_offer( credential_definition_id, connection_id, True, credential_values ) # Mark this credential exchange as the current cached one for this cred def await self.cache_credential_exchange(credential_exchange) return credential_exchange
[docs] async def perform_send( self, credential_exchange: CredentialExchange, outbound_handler ): """Send the first message in a credential exchange.""" if credential_exchange.credential_request: (credential_exchange, credential_message) = await self.issue_credential( credential_exchange ) await outbound_handler( credential_message, connection_id=credential_exchange.connection_id ) else: credential_exchange, credential_offer_message = await self.offer_credential( credential_exchange ) await outbound_handler( credential_offer_message, connection_id=credential_exchange.connection_id, )
[docs] async def create_offer( self, credential_definition_id: str, connection_id: str, auto_issue: bool = None, credential_values: dict = None, ): """ Create a new credential exchange representing an offer. Args: credential_definition_id: Credential definition id for offer connection_id: Connection to create offer for Returns: A new credential exchange record """ issuer: BaseIssuer = await self.context.inject(BaseIssuer) credential_offer = await issuer.create_credential_offer( credential_definition_id ) credential_exchange = CredentialExchange( auto_issue=auto_issue, connection_id=connection_id, initiator=CredentialExchange.INITIATOR_SELF, state=CredentialExchange.STATE_OFFER_SENT, credential_definition_id=credential_definition_id, schema_id=credential_offer["schema_id"], credential_offer=credential_offer, credential_values=credential_values, ) await credential_exchange.save(self.context, reason="Create credential offer") return credential_exchange
[docs] async def offer_credential(self, credential_exchange: CredentialExchange): """ Offer a credential. Args: credential_exchange_record: The credential exchange we are creating the credential offer for Returns: Tuple: (Updated credential exchange record, credential offer message) """ credential_offer_message = CredentialOffer( offer_json=json.dumps(credential_exchange.credential_offer) ) credential_exchange.thread_id = credential_offer_message._thread_id credential_exchange.state = CredentialExchange.STATE_OFFER_SENT await credential_exchange.save(self.context, reason="Send credential offer") return credential_exchange, credential_offer_message
[docs] async def receive_offer( self, credential_offer_message: CredentialOffer, connection_id: str ): """ Receive a credential offer. Args: credential_offer: Credential offer to receive connection_id: Connection to receive offer on Returns: The credential_exchange_record """ credential_offer = json.loads(credential_offer_message.offer_json) credential_exchange = CredentialExchange( connection_id=connection_id, thread_id=credential_offer_message._thread_id, initiator=CredentialExchange.INITIATOR_EXTERNAL, state=CredentialExchange.STATE_OFFER_RECEIVED, credential_definition_id=credential_offer["cred_def_id"], schema_id=credential_offer["schema_id"], credential_offer=credential_offer, ) await credential_exchange.save(self.context, reason="Receive credential offer") return credential_exchange
[docs] async def create_request( self, credential_exchange_record: CredentialExchange, connection_record: ConnectionRecord, ): """ Create a credential request. Args: credential_exchange_record: Credential exchange to create request for connection_record: Connection to create the request for Return: A tuple (credential_exchange_record, credential_request_message) """ credential_definition_id = credential_exchange_record.credential_definition_id credential_offer = credential_exchange_record.credential_offer did = connection_record.my_did if credential_exchange_record.credential_request: self._logger.warning( "create_request called multiple times for credential exchange: %s", credential_exchange_record.credential_exchange_id, ) else: ledger: BaseLedger = await self.context.inject(BaseLedger) async with ledger: credential_definition = await ledger.get_credential_definition( credential_definition_id ) holder: BaseHolder = await self.context.inject(BaseHolder) ( credential_exchange_record.credential_request, credential_exchange_record.credential_request_metadata, ) = await holder.create_credential_request( credential_offer, credential_definition, did ) credential_request_message = CredentialRequest( request=json.dumps(credential_exchange_record.credential_request) ) credential_request_message.assign_thread_id( credential_exchange_record.thread_id ) credential_exchange_record.state = CredentialExchange.STATE_REQUEST_SENT await credential_exchange_record.save( self.context, reason="Create credential request" ) return credential_exchange_record, credential_request_message
[docs] async def receive_request(self, credential_request_message: CredentialRequest): """ Receive a credential request. Args: credential_request_message: Credential request to receive """ credential_request = json.loads(credential_request_message.request) credential_exchange_record = await CredentialExchange.retrieve_by_tag_filter( self.context, tag_filter={"thread_id": credential_request_message._thread_id}, ) credential_exchange_record.credential_request = credential_request credential_exchange_record.state = CredentialExchange.STATE_REQUEST_RECEIVED await credential_exchange_record.save( self.context, reason="Receive credential request" ) return credential_exchange_record
[docs] async def issue_credential(self, credential_exchange_record: CredentialExchange): """ Issue a credential. Args: credential_exchange_record: The credential exchange we are issuing a credential for Returns: Tuple: (Updated credential exchange record, credential message obj) """ schema_id = credential_exchange_record.schema_id if credential_exchange_record.credential: self._logger.warning( "issue_credential called multiple times for credential exchange: %s", credential_exchange_record.credential_exchange_id, ) else: credential_offer = credential_exchange_record.credential_offer credential_request = credential_exchange_record.credential_request credential_values = credential_exchange_record.credential_values ledger: BaseLedger = await self.context.inject(BaseLedger) async with ledger: schema = await ledger.get_schema(schema_id) issuer: BaseIssuer = await self.context.inject(BaseIssuer) ( credential_exchange_record.credential, credential_revocation_id, ) = await issuer.create_credential( schema, credential_offer, credential_request, credential_values ) credential_exchange_record.state = CredentialExchange.STATE_ISSUED credential_message = CredentialIssue( issue=json.dumps(credential_exchange_record.credential) ) # If we have a thread id on the exchange object, # then we not re-using a credential request and we can # re-assign the thread_id to this message to continue the thread if credential_exchange_record.thread_id: credential_message.assign_thread_id( thid=credential_exchange_record.thread_id ) # If we have no thread_id on the exchange object, but we DO have # a parent_thread_id, then we are re-using the credential # request and we need to assign the parent_thread_id to this message # and also keep the generated message thread_id. We also save the # message's thread_id to our exchange object so we can correlate # the thread to an exchange object in `credential_stored()` elif credential_exchange_record.parent_thread_id: new_thread_id = credential_message._thread_id credential_message.assign_thread_id( thid=new_thread_id, pthid=credential_exchange_record.parent_thread_id ) credential_exchange_record.thread_id = new_thread_id else: raise CredentialManagerError( "The credential exchange object must have a parent thread id" + " OR thread id in order to issue a credential." ) await credential_exchange_record.save(self.context, reason="Issue credential") return credential_exchange_record, credential_message
[docs] async def receive_credential(self, credential_message: CredentialIssue): """ Receive a credential a credential from an issuer. Hold in storage to be potentially processed by controller before storing. Args: credential_message: credential to store """ raw_credential = json.loads(credential_message.issue) try: ( credential_exchange_record ) = await CredentialExchange.retrieve_by_tag_filter( self.context, tag_filter={"thread_id": credential_message._thread_id} ) except StorageNotFoundError: if not credential_message._thread or not credential_message._thread.pthid: raise # If the thread_id does not return any results, we check the # parent thread id to see if this exchange is nested and is # re-using information from parent. In this case, we need the parent # exchange state object to retrieve and re-use the # credential_request_metadata ( credential_exchange_record ) = await CredentialExchange.retrieve_by_tag_filter( self.context, tag_filter={"thread_id": credential_message._thread.pthid} ) credential_exchange_record._id = None credential_exchange_record.thread_id = credential_message._thread_id credential_exchange_record.credential_id = None credential_exchange_record.credential = None credential_exchange_record.raw_credential = raw_credential credential_exchange_record.state = CredentialExchange.STATE_CREDENTIAL_RECEIVED await credential_exchange_record.save(self.context, reason="Receive credential") return credential_exchange_record
[docs] async def store_credential(self, credential_exchange_record: CredentialExchange): """ Store a credential in the wallet. Args: credential_message: credential to store """ raw_credential = credential_exchange_record.raw_credential ledger: BaseLedger = await self.context.inject(BaseLedger) async with ledger: credential_definition = await ledger.get_credential_definition( raw_credential["cred_def_id"] ) holder: BaseHolder = await self.context.inject(BaseHolder) credential_id = await holder.store_credential( credential_definition, raw_credential, credential_exchange_record.credential_request_metadata, ) credential = await holder.get_credential(credential_id) credential_exchange_record.state = CredentialExchange.STATE_STORED credential_exchange_record.credential_id = credential_id credential_exchange_record.credential = credential await credential_exchange_record.save(self.context, reason="Store credential") credential_stored_message = CredentialStored() credential_stored_message.assign_thread_id( credential_exchange_record.thread_id, credential_exchange_record.parent_thread_id, ) return credential_exchange_record, credential_stored_message
[docs] async def credential_stored(self, credential_stored_message: CredentialStored): """ Receive confirmation that holder stored credential. Args: credential_message: credential to store """ credential_exchange_record = await CredentialExchange.retrieve_by_tag_filter( self.context, tag_filter={"thread_id": credential_stored_message._thread_id} ) credential_exchange_record.state = CredentialExchange.STATE_STORED await credential_exchange_record.save(self.context, reason="Credential stored") return credential_exchange_record