Source code for aries_cloudagent.dispatcher

"""
The Dispatcher.

The dispatcher is responsible for coordinating data flow between handlers, providing
lifecycle hook callbacks storing state for message threads, etc.
"""

import asyncio
import logging
from typing import Coroutine, Union

from .admin.base_server import BaseAdminServer
from .config.injection_context import InjectionContext
from .messaging.agent_message import AgentMessage
from .messaging.connections.models.connection_record import ConnectionRecord
from .messaging.error import MessageParseError
from .messaging.message_delivery import MessageDelivery
from .messaging.models.base import BaseModelError
from .messaging.outbound_message import OutboundMessage
from .messaging.problem_report.message import ProblemReport
from .messaging.protocol_registry import ProtocolRegistry
from .messaging.request_context import RequestContext
from .messaging.responder import BaseResponder
from .messaging.serializer import MessageSerializer
from .messaging.util import datetime_now
from .stats import Collector


[docs]class Dispatcher: """ Dispatcher class. Class responsible for dispatching messages to message handlers and responding to other agents. """ def __init__(self, context: InjectionContext): """Initialize an instance of Dispatcher.""" self.context = context self.logger = logging.getLogger(__name__)
[docs] async def dispatch( self, parsed_msg: dict, delivery: MessageDelivery, connection: ConnectionRecord, send: Coroutine, ) -> asyncio.Future: """ Configure responder and dispatch message context to message handler. Args: parsed_msg: The parsed message body delivery: The incoming message delivery metadata connection: The related connection record, if any send: Function to send outbound messages Returns: The response from the handler """ error_result = None try: message = await self.make_message(parsed_msg) except MessageParseError as e: self.logger.error( f"Message parsing failed: {str(e)}, sending problem report" ) error_result = ProblemReport(explain_ltxt=str(e)) if delivery.thread_id: error_result.assign_thread_id(delivery.thread_id) message = None context = RequestContext(base_context=self.context) context.message = message context.message_delivery = delivery context.connection_ready = connection and connection.is_ready context.connection_record = connection responder = DispatcherResponder( send, context, connection_id=connection and connection.connection_id, reply_socket_id=delivery.socket_id, reply_to_verkey=delivery.sender_verkey, ) if error_result: return asyncio.ensure_future(responder.send_reply(error_result)) context.injector.bind_instance(BaseResponder, responder) handler_cls = context.message.Handler handler_obj = handler_cls() collector: Collector = await context.inject(Collector, required=False) if collector: collector.wrap(handler_obj, "handle", ["any-message-handler"]) handler = asyncio.ensure_future(handler_obj.handle(context, responder)) return handler
[docs] async def make_message(self, parsed_msg: dict) -> AgentMessage: """ Deserialize a message dict into the appropriate message instance. Given a dict describing a message, this method returns an instance of the related message class. Args: parsed_msg: The parsed message Returns: An instance of the corresponding message class for this message Raises: MessageParseError: If the message doesn't specify @type MessageParseError: If there is no message class registered to handle the given type """ registry: ProtocolRegistry = await self.context.inject(ProtocolRegistry) serializer: MessageSerializer = await self.context.inject(MessageSerializer) # throws a MessageParseError on failure message_type = serializer.extract_message_type(parsed_msg) message_cls = registry.resolve_message_class(message_type) if not message_cls: raise MessageParseError(f"Unrecognized message type {message_type}") try: instance = message_cls.deserialize(parsed_msg) except BaseModelError as e: raise MessageParseError(f"Error deserializing message: {e}") from e return instance
[docs]class DispatcherResponder(BaseResponder): """Handle outgoing messages from message handlers.""" def __init__(self, send: Coroutine, context: RequestContext, **kwargs): """ Initialize an instance of `DispatcherResponder`. Args: send: Function to send outbound message context: The request context of the incoming message """ super().__init__(**kwargs) self._context = context self._send = send
[docs] async def create_outbound( self, message: Union[AgentMessage, str, bytes], **kwargs ) -> OutboundMessage: """Create an OutboundMessage from a message body.""" if isinstance(message, AgentMessage) and self._context.settings.get( "timing.enabled" ): # Inject the timing decorator in_time = ( self._context.message_delivery and self._context.message_delivery.in_time ) if not message._decorators.get("timing"): message._decorators["timing"] = { "in_time": in_time, "out_time": datetime_now(), } return await super().create_outbound(message, **kwargs)
[docs] async def send_outbound(self, message: OutboundMessage): """ Send outbound message. Args: message: The `OutboundMessage` to be sent """ await self._send(message)
[docs] async def send_webhook(self, topic: str, payload: dict): """ Dispatch a webhook. Args: topic: the webhook topic identifier payload: the webhook payload value """ asyncio.ensure_future(self._dispatch_webhook(topic, payload))
async def _dispatch_webhook(self, topic: str, payload: dict): """Perform dispatch of a webhook.""" server = await self._context.inject(BaseAdminServer, required=False) if server: await server.send_webhook(topic, payload)