Source code for aries_cloudagent.protocols.present_proof.v1_0.routes

"""Admin routes for presentations."""

import json

from aiohttp import web
from aiohttp_apispec import docs, request_schema, response_schema
from marshmallow import Schema, fields

from ....connections.models.connection_record import ConnectionRecord
from ....holder.base import BaseHolder
from ....messaging.decorators.attach_decorator import AttachDecorator
from ....messaging.valid import (
    INDY_CRED_DEF_ID,
    INDY_DID,
    INDY_PREDICATE,
    INDY_SCHEMA_ID,
    INDY_VERSION,
    INT_EPOCH,
    UUIDFour,
)
from ....storage.error import StorageNotFoundError
from ....indy.util import generate_pr_nonce

from .manager import PresentationManager
from .messages.inner.presentation_preview import (
    PresentationPreview,
    PresentationPreviewSchema,
)
from .messages.presentation_proposal import PresentationProposal
from .messages.presentation_request import PresentationRequest
from .models.presentation_exchange import (
    V10PresentationExchange,
    V10PresentationExchangeSchema,
)

from .message_types import ATTACH_DECO_IDS, PRESENTATION_REQUEST

from ....utils.tracing import trace_event, get_timer, AdminAPIMessageTracingSchema


[docs]class V10PresentationExchangeListSchema(Schema): """Result schema for an Aries#0037 v1.0 presentation exchange query.""" results = fields.List( fields.Nested(V10PresentationExchangeSchema()), description="Aries#0037 v1.0 presentation exchange records", )
[docs]class V10PresentationProposalRequestSchema(AdminAPIMessageTracingSchema): """Request schema for sending a presentation proposal admin message.""" connection_id = fields.UUID( description="Connection identifier", required=True, example=UUIDFour.EXAMPLE ) comment = fields.Str( description="Human-readable comment", required=False, default="" ) presentation_proposal = fields.Nested(PresentationPreviewSchema(), required=True) auto_present = fields.Boolean( description=( "Whether to respond automatically to presentation requests, building " "and presenting requested proof" ), required=False, default=False, )
[docs]class IndyProofReqSpecRestrictionsSchema(Schema): """Schema for restrictions in attr or pred specifier indy proof request.""" credential_definition_id = fields.Str( description="Credential definition identifier", required=True, **INDY_CRED_DEF_ID ) schema_id = fields.String( description="Schema identifier", required=False, **INDY_SCHEMA_ID ) schema_issuer_did = fields.String( description="Schema issuer (origin) DID", required=False, **INDY_DID ) schema_name = fields.String( example="transcript", description="Schema name", required=False ) schema_version = fields.String( description="Schema version", required=False, **INDY_VERSION ) issuer_did = fields.String( description="Credential issuer DID", required=False, **INDY_DID ) cred_def_id = fields.String( description="Credential definition identifier", required=False, **INDY_CRED_DEF_ID )
[docs]class IndyProofReqNonRevoked(Schema): """Non-revocation times specification in indy proof request.""" from_epoch = fields.Int( description="Earliest epoch of interest for non-revocation proof", required=True, **INT_EPOCH ) to_epoch = fields.Int( description="Latest epoch of interest for non-revocation proof", required=True, **INT_EPOCH )
[docs]class IndyProofReqAttrSpecSchema(Schema): """Schema for attribute specification in indy proof request.""" name = fields.String( example="favouriteDrink", description="Attribute name", required=True ) restrictions = fields.List( fields.Nested(IndyProofReqSpecRestrictionsSchema()), description="If present, credential must satisfy one of given restrictions", required=False, ) non_revoked = fields.Nested(IndyProofReqNonRevoked(), required=False)
[docs]class IndyProofReqPredSpecSchema(Schema): """Schema for predicate specification in indy proof request.""" name = fields.String(example="index", description="Attribute name", required=True) p_type = fields.String( description="Predicate type ('<', '<=', '>=', or '>')", required=True, **INDY_PREDICATE ) p_value = fields.Integer(description="Threshold value", required=True) restrictions = fields.List( fields.Nested(IndyProofReqSpecRestrictionsSchema()), description="If present, credential must satisfy one of given restrictions", required=False, ) non_revoked = fields.Nested(IndyProofReqNonRevoked(), required=False)
[docs]class IndyProofRequestSchema(Schema): """Schema for indy proof request.""" nonce = fields.String(description="Nonce", required=False, example="1234567890") name = fields.String( description="Proof request name", required=False, example="Proof request", default="Proof request", ) version = fields.String( description="Proof request version", required=False, default="1.0", **INDY_VERSION ) requested_attributes = fields.Dict( description=("Requested attribute specifications of proof request"), required=True, keys=fields.Str(example="0_attr_uuid"), # marshmallow/apispec v3.0 ignores values=fields.Nested(IndyProofReqAttrSpecSchema()), ) requested_predicates = fields.Dict( description=("Requested predicate specifications of proof request"), required=True, keys=fields.Str(example="0_age_GE_uuid"), # marshmallow/apispec v3.0 ignores values=fields.Nested(IndyProofReqPredSpecSchema()), )
[docs]class V10PresentationRequestRequestSchema(AdminAPIMessageTracingSchema): """Request schema for sending a proof request.""" connection_id = fields.UUID( description="Connection identifier", required=True, example=UUIDFour.EXAMPLE ) proof_request = fields.Nested(IndyProofRequestSchema(), required=True) comment = fields.Str(required=False)
[docs]class IndyRequestedCredsRequestedAttrSchema(Schema): """Schema for requested attributes within indy requested credentials structure.""" cred_id = fields.Str( example="3fa85f64-5717-4562-b3fc-2c963f66afa6", description=( "Wallet credential identifier (typically but not necessarily a UUID)" ), ) revealed = fields.Bool( description="Whether to reveal attribute in proof", default=True )
[docs]class IndyRequestedCredsRequestedPredSchema(Schema): """Schema for requested predicates within indy requested credentials structure.""" cred_id = fields.Str( example="3fa85f64-5717-4562-b3fc-2c963f66afa6", description=( "Wallet credential identifier (typically but not necessarily a UUID)" ), )
[docs]class V10PresentationRequestSchema(AdminAPIMessageTracingSchema): """Request schema for sending a presentation.""" self_attested_attributes = fields.Dict( description=("Self-attested attributes to build into proof"), required=True, keys=fields.Str(example="attr_name"), # marshmallow/apispec v3.0 ignores values=fields.Str( example="self_attested_value", description=( "Self-attested attribute values to use in requested-credentials " "structure for proof construction" ), ), ) requested_attributes = fields.Dict( description=( "Nested object mapping proof request attribute referents to " "requested-attribute specifiers" ), required=True, keys=fields.Str(example="attr_referent"), # marshmallow/apispec v3.0 ignores values=fields.Nested(IndyRequestedCredsRequestedAttrSchema()), ) requested_predicates = fields.Dict( description=( "Nested object mapping proof request predicate referents to " "requested-predicate specifiers" ), required=True, keys=fields.Str(example="pred_referent"), # marshmallow/apispec v3.0 ignores values=fields.Nested(IndyRequestedCredsRequestedPredSchema()), )
[docs]@docs(tags=["present-proof"], summary="Fetch all present-proof exchange records") @response_schema(V10PresentationExchangeListSchema(), 200) async def presentation_exchange_list(request: web.BaseRequest): """ Request handler for searching presentation exchange records. Args: request: aiohttp request object Returns: The presentation exchange list response """ context = request.app["request_context"] tag_filter = {} if "thread_id" in request.query and request.query["thread_id"] != "": tag_filter["thread_id"] = request.query["thread_id"] post_filter = {} for param_name in ("connection_id", "role", "state"): if param_name in request.query and request.query[param_name] != "": post_filter[param_name] = request.query[param_name] records = await V10PresentationExchange.query(context, tag_filter, post_filter) return web.json_response({"results": [record.serialize() for record in records]})
[docs]@docs(tags=["present-proof"], summary="Fetch a single presentation exchange record") @response_schema(V10PresentationExchangeSchema(), 200) async def presentation_exchange_retrieve(request: web.BaseRequest): """ Request handler for fetching a single presentation exchange record. Args: request: aiohttp request object Returns: The presentation exchange record response """ context = request.app["request_context"] presentation_exchange_id = request.match_info["pres_ex_id"] try: record = await V10PresentationExchange.retrieve_by_id( context, presentation_exchange_id ) except StorageNotFoundError: raise web.HTTPNotFound() return web.json_response(record.serialize())
[docs]@docs( tags=["present-proof"], summary="Fetch credentials for a presentation request from wallet", parameters=[ { "name": "start", "in": "query", "schema": {"type": "string"}, "required": False, }, { "name": "count", "in": "query", "schema": {"type": "string"}, "required": False, }, { "name": "extra_query", "in": "query", "schema": {"type": "string"}, "required": False, }, ], ) async def presentation_exchange_credentials_list(request: web.BaseRequest): """ Request handler for searching applicable credential records. Args: request: aiohttp request object Returns: The credential list response """ context = request.app["request_context"] presentation_exchange_id = request.match_info["pres_ex_id"] referents = request.match_info.get("referent") presentation_referents = referents.split(",") if referents else () try: presentation_exchange_record = await V10PresentationExchange.retrieve_by_id( context, presentation_exchange_id ) except StorageNotFoundError: raise web.HTTPNotFound() start = request.query.get("start") count = request.query.get("count") # url encoded json extra_query encoded_extra_query = request.query.get("extra_query") or "{}" extra_query = json.loads(encoded_extra_query) # defaults start = int(start) if isinstance(start, str) else 0 count = int(count) if isinstance(count, str) else 10 holder: BaseHolder = await context.inject(BaseHolder) credentials = await holder.get_credentials_for_presentation_request_by_referent( presentation_exchange_record.presentation_request, presentation_referents, start, count, extra_query, ) presentation_exchange_record.log_state( context, "Retrieved presentation credentials", { "presentation_exchange_id": presentation_exchange_id, "referents": presentation_referents, "extra_query": extra_query, "credentials": credentials, }, ) return web.json_response(credentials)
[docs]@docs(tags=["present-proof"], summary="Sends a presentation proposal") @request_schema(V10PresentationProposalRequestSchema()) @response_schema(V10PresentationExchangeSchema(), 200) async def presentation_exchange_send_proposal(request: web.BaseRequest): """ Request handler for sending a presentation proposal. Args: request: aiohttp request object Returns: The presentation exchange details """ r_time = get_timer() context = request.app["request_context"] outbound_handler = request.app["outbound_message_router"] body = await request.json() connection_id = body.get("connection_id") try: connection_record = await ConnectionRecord.retrieve_by_id( context, connection_id ) except StorageNotFoundError: raise web.HTTPBadRequest() if not connection_record.is_ready: raise web.HTTPForbidden() comment = body.get("comment") # Aries#0037 calls it a proposal in the proposal struct but it's of type preview presentation_preview = body.get("presentation_proposal") presentation_proposal_message = PresentationProposal( comment=comment, presentation_proposal=PresentationPreview.deserialize(presentation_preview), ) trace_msg = body.get("trace") presentation_proposal_message.assign_trace_decorator( context.settings, trace_msg, ) auto_present = body.get( "auto_present", context.settings.get("debug.auto_respond_presentation_request") ) presentation_manager = PresentationManager(context) ( presentation_exchange_record ) = await presentation_manager.create_exchange_for_proposal( connection_id=connection_id, presentation_proposal_message=presentation_proposal_message, auto_present=auto_present, ) await outbound_handler(presentation_proposal_message, connection_id=connection_id) trace_event( context.settings, presentation_proposal_message, outcome="presentation_exchange_propose.END", perf_counter=r_time, ) return web.json_response(presentation_exchange_record.serialize())
[docs]@docs( tags=["present-proof"], summary=""" Creates a presentation request not bound to any proposal or existing connection """, ) @request_schema(V10PresentationRequestRequestSchema()) @response_schema(V10PresentationExchangeSchema(), 200) async def presentation_exchange_create_request(request: web.BaseRequest): """ Request handler for creating a free presentation request. The presentation request will not be bound to any proposal or existing connection. Args: request: aiohttp request object Returns: The presentation exchange details """ r_time = get_timer() context = request.app["request_context"] outbound_handler = request.app["outbound_message_router"] body = await request.json() comment = body.get("comment") indy_proof_request = body.get("proof_request") if not indy_proof_request.get("nonce"): indy_proof_request["nonce"] = await generate_pr_nonce() presentation_request_message = PresentationRequest( comment=comment, request_presentations_attach=[ AttachDecorator.from_indy_dict( indy_dict=indy_proof_request, ident=ATTACH_DECO_IDS[PRESENTATION_REQUEST], ) ], ) trace_msg = body.get("trace") presentation_request_message.assign_trace_decorator( context.settings, trace_msg, ) presentation_manager = PresentationManager(context) ( presentation_exchange_record ) = await presentation_manager.create_exchange_for_request( connection_id=None, presentation_request_message=presentation_request_message ) await outbound_handler(presentation_request_message, connection_id=None) trace_event( context.settings, presentation_request_message, outcome="presentation_exchange_create_request.END", perf_counter=r_time, ) return web.json_response(presentation_exchange_record.serialize())
[docs]@docs( tags=["present-proof"], summary="Sends a free presentation request not bound to any proposal", ) @request_schema(V10PresentationRequestRequestSchema()) @response_schema(V10PresentationExchangeSchema(), 200) async def presentation_exchange_send_free_request(request: web.BaseRequest): """ Request handler for sending a presentation request free from any proposal. Args: request: aiohttp request object Returns: The presentation exchange details """ r_time = get_timer() context = request.app["request_context"] outbound_handler = request.app["outbound_message_router"] body = await request.json() connection_id = body.get("connection_id") try: connection_record = await ConnectionRecord.retrieve_by_id( context, connection_id ) except StorageNotFoundError: raise web.HTTPBadRequest() if not connection_record.is_ready: raise web.HTTPForbidden() comment = body.get("comment") indy_proof_request = body.get("proof_request") if not indy_proof_request.get("nonce"): indy_proof_request["nonce"] = await generate_pr_nonce() presentation_request_message = PresentationRequest( comment=comment, request_presentations_attach=[ AttachDecorator.from_indy_dict( indy_dict=indy_proof_request, ident=ATTACH_DECO_IDS[PRESENTATION_REQUEST], ) ], ) trace_msg = body.get("trace") presentation_request_message.assign_trace_decorator( context.settings, trace_msg, ) presentation_manager = PresentationManager(context) ( presentation_exchange_record ) = await presentation_manager.create_exchange_for_request( connection_id=connection_id, presentation_request_message=presentation_request_message, ) await outbound_handler(presentation_request_message, connection_id=connection_id) trace_event( context.settings, presentation_request_message, outcome="presentation_exchange_send_request.END", perf_counter=r_time, ) return web.json_response(presentation_exchange_record.serialize())
[docs]@docs( tags=["present-proof"], summary="Sends a presentation request in reference to a proposal", ) @request_schema(V10PresentationRequestRequestSchema()) @response_schema(V10PresentationExchangeSchema(), 200) async def presentation_exchange_send_bound_request(request: web.BaseRequest): """ Request handler for sending a presentation request free from any proposal. Args: request: aiohttp request object Returns: The presentation exchange details """ r_time = get_timer() context = request.app["request_context"] outbound_handler = request.app["outbound_message_router"] presentation_exchange_id = request.match_info["pres_ex_id"] presentation_exchange_record = await V10PresentationExchange.retrieve_by_id( context, presentation_exchange_id ) assert presentation_exchange_record.state == ( V10PresentationExchange.STATE_PROPOSAL_RECEIVED ) body = await request.json() connection_id = body.get("connection_id") try: connection_record = await ConnectionRecord.retrieve_by_id( context, connection_id ) except StorageNotFoundError: raise web.HTTPBadRequest() if not connection_record.is_ready: raise web.HTTPForbidden() presentation_manager = PresentationManager(context) ( presentation_exchange_record, presentation_request_message, ) = await presentation_manager.create_bound_request(presentation_exchange_record) trace_msg = body.get("trace") presentation_request_message.assign_trace_decorator( context.settings, trace_msg, ) await outbound_handler(presentation_request_message, connection_id=connection_id) trace_event( context.settings, presentation_request_message, outcome="presentation_exchange_send_request.END", perf_counter=r_time, ) return web.json_response(presentation_exchange_record.serialize())
[docs]@docs(tags=["present-proof"], summary="Sends a proof presentation") @request_schema(V10PresentationRequestSchema()) @response_schema(V10PresentationExchangeSchema()) async def presentation_exchange_send_presentation(request: web.BaseRequest): """ Request handler for sending a presentation. Args: request: aiohttp request object Returns: The presentation exchange details """ r_time = get_timer() context = request.app["request_context"] outbound_handler = request.app["outbound_message_router"] presentation_exchange_id = request.match_info["pres_ex_id"] presentation_exchange_record = await V10PresentationExchange.retrieve_by_id( context, presentation_exchange_id ) body = await request.json() connection_id = presentation_exchange_record.connection_id try: connection_record = await ConnectionRecord.retrieve_by_id( context, connection_id ) except StorageNotFoundError: raise web.HTTPBadRequest() if not connection_record.is_ready: raise web.HTTPForbidden() assert ( presentation_exchange_record.state ) == V10PresentationExchange.STATE_REQUEST_RECEIVED presentation_manager = PresentationManager(context) ( presentation_exchange_record, presentation_message, ) = await presentation_manager.create_presentation( presentation_exchange_record, { "self_attested_attributes": body.get("self_attested_attributes"), "requested_attributes": body.get("requested_attributes"), "requested_predicates": body.get("requested_predicates"), }, comment=body.get("comment"), ) trace_msg = body.get("trace") presentation_message.assign_trace_decorator( context.settings, trace_msg, ) await outbound_handler(presentation_message, connection_id=connection_id) trace_event( context.settings, presentation_message, outcome="presentation_exchange_send_request.END", perf_counter=r_time, ) return web.json_response(presentation_exchange_record.serialize())
[docs]@docs(tags=["present-proof"], summary="Verify a received presentation") @response_schema(V10PresentationExchangeSchema()) async def presentation_exchange_verify_presentation(request: web.BaseRequest): """ Request handler for verifying a presentation request. Args: request: aiohttp request object Returns: The presentation exchange details """ r_time = get_timer() context = request.app["request_context"] presentation_exchange_id = request.match_info["pres_ex_id"] presentation_exchange_record = await V10PresentationExchange.retrieve_by_id( context, presentation_exchange_id ) connection_id = presentation_exchange_record.connection_id try: connection_record = await ConnectionRecord.retrieve_by_id( context, connection_id ) except StorageNotFoundError: raise web.HTTPBadRequest() if not connection_record.is_ready: raise web.HTTPForbidden() assert ( presentation_exchange_record.state ) == V10PresentationExchange.STATE_PRESENTATION_RECEIVED presentation_manager = PresentationManager(context) presentation_exchange_record = await presentation_manager.verify_presentation( presentation_exchange_record ) trace_event( context.settings, presentation_exchange_record, outcome="presentation_exchange_verify.END", perf_counter=r_time, ) return web.json_response(presentation_exchange_record.serialize())
[docs]@docs(tags=["present-proof"], summary="Remove an existing presentation exchange record") async def presentation_exchange_remove(request: web.BaseRequest): """ Request handler for removing a presentation exchange record. Args: request: aiohttp request object """ context = request.app["request_context"] presentation_exchange_id = request.match_info["pres_ex_id"] try: presentation_exchange_record = await V10PresentationExchange.retrieve_by_id( context, presentation_exchange_id ) except StorageNotFoundError: raise web.HTTPNotFound() await presentation_exchange_record.delete_record(context) return web.json_response({})
[docs]async def register(app: web.Application): """Register routes.""" app.add_routes( [ web.get("/present-proof/records", presentation_exchange_list), web.get( "/present-proof/records/{pres_ex_id}", presentation_exchange_retrieve ), web.get( "/present-proof/records/{pres_ex_id}/credentials", presentation_exchange_credentials_list, ), web.get( "/present-proof/records/{pres_ex_id}/credentials/{referent}", presentation_exchange_credentials_list, ), web.post( "/present-proof/send-proposal", presentation_exchange_send_proposal ), web.post( "/present-proof/create-request", presentation_exchange_create_request ), web.post( "/present-proof/send-request", presentation_exchange_send_free_request ), web.post( "/present-proof/records/{pres_ex_id}/send-request", presentation_exchange_send_bound_request, ), web.post( "/present-proof/records/{pres_ex_id}/send-presentation", presentation_exchange_send_presentation, ), web.post( "/present-proof/records/{pres_ex_id}/verify-presentation", presentation_exchange_verify_presentation, ), web.post( "/present-proof/records/{pres_ex_id}/remove", presentation_exchange_remove, ), ] )