aries_cloudagent package

Aries Cloud Agent.

Subpackages

Submodules

aries_cloudagent.classloader module

The classloader provides utilties to dynamically load classes and modules.

class aries_cloudagent.classloader.ClassLoader(base_path, super_class)[source]

Bases: object

Class used to load classes from modules dynamically.

load(module_path, load_relative=False)[source]

Load module by module path.

Parameters:
  • module_path – Dotted path to module
  • load_relative – Should the method check in the
  • base path for relative import (configured) –
Returns:

The loaded class

Raises:
classmethod load_class(class_name: str, default_module: str = None)[source]

Resolve a complete class path (ie. typing.Dict) to the class itself.

Parameters:
  • class_name – Class name
  • default_module – (Default value = None)
Returns:

The resolved class

Raises:
classmethod load_module(mod_path: str, default_module: str = None)[source]

Resolve a complete class path (ie. typing.Dict) to the class itself.

Parameters:
  • class_name – Class name
  • default_module – (Default value = None)
Returns:

The resolved class

Raises:
exception aries_cloudagent.classloader.ClassNotFoundError(*args, error_code: str = None, **kwargs)[source]

Bases: aries_cloudagent.error.BaseError

Class not found error.

exception aries_cloudagent.classloader.ModuleLoadError(*args, error_code: str = None, **kwargs)[source]

Bases: aries_cloudagent.error.BaseError

Module load error.

aries_cloudagent.conductor module

The Conductor.

The conductor is responsible for coordinating messages that are received over the network, communicating with the ledger, passing messages to handlers, instantiating concrete implementations of required modules and storing data in the wallet.

class aries_cloudagent.conductor.Conductor(context_builder: aries_cloudagent.config.base_context.ContextBuilder)[source]

Bases: object

Conductor class.

Class responsible for initializing concrete implementations of our require interfaces and routing inbound and outbound message data.

inbound_message_router(message_body: Union[str, bytes], transport_type: str = None, socket_id: str = None, single_response: _asyncio.Future = None) → _asyncio.Future[source]

Route inbound messages.

Parameters:
  • message_body – Body of the incoming message
  • transport_type – Type of transport this message came from
  • socket_id – The identifier of the incoming socket connection
  • single_response – A future to contain the first direct response message
outbound_message_router(message: aries_cloudagent.messaging.outbound_message.OutboundMessage, context: aries_cloudagent.config.injection_context.InjectionContext = None) → None[source]

Route an outbound message.

Parameters:
  • message – An outbound message to be sent
  • context – Optional request context
prepare_outbound_message(message: aries_cloudagent.messaging.outbound_message.OutboundMessage, context: aries_cloudagent.config.injection_context.InjectionContext = None)[source]

Prepare a response message for transmission.

Parameters:
  • message – An outbound message to be sent
  • context – Optional request context
register_socket(*, handler: Coroutine[T_co, T_contra, V_co] = None, single_response: _asyncio.Future = None) → aries_cloudagent.messaging.socket.SocketRef[source]

Register a new duplex connection.

setup()[source]

Initialize the global request context.

start() → None[source]

Start the agent.

stop(timeout=0.1)[source]

Stop the agent.

aries_cloudagent.defaults module

Sane defaults for known message definitions.

aries_cloudagent.defaults.default_protocol_registry() → aries_cloudagent.messaging.protocol_registry.ProtocolRegistry[source]

Protocol registry for default message types.

aries_cloudagent.dispatcher module

The Dispatcher.

The dispatcher is responsible for coordinating data flow between handlers, providing lifecycle hook callbacks storing state for message threads, etc.

class aries_cloudagent.dispatcher.Dispatcher(context: aries_cloudagent.config.injection_context.InjectionContext)[source]

Bases: object

Dispatcher class.

Class responsible for dispatching messages to message handlers and responding to other agents.

dispatch(parsed_msg: dict, delivery: aries_cloudagent.messaging.message_delivery.MessageDelivery, connection: aries_cloudagent.messaging.connections.models.connection_record.ConnectionRecord, send: Coroutine[T_co, T_contra, V_co]) → _asyncio.Future[source]

Configure responder and dispatch message context to message handler.

Parameters:
  • parsed_msg – The parsed message body
  • delivery – The incoming message delivery metadata
  • connection – The related connection record, if any
  • send – Function to send outbound messages
Returns:

The response from the handler

make_message(parsed_msg: dict) → aries_cloudagent.messaging.agent_message.AgentMessage[source]

Deserialize a message dict into the appropriate message instance.

Given a dict describing a message, this method returns an instance of the related message class.

Parameters:

parsed_msg – The parsed message

Returns:

An instance of the corresponding message class for this message

Raises:
  • MessageParseError – If the message doesn’t specify @type
  • MessageParseError – If there is no message class registered to handle
  • the given type
class aries_cloudagent.dispatcher.DispatcherResponder(send: Coroutine[T_co, T_contra, V_co], context: aries_cloudagent.messaging.request_context.RequestContext, **kwargs)[source]

Bases: aries_cloudagent.messaging.responder.BaseResponder

Handle outgoing messages from message handlers.

create_outbound(message: Union[aries_cloudagent.messaging.agent_message.AgentMessage, str, bytes], **kwargs) → aries_cloudagent.messaging.outbound_message.OutboundMessage[source]

Create an OutboundMessage from a message body.

send_outbound(message: aries_cloudagent.messaging.outbound_message.OutboundMessage)[source]

Send outbound message.

Parameters:message – The OutboundMessage to be sent
send_webhook(topic: str, payload: dict)[source]

Dispatch a webhook.

Parameters:
  • topic – the webhook topic identifier
  • payload – the webhook payload value

aries_cloudagent.error module

Common exception classes.

exception aries_cloudagent.error.BaseError(*args, error_code: str = None, **kwargs)[source]

Bases: Exception

Generic exception class which other exceptions should inherit from.

error_code = None
message

Accessor for the error message.

exception aries_cloudagent.error.StartupError(*args, error_code: str = None, **kwargs)[source]

Bases: aries_cloudagent.error.BaseError

Error raised when there is a problem starting the conductor.

aries_cloudagent.postgres module

Utility for loading Postgres wallet plug-in.

aries_cloudagent.postgres.file_ext()[source]

Determine file extension based on platform.

aries_cloudagent.postgres.load_postgres_plugin()[source]

Load postgres dll and configure postgres wallet.

aries_cloudagent.stats module

Classes for tracking performance and timing.

class aries_cloudagent.stats.Collector(enabled: bool = True)[source]

Bases: object

Collector for a set of statistics.

enabled

Accessor for the collector’s enabled property.

extract(groups: Sequence[str] = None) → dict[source]

Extract statistics for a specific set of groups.

log(name: str, duration: float)[source]

Log an entry in the statistics if the collector is enabled.

mark(*names)[source]

Make a custom decorator function for adding to the set of groups.

reset()[source]

Reset the collector’s statistics.

results

Accessor for the current set of collected statistics.

timer(*groups)[source]

Create a new timer attached to this collector.

wrap(obj, prop_name: Union[str, Sequence[str]], groups: Sequence[str] = None, *, ignore_missing: bool = False)[source]

Wrap a method on a class or class instance.

wrap_coro(fn, groups: Sequence[str])[source]

Wrap a coroutine instance to collect timing statistics on execution.

wrap_fn(fn, groups: Sequence[str])[source]

Wrap a function instance to collect timing statistics on execution.

class aries_cloudagent.stats.Stats[source]

Bases: object

A collection of statistics.

extract(names: Sequence[str] = None) → dict[source]

Summarize the stats in a dictionary.

log(name: str, duration: float)[source]

Log an entry in the stats.

class aries_cloudagent.stats.Timer(collector: aries_cloudagent.stats.Collector, groups: Sequence[str])[source]

Bases: object

Timer instance for a running task.

classmethod now()[source]

Fetch a standard timer value.

aries_cloudagent.task_processor module

Classes for managing a limited set of concurrent tasks.

class aries_cloudagent.task_processor.PendingTask(ident, fn: Callable[[PendingTask], Awaitable[T_co]], retries: int = None, retry_delay: float = None)[source]

Bases: object

Class for tracking pending tasks.

cancel()[source]

Cancel the running task.

done()[source]

Check if the task is done.

exception()[source]

Get the exception raised by the task, if any.

result()[source]

Get the result of the task.

class aries_cloudagent.task_processor.TaskProcessor(*, max_pending: int = 10)[source]

Bases: object

Class for managing a limited set of concurrent tasks.

done()[source]

Check if the processor has any pending tasks.

ready()[source]

Check if the processor is ready.

run_retry(fn: Callable[[aries_cloudagent.task_processor.PendingTask], Awaitable[T_co]], *, ident=None, retries: int = 5, retry_delay: float = 10.0, when_ready: bool = True) → aries_cloudagent.task_processor.PendingTask[source]

Process a task and track the result.

run_task(task: Awaitable[T_co], *, ident=None, when_ready: bool = True) → aries_cloudagent.task_processor.PendingTask[source]

Run a single coroutine with no retries.

wait_done()[source]

Wait for all pending tasks to complete.

wait_ready()[source]

Wait for the processor to be ready for more tasks.

aries_cloudagent.task_processor.delay_task(delay: float, task: Awaitable[T_co])[source]

Wait a given amount of time before executing an awaitable.

aries_cloudagent.version module

Library version information.