aries_cloudagent.transport.outbound package

Submodules

aries_cloudagent.transport.outbound.base module

Base outbound transport.

class aries_cloudagent.transport.outbound.base.BaseOutboundTransport(wire_format: Optional[BaseWireFormat] = None, root_profile: Optional[Profile] = None)[source]

Bases: ABC

Base outbound transport class.

property collector: Collector

Accessor for the stats collector instance.

abstract async handle_message(profile: Profile, outbound_message: QueuedOutboundMessage, endpoint: str, metadata: Optional[dict] = None)[source]

Handle message.

Parameters:
  • profile – the profile that produced the message

  • outbound_message – the outbound message to handle

  • endpoint – URI endpoint for delivery

  • metadata – Additional metadata associated with the payload

abstract async start()[source]

Start the transport.

abstract async stop()[source]

Shut down the transport.

property wire_format: BaseWireFormat

Accessor for a custom wire format for the transport.

exception aries_cloudagent.transport.outbound.base.OutboundDeliveryError(*args, error_code: Optional[str] = None, **kwargs)[source]

Bases: OutboundTransportError

Base exception when a message cannot be delivered via an outbound transport.

exception aries_cloudagent.transport.outbound.base.OutboundTransportError(*args, error_code: Optional[str] = None, **kwargs)[source]

Bases: TransportError

Generic outbound transport error.

exception aries_cloudagent.transport.outbound.base.OutboundTransportRegistrationError(*args, error_code: Optional[str] = None, **kwargs)[source]

Bases: OutboundTransportError

Outbound transport registration error.

class aries_cloudagent.transport.outbound.base.QueuedOutboundMessage(profile: Profile, message: OutboundMessage, target: ConnectionTarget, transport_id: str)[source]

Bases: object

Class representing an outbound message pending delivery.

STATE_DELIVER = 'deliver'
STATE_DONE = 'done'
STATE_ENCODE = 'encode'
STATE_NEW = 'new'
STATE_PENDING = 'pending'
STATE_RETRY = 'retry'

aries_cloudagent.transport.outbound.http module

Http outbound transport.

class aries_cloudagent.transport.outbound.http.HttpTransport(**kwargs)[source]

Bases: BaseOutboundTransport

Http outbound transport class.

async handle_message(profile: Profile, payload: Union[str, bytes], endpoint: str, metadata: Optional[dict] = None, api_key: Optional[str] = None)[source]

Handle message from queue.

Parameters:
  • profile – the profile that produced the message

  • payload – message payload in string or byte format

  • endpoint – URI endpoint for delivery

  • metadata – Additional metadata associated with the payload

  • api_key – API key for the endpoint

is_external = False
schemes = ('http', 'https')
async start()[source]

Start the transport.

async stop()[source]

Stop the transport.

aries_cloudagent.transport.outbound.manager module

Outbound transport manager.

class aries_cloudagent.transport.outbound.manager.OutboundTransportManager(profile: Profile, handle_not_delivered: Optional[Callable] = None)[source]

Bases: object

Outbound transport manager class.

MAX_RETRY_COUNT = 4
deliver_queued_message(queued: QueuedOutboundMessage) Task[source]

Kick off delivery of a queued message.

async encode_outbound_message(profile: Profile, outbound: OutboundMessage, target: ConnectionTarget)[source]

Encode outbound message for the target.

Parameters:
  • profile – The active profile for the request

  • outbound – The outbound message to deliver

  • target – The outbound message target

encode_queued_message(queued: QueuedOutboundMessage) Task[source]

Kick off encoding of a queued message.

async enqueue_message(profile: Profile, outbound: OutboundMessage)[source]

Add an outbound message to the queue.

Parameters:
  • profile – The active profile for the request

  • outbound – The outbound message to deliver

enqueue_webhook(topic: str, payload: dict, endpoint: str, max_attempts: Optional[int] = None, metadata: Optional[dict] = None)[source]

Add a webhook to the queue.

Parameters:
  • topic – The webhook topic

  • payload – The webhook payload

  • endpoint – The webhook endpoint

  • max_attempts – Override the maximum number of attempts

  • metadata – Additional metadata associated with the payload

Raises:

OutboundDeliveryError – if the associated transport is not running

finished_deliver(queued: QueuedOutboundMessage, completed: CompletedTask)[source]

Handle completion of queued message delivery.

finished_encode(queued: QueuedOutboundMessage, completed: CompletedTask)[source]

Handle completion of queued message encoding.

async flush()[source]

Wait for any queued messages to be delivered.

get_external_running_transport() str[source]

Find the external running transport ID.

get_registered_transport_for_scheme(scheme: str) str[source]

Find the registered transport ID for a given scheme.

get_running_transport_for_endpoint(endpoint: str)[source]

Find the running transport ID to use for a given endpoint.

get_running_transport_for_scheme(scheme: str) str[source]

Find the running transport ID for a given scheme.

get_transport_instance(transport_id: str) BaseOutboundTransport[source]

Get an instance of a running transport by ID.

async perform_encode(queued: QueuedOutboundMessage, wire_format: Optional[BaseWireFormat] = None)[source]

Perform message encoding.

process_queued() Task[source]

Start the process to deliver queued messages if necessary.

Returns: the current queue processing task or None

register(module_name: str) str[source]

Register a new outbound transport by module path.

Parameters:

module_name – Module name to register

Raises:
  • OutboundTransportRegistrationError – If the imported class cannot be located

  • OutboundTransportRegistrationError – If the imported class does not specify a schemes attribute

  • OutboundTransportRegistrationError – If the scheme has already been registered

register_class(transport_class: Type[BaseOutboundTransport], transport_id: Optional[str] = None) str[source]

Register a new outbound transport class.

Parameters:
  • transport_class (Type[BaseOutboundTransport]) – The transport class to register.

  • transport_id (str, optional) – The ID of the transport. If not provided, the qualified name of the transport class will be used as the ID.

Returns:

The ID of the registered transport.

Return type:

str

Raises:
  • OutboundTransportRegistrationError – If the imported class does not specify a schemes attribute.

  • OutboundTransportRegistrationError – If the scheme has already been registered.

async setup()[source]

Perform setup operations.

async start()[source]

Start all transports and feed messages from the queue.

async start_transport(transport_id: str)[source]

Start a registered transport.

async stop(wait: bool = True)[source]

Stop all running transports.

aries_cloudagent.transport.outbound.message module

Outbound message representation.

class aries_cloudagent.transport.outbound.message.OutboundMessage(*, connection_id: Optional[str] = None, enc_payload: Union[str, bytes] = None, endpoint: Optional[str] = None, payload: Union[str, bytes], reply_session_id: Optional[str] = None, reply_thread_id: Optional[str] = None, reply_to_verkey: Optional[str] = None, reply_from_verkey: Optional[str] = None, target: Optional[ConnectionTarget] = None, target_list: Sequence[ConnectionTarget] = None, to_session_only: bool = False)[source]

Bases: object

Represents an outgoing message.

aries_cloudagent.transport.outbound.status module

Enum representing captured send status of outbound messages.

class aries_cloudagent.transport.outbound.status.OutboundSendStatus(value, names=<not given>, *values, module=None, qualname=None, type=None, start=1, boundary=None)[source]

Bases: Enum

Send status of outbound messages.

QUEUED_FOR_DELIVERY = 'queued_for_delivery'
SENT_TO_EXTERNAL_QUEUE = 'sent_to_external_queue'
SENT_TO_SESSION = 'sent_to_session'
UNDELIVERABLE = 'undeliverable'
WAITING_FOR_PICKUP = 'waiting_for_pickup'
property topic

Return an event topic associated with a given status.

aries_cloudagent.transport.outbound.ws module

Websockets outbound transport.

class aries_cloudagent.transport.outbound.ws.WsTransport(**kwargs)[source]

Bases: BaseOutboundTransport

Websockets outbound transport class.

async handle_message(profile: Profile, payload: Union[str, bytes], endpoint: str, metadata: Optional[dict] = None, api_key: Optional[str] = None)[source]

Handle message from queue.

Parameters:
  • profile – the profile that produced the message

  • payload – message payload in string or byte format

  • endpoint – URI endpoint for delivery

  • metadata – Additional metadata associated with the payload

  • api_key – API key for the endpoint

is_external = False
schemes = ('ws', 'wss')
async start()[source]

Start the outbound transport.

async stop()[source]

Stop the outbound transport.