aries_cloudagent.transport.outbound package

Submodules

aries_cloudagent.transport.outbound.base module

Base outbound transport.

class aries_cloudagent.transport.outbound.base.BaseOutboundTransport(wire_format: Optional[aries_cloudagent.transport.wire_format.BaseWireFormat] = None, root_profile: Optional[aries_cloudagent.core.profile.Profile] = None)[source]

Bases: abc.ABC

Base outbound transport class.

property collector: aries_cloudagent.utils.stats.Collector

Accessor for the stats collector instance.

abstract async handle_message(profile: aries_cloudagent.core.profile.Profile, payload: Union[str, bytes], endpoint: str, metadata: Optional[dict] = None)[source]

Handle message.

Parameters
  • profile – the profile that produced the message

  • payload – message payload in string or byte format

  • endpoint – URI endpoint for delivery

  • metadata – Additional metadata associated with the payload

abstract async start()[source]

Start the transport.

abstract async stop()[source]

Shut down the transport.

property wire_format: aries_cloudagent.transport.wire_format.BaseWireFormat

Accessor for a custom wire format for the transport.

exception aries_cloudagent.transport.outbound.base.OutboundDeliveryError(*args, error_code: Optional[str] = None, **kwargs)[source]

Bases: aries_cloudagent.transport.outbound.base.OutboundTransportError

Base exception when a message cannot be delivered via an outbound transport.

exception aries_cloudagent.transport.outbound.base.OutboundTransportError(*args, error_code: Optional[str] = None, **kwargs)[source]

Bases: aries_cloudagent.transport.error.TransportError

Generic outbound transport error.

exception aries_cloudagent.transport.outbound.base.OutboundTransportRegistrationError(*args, error_code: Optional[str] = None, **kwargs)[source]

Bases: aries_cloudagent.transport.outbound.base.OutboundTransportError

Outbound transport registration error.

aries_cloudagent.transport.outbound.http module

Http outbound transport.

class aries_cloudagent.transport.outbound.http.HttpTransport(**kwargs)[source]

Bases: aries_cloudagent.transport.outbound.base.BaseOutboundTransport

Http outbound transport class.

async handle_message(profile: aries_cloudagent.core.profile.Profile, payload: Union[str, bytes], endpoint: str, metadata: Optional[dict] = None, api_key: Optional[str] = None)[source]

Handle message from queue.

Parameters
  • profile – the profile that produced the message

  • payload – message payload in string or byte format

  • endpoint – URI endpoint for delivery

  • metadata – Additional metadata associated with the payload

is_external = False
schemes = ('http', 'https')
async start()[source]

Start the transport.

async stop()[source]

Stop the transport.

aries_cloudagent.transport.outbound.manager module

aries_cloudagent.transport.outbound.message module

aries_cloudagent.transport.outbound.status module

Enum representing captured send status of outbound messages.

class aries_cloudagent.transport.outbound.status.OutboundSendStatus(value)[source]

Bases: enum.Enum

Send status of outbound messages.

QUEUED_FOR_DELIVERY = 'queued_for_delivery'
SENT_TO_EXTERNAL_QUEUE = 'sent_to_external_queue'
SENT_TO_SESSION = 'sent_to_session'
UNDELIVERABLE = 'undeliverable'
WAITING_FOR_PICKUP = 'waiting_for_pickup'
property topic

Return an event topic associated with a given status.

aries_cloudagent.transport.outbound.ws module

Websockets outbound transport.

class aries_cloudagent.transport.outbound.ws.WsTransport(**kwargs)[source]

Bases: aries_cloudagent.transport.outbound.base.BaseOutboundTransport

Websockets outbound transport class.

async handle_message(profile: aries_cloudagent.core.profile.Profile, payload: Union[str, bytes], endpoint: str, metadata: Optional[dict] = None, api_key: Optional[str] = None)[source]

Handle message from queue.

Parameters
  • profile – the profile that produced the message

  • payload – message payload in string or byte format

  • endpoint – URI endpoint for delivery

  • metadata – Additional metadata associated with the payload

is_external = False
schemes = ('ws', 'wss')
async start()[source]

Start the outbound transport.

async stop()[source]

Stop the outbound transport.