Source code for aries_cloudagent.protocols.didexchange.v1_0.routes

"""Connection handling admin routes."""

import json

from aiohttp import web
from aiohttp_apispec import (
    docs,
    match_info_schema,
    querystring_schema,
    request_schema,
    response_schema,
)

from marshmallow import fields

from ....admin.request_context import AdminRequestContext
from ....connections.models.conn_record import ConnRecord, ConnRecordSchema
from ....messaging.models.base import BaseModelError
from ....messaging.models.openapi import OpenAPISchema
from ....messaging.valid import ENDPOINT, GENERIC_DID, UUIDFour, UUID4
from ....storage.error import StorageError, StorageNotFoundError
from ....wallet.error import WalletError

from .manager import DIDXManager, DIDXManagerError
from .message_types import SPEC_URI
from .messages.request import DIDXRequest, DIDXRequestSchema


[docs]class DIDXAcceptInvitationQueryStringSchema(OpenAPISchema): """Parameters and validators for accept invitation request query string.""" my_endpoint = fields.Str(description="My URL endpoint", required=False, **ENDPOINT) my_label = fields.Str( description="Label for connection request", required=False, example="Broker" )
[docs]class DIDXCreateRequestImplicitQueryStringSchema(OpenAPISchema): """Parameters and validators for create-request-implicit request query string.""" their_public_did = fields.Str( required=True, allow_none=False, description="Qualified public DID to which to request connection", **GENERIC_DID, ) my_endpoint = fields.Str(description="My URL endpoint", required=False, **ENDPOINT) my_label = fields.Str( description="Label for connection request", required=False, example="Broker" ) mediation_id = fields.Str( required=False, description="Identifier for active mediation record to be used", **UUID4, ) use_public_did = fields.Boolean( required=False, description="Use public DID for this connection", )
[docs]class DIDXReceiveRequestImplicitQueryStringSchema(OpenAPISchema): """Parameters and validators for receive-request-implicit request query string.""" alias = fields.Str( description="Alias for connection", required=False, example="Barry", ) my_endpoint = fields.Str(description="My URL endpoint", required=False, **ENDPOINT) auto_accept = fields.Boolean( description="Auto-accept connection (defaults to configuration)", required=False, ) mediation_id = fields.Str( required=False, description="Identifier for active mediation record to be used", **UUID4, )
[docs]class DIDXAcceptRequestQueryStringSchema(OpenAPISchema): """Parameters and validators for accept-request request query string.""" my_endpoint = fields.Str(description="My URL endpoint", required=False, **ENDPOINT) mediation_id = fields.Str( required=False, description="Identifier for active mediation record to be used", **UUID4, )
[docs]class DIDXConnIdMatchInfoSchema(OpenAPISchema): """Path parameters and validators for request taking connection id.""" conn_id = fields.Str( description="Connection identifier", required=True, example=UUIDFour.EXAMPLE )
[docs]class DIDXConnIdRefIdMatchInfoSchema(OpenAPISchema): """Path parameters and validators for request taking connection and ref ids.""" conn_id = fields.Str( description="Connection identifier", required=True, example=UUIDFour.EXAMPLE ) ref_id = fields.Str( description="Inbound connection identifier", required=True, example=UUIDFour.EXAMPLE, )
[docs]@docs( tags=["did-exchange"], summary="Accept a stored connection invitation", ) @match_info_schema(DIDXConnIdMatchInfoSchema()) @querystring_schema(DIDXAcceptInvitationQueryStringSchema()) @response_schema(ConnRecordSchema(), 200, description="") async def didx_accept_invitation(request: web.BaseRequest): """ Request handler for accepting a stored connection invitation. Args: request: aiohttp request object Returns: The resulting connection record details """ context: AdminRequestContext = request["context"] outbound_handler = request["outbound_message_router"] connection_id = request.match_info["conn_id"] session = await context.session() my_label = request.query.get("my_label") or None my_endpoint = request.query.get("my_endpoint") or None mediation_id = request.query.get("mediation_id") or None didx_mgr = DIDXManager(session) try: conn_rec = await ConnRecord.retrieve_by_id(session, connection_id) request = await didx_mgr.create_request( conn_rec=conn_rec, my_label=my_label, my_endpoint=my_endpoint, mediation_id=mediation_id, ) result = conn_rec.serialize() except StorageNotFoundError as err: raise web.HTTPNotFound(reason=err.roll_up) from err except (StorageError, WalletError, DIDXManagerError, BaseModelError) as err: raise web.HTTPBadRequest(reason=err.roll_up) from err await outbound_handler(request, connection_id=conn_rec.connection_id) return web.json_response(result)
[docs]@docs( tags=["did-exchange"], summary="Create and send a request against public DID's implicit invitation", ) @querystring_schema(DIDXCreateRequestImplicitQueryStringSchema()) @response_schema(ConnRecordSchema(), 200, description="") async def didx_create_request_implicit(request: web.BaseRequest): """ Request handler for creating and sending a request to an implicit invitation. Args: request: aiohttp request object Returns: The resulting connection record details """ context: AdminRequestContext = request["context"] session = await context.session() their_public_did = request.query.get("their_public_did") my_label = request.query.get("my_label") or None my_endpoint = request.query.get("my_endpoint") or None mediation_id = request.query.get("mediation_id") or None use_public_did = json.loads(request.query.get("use_public_did", "null")) didx_mgr = DIDXManager(session) try: request = await didx_mgr.create_request_implicit( their_public_did=their_public_did, my_label=my_label, my_endpoint=my_endpoint, mediation_id=mediation_id, use_public_did=use_public_did, ) except StorageNotFoundError as err: raise web.HTTPNotFound(reason=err.roll_up) from err except (StorageError, WalletError, DIDXManagerError, BaseModelError) as err: raise web.HTTPBadRequest(reason=err.roll_up) from err return web.json_response(request.serialize())
[docs]@docs( tags=["did-exchange"], summary="Receive request against public DID's implicit invitation", ) @querystring_schema(DIDXReceiveRequestImplicitQueryStringSchema()) @request_schema(DIDXRequestSchema()) @response_schema(ConnRecordSchema(), 200, description="") async def didx_receive_request_implicit(request: web.BaseRequest): """ Request handler for receiving a request against public DID's implicit invitation. Args: request: aiohttp request object Returns: The resulting connection record details """ context: AdminRequestContext = request["context"] session = await context.session() body = await request.json() alias = request.query.get("alias") my_endpoint = request.query.get("my_endpoint") auto_accept = json.loads(request.query.get("auto_accept", "null")) mediation_id = request.query.get("mediation_id") or None didx_mgr = DIDXManager(session) try: request = DIDXRequest.deserialize(body) conn_rec = await didx_mgr.receive_request( request=request, recipient_did=request._thread.pthid.split(":")[-1], alias=alias, my_endpoint=my_endpoint, auto_accept_implicit=auto_accept, mediation_id=mediation_id, ) result = conn_rec.serialize() except StorageNotFoundError as err: raise web.HTTPNotFound(reason=err.roll_up) from err except (StorageError, WalletError, DIDXManagerError, BaseModelError) as err: raise web.HTTPBadRequest(reason=err.roll_up) from err return web.json_response(result)
[docs]@docs( tags=["did-exchange"], summary="Accept a stored connection request", ) @match_info_schema(DIDXConnIdMatchInfoSchema()) @querystring_schema(DIDXAcceptRequestQueryStringSchema()) @response_schema(ConnRecordSchema(), 200, description="") async def didx_accept_request(request: web.BaseRequest): """ Request handler for accepting a stored connection request. Args: request: aiohttp request object Returns: The resulting connection record details """ context: AdminRequestContext = request["context"] outbound_handler = request["outbound_message_router"] connection_id = request.match_info["conn_id"] session = await context.session() my_endpoint = request.query.get("my_endpoint") or None mediation_id = request.query.get("mediation_id") or None didx_mgr = DIDXManager(session) try: conn_rec = await ConnRecord.retrieve_by_id(session, connection_id) response = await didx_mgr.create_response( conn_rec=conn_rec, my_endpoint=my_endpoint, mediation_id=mediation_id, ) result = conn_rec.serialize() except StorageNotFoundError as err: raise web.HTTPNotFound(reason=err.roll_up) from err except (StorageError, WalletError, DIDXManagerError, BaseModelError) as err: raise web.HTTPBadRequest(reason=err.roll_up) from err await outbound_handler(response, connection_id=conn_rec.connection_id) return web.json_response(result)
[docs]async def register(app: web.Application): """Register routes.""" app.add_routes( [ web.post( "/didexchange/{conn_id}/accept-invitation", didx_accept_invitation, ), web.post("/didexchange/create-request", didx_create_request_implicit), web.post("/didexchange/receive-request", didx_receive_request_implicit), web.post("/didexchange/{conn_id}/accept-request", didx_accept_request), ] )
[docs]def post_process_routes(app: web.Application): """Amend swagger API.""" # Add top-level tags description if "tags" not in app._state["swagger_dict"]: app._state["swagger_dict"]["tags"] = [] app._state["swagger_dict"]["tags"].append( { "name": "did-exchange", "description": "Connection management via DID exchange", "externalDocs": {"description": "Specification", "url": SPEC_URI}, } )