"""
A message responder.
The responder is provided to message handlers to enable them to send a new message
in response to the message being handled.
"""
from abc import ABC, abstractmethod
from typing import Sequence, Union
from ..core.error import BaseError
from ..connections.models.connection_target import ConnectionTarget
from ..transport.outbound.message import OutboundMessage
from .agent_message import AgentMessage
[docs]class ResponderError(BaseError):
"""Responder error."""
[docs]class BaseResponder(ABC):
"""Interface for message handlers to send responses."""
def __init__(
self,
*,
connection_id: str = None,
reply_session_id: str = None,
reply_to_verkey: str = None,
):
"""Initialize a base responder."""
self.connection_id = connection_id
self.reply_session_id = reply_session_id
self.reply_to_verkey = reply_to_verkey
[docs] async def create_outbound(
self,
message: Union[AgentMessage, str, bytes],
*,
connection_id: str = None,
reply_session_id: str = None,
reply_thread_id: str = None,
reply_to_verkey: str = None,
target: ConnectionTarget = None,
target_list: Sequence[ConnectionTarget] = None,
) -> OutboundMessage:
"""Create an OutboundMessage from a message payload."""
if isinstance(message, AgentMessage):
payload = message.to_json()
enc_payload = None
if not reply_thread_id:
reply_thread_id = message._thread_id
else:
payload = None
enc_payload = message
return OutboundMessage(
connection_id=connection_id,
enc_payload=enc_payload,
payload=payload,
reply_session_id=reply_session_id,
reply_thread_id=reply_thread_id,
reply_to_verkey=reply_to_verkey,
target=target,
target_list=target_list,
)
[docs] async def send(self, message: Union[AgentMessage, str, bytes], **kwargs):
"""Convert a message to an OutboundMessage and send it."""
outbound = await self.create_outbound(message, **kwargs)
await self.send_outbound(outbound)
[docs] async def send_reply(
self,
message: Union[AgentMessage, str, bytes],
*,
connection_id: str = None,
target: ConnectionTarget = None,
target_list: Sequence[ConnectionTarget] = None,
):
"""
Send a reply to an incoming message.
Args:
message: the `AgentMessage`, or pre-packed str or bytes to reply with
connection_id: optionally override the target connection ID
target: optionally specify a `ConnectionTarget` to send to
Raises:
ResponderError: If there is no active connection
"""
outbound = await self.create_outbound(
message,
connection_id=connection_id or self.connection_id,
reply_session_id=self.reply_session_id,
reply_to_verkey=self.reply_to_verkey,
target=target,
target_list=target_list,
)
await self.send_outbound(outbound)
[docs] @abstractmethod
async def send_outbound(self, message: OutboundMessage):
"""
Send an outbound message.
Args:
message: The `OutboundMessage` to be sent
"""
[docs] @abstractmethod
async def send_webhook(self, topic: str, payload: dict):
"""
Dispatch a webhook.
Args:
topic: the webhook topic identifier
payload: the webhook payload value
"""
[docs]class MockResponder(BaseResponder):
"""Mock responder implementation for use by tests."""
def __init__(self):
"""Initialize the mock responder."""
self.messages = []
self.webhooks = []
[docs] async def send(self, message: Union[AgentMessage, str, bytes], **kwargs):
"""Convert a message to an OutboundMessage and send it."""
self.messages.append((message, kwargs))
[docs] async def send_reply(self, message: Union[AgentMessage, str, bytes], **kwargs):
"""Send a reply to an incoming message."""
self.messages.append((message, kwargs))
[docs] async def send_outbound(self, message: OutboundMessage):
"""Send an outbound message."""
self.messages.append((message, None))
[docs] async def send_webhook(self, topic: str, payload: dict):
"""Send an outbound message."""
self.webhooks.append((topic, payload))