Source code for aries_cloudagent.protocols.trustping.v1_0.routes

"""Trust ping admin routes."""

from aiohttp import web
from aiohttp_apispec import docs, match_info_schema, request_schema, response_schema

from marshmallow import fields

from ....admin.request_context import AdminRequestContext
from ....connections.models.conn_record import ConnRecord
from ....messaging.models.openapi import OpenAPISchema
from ....messaging.valid import UUIDFour
from ....storage.error import StorageNotFoundError

from .message_types import SPEC_URI
from .messages.ping import Ping


[docs]class PingRequestSchema(OpenAPISchema): """Request schema for performing a ping.""" comment = fields.Str( description="Comment for the ping message", required=False, allow_none=True )
[docs]class PingRequestResponseSchema(OpenAPISchema): """Request schema for performing a ping.""" thread_id = fields.Str(required=False, description="Thread ID of the ping message")
[docs]class PingConnIdMatchInfoSchema(OpenAPISchema): """Path parameters and validators for request taking connection id.""" conn_id = fields.Str( description="Connection identifier", required=True, example=UUIDFour.EXAMPLE )
[docs]@docs(tags=["trustping"], summary="Send a trust ping to a connection") @match_info_schema(PingConnIdMatchInfoSchema()) @request_schema(PingRequestSchema()) @response_schema(PingRequestResponseSchema(), 200, description="") async def connections_send_ping(request: web.BaseRequest): """ Request handler for sending a trust ping to a connection. Args: request: aiohttp request object """ context: AdminRequestContext = request["context"] connection_id = request.match_info["conn_id"] outbound_handler = request["outbound_message_router"] body = await request.json() comment = body.get("comment") try: async with context.session() as session: connection = await ConnRecord.retrieve_by_id(session, connection_id) except StorageNotFoundError as err: raise web.HTTPNotFound(reason=err.roll_up) from err if not connection.is_ready: raise web.HTTPBadRequest(reason=f"Connection {connection_id} not ready") msg = Ping(comment=comment) await outbound_handler(msg, connection_id=connection_id) return web.json_response({"thread_id": msg._thread_id})
[docs]async def register(app: web.Application): """Register routes.""" app.add_routes( [web.post("/connections/{conn_id}/send-ping", connections_send_ping)] )
[docs]def post_process_routes(app: web.Application): """Amend swagger API.""" # Add top-level tags description if "tags" not in app._state["swagger_dict"]: app._state["swagger_dict"]["tags"] = [] app._state["swagger_dict"]["tags"].append( { "name": "trustping", "description": "Trust-ping over connection", "externalDocs": {"description": "Specification", "url": SPEC_URI}, } )