Source code for aries_cloudagent.transport.pack_format
"""Standard packed message format classes."""
import json
import logging
from typing import Sequence, Tuple, Union
from ..config.base import InjectorError
from ..config.injection_context import InjectionContext
# FIXME: We shouldn't rely on a hardcoded message version here.
from ..protocols.routing.v1_0.messages.forward import Forward
from ..messaging.util import time_now
from ..utils.task_queue import TaskQueue
from ..wallet.base import BaseWallet
from ..wallet.error import WalletError
from .error import MessageParseError, MessageEncodeError
from .inbound.receipt import MessageReceipt
from .wire_format import BaseWireFormat
LOGGER = logging.getLogger(__name__)
[docs]class PackWireFormat(BaseWireFormat):
"""Standard DIDComm message parser and serializer."""
def __init__(self):
"""Initialize the pack wire format instance."""
super().__init__()
self.task_queue: TaskQueue = None
[docs] async def parse_message(
self, context: InjectionContext, message_body: Union[str, bytes],
) -> Tuple[dict, MessageReceipt]:
"""
Deserialize an incoming message and further populate the request context.
Args:
context: The injection context for settings and services
message_body: The body of the message
Returns:
A tuple of the parsed message and a message receipt instance
Raises:
MessageParseError: If the JSON parsing failed
MessageParseError: If a wallet is required but can't be located
"""
receipt = MessageReceipt()
receipt.in_time = time_now()
receipt.raw_message = message_body
message_dict = None
message_json = message_body
if not message_json:
raise MessageParseError("Message body is empty")
try:
message_dict = json.loads(message_json)
except ValueError:
raise MessageParseError("Message JSON parsing failed")
if not isinstance(message_dict, dict):
raise MessageParseError("Message JSON result is not an object")
# packed messages are detected by the absence of @type
if "@type" not in message_dict:
try:
unpack = self.unpack(context, message_body, receipt)
message_json = await (
self.task_queue and self.task_queue.run(unpack) or unpack
)
except MessageParseError:
LOGGER.debug("Message unpack failed, falling back to JSON")
else:
receipt.raw_message = message_json
try:
message_dict = json.loads(message_json)
except ValueError:
raise MessageParseError("Message JSON parsing failed")
if not isinstance(message_dict, dict):
raise MessageParseError("Message JSON result is not an object")
# parse thread ID
thread_dec = message_dict.get("~thread")
receipt.thread_id = (
thread_dec and thread_dec.get("thid") or message_dict.get("@id")
)
# handle transport decorator
transport_dec = message_dict.get("~transport")
if transport_dec:
receipt.direct_response_mode = transport_dec.get("return_route")
LOGGER.debug(f"Expanded message: {message_dict}")
return message_dict, receipt
[docs] async def unpack(
self,
context: InjectionContext,
message_body: Union[str, bytes],
receipt: MessageReceipt,
):
"""Look up the wallet instance and perform the message unpack."""
try:
wallet: BaseWallet = await context.inject(BaseWallet)
except InjectorError:
raise MessageParseError("Wallet not defined in request context")
try:
unpacked = await wallet.unpack_message(message_body)
(message_json, receipt.sender_verkey, receipt.recipient_verkey,) = unpacked
return message_json
except WalletError as e:
raise MessageParseError("Message unpack failed") from e
[docs] async def encode_message(
self,
context: InjectionContext,
message_json: Union[str, bytes],
recipient_keys: Sequence[str],
routing_keys: Sequence[str],
sender_key: str,
) -> Union[str, bytes]:
"""
Encode an outgoing message for transport.
Args:
context: The injection context for settings and services
message_json: The message body to serialize
recipient_keys: A sequence of recipient verkeys
routing_keys: A sequence of routing verkeys
sender_key: The verification key of the sending agent
Returns:
The encoded message
Raises:
MessageEncodeError: If the message could not be encoded
"""
if sender_key and recipient_keys:
pack = self.pack(
context, message_json, recipient_keys, routing_keys, sender_key
)
message = await (self.task_queue and self.task_queue.run(pack) or pack)
else:
message = message_json
return message
[docs] async def pack(
self,
context: InjectionContext,
message_json: Union[str, bytes],
recipient_keys: Sequence[str],
routing_keys: Sequence[str],
sender_key: str,
):
"""Look up the wallet instance and perform the message pack."""
if not sender_key or not recipient_keys:
raise MessageEncodeError("Cannot pack message without associated keys")
wallet: BaseWallet = await context.inject(BaseWallet, required=False)
if not wallet:
raise MessageEncodeError("No wallet instance")
try:
message = await wallet.pack_message(
message_json, recipient_keys, sender_key
)
except WalletError as e:
raise MessageEncodeError("Message pack failed") from e
if routing_keys:
recip_keys = recipient_keys
for router_key in routing_keys:
message = json.loads(message.decode("utf-8"))
fwd_msg = Forward(to=recip_keys[0], msg=message)
# Forwards are anon packed
recip_keys = [router_key]
try:
message = await wallet.pack_message(fwd_msg.to_json(), recip_keys)
except WalletError as e:
raise MessageEncodeError("Forward message pack failed") from e
return message