aries_cloudagent.core package

Submodules

aries_cloudagent.core.conductor module

The Conductor.

The conductor is responsible for coordinating messages that are received over the network, communicating with the ledger, passing messages to handlers, instantiating concrete implementations of required modules and storing data in the wallet.

class aries_cloudagent.core.conductor.Conductor(context_builder: aries_cloudagent.config.base_context.ContextBuilder)[source]

Bases: object

Conductor class.

Class responsible for initializing concrete implementations of our require interfaces and routing inbound and outbound message data.

context

Accessor for the injection context.

dispatch_complete(message: aries_cloudagent.transport.inbound.message.InboundMessage, completed: aries_cloudagent.utils.task_queue.CompletedTask)[source]

Handle completion of message dispatch.

get_stats() → dict[source]

Get the current stats tracked by the conductor.

handle_not_delivered(profile: aries_cloudagent.core.profile.Profile, outbound: aries_cloudagent.transport.outbound.message.OutboundMessage) → aries_cloudagent.transport.outbound.status.OutboundSendStatus[source]

Handle a message that failed delivery via outbound transports.

handle_not_returned(profile: aries_cloudagent.core.profile.Profile, outbound: aries_cloudagent.transport.outbound.message.OutboundMessage)[source]

Handle a message that failed delivery via an inbound session.

inbound_message_router(profile: aries_cloudagent.core.profile.Profile, message: aries_cloudagent.transport.inbound.message.InboundMessage, can_respond: bool = False)[source]

Route inbound messages.

Parameters:
  • context – The context associated with the inbound message
  • message – The inbound message instance
  • can_respond – If the session supports return routing
outbound_message_router(profile: aries_cloudagent.core.profile.Profile, outbound: aries_cloudagent.transport.outbound.message.OutboundMessage, inbound: aries_cloudagent.transport.inbound.message.InboundMessage = None) → aries_cloudagent.transport.outbound.status.OutboundSendStatus[source]

Route an outbound message.

Parameters:
  • profile – The active profile for the request
  • message – An outbound message to be sent
  • inbound – The inbound message that produced this response, if available
queue_outbound(profile: aries_cloudagent.core.profile.Profile, outbound: aries_cloudagent.transport.outbound.message.OutboundMessage, inbound: aries_cloudagent.transport.inbound.message.InboundMessage = None) → aries_cloudagent.transport.outbound.status.OutboundSendStatus[source]

Queue an outbound message for transport.

Parameters:
  • profile – The active profile
  • message – An outbound message to be sent
  • inbound – The inbound message that produced this response, if available
setup()[source]

Initialize the global request context.

start() → None[source]

Start the agent.

stop(timeout=1.0)[source]

Stop the agent.

webhook_router(topic: str, payload: dict, endpoint: str, max_attempts: int = None, metadata: dict = None)[source]

Route a webhook through the outbound transport manager.

Parameters:
  • topic – The webhook topic
  • payload – The webhook payload
  • endpoint – The endpoint of the webhook target
  • max_attempts – The maximum number of attempts
  • metadata – Additional metadata associated with the payload

aries_cloudagent.core.dispatcher module

The Dispatcher.

The dispatcher is responsible for coordinating data flow between handlers, providing lifecycle hook callbacks storing state for message threads, etc.

class aries_cloudagent.core.dispatcher.Dispatcher(profile: aries_cloudagent.core.profile.Profile)[source]

Bases: object

Dispatcher class.

Class responsible for dispatching messages to message handlers and responding to other agents.

complete(timeout: float = 0.1)[source]

Wait for pending tasks to complete.

handle_message(profile: aries_cloudagent.core.profile.Profile, inbound_message: aries_cloudagent.transport.inbound.message.InboundMessage, send_outbound: Coroutine[T_co, T_contra, V_co])[source]

Configure responder and message context and invoke the message handler.

Parameters:
  • profile – The profile associated with the inbound message
  • inbound_message – The inbound message instance
  • send_outbound – Async function to send outbound messages
Returns:

The response from the handler

log_task(task: aries_cloudagent.utils.task_queue.CompletedTask)[source]

Log a completed task using the stats collector.

make_message(parsed_msg: dict) → aries_cloudagent.messaging.base_message.BaseMessage[source]

Deserialize a message dict into the appropriate message instance.

Given a dict describing a message, this method returns an instance of the related message class.

Parameters:

parsed_msg – The parsed message

Returns:

An instance of the corresponding message class for this message

Raises:
  • MessageParseError – If the message doesn’t specify @type
  • MessageParseError – If there is no message class registered to handle
  • the given type
put_task(coro: Coroutine[T_co, T_contra, V_co], complete: Callable = None, ident: str = None) → aries_cloudagent.utils.task_queue.PendingTask[source]

Run a task in the task queue, potentially blocking other handlers.

queue_message(profile: aries_cloudagent.core.profile.Profile, inbound_message: aries_cloudagent.transport.inbound.message.InboundMessage, send_outbound: Coroutine[T_co, T_contra, V_co], complete: Callable = None) → aries_cloudagent.utils.task_queue.PendingTask[source]

Add a message to the processing queue for handling.

Parameters:
  • profile – The profile associated with the inbound message
  • inbound_message – The inbound message instance
  • send_outbound – Async function to send outbound messages
  • complete – Function to call when the handler has completed
Returns:

A pending task instance resolving to the handler task

run_task(coro: Coroutine[T_co, T_contra, V_co], complete: Callable = None, ident: str = None) → _asyncio.Task[source]

Run a task in the task queue, potentially blocking other handlers.

setup()[source]

Perform async instance setup.

class aries_cloudagent.core.dispatcher.DispatcherResponder(context: aries_cloudagent.messaging.request_context.RequestContext, inbound_message: aries_cloudagent.transport.inbound.message.InboundMessage, send_outbound: Coroutine[T_co, T_contra, V_co], **kwargs)[source]

Bases: aries_cloudagent.messaging.responder.BaseResponder

Handle outgoing messages from message handlers.

create_outbound(message: Union[aries_cloudagent.messaging.agent_message.AgentMessage, aries_cloudagent.messaging.base_message.BaseMessage, str, bytes], **kwargs) → aries_cloudagent.transport.outbound.message.OutboundMessage[source]

Create an OutboundMessage from a message body.

Parameters:message – The message payload
send_outbound(message: aries_cloudagent.transport.outbound.message.OutboundMessage) → aries_cloudagent.transport.outbound.status.OutboundSendStatus[source]

Send outbound message.

Parameters:message – The OutboundMessage to be sent
send_webhook(topic: str, payload: dict)[source]

Dispatch a webhook. DEPRECATED: use the event bus instead.

Parameters:
  • topic – the webhook topic identifier
  • payload – the webhook payload value
exception aries_cloudagent.core.dispatcher.ProblemReportParseError(*args, error_code: str = None, **kwargs)[source]

Bases: aries_cloudagent.messaging.error.MessageParseError

Error to raise on failure to parse problem-report message.

aries_cloudagent.core.error module

Common exception classes.

exception aries_cloudagent.core.error.BaseError(*args, error_code: str = None, **kwargs)[source]

Bases: Exception

Generic exception class which other exceptions should inherit from.

message

Accessor for the error message.

roll_up

Accessor for nested error messages rolled into one line.

For display: aiohttp.web errors truncate after newline.

exception aries_cloudagent.core.error.ProfileDuplicateError(*args, error_code: str = None, **kwargs)[source]

Bases: aries_cloudagent.core.error.ProfileError

Profile with the given name already exists.

exception aries_cloudagent.core.error.ProfileError(*args, error_code: str = None, **kwargs)[source]

Bases: aries_cloudagent.core.error.BaseError

Base error for profile operations.

exception aries_cloudagent.core.error.ProfileNotFoundError(*args, error_code: str = None, **kwargs)[source]

Bases: aries_cloudagent.core.error.ProfileError

Requested profile was not found.

exception aries_cloudagent.core.error.ProfileSessionInactiveError(*args, error_code: str = None, **kwargs)[source]

Bases: aries_cloudagent.core.error.ProfileError

Error raised when a profile session is not currently active.

exception aries_cloudagent.core.error.ProtocolDefinitionValidationError(*args, error_code: str = None, **kwargs)[source]

Bases: aries_cloudagent.core.error.BaseError

Error raised when there is a problem validating a protocol definition.

exception aries_cloudagent.core.error.ProtocolMinorVersionNotSupported(*args, error_code: str = None, **kwargs)[source]

Bases: aries_cloudagent.core.error.BaseError

Minimum minor version protocol error.

Error raised when protocol support exists but minimum minor version is higher than in @type parameter.

exception aries_cloudagent.core.error.StartupError(*args, error_code: str = None, **kwargs)[source]

Bases: aries_cloudagent.core.error.BaseError

Error raised when there is a problem starting the conductor.

aries_cloudagent.core.in_memory module

Manage in-memory profile interaction.

class aries_cloudagent.core.in_memory.InMemoryProfile(*, context: aries_cloudagent.config.injection_context.InjectionContext = None, name: str = None)[source]

Bases: aries_cloudagent.core.profile.Profile

Provide access to in-memory profile management.

Used primarily for testing.

BACKEND_NAME = 'in_memory'
TEST_PROFILE_NAME = 'test-profile'
bind_providers()[source]

Initialize the profile-level instance providers.

session(context: aries_cloudagent.config.injection_context.InjectionContext = None) → aries_cloudagent.core.profile.ProfileSession[source]

Start a new interactive session with no transaction support requested.

classmethod test_profile(settings: Mapping[str, Any] = None, bind: Mapping[Type[CT_co], Any] = None) → aries_cloudagent.core.in_memory.InMemoryProfile[source]

Used in tests to create a standard InMemoryProfile.

classmethod test_session(settings: Mapping[str, Any] = None, bind: Mapping[Type[CT_co], Any] = None) → aries_cloudagent.core.in_memory.InMemoryProfileSession[source]

Used in tests to quickly create InMemoryProfileSession.

transaction(context: aries_cloudagent.config.injection_context.InjectionContext = None) → aries_cloudagent.core.profile.ProfileSession[source]

Start a new interactive session with commit and rollback support.

If the current backend does not support transactions, then commit and rollback operations of the session will not have any effect.

class aries_cloudagent.core.in_memory.InMemoryProfileManager[source]

Bases: aries_cloudagent.core.profile.ProfileManager

Manager for producing in-memory wallet/storage implementation.

open(context: aries_cloudagent.config.injection_context.InjectionContext, config: Mapping[str, Any] = None) → aries_cloudagent.core.profile.Profile[source]

Open an instance of an existing profile.

provision(context: aries_cloudagent.config.injection_context.InjectionContext, config: Mapping[str, Any] = None) → aries_cloudagent.core.profile.Profile[source]

Provision a new instance of a profile.

class aries_cloudagent.core.in_memory.InMemoryProfileSession(profile: aries_cloudagent.core.profile.Profile, *, context: aries_cloudagent.config.injection_context.InjectionContext = None, settings: Mapping[str, Any] = None)[source]

Bases: aries_cloudagent.core.profile.ProfileSession

An active connection to the profile management backend.

storage

Get the BaseStorage implementation (helper specific to in-memory profile).

wallet

Get the BaseWallet implementation (helper specific to in-memory profile).

aries_cloudagent.core.plugin_registry module

Handle registration of plugin modules for extending functionality.

class aries_cloudagent.core.plugin_registry.PluginRegistry[source]

Bases: object

Plugin registry for indexing application plugins.

init_context(context: aries_cloudagent.config.injection_context.InjectionContext)[source]

Call plugin setup methods on the current context.

load_protocol_version(context: aries_cloudagent.config.injection_context.InjectionContext, mod: module, version_definition: dict = None)[source]

Load a particular protocol version.

load_protocols(context: aries_cloudagent.config.injection_context.InjectionContext, plugin: module)[source]

For modules that don’t implement setup, register protocols manually.

plugin_names

Accessor for a list of all plugin modules.

plugins

Accessor for a list of all plugin modules.

post_process_routes(app)[source]

Call route binary file response OpenAPI fixups if applicable.

register_admin_routes(app)[source]

Call route registration methods on the current context.

register_package(package_name: str) → Sequence[module][source]

Register all modules (sub-packages) under a given package name.

register_plugin(module_name: str) → module[source]

Register a plugin module.

validate_version(version_list, module_name)[source]

Validate version dict format.

aries_cloudagent.core.profile module

Classes for managing profile information within a request context.

class aries_cloudagent.core.profile.Profile(*, context: aries_cloudagent.config.injection_context.InjectionContext = None, name: str = None, created: bool = False)[source]

Bases: abc.ABC

Base abstraction for handling identity-related state.

BACKEND_NAME = None
DEFAULT_NAME = 'default'
backend

Accessor for the backend implementation name.

close()[source]

Close the profile instance.

context

Accessor for the injection context.

created

Accessor for the created flag indicating a new profile.

inject(base_cls: Type[InjectType], settings: Mapping[str, object] = None, *, required: bool = True) → Optional[InjectType][source]

Get the provided instance of a given class identifier.

Parameters:
  • cls – The base class to retrieve an instance of
  • settings – An optional mapping providing configuration to the provider
Returns:

An instance of the base class, or None

name

Accessor for the profile name.

notify(topic: str, payload: Any)[source]

Signal an event.

remove()[source]

Remove the profile.

session(context: aries_cloudagent.config.injection_context.InjectionContext = None) → aries_cloudagent.core.profile.ProfileSession[source]

Start a new interactive session with no transaction support requested.

settings

Accessor for scope-specific settings.

transaction(context: aries_cloudagent.config.injection_context.InjectionContext = None) → aries_cloudagent.core.profile.ProfileSession[source]

Start a new interactive session with commit and rollback support.

If the current backend does not support transactions, then commit and rollback operations of the session will not have any effect.

class aries_cloudagent.core.profile.ProfileManager[source]

Bases: abc.ABC

Handle provision and open for profile instances.

open(context: aries_cloudagent.config.injection_context.InjectionContext, config: Mapping[str, Any] = None) → aries_cloudagent.core.profile.Profile[source]

Open an instance of an existing profile.

provision(context: aries_cloudagent.config.injection_context.InjectionContext, config: Mapping[str, Any] = None) → aries_cloudagent.core.profile.Profile[source]

Provision a new instance of a profile.

class aries_cloudagent.core.profile.ProfileManagerProvider[source]

Bases: aries_cloudagent.config.base.BaseProvider

The standard profile manager provider which keys off the selected wallet type.

MANAGER_TYPES = {'askar': 'aries_cloudagent.askar.profile.AskarProfileManager', 'in_memory': 'aries_cloudagent.core.in_memory.InMemoryProfileManager', 'indy': 'aries_cloudagent.indy.sdk.profile.IndySdkProfileManager'}
provide(settings: aries_cloudagent.config.base.BaseSettings, injector: aries_cloudagent.config.base.BaseInjector)[source]

Create the profile manager instance.

class aries_cloudagent.core.profile.ProfileSession(profile: aries_cloudagent.core.profile.Profile, *, context: aries_cloudagent.config.injection_context.InjectionContext = None, settings: Mapping[str, Any] = None)[source]

Bases: abc.ABC

An active connection to the profile management backend.

active

Accessor for the session active state.

commit()[source]

Commit any updates performed within the transaction.

If the current session is not a transaction, then nothing is performed.

context

Accessor for the associated injection context.

inject(base_cls: Type[InjectType], settings: Mapping[str, object] = None, *, required: bool = True) → Optional[InjectType][source]

Get the provided instance of a given class identifier.

Parameters:
  • cls – The base class to retrieve an instance of
  • settings – An optional mapping providing configuration to the provider
Returns:

An instance of the base class, or None

is_transaction

Check if the session supports commit and rollback operations.

profile

Accessor for the associated profile instance.

rollback()[source]

Roll back any updates performed within the transaction.

If the current session is not a transaction, then nothing is performed.

settings

Accessor for scope-specific settings.

aries_cloudagent.core.protocol_registry module

Handle registration and publication of supported protocols.

class aries_cloudagent.core.protocol_registry.ProtocolRegistry[source]

Bases: object

Protocol registry for indexing message families.

controllers

Accessor for a list of all protocol controller functions.

message_types

Accessor for a list of all message types.

parse_type_string(message_type)[source]

Parse message type string and return dict with info.

prepare_disclosed(context: aries_cloudagent.config.injection_context.InjectionContext, protocols: Sequence[str])[source]

Call controllers and return publicly supported message families and roles.

protocols

Accessor for a list of all message protocols.

protocols_matching_query(query: str) → Sequence[str][source]

Return a list of message protocols matching a query string.

register_controllers(*controller_sets, version_definition=None)[source]

Add new controllers.

Parameters:controller_sets – Mappings of message families to coroutines
register_message_types(*typesets, version_definition=None)[source]

Add new supported message types.

Parameters:
  • typesets – Mappings of message types to register
  • version_definition – Optional version definition dict
resolve_message_class(message_type: str) → type[source]

Resolve a message_type to a message class.

Given a message type identifier, this method returns the corresponding registered message class.

Parameters:message_type – Message type to resolve
Returns:The resolved message class