Source code for aries_cloudagent.transport.outbound.base

"""Base outbound transport."""

from abc import ABC, abstractmethod
import asyncio
from typing import Union

from ...connections.models.connection_target import ConnectionTarget
from ...core.profile import Profile
from ...utils.stats import Collector
from ..error import TransportError
from ..wire_format import BaseWireFormat
from .message import OutboundMessage


[docs]class QueuedOutboundMessage: """Class representing an outbound message pending delivery.""" STATE_NEW = "new" STATE_PENDING = "pending" STATE_ENCODE = "encode" STATE_DELIVER = "deliver" STATE_RETRY = "retry" STATE_DONE = "done" def __init__( self, profile: Profile, message: OutboundMessage, target: ConnectionTarget, transport_id: str, ): """Initialize the queued outbound message.""" self.profile = profile self.endpoint = target and target.endpoint self.error: Exception = None self.message = message self.payload: Union[str, bytes] = None self.retries = None self.retry_at: float = None self.state = self.STATE_NEW self.target = target self.task: asyncio.Task = None self.transport_id: str = transport_id self.metadata: dict = None self.api_key: str = None
[docs]class BaseOutboundTransport(ABC): """Base outbound transport class.""" def __init__( self, wire_format: BaseWireFormat = None, root_profile: Profile = None, ) -> None: """Initialize a `BaseOutboundTransport` instance.""" self._collector = None self._wire_format = wire_format self.root_profile = root_profile @property def collector(self) -> Collector: """Accessor for the stats collector instance.""" return self._collector @collector.setter def collector(self, coll: Collector): """Assign a new stats collector instance.""" self._collector = coll @property def wire_format(self) -> BaseWireFormat: """Accessor for a custom wire format for the transport.""" return self._wire_format @wire_format.setter def wire_format(self, format: BaseWireFormat): """Setter for a custom wire format for the transport.""" self._wire_format = format async def __aenter__(self): """Async context manager enter.""" await self.start() async def __aexit__(self, err_type, err_value, err_t): """Async context manager exit.""" if err_type and err_type != asyncio.CancelledError: self.logger.exception("Exception in outbound transport") await self.stop()
[docs] @abstractmethod async def start(self): """Start the transport."""
[docs] @abstractmethod async def stop(self): """Shut down the transport."""
[docs] @abstractmethod async def handle_message( self, profile: Profile, outbound_message: QueuedOutboundMessage, endpoint: str, metadata: dict = None, ): """Handle message. Args: profile: the profile that produced the message payload: message payload in string or byte format endpoint: URI endpoint for delivery metadata: Additional metadata associated with the payload """
[docs]class OutboundTransportError(TransportError): """Generic outbound transport error."""
[docs]class OutboundTransportRegistrationError(OutboundTransportError): """Outbound transport registration error."""
[docs]class OutboundDeliveryError(OutboundTransportError): """Base exception when a message cannot be delivered via an outbound transport."""