Source code for aries_cloudagent.protocols.presentations.manager

"""Classes to manage presentations."""

import json
import logging
from uuid import uuid4

from ...config.injection_context import InjectionContext
from ...core.error import BaseError
from ...holder.base import BaseHolder
from ...ledger.base import BaseLedger
from ...verifier.base import BaseVerifier

from .models.presentation_exchange import PresentationExchange
from .messages.presentation_request import PresentationRequest
from .messages.credential_presentation import CredentialPresentation


[docs]class PresentationManagerError(BaseError): """Presentation error."""
[docs]class PresentationManager: """Class for managing presentations.""" def __init__(self, context: InjectionContext): """ Initialize a PresentationManager. Args: context: The context for this credential """ self._context = context self._logger = logging.getLogger(__name__) @property def context(self) -> InjectionContext: """ Accessor for the current request context. Returns: The injection context for this presentation manager """ return self._context
[docs] async def create_request( self, name: str, version: str, requested_attributes: list, requested_predicates: list, connection_id: str, ): """Create a proof request.""" presentation_request = { "name": name, "version": version, "nonce": str(uuid4().int), "requested_attributes": {}, "requested_predicates": {}, } for requested_attribute in requested_attributes: presentation_request["requested_attributes"][ str(uuid4()) ] = requested_attribute for requested_predicates in requested_predicates: presentation_request["requested_predicates"][ str(uuid4()) ] = requested_predicates presentation_request_message = PresentationRequest( request=json.dumps(presentation_request) ) presentation_exchange = PresentationExchange( connection_id=connection_id, initiator=PresentationExchange.INITIATOR_SELF, state=PresentationExchange.STATE_REQUEST_SENT, presentation_request=presentation_request, thread_id=presentation_request_message._thread_id, ) await presentation_exchange.save( self.context, reason="Create presentation request" ) return presentation_exchange, presentation_request_message
[docs] async def receive_request( self, presentation_request_message: PresentationRequest, connection_id: str ): """ Receive a presentation request. Args: presentation_request_message: Presentation message to receive """ presentation_exchange = PresentationExchange( connection_id=connection_id, thread_id=presentation_request_message._thread_id, initiator=PresentationExchange.INITIATOR_EXTERNAL, state=PresentationExchange.STATE_REQUEST_RECEIVED, presentation_request=json.loads(presentation_request_message.request), ) await presentation_exchange.save( self.context, reason="Receive presentation request" ) return presentation_exchange
[docs] async def create_presentation( self, presentation_exchange_record: PresentationExchange, requested_credentials: dict, ): """ Receive a presentation request. Args: presentation_exchange_record: Record to update requested_credentials: Indy formatted requested_credentials i.e., :: { "self_attested_attributes": { "j233ffbc-bd35-49b1-934f-51e083106f6d": "value" }, "requested_attributes": { "6253ffbb-bd35-49b3-934f-46e083106f6c": { "cred_id": "5bfa40b7-062b-4ae0-a251-a86c87922c0e", "revealed": true } }, "requested_predicates": { "bfc8a97d-60d3-4f21-b998-85eeabe5c8c0": { "cred_id": "5bfa40b7-062b-4ae0-a251-a86c87922c0e" } } } """ # Get all credential ids for this presentation credential_ids = [] requested_attributes = requested_credentials["requested_attributes"] for presentation_referent in requested_attributes: credential_id = requested_attributes[presentation_referent]["cred_id"] credential_ids.append(credential_id) requested_predicates = requested_credentials["requested_predicates"] for presentation_referent in requested_predicates: credential_id = requested_predicates[presentation_referent]["cred_id"] credential_ids.append(credential_id) # Get all schema and credential definition ids in use # TODO: Cache this!!! schema_ids = [] credential_definition_ids = [] holder: BaseHolder = await self.context.inject(BaseHolder) for credential_id in credential_ids: credential = await holder.get_credential(credential_id) schema_id = credential["schema_id"] credential_definition_id = credential["cred_def_id"] schema_ids.append(schema_id) credential_definition_ids.append(credential_definition_id) schemas = {} credential_definitions = {} ledger: BaseLedger = await self.context.inject(BaseLedger) async with ledger: # Build schemas for anoncreds for schema_id in schema_ids: schema = await ledger.get_schema(schema_id) schemas[schema_id] = schema # Build credential_definitions for anoncreds for credential_definition_id in credential_definition_ids: (credential_definition) = await ledger.get_credential_definition( credential_definition_id ) credential_definitions[credential_definition_id] = credential_definition holder: BaseHolder = await self.context.inject(BaseHolder) presentation = await holder.create_presentation( presentation_exchange_record.presentation_request, requested_credentials, schemas, credential_definitions, ) presentation_message = CredentialPresentation( presentation=json.dumps(presentation) ) # TODO: Find a more elegant way to do this presentation_message._thread = {"thid": presentation_exchange_record.thread_id} # save presentation exchange state presentation_exchange_record.state = ( PresentationExchange.STATE_PRESENTATION_SENT ) presentation_exchange_record.presentation = presentation await presentation_exchange_record.save( self.context, reason="Create presentation", log_params={"requested_credentials": requested_credentials}, ) return presentation_exchange_record, presentation_message
[docs] async def receive_presentation(self, presentation: dict, thread_id: str): """Receive a presentation request.""" ( presentation_exchange_record ) = await PresentationExchange.retrieve_by_tag_filter( self.context, {"thread_id": thread_id}, {"initiator": "self"} ) presentation_exchange_record.presentation = presentation presentation_exchange_record.state = ( PresentationExchange.STATE_PRESENTATION_RECEIVED ) await presentation_exchange_record.save( self.context, reason="Receive presentation" ) return presentation_exchange_record
[docs] async def verify_presentation( self, presentation_exchange_record: PresentationExchange ): """Verify a presentation request.""" presentation_request = presentation_exchange_record.presentation_request presentation = presentation_exchange_record.presentation schema_ids = [] credential_definition_ids = [] identifiers = presentation["identifiers"] for identifier in identifiers: schema_ids.append(identifier["schema_id"]) credential_definition_ids.append(identifier["cred_def_id"]) schemas = {} credential_definitions = {} ledger: BaseLedger = await self.context.inject(BaseLedger) async with ledger: # Build schemas for anoncreds for schema_id in schema_ids: schema = await ledger.get_schema(schema_id) schemas[schema_id] = schema # Build credential_definitions for anoncreds for credential_definition_id in credential_definition_ids: (credential_definition) = await ledger.get_credential_definition( credential_definition_id ) credential_definitions[credential_definition_id] = credential_definition verifier: BaseVerifier = await self.context.inject(BaseVerifier) verified = await verifier.verify_presentation( presentation_request, presentation, schemas, credential_definitions ) presentation_exchange_record.verified = "true" if verified else "false" presentation_exchange_record.state = PresentationExchange.STATE_VERIFIED await presentation_exchange_record.save( self.context, reason="Verify presentation" ) return presentation_exchange_record