Source code for aries_cloudagent.protocols.issue_credential.v1_0.routes

"""Credential exchange admin routes."""

import json

from aiohttp import web
from aiohttp_apispec import docs, request_schema, response_schema
from json.decoder import JSONDecodeError
from marshmallow import fields, Schema

from ....connections.models.connection_record import ConnectionRecord
from ....holder.base import BaseHolder
from ....issuer.indy import IssuerRevocationRegistryFullError
from ....messaging.credential_definitions.util import CRED_DEF_TAGS
from ....messaging.valid import (
    INDY_CRED_DEF_ID,
    INDY_DID,
    INDY_REV_REG_ID,
    INDY_SCHEMA_ID,
    INDY_VERSION,
    UUIDFour,
)
from ....storage.error import StorageNotFoundError

# FIXME: We shouldn't rely on a hardcoded message version here.
from ...problem_report.v1_0.message import ProblemReport

from .manager import CredentialManager
from .messages.credential_proposal import CredentialProposal
from .messages.inner.credential_preview import (
    CredentialPreview,
    CredentialPreviewSchema,
)
from .models.credential_exchange import (
    V10CredentialExchange,
    V10CredentialExchangeSchema,
)

from ....utils.tracing import trace_event, get_timer, AdminAPIMessageTracingSchema


[docs]class V10AttributeMimeTypesResultSchema(Schema): """Result schema for credential attribute MIME types by credential definition."""
[docs]class V10CredentialExchangeListResultSchema(Schema): """Result schema for Aries#0036 v1.0 credential exchange query.""" results = fields.List( fields.Nested(V10CredentialExchangeSchema), description="Aries#0036 v1.0 credential exchange records", )
[docs]class V10CredentialStoreRequestSchema(Schema): """Request schema for sending a credential store admin message.""" credential_id = fields.Str(required=False)
[docs]class V10CredentialProposalRequestSchemaBase(AdminAPIMessageTracingSchema): """Base class for request schema for sending credential proposal admin message.""" connection_id = fields.UUID( description="Connection identifier", required=True, example=UUIDFour.EXAMPLE, # typically but not necessarily a UUID4 ) cred_def_id = fields.Str( description="Credential definition identifier", required=False, **INDY_CRED_DEF_ID, ) schema_id = fields.Str( description="Schema identifier", required=False, **INDY_SCHEMA_ID ) schema_issuer_did = fields.Str( description="Schema issuer DID", required=False, **INDY_DID ) schema_name = fields.Str( description="Schema name", required=False, example="preferences" ) schema_version = fields.Str( description="Schema version", required=False, **INDY_VERSION ) issuer_did = fields.Str( description="Credential issuer DID", required=False, **INDY_DID ) auto_remove = fields.Bool( description=( "Whether to remove the credential exchange record on completion " "(overrides --preserve-exchange-records configuration setting)" ), required=False, ) comment = fields.Str(description="Human-readable comment", required=False)
[docs]class V10CredentialProposalRequestOptSchema(V10CredentialProposalRequestSchemaBase): """Request schema for sending credential proposal on optional proposal preview.""" credential_proposal = fields.Nested(CredentialPreviewSchema, required=False)
[docs]class V10CredentialProposalRequestMandSchema(V10CredentialProposalRequestSchemaBase): """Request schema for sending credential proposal on mandatory proposal preview.""" credential_proposal = fields.Nested(CredentialPreviewSchema, required=True)
[docs]class V10CredentialOfferRequestSchema(AdminAPIMessageTracingSchema): """Request schema for sending credential offer admin message.""" connection_id = fields.UUID( description="Connection identifier", required=True, example=UUIDFour.EXAMPLE, # typically but not necessarily a UUID4 ) cred_def_id = fields.Str( description="Credential definition identifier", required=True, **INDY_CRED_DEF_ID, ) auto_issue = fields.Bool( description=( "Whether to respond automatically to credential requests, creating " "and issuing requested credentials" ), required=False, ) auto_remove = fields.Bool( description=( "Whether to remove the credential exchange record on completion " "(overrides --preserve-exchange-records configuration setting)" ), required=False, default=True, ) comment = fields.Str(description="Human-readable comment", required=False) credential_preview = fields.Nested(CredentialPreviewSchema, required=True)
[docs]class V10CredentialIssueRequestSchema(Schema): """Request schema for sending credential issue admin message.""" comment = fields.Str(description="Human-readable comment", required=False) credential_preview = fields.Nested(CredentialPreviewSchema, required=True)
[docs]class V10CredentialProblemReportRequestSchema(Schema): """Request schema for sending problem report.""" explain_ltxt = fields.Str(required=True)
[docs]class V10PublishRevocationsResultSchema(Schema): """Result schema for revocation publication API call.""" results = fields.Dict( keys=fields.Str(example=INDY_REV_REG_ID["example"]), # marshmallow 3.0 ignores values=fields.List( fields.Str(description="Credential revocation identifier", example="23") ), description="Credential revocation ids published by revocation registry id", )
[docs]@docs(tags=["issue-credential"], summary="Get attribute MIME types from wallet") @response_schema(V10AttributeMimeTypesResultSchema(), 200) async def attribute_mime_types_get(request: web.BaseRequest): """ Request handler for getting credential attribute MIME types. Args: request: aiohttp request object Returns: The MIME types response """ context = request.app["request_context"] credential_id = request.match_info["credential_id"] holder: BaseHolder = await context.inject(BaseHolder) return web.json_response(await holder.get_mime_type(credential_id))
[docs]@docs(tags=["issue-credential"], summary="Fetch all credential exchange records") @response_schema(V10CredentialExchangeListResultSchema(), 200) async def credential_exchange_list(request: web.BaseRequest): """ Request handler for searching connection records. Args: request: aiohttp request object Returns: The connection list response """ context = request.app["request_context"] tag_filter = {} if "thread_id" in request.query and request.query["thread_id"] != "": tag_filter["thread_id"] = request.query["thread_id"] post_filter = {} for param_name in ("connection_id", "role", "state"): if param_name in request.query and request.query[param_name] != "": post_filter[param_name] = request.query[param_name] records = await V10CredentialExchange.query(context, tag_filter, post_filter) return web.json_response({"results": [record.serialize() for record in records]})
[docs]@docs(tags=["issue-credential"], summary="Fetch a single credential exchange record") @response_schema(V10CredentialExchangeSchema(), 200) async def credential_exchange_retrieve(request: web.BaseRequest): """ Request handler for fetching single connection record. Args: request: aiohttp request object Returns: The credential exchange record """ context = request.app["request_context"] credential_exchange_id = request.match_info["cred_ex_id"] try: record = await V10CredentialExchange.retrieve_by_id( context, credential_exchange_id ) except StorageNotFoundError: raise web.HTTPNotFound() return web.json_response(record.serialize())
[docs]@docs( tags=["issue-credential"], summary="Send holder a credential, automating entire flow", ) @request_schema(V10CredentialProposalRequestMandSchema()) @response_schema(V10CredentialExchangeSchema(), 200) async def credential_exchange_send(request: web.BaseRequest): """ Request handler for sending credential from issuer to holder from attr values. If both issuer and holder are configured for automatic responses, the operation ultimately results in credential issue; otherwise, the result waits on the first response not automated; the credential exchange record retains state regardless. Args: request: aiohttp request object Returns: The credential exchange record """ r_time = get_timer() context = request.app["request_context"] outbound_handler = request.app["outbound_message_router"] body = await request.json() comment = body.get("comment") connection_id = body.get("connection_id") preview_spec = body.get("credential_proposal") if not preview_spec: raise web.HTTPBadRequest(reason="credential_proposal must be provided") auto_remove = body.get("auto_remove") trace_msg = body.get("trace") preview = CredentialPreview.deserialize(preview_spec) try: connection_record = await ConnectionRecord.retrieve_by_id( context, connection_id ) except StorageNotFoundError: raise web.HTTPBadRequest() if not connection_record.is_ready: raise web.HTTPForbidden() credential_proposal = CredentialProposal( comment=comment, credential_proposal=preview, **{t: body.get(t) for t in CRED_DEF_TAGS if body.get(t)}, ) credential_proposal.assign_trace_decorator( context.settings, trace_msg, ) trace_event( context.settings, credential_proposal, outcome="credential_exchange_send.START", ) credential_manager = CredentialManager(context) ( credential_exchange_record, credential_offer_message, ) = await credential_manager.prepare_send( connection_id, credential_proposal=credential_proposal, auto_remove=auto_remove, ) await outbound_handler( credential_offer_message, connection_id=credential_exchange_record.connection_id ) trace_event( context.settings, credential_offer_message, outcome="credential_exchange_send.END", perf_counter=r_time, ) return web.json_response(credential_exchange_record.serialize())
[docs]@docs(tags=["issue-credential"], summary="Send issuer a credential proposal") @request_schema(V10CredentialProposalRequestOptSchema()) @response_schema(V10CredentialExchangeSchema(), 200) async def credential_exchange_send_proposal(request: web.BaseRequest): """ Request handler for sending credential proposal. Args: request: aiohttp request object Returns: The credential exchange record """ r_time = get_timer() context = request.app["request_context"] outbound_handler = request.app["outbound_message_router"] body = await request.json() connection_id = body.get("connection_id") comment = body.get("comment") preview_spec = body.get("credential_proposal") preview = CredentialPreview.deserialize(preview_spec) if preview_spec else None auto_remove = body.get("auto_remove") trace_msg = body.get("trace") try: connection_record = await ConnectionRecord.retrieve_by_id( context, connection_id ) except StorageNotFoundError: raise web.HTTPBadRequest() if not connection_record.is_ready: raise web.HTTPForbidden() credential_manager = CredentialManager(context) credential_exchange_record = await credential_manager.create_proposal( connection_id, comment=comment, credential_preview=preview, auto_remove=auto_remove, trace=trace_msg, **{t: body.get(t) for t in CRED_DEF_TAGS if body.get(t)}, ) credential_proposal = CredentialProposal.deserialize( credential_exchange_record.credential_proposal_dict ) await outbound_handler( credential_proposal, connection_id=connection_id, ) trace_event( context.settings, credential_proposal, outcome="credential_exchange_send_proposal.END", perf_counter=r_time, ) return web.json_response(credential_exchange_record.serialize())
[docs]@docs( tags=["issue-credential"], summary="Send holder a credential offer, independent of any proposal with preview", ) @request_schema(V10CredentialOfferRequestSchema()) @response_schema(V10CredentialExchangeSchema(), 200) async def credential_exchange_send_free_offer(request: web.BaseRequest): """ Request handler for sending free credential offer. An issuer initiates a such a credential offer, free from any holder-initiated corresponding credential proposal with preview. Args: request: aiohttp request object Returns: The credential exchange record """ r_time = get_timer() context = request.app["request_context"] outbound_handler = request.app["outbound_message_router"] body = await request.json() connection_id = body.get("connection_id") cred_def_id = body.get("cred_def_id") auto_issue = body.get( "auto_issue", context.settings.get("debug.auto_respond_credential_request") ) auto_remove = body.get("auto_remove") comment = body.get("comment") preview_spec = body.get("credential_preview") trace_msg = body.get("trace") if not cred_def_id: raise web.HTTPBadRequest(reason="cred_def_id is required") if auto_issue and not preview_spec: raise web.HTTPBadRequest( reason=("If auto_issue is set then credential_preview must be provided") ) try: connection_record = await ConnectionRecord.retrieve_by_id( context, connection_id ) except StorageNotFoundError: raise web.HTTPBadRequest() if not connection_record.is_ready: raise web.HTTPForbidden() if preview_spec: credential_preview = CredentialPreview.deserialize(preview_spec) credential_proposal = CredentialProposal( comment=comment, credential_proposal=credential_preview, cred_def_id=cred_def_id, ) credential_proposal.assign_trace_decorator( context.settings, trace_msg, ) credential_proposal_dict = credential_proposal.serialize() else: credential_proposal_dict = None credential_exchange_record = V10CredentialExchange( connection_id=connection_id, initiator=V10CredentialExchange.INITIATOR_SELF, credential_definition_id=cred_def_id, credential_proposal_dict=credential_proposal_dict, auto_issue=auto_issue, auto_remove=auto_remove, trace=trace_msg, ) credential_manager = CredentialManager(context) ( credential_exchange_record, credential_offer_message, ) = await credential_manager.create_offer( credential_exchange_record, comment=comment ) await outbound_handler(credential_offer_message, connection_id=connection_id) trace_event( context.settings, credential_offer_message, outcome="credential_exchange_send_free_offer.END", perf_counter=r_time, ) return web.json_response(credential_exchange_record.serialize())
[docs]@docs( tags=["issue-credential"], summary="Send holder a credential offer in reference to a proposal with preview", ) @response_schema(V10CredentialExchangeSchema(), 200) async def credential_exchange_send_bound_offer(request: web.BaseRequest): """ Request handler for sending bound credential offer. A holder initiates this sequence with a credential proposal; this message responds with an offer bound to the proposal. Args: request: aiohttp request object Returns: The credential exchange record """ r_time = get_timer() context = request.app["request_context"] outbound_handler = request.app["outbound_message_router"] credential_exchange_id = request.match_info["cred_ex_id"] credential_exchange_record = await V10CredentialExchange.retrieve_by_id( context, credential_exchange_id ) assert credential_exchange_record.state == ( V10CredentialExchange.STATE_PROPOSAL_RECEIVED ) connection_id = credential_exchange_record.connection_id try: connection_record = await ConnectionRecord.retrieve_by_id( context, connection_id ) except StorageNotFoundError: raise web.HTTPBadRequest() if not connection_record.is_ready: raise web.HTTPForbidden() credential_manager = CredentialManager(context) ( credential_exchange_record, credential_offer_message, ) = await credential_manager.create_offer(credential_exchange_record, comment=None) await outbound_handler(credential_offer_message, connection_id=connection_id) trace_event( context.settings, credential_offer_message, outcome="credential_exchange_send_bound_offer.END", perf_counter=r_time, ) return web.json_response(credential_exchange_record.serialize())
[docs]@docs(tags=["issue-credential"], summary="Send issuer a credential request") @response_schema(V10CredentialExchangeSchema(), 200) async def credential_exchange_send_request(request: web.BaseRequest): """ Request handler for sending credential request. Args: request: aiohttp request object Returns: The credential exchange record """ r_time = get_timer() context = request.app["request_context"] outbound_handler = request.app["outbound_message_router"] credential_exchange_id = request.match_info["cred_ex_id"] credential_exchange_record = await V10CredentialExchange.retrieve_by_id( context, credential_exchange_id ) connection_id = credential_exchange_record.connection_id assert credential_exchange_record.state == ( V10CredentialExchange.STATE_OFFER_RECEIVED ) try: connection_record = await ConnectionRecord.retrieve_by_id( context, connection_id ) except StorageNotFoundError: raise web.HTTPBadRequest() if not connection_record.is_ready: raise web.HTTPForbidden() credential_manager = CredentialManager(context) ( credential_exchange_record, credential_request_message, ) = await credential_manager.create_request( credential_exchange_record, connection_record.my_did ) await outbound_handler(credential_request_message, connection_id=connection_id) trace_event( context.settings, credential_request_message, outcome="credential_exchange_send_request.END", perf_counter=r_time, ) return web.json_response(credential_exchange_record.serialize())
[docs]@docs(tags=["issue-credential"], summary="Send holder a credential") @request_schema(V10CredentialIssueRequestSchema()) @response_schema(V10CredentialExchangeSchema(), 200) async def credential_exchange_issue(request: web.BaseRequest): """ Request handler for sending credential. Args: request: aiohttp request object Returns: The credential exchange record """ r_time = get_timer() context = request.app["request_context"] outbound_handler = request.app["outbound_message_router"] body = await request.json() comment = body.get("comment") preview_spec = body.get("credential_preview") if not preview_spec: raise web.HTTPBadRequest(reason="credential_preview must be provided") credential_exchange_id = request.match_info["cred_ex_id"] cred_exch_record = await V10CredentialExchange.retrieve_by_id( context, credential_exchange_id ) connection_id = cred_exch_record.connection_id assert cred_exch_record.state == V10CredentialExchange.STATE_REQUEST_RECEIVED try: connection_record = await ConnectionRecord.retrieve_by_id( context, connection_id ) except StorageNotFoundError: raise web.HTTPBadRequest() if not connection_record.is_ready: raise web.HTTPForbidden() credential_preview = CredentialPreview.deserialize(preview_spec) credential_manager = CredentialManager(context) try: ( cred_exch_record, credential_issue_message, ) = await credential_manager.issue_credential( cred_exch_record, comment=comment, credential_values=credential_preview.attr_dict(decode=False), ) except IssuerRevocationRegistryFullError: raise web.HTTPBadRequest(reason="Revocation registry is full") await outbound_handler(credential_issue_message, connection_id=connection_id) trace_event( context.settings, credential_issue_message, outcome="credential_exchange_issue.END", perf_counter=r_time, ) return web.json_response(cred_exch_record.serialize())
[docs]@docs(tags=["issue-credential"], summary="Store a received credential") @request_schema(V10CredentialStoreRequestSchema()) @response_schema(V10CredentialExchangeSchema(), 200) async def credential_exchange_store(request: web.BaseRequest): """ Request handler for storing credential. Args: request: aiohttp request object Returns: The credential exchange record """ r_time = get_timer() context = request.app["request_context"] outbound_handler = request.app["outbound_message_router"] try: body = await request.json() or {} credential_id = body.get("credential_id") except JSONDecodeError: credential_id = None credential_exchange_id = request.match_info["cred_ex_id"] credential_exchange_record = await V10CredentialExchange.retrieve_by_id( context, credential_exchange_id ) connection_id = credential_exchange_record.connection_id assert credential_exchange_record.state == ( V10CredentialExchange.STATE_CREDENTIAL_RECEIVED ) try: connection_record = await ConnectionRecord.retrieve_by_id( context, connection_id ) except StorageNotFoundError: raise web.HTTPBadRequest() if not connection_record.is_ready: raise web.HTTPForbidden() credential_manager = CredentialManager(context) ( credential_exchange_record, credential_stored_message, ) = await credential_manager.store_credential( credential_exchange_record, credential_id ) await outbound_handler(credential_stored_message, connection_id=connection_id) trace_event( context.settings, credential_stored_message, outcome="credential_exchange_store.END", perf_counter=r_time, ) return web.json_response(credential_exchange_record.serialize())
[docs]@docs( tags=["issue-credential"], parameters=[ { "name": "rev_reg_id", "in": "query", "description": "revocation registry id", "required": True, }, { "name": "cred_rev_id", "in": "query", "description": "credential revocation id", "required": True, }, { "name": "publish", "in": "query", "description": ( "(true) publish revocation to ledger immediately, or " "(false) mark it pending" ), "schema": {"type": "boolean"}, "required": False, }, ], summary="Revoke an issued credential", ) async def credential_exchange_revoke(request: web.BaseRequest): """ Request handler for storing a credential request. Args: request: aiohttp request object Returns: The credential request details. """ context = request.app["request_context"] rev_reg_id = request.query.get("rev_reg_id") cred_rev_id = request.query.get("cred_rev_id") publish = bool(json.loads(request.query.get("publish", json.dumps(False)))) credential_manager = CredentialManager(context) try: await credential_manager.revoke_credential(rev_reg_id, cred_rev_id, publish) except StorageNotFoundError: raise web.HTTPBadRequest() return web.json_response({})
[docs]@docs(tags=["issue-credential"], summary="Publish pending revocations to ledger") @response_schema(V10PublishRevocationsResultSchema(), 200) async def credential_exchange_publish_revocations(request: web.BaseRequest): """ Request handler for publishing pending revocations to the ledger. Args: request: aiohttp request object Returns: Credential revocation ids published as revoked by revocation registry id. """ context = request.app["request_context"] credential_manager = CredentialManager(context) return web.json_response( {"results": await credential_manager.publish_pending_revocations()} )
[docs]@docs( tags=["issue-credential"], summary="Remove an existing credential exchange record" ) async def credential_exchange_remove(request: web.BaseRequest): """ Request handler for removing a credential exchange record. Args: request: aiohttp request object """ context = request.app["request_context"] credential_exchange_id = request.match_info["cred_ex_id"] try: credential_exchange_record = await V10CredentialExchange.retrieve_by_id( context, credential_exchange_id ) except StorageNotFoundError: raise web.HTTPNotFound() await credential_exchange_record.delete_record(context) return web.json_response({})
[docs]@docs( tags=["issue-credential"], summary="Send a problem report for credential exchange" ) @request_schema(V10CredentialProblemReportRequestSchema()) async def credential_exchange_problem_report(request: web.BaseRequest): """ Request handler for sending problem report. Args: request: aiohttp request object """ r_time = get_timer() context = request.app["request_context"] outbound_handler = request.app["outbound_message_router"] credential_exchange_id = request.match_info["cred_ex_id"] body = await request.json() try: credential_exchange_record = await V10CredentialExchange.retrieve_by_id( context, credential_exchange_id ) except StorageNotFoundError: raise web.HTTPNotFound() error_result = ProblemReport(explain_ltxt=body["explain_ltxt"]) error_result.assign_thread_id(credential_exchange_record.thread_id) await outbound_handler( error_result, connection_id=credential_exchange_record.connection_id ) trace_event( context.settings, error_result, outcome="credential_exchange_problem_report.END", perf_counter=r_time, ) return web.json_response({})
[docs]async def register(app: web.Application): """Register routes.""" app.add_routes( [ web.get( "/issue-credential/mime-types/{credential_id}", attribute_mime_types_get ), web.get("/issue-credential/records", credential_exchange_list), web.get( "/issue-credential/records/{cred_ex_id}", credential_exchange_retrieve ), web.post("/issue-credential/send", credential_exchange_send), web.post( "/issue-credential/send-proposal", credential_exchange_send_proposal ), web.post( "/issue-credential/send-offer", credential_exchange_send_free_offer ), web.post( "/issue-credential/records/{cred_ex_id}/send-offer", credential_exchange_send_bound_offer, ), web.post( "/issue-credential/records/{cred_ex_id}/send-request", credential_exchange_send_request, ), web.post( "/issue-credential/records/{cred_ex_id}/issue", credential_exchange_issue, ), web.post( "/issue-credential/records/{cred_ex_id}/store", credential_exchange_store, ), web.post("/issue-credential/revoke", credential_exchange_revoke,), web.post( "/issue-credential/publish-revocations", credential_exchange_publish_revocations, ), web.post( "/issue-credential/records/{cred_ex_id}/remove", credential_exchange_remove, ), web.post( "/issue-credential/records/{cred_ex_id}/problem-report", credential_exchange_problem_report, ), ] )