aries_cloudagent.transport.outbound package
Submodules
aries_cloudagent.transport.outbound.base module
Base outbound transport.
- class aries_cloudagent.transport.outbound.base.BaseOutboundTransport(wire_format: Optional[BaseWireFormat] = None, root_profile: Optional[Profile] = None)[source]
Bases:
ABC
Base outbound transport class.
- abstract async handle_message(profile: Profile, outbound_message: QueuedOutboundMessage, endpoint: str, metadata: Optional[dict] = None)[source]
Handle message.
- Parameters:
profile – the profile that produced the message
outbound_message – the outbound message to handle
endpoint – URI endpoint for delivery
metadata – Additional metadata associated with the payload
- property wire_format: BaseWireFormat
Accessor for a custom wire format for the transport.
- exception aries_cloudagent.transport.outbound.base.OutboundDeliveryError(*args, error_code: Optional[str] = None, **kwargs)[source]
Bases:
OutboundTransportError
Base exception when a message cannot be delivered via an outbound transport.
- exception aries_cloudagent.transport.outbound.base.OutboundTransportError(*args, error_code: Optional[str] = None, **kwargs)[source]
Bases:
TransportError
Generic outbound transport error.
- exception aries_cloudagent.transport.outbound.base.OutboundTransportRegistrationError(*args, error_code: Optional[str] = None, **kwargs)[source]
Bases:
OutboundTransportError
Outbound transport registration error.
- class aries_cloudagent.transport.outbound.base.QueuedOutboundMessage(profile: Profile, message: OutboundMessage, target: ConnectionTarget, transport_id: str)[source]
Bases:
object
Class representing an outbound message pending delivery.
- STATE_DELIVER = 'deliver'
- STATE_DONE = 'done'
- STATE_ENCODE = 'encode'
- STATE_NEW = 'new'
- STATE_PENDING = 'pending'
- STATE_RETRY = 'retry'
aries_cloudagent.transport.outbound.http module
Http outbound transport.
- class aries_cloudagent.transport.outbound.http.HttpTransport(**kwargs)[source]
Bases:
BaseOutboundTransport
Http outbound transport class.
- async handle_message(profile: Profile, payload: Union[str, bytes], endpoint: str, metadata: Optional[dict] = None, api_key: Optional[str] = None)[source]
Handle message from queue.
- Parameters:
profile – the profile that produced the message
payload – message payload in string or byte format
endpoint – URI endpoint for delivery
metadata – Additional metadata associated with the payload
api_key – API key for the endpoint
- is_external = False
- schemes = ('http', 'https')
aries_cloudagent.transport.outbound.manager module
Outbound transport manager.
- class aries_cloudagent.transport.outbound.manager.OutboundTransportManager(profile: Profile, handle_not_delivered: Optional[Callable] = None)[source]
Bases:
object
Outbound transport manager class.
- MAX_RETRY_COUNT = 4
- deliver_queued_message(queued: QueuedOutboundMessage) Task [source]
Kick off delivery of a queued message.
- async encode_outbound_message(profile: Profile, outbound: OutboundMessage, target: ConnectionTarget)[source]
Encode outbound message for the target.
- Parameters:
profile – The active profile for the request
outbound – The outbound message to deliver
target – The outbound message target
- encode_queued_message(queued: QueuedOutboundMessage) Task [source]
Kick off encoding of a queued message.
- async enqueue_message(profile: Profile, outbound: OutboundMessage)[source]
Add an outbound message to the queue.
- Parameters:
profile – The active profile for the request
outbound – The outbound message to deliver
- enqueue_webhook(topic: str, payload: dict, endpoint: str, max_attempts: Optional[int] = None, metadata: Optional[dict] = None)[source]
Add a webhook to the queue.
- Parameters:
topic – The webhook topic
payload – The webhook payload
endpoint – The webhook endpoint
max_attempts – Override the maximum number of attempts
metadata – Additional metadata associated with the payload
- Raises:
OutboundDeliveryError – if the associated transport is not running
- finished_deliver(queued: QueuedOutboundMessage, completed: CompletedTask)[source]
Handle completion of queued message delivery.
- finished_encode(queued: QueuedOutboundMessage, completed: CompletedTask)[source]
Handle completion of queued message encoding.
- get_registered_transport_for_scheme(scheme: str) str [source]
Find the registered transport ID for a given scheme.
- get_running_transport_for_endpoint(endpoint: str)[source]
Find the running transport ID to use for a given endpoint.
- get_running_transport_for_scheme(scheme: str) str [source]
Find the running transport ID for a given scheme.
- get_transport_instance(transport_id: str) BaseOutboundTransport [source]
Get an instance of a running transport by ID.
- async perform_encode(queued: QueuedOutboundMessage, wire_format: Optional[BaseWireFormat] = None)[source]
Perform message encoding.
- process_queued() Task [source]
Start the process to deliver queued messages if necessary.
Returns: the current queue processing task or None
- register(module_name: str) str [source]
Register a new outbound transport by module path.
- Parameters:
module_name – Module name to register
- Raises:
OutboundTransportRegistrationError – If the imported class cannot be located
OutboundTransportRegistrationError – If the imported class does not specify a schemes attribute
OutboundTransportRegistrationError – If the scheme has already been registered
- register_class(transport_class: Type[BaseOutboundTransport], transport_id: Optional[str] = None) str [source]
Register a new outbound transport class.
- Parameters:
transport_class (Type[BaseOutboundTransport]) – The transport class to register.
transport_id (str, optional) – The ID of the transport. If not provided, the qualified name of the transport class will be used as the ID.
- Returns:
The ID of the registered transport.
- Return type:
- Raises:
OutboundTransportRegistrationError – If the imported class does not specify a schemes attribute.
OutboundTransportRegistrationError – If the scheme has already been registered.
aries_cloudagent.transport.outbound.message module
Outbound message representation.
- class aries_cloudagent.transport.outbound.message.OutboundMessage(*, connection_id: Optional[str] = None, enc_payload: Union[str, bytes] = None, endpoint: Optional[str] = None, payload: Union[str, bytes], reply_session_id: Optional[str] = None, reply_thread_id: Optional[str] = None, reply_to_verkey: Optional[str] = None, reply_from_verkey: Optional[str] = None, target: Optional[ConnectionTarget] = None, target_list: Sequence[ConnectionTarget] = None, to_session_only: bool = False)[source]
Bases:
object
Represents an outgoing message.
aries_cloudagent.transport.outbound.status module
Enum representing captured send status of outbound messages.
- class aries_cloudagent.transport.outbound.status.OutboundSendStatus(value, names=<not given>, *values, module=None, qualname=None, type=None, start=1, boundary=None)[source]
Bases:
Enum
Send status of outbound messages.
- QUEUED_FOR_DELIVERY = 'queued_for_delivery'
- SENT_TO_EXTERNAL_QUEUE = 'sent_to_external_queue'
- SENT_TO_SESSION = 'sent_to_session'
- UNDELIVERABLE = 'undeliverable'
- WAITING_FOR_PICKUP = 'waiting_for_pickup'
- property topic
Return an event topic associated with a given status.
aries_cloudagent.transport.outbound.ws module
Websockets outbound transport.
- class aries_cloudagent.transport.outbound.ws.WsTransport(**kwargs)[source]
Bases:
BaseOutboundTransport
Websockets outbound transport class.
- async handle_message(profile: Profile, payload: Union[str, bytes], endpoint: str, metadata: Optional[dict] = None, api_key: Optional[str] = None)[source]
Handle message from queue.
- Parameters:
profile – the profile that produced the message
payload – message payload in string or byte format
endpoint – URI endpoint for delivery
metadata – Additional metadata associated with the payload
api_key – API key for the endpoint
- is_external = False
- schemes = ('ws', 'wss')