aries_cloudagent.transport.outbound package

Submodules

aries_cloudagent.transport.outbound.base module

Base outbound transport.

class aries_cloudagent.transport.outbound.base.BaseOutboundTransport(wire_format: aries_cloudagent.transport.wire_format.BaseWireFormat = None)[source]

Bases: abc.ABC

Base outbound transport class.

collector

Accessor for the stats collector instance.

handle_message(context: aries_cloudagent.config.injection_context.InjectionContext, payload: Union[str, bytes], endpoint: str)[source]

Handle message from queue.

Parameters:
  • context – the context that produced the message
  • payload – message payload in string or byte format
  • endpoint – URI endpoint for delivery
start()[source]

Start the transport.

stop()[source]

Shut down the transport.

wire_format

Accessor for a custom wire format for the transport.

exception aries_cloudagent.transport.outbound.base.OutboundDeliveryError(*args, error_code: str = None, **kwargs)[source]

Bases: aries_cloudagent.transport.outbound.base.OutboundTransportError

Base exception when a message cannot be delivered via an outbound transport.

exception aries_cloudagent.transport.outbound.base.OutboundTransportError(*args, error_code: str = None, **kwargs)[source]

Bases: aries_cloudagent.transport.error.TransportError

Generic outbound transport error.

exception aries_cloudagent.transport.outbound.base.OutboundTransportRegistrationError(*args, error_code: str = None, **kwargs)[source]

Bases: aries_cloudagent.transport.outbound.base.OutboundTransportError

Outbound transport registration error.

aries_cloudagent.transport.outbound.http module

Http outbound transport.

class aries_cloudagent.transport.outbound.http.HttpTransport[source]

Bases: aries_cloudagent.transport.outbound.base.BaseOutboundTransport

Http outbound transport class.

handle_message(context: aries_cloudagent.config.injection_context.InjectionContext, payload: Union[str, bytes], endpoint: str)[source]

Handle message from queue.

Parameters:
  • context – the context that produced the message
  • payload – message payload in string or byte format
  • endpoint – URI endpoint for delivery
schemes = ('http', 'https')
start()[source]

Start the transport.

stop()[source]

Stop the transport.

aries_cloudagent.transport.outbound.manager module

Outbound transport manager.

class aries_cloudagent.transport.outbound.manager.OutboundTransportManager(context: aries_cloudagent.config.injection_context.InjectionContext, handle_not_delivered: Callable = None)[source]

Bases: object

Outbound transport manager class.

deliver_queued_message(queued: aries_cloudagent.transport.outbound.manager.QueuedOutboundMessage) → _asyncio.Task[source]

Kick off delivery of a queued message.

encode_queued_message(queued: aries_cloudagent.transport.outbound.manager.QueuedOutboundMessage) → _asyncio.Task[source]

Kick off encoding of a queued message.

enqueue_message(context: aries_cloudagent.config.injection_context.InjectionContext, outbound: aries_cloudagent.transport.outbound.message.OutboundMessage)[source]

Add an outbound message to the queue.

Parameters:
  • context – The context of the request
  • outbound – The outbound message to deliver
enqueue_webhook(topic: str, payload: dict, endpoint: str, max_attempts: int = None)[source]

Add a webhook to the queue.

Parameters:
  • topic – The webhook topic
  • payload – The webhook payload
  • endpoint – The webhook endpoint
  • max_attempts – Override the maximum number of attempts
Raises:

OutboundDeliveryError – if the associated transport is not running

finished_deliver(queued: aries_cloudagent.transport.outbound.manager.QueuedOutboundMessage, completed: aries_cloudagent.utils.task_queue.CompletedTask)[source]

Handle completion of queued message delivery.

finished_encode(queued: aries_cloudagent.transport.outbound.manager.QueuedOutboundMessage, completed: aries_cloudagent.utils.task_queue.CompletedTask)[source]

Handle completion of queued message encoding.

flush()[source]

Wait for any queued messages to be delivered.

get_registered_transport_for_scheme(scheme: str) → str[source]

Find the registered transport ID for a given scheme.

get_running_transport_for_endpoint(endpoint: str)[source]

Find the running transport ID to use for a given endpoint.

get_running_transport_for_scheme(scheme: str) → str[source]

Find the running transport ID for a given scheme.

get_transport_instance(transport_id: str) → aries_cloudagent.transport.outbound.base.BaseOutboundTransport[source]

Get an instance of a running transport by ID.

perform_encode(queued: aries_cloudagent.transport.outbound.manager.QueuedOutboundMessage)[source]

Perform message encoding.

process_queued() → _asyncio.Task[source]

Start the process to deliver queued messages if necessary.

Returns: the current queue processing task or None

register(module: str) → str[source]

Register a new outbound transport by module path.

Parameters:

module – Module name to register

Raises:
  • OutboundTransportRegistrationError – If the imported class cannot be located
  • OutboundTransportRegistrationError – If the imported class does not specify a schemes attribute
  • OutboundTransportRegistrationError – If the scheme has already been registered
register_class(transport_class: Type[aries_cloudagent.transport.outbound.base.BaseOutboundTransport], transport_id: str = None) → str[source]

Register a new outbound transport class.

Parameters:

transport_class – Transport class to register

Raises:
  • OutboundTransportRegistrationError – If the imported class does not specify a schemes attribute
  • OutboundTransportRegistrationError – If the scheme has already been registered
setup()[source]

Perform setup operations.

start()[source]

Start all transports and feed messages from the queue.

start_transport(transport_id: str)[source]

Start a registered transport.

stop(wait: bool = True)[source]

Stop all running transports.

class aries_cloudagent.transport.outbound.manager.QueuedOutboundMessage(context: aries_cloudagent.config.injection_context.InjectionContext, message: aries_cloudagent.transport.outbound.message.OutboundMessage, target: aries_cloudagent.connections.models.connection_target.ConnectionTarget, transport_id: str)[source]

Bases: object

Class representing an outbound message pending delivery.

STATE_DELIVER = 'deliver'
STATE_DONE = 'done'
STATE_ENCODE = 'encode'
STATE_NEW = 'new'
STATE_PENDING = 'pending'
STATE_RETRY = 'retry'

aries_cloudagent.transport.outbound.message module

Outbound message representation.

class aries_cloudagent.transport.outbound.message.OutboundMessage(*, connection_id: str = None, enc_payload: Union[str, bytes] = None, endpoint: str = None, payload: Union[str, bytes], reply_session_id: str = None, reply_thread_id: str = None, reply_to_verkey: str = None, reply_from_verkey: str = None, target: aries_cloudagent.connections.models.connection_target.ConnectionTarget = None, target_list: Sequence[aries_cloudagent.connections.models.connection_target.ConnectionTarget] = None, to_session_only: bool = False)[source]

Bases: object

Represents an outgoing message.

aries_cloudagent.transport.outbound.ws module

Websockets outbound transport.

class aries_cloudagent.transport.outbound.ws.WsTransport[source]

Bases: aries_cloudagent.transport.outbound.base.BaseOutboundTransport

Websockets outbound transport class.

handle_message(context: aries_cloudagent.config.injection_context.InjectionContext, payload: Union[str, bytes], endpoint: str)[source]

Handle message from queue.

Parameters:
  • context – the context that produced the message
  • payload – message payload in string or byte format
  • endpoint – URI endpoint for delivery
schemes = ('ws', 'wss')
start()[source]

Start the outbound transport.

stop()[source]

Stop the outbound transport.