Source code for aries_cloudagent.transport.wire_format

"""Abstract wire format classes."""

import json
import logging

from abc import abstractmethod
from typing import Sequence, Tuple, Union

from ..config.injection_context import InjectionContext
from ..messaging.util import time_now

from .inbound.receipt import MessageReceipt
from .error import MessageParseError

LOGGER = logging.getLogger(__name__)


[docs]class BaseWireFormat: """Abstract messaging wire format.""" def __init__(self): """Initialize the base wire format instance."""
[docs] @abstractmethod async def parse_message( self, context: InjectionContext, message_body: Union[str, bytes], ) -> Tuple[dict, MessageReceipt]: """ Deserialize an incoming message and further populate the request context. Args: context: The injection context for settings and services message_body: The body of the message Returns: A tuple of the parsed message and a message receipt instance Raises: MessageParseError: If the message can't be parsed """
[docs] @abstractmethod async def encode_message( self, context: InjectionContext, message_json: Union[str, bytes], recipient_keys: Sequence[str], routing_keys: Sequence[str], sender_key: str, ) -> Union[str, bytes]: """ Encode an outgoing message for transport. Args: context: The injection context for settings and services message_json: The message body to serialize recipient_keys: A sequence of recipient verkeys routing_keys: A sequence of routing verkeys sender_key: The verification key of the sending agent Returns: The encoded message Raises: MessageEncodeError: If the message could not be encoded """
[docs]class JsonWireFormat(BaseWireFormat): """Unencrypted wire format."""
[docs] @abstractmethod async def parse_message( self, context: InjectionContext, message_body: Union[str, bytes], ) -> Tuple[dict, MessageReceipt]: """ Deserialize an incoming message and further populate the request context. Args: context: The injection context for settings and services message_body: The body of the message Returns: A tuple of the parsed message and a message receipt instance Raises: MessageParseError: If the JSON parsing failed """ receipt = MessageReceipt() receipt.in_time = time_now() receipt.raw_message = message_body message_dict = None message_json = message_body if not message_json: raise MessageParseError("Message body is empty") try: message_dict = json.loads(message_json) except ValueError: raise MessageParseError("Message JSON parsing failed") if not isinstance(message_dict, dict): raise MessageParseError("Message JSON result is not an object") # parse thread ID thread_dec = message_dict.get("~thread") receipt.thread_id = ( thread_dec and thread_dec.get("thid") or message_dict.get("@id") ) # handle transport decorator transport_dec = message_dict.get("~transport") if transport_dec: receipt.direct_response_mode = transport_dec.get("return_route") LOGGER.debug(f"Expanded message: {message_dict}") return message_dict, receipt
[docs] @abstractmethod async def encode_message( self, context: InjectionContext, message_json: Union[str, bytes], recipient_keys: Sequence[str], routing_keys: Sequence[str], sender_key: str, ) -> Union[str, bytes]: """ Encode an outgoing message for transport. Args: context: The injection context for settings and services message_json: The message body to serialize recipient_keys: A sequence of recipient verkeys routing_keys: A sequence of routing verkeys sender_key: The verification key of the sending agent Returns: The encoded message Raises: MessageEncodeError: If the message could not be encoded """ return message_json