Source code for aries_cloudagent.messaging.basicmessage.routes

"""Basic message admin routes."""

from aiohttp import web
from aiohttp_apispec import docs, request_schema

from marshmallow import fields, Schema

from ...storage.error import StorageNotFoundError

from ..connections.manager import ConnectionManager
from ..connections.models.connection_record import ConnectionRecord

from .messages.basicmessage import BasicMessage


[docs]class SendMessageSchema(Schema): """Request schema for sending a message.""" content = fields.Str()
[docs]@docs(tags=["basicmessage"], summary="Send a basic message to a connection") @request_schema(SendMessageSchema()) async def connections_send_message(request: web.BaseRequest): """ Request handler for sending a basic message to a connection. Args: request: aiohttp request object """ context = request.app["request_context"] connection_id = request.match_info["id"] outbound_handler = request.app["outbound_message_router"] params = await request.json() try: connection = await ConnectionRecord.retrieve_by_id(context, connection_id) except StorageNotFoundError: raise web.HTTPNotFound() if connection.is_ready: msg = BasicMessage(content=params["content"]) await outbound_handler(msg, connection_id=connection_id) conn_mgr = ConnectionManager(context) await conn_mgr.log_activity( connection, "message", connection.DIRECTION_SENT, {"content": params["content"]}, ) return web.json_response({})
[docs]@docs(tags=["basicmessage"], summary="Expire a copyable basicmessage") async def connections_expire_message(request: web.BaseRequest): """ Request handler for sending a basic message to a connection. Args: request: aiohttp request object """ context = request.app["request_context"] connection_id = request.match_info["id"] try: connection = await ConnectionRecord.retrieve_by_id(context, connection_id) except StorageNotFoundError: raise web.HTTPNotFound() activity_id = request.match_info["activity_id"] activity = await connection.retrieve_activity(context, activity_id) meta = activity.get("meta") or {} if meta.get("copy_invite"): meta["copied"] = 1 await connection.update_activity_meta(context, activity_id, meta) return web.json_response({})
[docs]async def register(app: web.Application): """Register routes.""" app.add_routes( [web.post("/connections/{id}/send-message", connections_send_message)] ) app.add_routes( [ web.post( "/connections/{id}/expire-message/{activity_id}", connections_expire_message, ) ] )