aries_cloudagent.core package

Submodules

aries_cloudagent.core.conductor module

The Conductor.

The conductor is responsible for coordinating messages that are received over the network, communicating with the ledger, passing messages to handlers, instantiating concrete implementations of required modules and storing data in the wallet.

class aries_cloudagent.core.conductor.Conductor(context_builder: aries_cloudagent.config.base_context.ContextBuilder)[source]

Bases: object

Conductor class.

Class responsible for initializing concrete implementations of our require interfaces and routing inbound and outbound message data.

dispatch_complete(message: aries_cloudagent.transport.inbound.message.InboundMessage, completed: aries_cloudagent.utils.task_queue.CompletedTask)[source]

Handle completion of message dispatch.

get_stats() → dict[source]

Get the current stats tracked by the conductor.

handle_not_delivered(context: aries_cloudagent.config.injection_context.InjectionContext, outbound: aries_cloudagent.transport.outbound.message.OutboundMessage)[source]

Handle a message that failed delivery via outbound transports.

handle_not_returned(context: aries_cloudagent.config.injection_context.InjectionContext, outbound: aries_cloudagent.transport.outbound.message.OutboundMessage)[source]

Handle a message that failed delivery via an inbound session.

inbound_message_router(message: aries_cloudagent.transport.inbound.message.InboundMessage, can_respond: bool = False)[source]

Route inbound messages.

Parameters:
  • message – The inbound message instance
  • can_respond – If the session supports return routing
outbound_message_router(context: aries_cloudagent.config.injection_context.InjectionContext, outbound: aries_cloudagent.transport.outbound.message.OutboundMessage, inbound: aries_cloudagent.transport.inbound.message.InboundMessage = None) → None[source]

Route an outbound message.

Parameters:
  • context – The request context
  • message – An outbound message to be sent
  • inbound – The inbound message that produced this response, if available
queue_outbound(context: aries_cloudagent.config.injection_context.InjectionContext, outbound: aries_cloudagent.transport.outbound.message.OutboundMessage, inbound: aries_cloudagent.transport.inbound.message.InboundMessage = None)[source]

Queue an outbound message.

Parameters:
  • context – The request context
  • message – An outbound message to be sent
  • inbound – The inbound message that produced this response, if available
setup()[source]

Initialize the global request context.

start() → None[source]

Start the agent.

stop(timeout=1.0)[source]

Stop the agent.

webhook_router(topic: str, payload: dict, endpoint: str, max_attempts: int = None)[source]

Route a webhook through the outbound transport manager.

Parameters:
  • topic – The webhook topic
  • payload – The webhook payload
  • endpoint – The endpoint of the webhook target
  • max_attempts – The maximum number of attempts

aries_cloudagent.core.dispatcher module

The Dispatcher.

The dispatcher is responsible for coordinating data flow between handlers, providing lifecycle hook callbacks storing state for message threads, etc.

class aries_cloudagent.core.dispatcher.Dispatcher(context: aries_cloudagent.config.injection_context.InjectionContext)[source]

Bases: object

Dispatcher class.

Class responsible for dispatching messages to message handlers and responding to other agents.

complete(timeout: float = 0.1)[source]

Wait for pending tasks to complete.

handle_message(inbound_message: aries_cloudagent.transport.inbound.message.InboundMessage, send_outbound: Coroutine[T_co, T_contra, V_co], send_webhook: Coroutine[T_co, T_contra, V_co] = None)[source]

Configure responder and message context and invoke the message handler.

Parameters:
  • inbound_message – The inbound message instance
  • send_outbound – Async function to send outbound messages
  • send_webhook – Async function to dispatch a webhook
Returns:

The response from the handler

log_task(task: aries_cloudagent.utils.task_queue.CompletedTask)[source]

Log a completed task using the stats collector.

make_message(parsed_msg: dict) → aries_cloudagent.messaging.agent_message.AgentMessage[source]

Deserialize a message dict into the appropriate message instance.

Given a dict describing a message, this method returns an instance of the related message class.

Parameters:

parsed_msg – The parsed message

Returns:

An instance of the corresponding message class for this message

Raises:
  • MessageParseError – If the message doesn’t specify @type
  • MessageParseError – If there is no message class registered to handle
  • the given type
put_task(coro: Coroutine[T_co, T_contra, V_co], complete: Callable = None, ident: str = None) → aries_cloudagent.utils.task_queue.PendingTask[source]

Run a task in the task queue, potentially blocking other handlers.

queue_message(inbound_message: aries_cloudagent.transport.inbound.message.InboundMessage, send_outbound: Coroutine[T_co, T_contra, V_co], send_webhook: Coroutine[T_co, T_contra, V_co] = None, complete: Callable = None) → aries_cloudagent.utils.task_queue.PendingTask[source]

Add a message to the processing queue for handling.

Parameters:
  • inbound_message – The inbound message instance
  • send_outbound – Async function to send outbound messages
  • send_webhook – Async function to dispatch a webhook
  • complete – Function to call when the handler has completed
Returns:

A pending task instance resolving to the handler task

run_task(coro: Coroutine[T_co, T_contra, V_co], complete: Callable = None, ident: str = None) → _asyncio.Task[source]

Run a task in the task queue, potentially blocking other handlers.

setup()[source]

Perform async instance setup.

class aries_cloudagent.core.dispatcher.DispatcherResponder(context: aries_cloudagent.messaging.request_context.RequestContext, inbound_message: aries_cloudagent.transport.inbound.message.InboundMessage, send_outbound: Coroutine[T_co, T_contra, V_co], send_webhook: Coroutine[T_co, T_contra, V_co] = None, **kwargs)[source]

Bases: aries_cloudagent.messaging.responder.BaseResponder

Handle outgoing messages from message handlers.

create_outbound(message: Union[aries_cloudagent.messaging.agent_message.AgentMessage, str, bytes], **kwargs) → aries_cloudagent.transport.outbound.message.OutboundMessage[source]

Create an OutboundMessage from a message body.

Parameters:message – The message payload
send_outbound(message: aries_cloudagent.transport.outbound.message.OutboundMessage)[source]

Send outbound message.

Parameters:message – The OutboundMessage to be sent
send_webhook(topic: str, payload: dict)[source]

Dispatch a webhook.

Parameters:
  • topic – the webhook topic identifier
  • payload – the webhook payload value

aries_cloudagent.core.error module

Common exception classes.

exception aries_cloudagent.core.error.BaseError(*args, error_code: str = None, **kwargs)[source]

Bases: Exception

Generic exception class which other exceptions should inherit from.

error_code = None
message

Accessor for the error message.

exception aries_cloudagent.core.error.ProtocolDefinitionValidationError(*args, error_code: str = None, **kwargs)[source]

Bases: aries_cloudagent.core.error.BaseError

Error raised when there is a problem validating a protocol definition.

exception aries_cloudagent.core.error.ProtocolMinorVersionNotSupported(*args, error_code: str = None, **kwargs)[source]

Bases: aries_cloudagent.core.error.BaseError

Minimum minor version protocol error.

Error raised when protocol support exists but minimum minor version is higher than in @type parameter.

exception aries_cloudagent.core.error.StartupError(*args, error_code: str = None, **kwargs)[source]

Bases: aries_cloudagent.core.error.BaseError

Error raised when there is a problem starting the conductor.

aries_cloudagent.core.plugin_registry module

Handle registration of plugin modules for extending functionality.

class aries_cloudagent.core.plugin_registry.PluginRegistry[source]

Bases: object

Plugin registry for indexing application plugins.

init_context(context: aries_cloudagent.config.injection_context.InjectionContext)[source]

Call plugin setup methods on the current context.

load_protocol_version(context: aries_cloudagent.config.injection_context.InjectionContext, mod: module, version_definition: dict = None)[source]

Load a particular protocol version.

load_protocols(context: aries_cloudagent.config.injection_context.InjectionContext, plugin: module)[source]

For modules that don’t implement setup, register protocols manually.

plugin_names

Accessor for a list of all plugin modules.

plugins

Accessor for a list of all plugin modules.

register_admin_routes(app)[source]

Call route registration methods on the current context.

register_package(package_name: str) → Sequence[module][source]

Register all modules (sub-packages) under a given package name.

register_plugin(module_name: str) → module[source]

Register a plugin module.

validate_version(version_list, module_name)[source]

Validate version dict format.

aries_cloudagent.core.protocol_registry module

Handle registration and publication of supported protocols.

class aries_cloudagent.core.protocol_registry.ProtocolRegistry[source]

Bases: object

Protocol registry for indexing message families.

controllers

Accessor for a list of all protocol controller functions.

message_types

Accessor for a list of all message types.

parse_type_string(message_type)[source]

Parse message type string and return dict with info.

prepare_disclosed(context: aries_cloudagent.config.injection_context.InjectionContext, protocols: Sequence[str])[source]

Call controllers and return publicly supported message families and roles.

protocols

Accessor for a list of all message protocols.

protocols_matching_query(query: str) → Sequence[str][source]

Return a list of message protocols matching a query string.

register_controllers(*controller_sets, version_definition=None)[source]

Add new controllers.

Parameters:controller_sets – Mappings of message families to coroutines
register_message_types(*typesets, version_definition=None)[source]

Add new supported message types.

Parameters:
  • typesets – Mappings of message types to register
  • version_definition – Optional version definition dict
resolve_message_class(message_type: str) → type[source]

Resolve a message_type to a message class.

Given a message type identifier, this method returns the corresponding registered message class.

Parameters:message_type – Message type to resolve
Returns:The resolved message class