aries_cloudagent.core package

Subpackages

Submodules

aries_cloudagent.core.conductor module

aries_cloudagent.core.dispatcher module

The Dispatcher.

The dispatcher is responsible for coordinating data flow between handlers, providing lifecycle hook callbacks storing state for message threads, etc.

class aries_cloudagent.core.dispatcher.Dispatcher(profile: Profile)[source]

Bases: object

Dispatcher class.

Class responsible for dispatching messages to message handlers and responding to other agents.

async complete(timeout: float = 0.1)[source]

Wait for pending tasks to complete.

async handle_message(profile: Profile, inbound_message: InboundMessage, send_outbound: Coroutine)[source]

Configure responder and message context and invoke the message handler.

Parameters
  • profile – The profile associated with the inbound message

  • inbound_message – The inbound message instance

  • send_outbound – Async function to send outbound messages

# Raises: # MessageParseError: If the message type version is not supported

Returns

The response from the handler

log_task(task: CompletedTask)[source]

Log a completed task using the stats collector.

async make_message(profile: Profile, parsed_msg: dict) Tuple[BaseMessage, Optional[str]][source]

Deserialize a message dict into the appropriate message instance.

Given a dict describing a message, this method returns an instance of the related message class.

Parameters
  • parsed_msg – The parsed message

  • profile – Profile

Returns

An instance of the corresponding message class for this message

Raises
  • MessageParseError – If the message doesn’t specify @type

  • MessageParseError – If there is no message class registered to handle

  • the given type

put_task(coro: Coroutine, complete: Optional[Callable] = None, ident: Optional[str] = None) PendingTask[source]

Run a task in the task queue, potentially blocking other handlers.

queue_message(profile: Profile, inbound_message: InboundMessage, send_outbound: Coroutine, complete: Optional[Callable] = None) PendingTask[source]

Add a message to the processing queue for handling.

Parameters
  • profile – The profile associated with the inbound message

  • inbound_message – The inbound message instance

  • send_outbound – Async function to send outbound messages

  • complete – Function to call when the handler has completed

Returns

A pending task instance resolving to the handler task

run_task(coro: Coroutine, complete: Optional[Callable] = None, ident: Optional[str] = None) Task[source]

Run a task in the task queue, potentially blocking other handlers.

async setup()[source]

Perform async instance setup.

class aries_cloudagent.core.dispatcher.DispatcherResponder(context: RequestContext, inbound_message: InboundMessage, send_outbound: Coroutine, **kwargs)[source]

Bases: BaseResponder

Handle outgoing messages from message handlers.

async create_outbound(message: Union[AgentMessage, BaseMessage, str, bytes], **kwargs) OutboundMessage[source]

Create an OutboundMessage from a message body.

Parameters

message – The message payload

async send_outbound(message: OutboundMessage, **kwargs) OutboundSendStatus[source]

Send outbound message.

Parameters

message – The OutboundMessage to be sent

async send_webhook(topic: str, payload: dict)[source]

Dispatch a webhook. DEPRECATED: use the event bus instead.

Parameters
  • topic – the webhook topic identifier

  • payload – the webhook payload value

exception aries_cloudagent.core.dispatcher.ProblemReportParseError(*args, error_code: Optional[str] = None, **kwargs)[source]

Bases: MessageParseError

Error to raise on failure to parse problem-report message.

aries_cloudagent.core.error module

Common exception classes.

exception aries_cloudagent.core.error.BaseError(*args, error_code: Optional[str] = None, **kwargs)[source]

Bases: Exception

Generic exception class which other exceptions should inherit from.

property message: str

Accessor for the error message.

property roll_up: str

Accessor for nested error messages rolled into one line.

For display: aiohttp.web errors truncate after newline.

exception aries_cloudagent.core.error.ProfileDuplicateError(*args, error_code: Optional[str] = None, **kwargs)[source]

Bases: ProfileError

Profile with the given name already exists.

exception aries_cloudagent.core.error.ProfileError(*args, error_code: Optional[str] = None, **kwargs)[source]

Bases: BaseError

Base error for profile operations.

exception aries_cloudagent.core.error.ProfileNotFoundError(*args, error_code: Optional[str] = None, **kwargs)[source]

Bases: ProfileError

Requested profile was not found.

exception aries_cloudagent.core.error.ProfileSessionInactiveError(*args, error_code: Optional[str] = None, **kwargs)[source]

Bases: ProfileError

Error raised when a profile session is not currently active.

exception aries_cloudagent.core.error.ProtocolDefinitionValidationError(*args, error_code: Optional[str] = None, **kwargs)[source]

Bases: BaseError

Error raised when there is a problem validating a protocol definition.

exception aries_cloudagent.core.error.ProtocolMinorVersionNotSupported(*args, error_code: Optional[str] = None, **kwargs)[source]

Bases: BaseError

Minimum minor version protocol error.

Error raised when protocol support exists but minimum minor version is higher than in @type parameter.

exception aries_cloudagent.core.error.StartupError(*args, error_code: Optional[str] = None, **kwargs)[source]

Bases: BaseError

Error raised when there is a problem starting the conductor.

aries_cloudagent.core.event_bus module

A simple event bus.

class aries_cloudagent.core.event_bus.Event(topic: str, payload: Optional[Any] = None)[source]

Bases: object

A simple event object.

property payload

Return this event’s payload.

property topic

Return this event’s topic.

with_metadata(metadata: EventMetadata) EventWithMetadata[source]

Annotate event with metadata and return EventWithMetadata object.

class aries_cloudagent.core.event_bus.EventBus[source]

Bases: object

A simple event bus implementation.

async notify(profile: Profile, event: Event)[source]

Notify subscribers of event.

Parameters
  • profile (Profile) – context of the event

  • event (Event) – event to emit

subscribe(pattern: Pattern, processor: Callable)[source]

Subscribe to an event.

Parameters
  • pattern (Pattern) – compiled regular expression for matching topics

  • processor (Callable) – async callable accepting profile and event

unsubscribe(pattern: Pattern, processor: Callable)[source]

Unsubscribe from an event.

This method is idempotent. Repeated calls to unsubscribe will not result in errors.

Parameters
  • pattern (Pattern) – regular expression used to subscribe the processor

  • processor (Callable) – processor to unsubscribe

wait_for_event(waiting_profile: Profile, pattern: Pattern, cond: Optional[Callable[[Event], bool]] = None) Iterator[Awaitable[Event]][source]

Capture an event and retrieve its value.

class aries_cloudagent.core.event_bus.EventMetadata(pattern: Pattern, match: Match[str])[source]

Bases: NamedTuple

Metadata passed alongside events to add context.

match: Match[str]

Alias for field number 1

pattern: Pattern

Alias for field number 0

class aries_cloudagent.core.event_bus.EventWithMetadata(topic: str, payload: Any, metadata: EventMetadata)[source]

Bases: Event

Event with metadata passed alongside events to add context.

property metadata: EventMetadata

Return metadata.

class aries_cloudagent.core.event_bus.MockEventBus[source]

Bases: EventBus

A mock EventBus for testing.

async notify(profile: Profile, event: Event)[source]

Append the event to MockEventBus.events.

aries_cloudagent.core.goal_code_registry module

Handle registration and publication of supported goal codes.

class aries_cloudagent.core.goal_code_registry.GoalCodeRegistry[source]

Bases: object

Goal code registry.

goal_codes_matching_query(query: str) Sequence[str][source]

Return a list of goal codes matching a query string.

register_controllers(*controller_sets)[source]

Add new controllers.

Parameters

controller_sets – Mappings of controller to coroutines

aries_cloudagent.core.oob_processor module

aries_cloudagent.core.plugin_registry module

Handle registration of plugin modules for extending functionality.

class aries_cloudagent.core.plugin_registry.PluginRegistry(blocklist: Iterable[str] = [])[source]

Bases: object

Plugin registry for indexing application plugins.

async init_context(context: InjectionContext)[source]

Call plugin setup methods on the current context.

async load_protocol_version(context: InjectionContext, mod: module, version_definition: Optional[dict] = None)[source]

Load a particular protocol version.

async load_protocols(context: InjectionContext, plugin: module)[source]

For modules that don’t implement setup, register protocols manually.

property plugin_names: Sequence[str]

Accessor for a list of all plugin modules.

property plugins: Sequence[module]

Accessor for a list of all plugin modules.

post_process_routes(app)[source]

Call route binary file response OpenAPI fixups if applicable.

async register_admin_routes(app)[source]

Call route registration methods on the current context.

register_package(package_name: str) Sequence[module][source]

Register all modules (sub-packages) under a given package name.

register_plugin(module_name: str) module[source]

Register a plugin module.

register_protocol_events(context: InjectionContext)[source]

Call route register_events methods on the current context.

validate_version(version_list, module_name)[source]

Validate version dict format.

aries_cloudagent.core.profile module

Classes for managing profile information within a request context.

class aries_cloudagent.core.profile.Profile(*, context: Optional[InjectionContext] = None, name: Optional[str] = None, created: bool = False)[source]

Bases: ABC

Base abstraction for handling identity-related state.

BACKEND_NAME: str = None
DEFAULT_NAME: str = 'default'
property backend: str

Accessor for the backend implementation name.

async close()[source]

Close the profile instance.

property context: InjectionContext

Accessor for the injection context.

property created: bool

Accessor for the created flag indicating a new profile.

inject(base_cls: Type[InjectType], settings: Optional[Mapping[str, object]] = None) InjectType[source]

Get the provided instance of a given class identifier.

Parameters
  • cls – The base class to retrieve an instance of

  • settings – An optional mapping providing configuration to the provider

Returns

An instance of the base class, or None

inject_or(base_cls: Type[InjectType], settings: Optional[Mapping[str, object]] = None, default: Optional[InjectType] = None) Optional[InjectType][source]

Get the provided instance of a given class identifier or default if not found.

Parameters
  • base_cls – The base class to retrieve an instance of

  • settings – An optional dict providing configuration to the provider

  • default – default return value if no instance is found

Returns

An instance of the base class, or None

property name: str

Accessor for the profile name.

async notify(topic: str, payload: Any)[source]

Signal an event.

async remove()[source]

Remove the profile.

abstract session(context: Optional[InjectionContext] = None) ProfileSession[source]

Start a new interactive session with no transaction support requested.

property settings: BaseSettings

Accessor for scope-specific settings.

abstract transaction(context: Optional[InjectionContext] = None) ProfileSession[source]

Start a new interactive session with commit and rollback support.

If the current backend does not support transactions, then commit and rollback operations of the session will not have any effect.

class aries_cloudagent.core.profile.ProfileManager[source]

Bases: ABC

Handle provision and open for profile instances.

abstract async open(context: InjectionContext, config: Optional[Mapping[str, Any]] = None) Profile[source]

Open an instance of an existing profile.

abstract async provision(context: InjectionContext, config: Optional[Mapping[str, Any]] = None) Profile[source]

Provision a new instance of a profile.

class aries_cloudagent.core.profile.ProfileManagerProvider[source]

Bases: BaseProvider

The standard profile manager provider which keys off the selected wallet type.

MANAGER_TYPES = {'askar': 'aries_cloudagent.askar.profile.AskarProfileManager', 'askar-anoncreds': 'aries_cloudagent.askar.profile_anon.AskarAnonProfileManager', 'in_memory': 'aries_cloudagent.core.in_memory.InMemoryProfileManager', 'indy': 'aries_cloudagent.indy.sdk.profile.IndySdkProfileManager'}
provide(settings: BaseSettings, injector: BaseInjector)[source]

Create the profile manager instance.

class aries_cloudagent.core.profile.ProfileSession(profile: Profile, *, context: Optional[InjectionContext] = None, settings: Optional[Mapping[str, Any]] = None)[source]

Bases: ABC

An active connection to the profile management backend.

property active: bool

Accessor for the session active state.

async commit()[source]

Commit any updates performed within the transaction.

If the current session is not a transaction, then nothing is performed.

property context: InjectionContext

Accessor for the associated injection context.

async emit_event(topic: str, payload: Any, force_emit: bool = False)[source]

Emit an event.

If we are in an active transaction, just queue the event, otherwise emit it.

Parameters
  • session – The profile session to use

  • payload – The event payload

inject(base_cls: Type[InjectType], settings: Optional[Mapping[str, object]] = None) InjectType[source]

Get the provided instance of a given class identifier.

Parameters
  • cls – The base class to retrieve an instance of

  • settings – An optional mapping providing configuration to the provider

Returns

An instance of the base class, or None

inject_or(base_cls: Type[InjectType], settings: Optional[Mapping[str, object]] = None, default: Optional[InjectType] = None) Optional[InjectType][source]

Get the provided instance of a given class identifier or default if not found.

Parameters
  • base_cls – The base class to retrieve an instance of

  • settings – An optional dict providing configuration to the provider

  • default – default return value if no instance is found

Returns

An instance of the base class, or None

property is_transaction: bool

Check if the session supports commit and rollback operations.

property profile: Profile

Accessor for the associated profile instance.

async rollback()[source]

Roll back any updates performed within the transaction.

If the current session is not a transaction, then nothing is performed.

property settings: BaseSettings

Accessor for scope-specific settings.

aries_cloudagent.core.protocol_registry module

Handle registration and publication of supported protocols.

class aries_cloudagent.core.protocol_registry.ProtocolRegistry[source]

Bases: object

Protocol registry for indexing message families.

property controllers: Mapping[str, str]

Accessor for a list of all protocol controller functions.

create_msg_types_for_minor_version(typesets, version_definition)[source]

Return mapping of message type to module path for minor versions.

Parameters
  • typesets – Mappings of message types to register

  • version_definition – Optional version definition dict

Returns

Typesets mapping

property message_types: Sequence[str]

Accessor for a list of all message types.

parse_type_string(message_type)[source]

Parse message type string and return dict with info.

async prepare_disclosed(context: InjectionContext, protocols: Sequence[str])[source]

Call controllers and return publicly supported message families and roles.

property protocols: Sequence[str]

Accessor for a list of all message protocols.

protocols_matching_query(query: str) Sequence[str][source]

Return a list of message protocols matching a query string.

register_controllers(*controller_sets, version_definition=None)[source]

Add new controllers.

Parameters

controller_sets – Mappings of message families to coroutines

register_message_types(*typesets, version_definition=None)[source]

Add new supported message types.

Parameters
  • typesets – Mappings of message types to register

  • version_definition – Optional version definition dict

resolve_message_class(message_type: str) type[source]

Resolve a message_type to a message class.

Given a message type identifier, this method returns the corresponding registered message class.

Parameters

message_type – Message type to resolve

Returns

The resolved message class

aries_cloudagent.core.util module

Core utilities and constants.

aries_cloudagent.core.util.get_proto_default_version(def_path: str, major_version: int = 1) str[source]

Return default protocol version from version_definition.

async aries_cloudagent.core.util.get_proto_default_version_from_msg_class(profile: Profile, msg_class: type, major_version: int = 1) str[source]

Return default protocol version from version_definition.

async aries_cloudagent.core.util.get_version_def_from_msg_class(profile: Profile, msg_class: type, major_version: int = 1)[source]

Return version_definition of a protocol from msg_class.

aries_cloudagent.core.util.get_version_from_message(msg: AgentMessage) str[source]

Return version from provided AgentMessage.

aries_cloudagent.core.util.get_version_from_message_type(msg_type: str) str[source]

Return version from provided message_type.

async aries_cloudagent.core.util.validate_get_response_version(profile: Profile, rec_version: str, msg_class: type) Tuple[str, Optional[str]][source]

Return a tuple with version to respond with and warnings.

Process received version and protocol version definition, returns the tuple.

Parameters
  • profile – Profile

  • rec_version – received version from message

  • msg_class – type

Returns

Tuple with response version and any warnings