Source code for aries_cloudagent.messaging.trustping.routes

"""Trust ping admin routes."""

from aiohttp import web
from aiohttp_apispec import docs

from ..connections.models.connection_record import ConnectionRecord
from .messages.ping import Ping
from ...storage.error import StorageNotFoundError


[docs]@docs(tags=["trustping"], summary="Send a trust ping to a connection") async def connections_send_ping(request: web.BaseRequest): """ Request handler for sending a trust ping to a connection. Args: request: aiohttp request object """ context = request.app["request_context"] connection_id = request.match_info["id"] outbound_handler = request.app["outbound_message_router"] try: connection = await ConnectionRecord.retrieve_by_id(context, connection_id) except StorageNotFoundError: raise web.HTTPNotFound() if connection.is_ready: msg = Ping() await outbound_handler(msg, connection_id=connection_id) await connection.log_activity(context, "ping", connection.DIRECTION_SENT) return web.json_response({})
[docs]async def register(app: web.Application): """Register routes.""" app.add_routes([web.post("/connections/{id}/send-ping", connections_send_ping)])