Source code for aries_cloudagent.protocols.didexchange.v1_0.routes

"""Connection handling admin routes."""

import json

from aiohttp import web
from aiohttp_apispec import (
    docs,
    match_info_schema,
    querystring_schema,
    request_schema,
    response_schema,
)

from marshmallow import fields

from ....admin.request_context import AdminRequestContext
from ....connections.models.conn_record import ConnRecord, ConnRecordSchema
from ....messaging.models.base import BaseModelError
from ....messaging.models.openapi import OpenAPISchema
from ....messaging.valid import (
    ENDPOINT_EXAMPLE,
    ENDPOINT_VALIDATE,
    GENERIC_DID_EXAMPLE,
    GENERIC_DID_VALIDATE,
    UUID4_EXAMPLE,
    UUID4_VALIDATE,
)
from ....storage.error import StorageError, StorageNotFoundError
from ....wallet.error import WalletError
from .manager import DIDXManager, DIDXManagerError
from .message_types import SPEC_URI
from .messages.request import DIDXRequest, DIDXRequestSchema


[docs]class DIDXAcceptInvitationQueryStringSchema(OpenAPISchema): """Parameters and validators for accept invitation request query string.""" my_endpoint = fields.Str( required=False, validate=ENDPOINT_VALIDATE, metadata={"description": "My URL endpoint", "example": ENDPOINT_EXAMPLE}, ) my_label = fields.Str( required=False, metadata={"description": "Label for connection request", "example": "Broker"}, )
[docs]class DIDXCreateRequestImplicitQueryStringSchema(OpenAPISchema): """Parameters and validators for create-request-implicit request query string.""" their_public_did = fields.Str( required=True, allow_none=False, validate=GENERIC_DID_VALIDATE, metadata={ "description": "Qualified public DID to which to request connection", "example": GENERIC_DID_EXAMPLE, }, ) alias = fields.Str( required=False, metadata={"description": "Alias for connection", "example": "Barry"}, ) my_endpoint = fields.Str( required=False, validate=ENDPOINT_VALIDATE, metadata={"description": "My URL endpoint", "example": ENDPOINT_EXAMPLE}, ) my_label = fields.Str( required=False, metadata={"description": "Label for connection request", "example": "Broker"}, ) mediation_id = fields.Str( required=False, validate=UUID4_VALIDATE, metadata={ "description": "Identifier for active mediation record to be used", "example": UUID4_EXAMPLE, }, ) use_public_did = fields.Boolean( required=False, metadata={"description": "Use public DID for this connection"} ) goal_code = fields.Str( required=False, metadata={ "description": ( "A self-attested code the receiver may want to display to the user or" " use in automatically deciding what to do with the out-of-band message" ), "example": "issue-vc", }, ) goal = fields.Str( required=False, metadata={ "description": ( "A self-attested string that the receiver may want to display to the" " user about the context-specific goal of the out-of-band message" ), "example": "To issue a Faber College Graduate credential", }, )
[docs]class DIDXReceiveRequestImplicitQueryStringSchema(OpenAPISchema): """Parameters and validators for receive-request-implicit request query string.""" alias = fields.Str( required=False, metadata={"description": "Alias for connection", "example": "Barry"}, ) my_endpoint = fields.Str( required=False, validate=ENDPOINT_VALIDATE, metadata={"description": "My URL endpoint", "example": ENDPOINT_EXAMPLE}, ) auto_accept = fields.Boolean( required=False, metadata={"description": "Auto-accept connection (defaults to configuration)"}, ) mediation_id = fields.Str( required=False, validate=UUID4_VALIDATE, metadata={ "description": "Identifier for active mediation record to be used", "example": UUID4_EXAMPLE, }, )
[docs]class DIDXAcceptRequestQueryStringSchema(OpenAPISchema): """Parameters and validators for accept-request request query string.""" my_endpoint = fields.Str( required=False, validate=ENDPOINT_VALIDATE, metadata={"description": "My URL endpoint", "example": ENDPOINT_EXAMPLE}, ) mediation_id = fields.Str( required=False, validate=UUID4_VALIDATE, metadata={ "description": "Identifier for active mediation record to be used", "example": UUID4_EXAMPLE, }, ) use_public_did = fields.Boolean( required=False, metadata={"description": "Use public DID for this connection"} )
[docs]class DIDXConnIdMatchInfoSchema(OpenAPISchema): """Path parameters and validators for request taking connection id.""" conn_id = fields.Str( required=True, metadata={"description": "Connection identifier", "example": UUID4_EXAMPLE}, )
[docs]class DIDXConnIdRefIdMatchInfoSchema(OpenAPISchema): """Path parameters and validators for request taking connection and ref ids.""" conn_id = fields.Str( required=True, metadata={"description": "Connection identifier", "example": UUID4_EXAMPLE}, ) ref_id = fields.Str( required=True, metadata={ "description": "Inbound connection identifier", "example": UUID4_EXAMPLE, }, )
[docs]class DIDXRejectRequestSchema(OpenAPISchema): """Parameters and validators for reject-request request string.""" reason = fields.Str( description="Reason for rejecting the DID Exchange", required=False, example="Request rejected", )
@docs( tags=["did-exchange"], summary="Accept a stored connection invitation", ) @match_info_schema(DIDXConnIdMatchInfoSchema()) @querystring_schema(DIDXAcceptInvitationQueryStringSchema()) @response_schema(ConnRecordSchema(), 200, description="") async def didx_accept_invitation(request: web.BaseRequest): """Request handler for accepting a stored connection invitation. Args: request: aiohttp request object Returns: The resulting connection record details """ context: AdminRequestContext = request["context"] outbound_handler = request["outbound_message_router"] connection_id = request.match_info["conn_id"] my_label = request.query.get("my_label") or None my_endpoint = request.query.get("my_endpoint") or None mediation_id = request.query.get("mediation_id") or None profile = context.profile didx_mgr = DIDXManager(profile) try: async with profile.session() as session: conn_rec = await ConnRecord.retrieve_by_id(session, connection_id) request = await didx_mgr.create_request( conn_rec=conn_rec, my_label=my_label, my_endpoint=my_endpoint, mediation_id=mediation_id, ) result = conn_rec.serialize() except StorageNotFoundError as err: raise web.HTTPNotFound(reason=err.roll_up) from err except (StorageError, WalletError, DIDXManagerError, BaseModelError) as err: raise web.HTTPBadRequest(reason=err.roll_up) from err await outbound_handler(request, connection_id=conn_rec.connection_id) return web.json_response(result) @docs( tags=["did-exchange"], summary="Create and send a request against public DID's implicit invitation", ) @querystring_schema(DIDXCreateRequestImplicitQueryStringSchema()) @response_schema(ConnRecordSchema(), 200, description="") async def didx_create_request_implicit(request: web.BaseRequest): """Request handler for creating and sending a request to an implicit invitation. Args: request: aiohttp request object Returns: The resulting connection record details """ context: AdminRequestContext = request["context"] their_public_did = request.query.get("their_public_did") my_label = request.query.get("my_label") or None my_endpoint = request.query.get("my_endpoint") or None mediation_id = request.query.get("mediation_id") or None alias = request.query.get("alias") or None use_public_did = json.loads(request.query.get("use_public_did", "null")) goal_code = request.query.get("goal_code") or None goal = request.query.get("goal") or None profile = context.profile didx_mgr = DIDXManager(profile) try: request = await didx_mgr.create_request_implicit( their_public_did=their_public_did, my_label=my_label, my_endpoint=my_endpoint, mediation_id=mediation_id, use_public_did=use_public_did, alias=alias, goal_code=goal_code, goal=goal, ) except StorageNotFoundError as err: raise web.HTTPNotFound(reason=err.roll_up) from err except (StorageError, WalletError, DIDXManagerError, BaseModelError) as err: raise web.HTTPBadRequest(reason=err.roll_up) from err return web.json_response(request.serialize()) @docs( tags=["did-exchange"], summary="Receive request against public DID's implicit invitation", ) @querystring_schema(DIDXReceiveRequestImplicitQueryStringSchema()) @request_schema(DIDXRequestSchema()) @response_schema(ConnRecordSchema(), 200, description="") async def didx_receive_request_implicit(request: web.BaseRequest): """Request handler for receiving a request against public DID's implicit invitation. Args: request: aiohttp request object Returns: The resulting connection record details """ context: AdminRequestContext = request["context"] body = await request.json() alias = request.query.get("alias") my_endpoint = request.query.get("my_endpoint") auto_accept = json.loads(request.query.get("auto_accept", "null")) mediation_id = request.query.get("mediation_id") or None profile = context.profile didx_mgr = DIDXManager(profile) try: request = DIDXRequest.deserialize(body) conn_rec = await didx_mgr.receive_request( request=request, recipient_did=request._thread.pthid.split(":")[-1], alias=alias, my_endpoint=my_endpoint, auto_accept_implicit=auto_accept, mediation_id=mediation_id, ) result = conn_rec.serialize() except StorageNotFoundError as err: raise web.HTTPNotFound(reason=err.roll_up) from err except (StorageError, WalletError, DIDXManagerError, BaseModelError) as err: raise web.HTTPBadRequest(reason=err.roll_up) from err return web.json_response(result) @docs( tags=["did-exchange"], summary="Accept a stored connection request", ) @match_info_schema(DIDXConnIdMatchInfoSchema()) @querystring_schema(DIDXAcceptRequestQueryStringSchema()) @response_schema(ConnRecordSchema(), 200, description="") async def didx_accept_request(request: web.BaseRequest): """Request handler for accepting a stored connection request. Args: request: aiohttp request object Returns: The resulting connection record details """ context: AdminRequestContext = request["context"] outbound_handler = request["outbound_message_router"] connection_id = request.match_info["conn_id"] my_endpoint = request.query.get("my_endpoint") or None mediation_id = request.query.get("mediation_id") or None use_public_did = json.loads(request.query.get("use_public_did", "null")) profile = context.profile didx_mgr = DIDXManager(profile) try: async with profile.session() as session: conn_rec = await ConnRecord.retrieve_by_id(session, connection_id) response = await didx_mgr.create_response( conn_rec=conn_rec, my_endpoint=my_endpoint, mediation_id=mediation_id, use_public_did=use_public_did, ) result = conn_rec.serialize() except StorageNotFoundError as err: raise web.HTTPNotFound(reason=err.roll_up) from err except (StorageError, WalletError, DIDXManagerError, BaseModelError) as err: raise web.HTTPBadRequest(reason=err.roll_up) from err await outbound_handler(response, connection_id=conn_rec.connection_id) return web.json_response(result) @docs( tags=["did-exchange"], summary="Abandon or reject a DID Exchange", ) @match_info_schema(DIDXConnIdMatchInfoSchema()) @request_schema(DIDXRejectRequestSchema()) @response_schema(ConnRecordSchema(), 200, description="") async def didx_reject(request: web.BaseRequest): """Abandon or reject a DID Exchange.""" context: AdminRequestContext = request["context"] outbound_handler = request["outbound_message_router"] connection_id = request.match_info["conn_id"] body = await request.json() reason = body.get("reason") profile = context.profile didx_mgr = DIDXManager(profile) try: async with profile.session() as session: conn_rec = await ConnRecord.retrieve_by_id(session, connection_id) report = await didx_mgr.reject(conn_rec=conn_rec, reason=reason) except StorageNotFoundError as err: raise web.HTTPNotFound(reason=err.roll_up) from err except (StorageError, WalletError, DIDXManagerError, BaseModelError) as err: raise web.HTTPBadRequest(reason=err.roll_up) from err await outbound_handler(report, connection_id=conn_rec.connection_id) result = conn_rec.serialize() return web.json_response(result)
[docs]async def register(app: web.Application): """Register routes.""" app.add_routes( [ web.post( "/didexchange/{conn_id}/accept-invitation", didx_accept_invitation, ), web.post("/didexchange/create-request", didx_create_request_implicit), web.post("/didexchange/receive-request", didx_receive_request_implicit), web.post("/didexchange/{conn_id}/accept-request", didx_accept_request), web.post("/didexchange/{conn_id}/reject", didx_reject), ] )
[docs]def post_process_routes(app: web.Application): """Amend swagger API.""" # Add top-level tags description if "tags" not in app._state["swagger_dict"]: app._state["swagger_dict"]["tags"] = [] app._state["swagger_dict"]["tags"].append( { "name": "did-exchange", "description": "Connection management via DID exchange", "externalDocs": {"description": "Specification", "url": SPEC_URI}, } )