"""A message responder.
The responder is provided to message handlers to enable them to send a new message
in response to the message being handled.
"""
import asyncio
import json
from abc import ABC, abstractmethod
from typing import List, Optional, Sequence, Tuple, Union
from ..cache.base import BaseCache
from ..connections.models.conn_record import ConnRecord
from ..connections.models.connection_target import ConnectionTarget
from ..core.error import BaseError
from ..core.profile import Profile
from ..transport.outbound.message import OutboundMessage
from ..transport.outbound.status import OutboundSendStatus
from .base_message import BaseMessage
SKIP_ACTIVE_CONN_CHECK_MSG_TYPES = [
"didexchange/1.0/request",
"didexchange/1.0/response",
"didexchange/1.0/problem_report",
"connections/1.0/invitation",
"connections/1.0/request",
"connections/1.0/response",
"connections/1.0/problem_report",
]
[docs]class ResponderError(BaseError):
"""Responder error."""
[docs]class BaseResponder(ABC):
"""Interface for message handlers to send responses."""
def __init__(
self,
*,
connection_id: str = None,
reply_session_id: str = None,
reply_to_verkey: str = None,
):
"""Initialize a base responder."""
self.connection_id = connection_id
self.reply_session_id = reply_session_id
self.reply_to_verkey = reply_to_verkey
[docs] async def create_outbound(
self,
message: Union[BaseMessage, str, bytes],
*,
connection_id: str = None,
reply_session_id: str = None,
reply_thread_id: str = None,
reply_to_verkey: str = None,
reply_from_verkey: str = None,
target: ConnectionTarget = None,
target_list: Sequence[ConnectionTarget] = None,
to_session_only: bool = False,
) -> OutboundMessage:
"""Create an OutboundMessage from a message payload."""
if isinstance(message, BaseMessage):
# TODO DIDComm version selection
serialized = message.serialize()
# TODO serialized format selection?
payload = json.dumps(serialized)
enc_payload = None
if not reply_thread_id:
reply_thread_id = message._thread_id
else:
payload = None
enc_payload = message
return OutboundMessage(
connection_id=connection_id,
enc_payload=enc_payload,
payload=payload,
reply_session_id=reply_session_id,
reply_thread_id=reply_thread_id,
reply_to_verkey=reply_to_verkey,
reply_from_verkey=reply_from_verkey,
target=target,
target_list=target_list,
to_session_only=to_session_only,
)
[docs] async def send(
self, message: Union[BaseMessage, str, bytes], **kwargs
) -> OutboundSendStatus:
"""Convert a message to an OutboundMessage and send it."""
outbound = await self.create_outbound(message, **kwargs)
if isinstance(message, BaseMessage):
msg_type = message._message_type
msg_id = message._id
else:
msg_dict = json.loads(message)
msg_type = msg_dict.get("@type")
msg_id = msg_dict.get("@id")
return await self.send_outbound(
message=outbound,
message_type=msg_type,
message_id=msg_id,
)
[docs] async def send_reply(
self,
message: Union[BaseMessage, str, bytes],
*,
connection_id: str = None,
target: ConnectionTarget = None,
target_list: Sequence[ConnectionTarget] = None,
) -> OutboundSendStatus:
"""Send a reply to an incoming message.
Args:
message: the `BaseMessage`, or pre-packed str or bytes to reply with
connection_id: optionally override the target connection ID
target: optionally specify a `ConnectionTarget` to send to
Raises:
ResponderError: If there is no active connection
"""
outbound = await self.create_outbound(
message,
connection_id=connection_id or self.connection_id,
reply_session_id=self.reply_session_id,
reply_to_verkey=self.reply_to_verkey,
target=target,
target_list=target_list,
)
if isinstance(message, BaseMessage):
msg_type = message._message_type
msg_id = message._id
else:
msg_dict = json.loads(message)
msg_type = msg_dict.get("@type")
msg_id = msg_dict.get("@id")
return await self.send_outbound(
message=outbound, message_type=msg_type, message_id=msg_id
)
[docs] async def conn_rec_active_state_check(
self, profile: Profile, connection_id: str, timeout: int = 7
) -> bool:
"""Check if the connection record is ready for sending outbound message."""
async def _wait_for_state() -> Tuple[bool, Optional[str]]:
while True:
async with profile.session() as session:
conn_record = await ConnRecord.retrieve_by_id(
session, connection_id
)
if conn_record.is_ready:
# if ConnRecord.State.get(conn_record.state) in (
# ConnRecord.State.COMPLETED,
# ):
return (True, conn_record.state)
await asyncio.sleep(1)
try:
cache_key = f"conn_rec_state::{connection_id}"
connection_state = None
cache = profile.inject_or(BaseCache)
if cache:
connection_state = await cache.get(cache_key)
if connection_state and ConnRecord.State.get(connection_state) in (
ConnRecord.State.COMPLETED,
ConnRecord.State.RESPONSE,
):
return True
check_flag, connection_state = await asyncio.wait_for(
_wait_for_state(), timeout
)
if cache and connection_state:
await cache.set(cache_key, connection_state)
return check_flag
except asyncio.TimeoutError:
return False
[docs] @abstractmethod
async def send_outbound(
self, message: OutboundMessage, **kwargs
) -> OutboundSendStatus:
"""Send an outbound message.
Args:
message: The `OutboundMessage` to be sent
"""
[docs] @abstractmethod
async def send_webhook(self, topic: str, payload: dict):
"""Dispatch a webhook. DEPRECATED: use the event bus instead.
Args:
topic: the webhook topic identifier
payload: the webhook payload value
"""
[docs]class MockResponder(BaseResponder):
"""Mock responder implementation for use by tests."""
def __init__(self):
"""Initialize the mock responder."""
self.messages: List[
Tuple[Union[BaseMessage, str, bytes, OutboundMessage], Optional[dict]]
] = []
[docs] async def send(
self, message: Union[BaseMessage, str, bytes], **kwargs
) -> OutboundSendStatus:
"""Convert a message to an OutboundMessage and send it."""
self.messages.append((message, kwargs))
return OutboundSendStatus.QUEUED_FOR_DELIVERY
[docs] async def send_reply(
self, message: Union[BaseMessage, str, bytes], **kwargs
) -> OutboundSendStatus:
"""Send a reply to an incoming message."""
self.messages.append((message, kwargs))
return OutboundSendStatus.QUEUED_FOR_DELIVERY
[docs] async def send_outbound(
self, message: OutboundMessage, **kwargs
) -> OutboundSendStatus:
"""Send an outbound message."""
self.messages.append((message, None))
return OutboundSendStatus.QUEUED_FOR_DELIVERY
[docs] async def send_webhook(self, topic: str, payload: dict):
"""Send an outbound message."""
raise Exception(
"responder.send_webhook is deprecated; please use the event bus instead."
)