aries_cloudagent.core package
Subpackages
Submodules
aries_cloudagent.core.conductor module
aries_cloudagent.core.dispatcher module
The Dispatcher.
The dispatcher is responsible for coordinating data flow between handlers, providing lifecycle hook callbacks storing state for message threads, etc.
- class aries_cloudagent.core.dispatcher.Dispatcher(profile: Profile)[source]
Bases:
object
Dispatcher class.
Class responsible for dispatching messages to message handlers and responding to other agents.
- async handle_message(profile: Profile, inbound_message: InboundMessage, send_outbound: Coroutine)[source]
Configure responder and message context and invoke the message handler.
- Parameters
profile – The profile associated with the inbound message
inbound_message – The inbound message instance
send_outbound – Async function to send outbound messages
# Raises: # MessageParseError: If the message type version is not supported
- Returns
The response from the handler
- log_task(task: CompletedTask)[source]
Log a completed task using the stats collector.
- async make_message(profile: Profile, parsed_msg: dict) Tuple[BaseMessage, Optional[str]] [source]
Deserialize a message dict into the appropriate message instance.
Given a dict describing a message, this method returns an instance of the related message class.
- Parameters
parsed_msg – The parsed message
profile – Profile
- Returns
An instance of the corresponding message class for this message
- Raises
MessageParseError – If the message doesn’t specify @type
MessageParseError – If there is no message class registered to handle
the given type –
- put_task(coro: Coroutine, complete: Optional[Callable] = None, ident: Optional[str] = None) PendingTask [source]
Run a task in the task queue, potentially blocking other handlers.
- queue_message(profile: Profile, inbound_message: InboundMessage, send_outbound: Coroutine, complete: Optional[Callable] = None) PendingTask [source]
Add a message to the processing queue for handling.
- Parameters
profile – The profile associated with the inbound message
inbound_message – The inbound message instance
send_outbound – Async function to send outbound messages
complete – Function to call when the handler has completed
- Returns
A pending task instance resolving to the handler task
- class aries_cloudagent.core.dispatcher.DispatcherResponder(context: RequestContext, inbound_message: InboundMessage, send_outbound: Coroutine, **kwargs)[source]
Bases:
BaseResponder
Handle outgoing messages from message handlers.
- async create_outbound(message: Union[AgentMessage, BaseMessage, str, bytes], **kwargs) OutboundMessage [source]
Create an OutboundMessage from a message body.
- Parameters
message – The message payload
- async send_outbound(message: OutboundMessage, **kwargs) OutboundSendStatus [source]
Send outbound message.
- Parameters
message – The OutboundMessage to be sent
- exception aries_cloudagent.core.dispatcher.ProblemReportParseError(*args, error_code: Optional[str] = None, **kwargs)[source]
Bases:
MessageParseError
Error to raise on failure to parse problem-report message.
aries_cloudagent.core.error module
Common exception classes.
- exception aries_cloudagent.core.error.BaseError(*args, error_code: Optional[str] = None, **kwargs)[source]
Bases:
Exception
Generic exception class which other exceptions should inherit from.
- exception aries_cloudagent.core.error.ProfileDuplicateError(*args, error_code: Optional[str] = None, **kwargs)[source]
Bases:
ProfileError
Profile with the given name already exists.
- exception aries_cloudagent.core.error.ProfileError(*args, error_code: Optional[str] = None, **kwargs)[source]
Bases:
BaseError
Base error for profile operations.
- exception aries_cloudagent.core.error.ProfileNotFoundError(*args, error_code: Optional[str] = None, **kwargs)[source]
Bases:
ProfileError
Requested profile was not found.
- exception aries_cloudagent.core.error.ProfileSessionInactiveError(*args, error_code: Optional[str] = None, **kwargs)[source]
Bases:
ProfileError
Error raised when a profile session is not currently active.
- exception aries_cloudagent.core.error.ProtocolDefinitionValidationError(*args, error_code: Optional[str] = None, **kwargs)[source]
Bases:
BaseError
Error raised when there is a problem validating a protocol definition.
aries_cloudagent.core.event_bus module
A simple event bus.
- class aries_cloudagent.core.event_bus.Event(topic: str, payload: Optional[Any] = None)[source]
Bases:
object
A simple event object.
- property payload
Return this event’s payload.
- property topic
Return this event’s topic.
- with_metadata(metadata: EventMetadata) EventWithMetadata [source]
Annotate event with metadata and return EventWithMetadata object.
- class aries_cloudagent.core.event_bus.EventBus[source]
Bases:
object
A simple event bus implementation.
- async notify(profile: Profile, event: Event)[source]
Notify subscribers of event.
- Parameters
profile (Profile) – context of the event
event (Event) – event to emit
- subscribe(pattern: Pattern, processor: Callable)[source]
Subscribe to an event.
- Parameters
pattern (Pattern) – compiled regular expression for matching topics
processor (Callable) – async callable accepting profile and event
- unsubscribe(pattern: Pattern, processor: Callable)[source]
Unsubscribe from an event.
This method is idempotent. Repeated calls to unsubscribe will not result in errors.
- Parameters
pattern (Pattern) – regular expression used to subscribe the processor
processor (Callable) – processor to unsubscribe
- class aries_cloudagent.core.event_bus.EventMetadata(pattern: Pattern, match: Match[str])[source]
Bases:
NamedTuple
Metadata passed alongside events to add context.
- class aries_cloudagent.core.event_bus.EventWithMetadata(topic: str, payload: Any, metadata: EventMetadata)[source]
Bases:
Event
Event with metadata passed alongside events to add context.
- property metadata: EventMetadata
Return metadata.
aries_cloudagent.core.goal_code_registry module
Handle registration and publication of supported goal codes.
aries_cloudagent.core.oob_processor module
aries_cloudagent.core.plugin_registry module
Handle registration of plugin modules for extending functionality.
- class aries_cloudagent.core.plugin_registry.PluginRegistry(blocklist: Iterable[str] = [])[source]
Bases:
object
Plugin registry for indexing application plugins.
- async init_context(context: InjectionContext)[source]
Call plugin setup methods on the current context.
- async load_protocol_version(context: InjectionContext, mod: module, version_definition: Optional[dict] = None)[source]
Load a particular protocol version.
- async load_protocols(context: InjectionContext, plugin: module)[source]
For modules that don’t implement setup, register protocols manually.
- register_package(package_name: str) Sequence[module] [source]
Register all modules (sub-packages) under a given package name.
- register_protocol_events(context: InjectionContext)[source]
Call route register_events methods on the current context.
aries_cloudagent.core.profile module
Classes for managing profile information within a request context.
- class aries_cloudagent.core.profile.Profile(*, context: Optional[InjectionContext] = None, name: Optional[str] = None, created: bool = False)[source]
Bases:
ABC
Base abstraction for handling identity-related state.
- property context: InjectionContext
Accessor for the injection context.
- inject(base_cls: Type[InjectType], settings: Optional[Mapping[str, object]] = None) InjectType [source]
Get the provided instance of a given class identifier.
- Parameters
cls – The base class to retrieve an instance of
settings – An optional mapping providing configuration to the provider
- Returns
An instance of the base class, or None
- inject_or(base_cls: Type[InjectType], settings: Optional[Mapping[str, object]] = None, default: Optional[InjectType] = None) Optional[InjectType] [source]
Get the provided instance of a given class identifier or default if not found.
- Parameters
base_cls – The base class to retrieve an instance of
settings – An optional dict providing configuration to the provider
default – default return value if no instance is found
- Returns
An instance of the base class, or None
- abstract session(context: Optional[InjectionContext] = None) ProfileSession [source]
Start a new interactive session with no transaction support requested.
- property settings: BaseSettings
Accessor for scope-specific settings.
- abstract transaction(context: Optional[InjectionContext] = None) ProfileSession [source]
Start a new interactive session with commit and rollback support.
If the current backend does not support transactions, then commit and rollback operations of the session will not have any effect.
- class aries_cloudagent.core.profile.ProfileManager[source]
Bases:
ABC
Handle provision and open for profile instances.
- class aries_cloudagent.core.profile.ProfileManagerProvider[source]
Bases:
BaseProvider
The standard profile manager provider which keys off the selected wallet type.
- MANAGER_TYPES = {'askar': 'aries_cloudagent.askar.profile.AskarProfileManager', 'askar-anoncreds': 'aries_cloudagent.askar.profile_anon.AskarAnonProfileManager', 'in_memory': 'aries_cloudagent.core.in_memory.InMemoryProfileManager', 'indy': 'aries_cloudagent.indy.sdk.profile.IndySdkProfileManager'}
- provide(settings: BaseSettings, injector: BaseInjector)[source]
Create the profile manager instance.
- class aries_cloudagent.core.profile.ProfileSession(profile: Profile, *, context: Optional[InjectionContext] = None, settings: Optional[Mapping[str, Any]] = None)[source]
Bases:
ABC
An active connection to the profile management backend.
- async commit()[source]
Commit any updates performed within the transaction.
If the current session is not a transaction, then nothing is performed.
- property context: InjectionContext
Accessor for the associated injection context.
- async emit_event(topic: str, payload: Any, force_emit: bool = False)[source]
Emit an event.
If we are in an active transaction, just queue the event, otherwise emit it.
- Parameters
session – The profile session to use
payload – The event payload
- inject(base_cls: Type[InjectType], settings: Optional[Mapping[str, object]] = None) InjectType [source]
Get the provided instance of a given class identifier.
- Parameters
cls – The base class to retrieve an instance of
settings – An optional mapping providing configuration to the provider
- Returns
An instance of the base class, or None
- inject_or(base_cls: Type[InjectType], settings: Optional[Mapping[str, object]] = None, default: Optional[InjectType] = None) Optional[InjectType] [source]
Get the provided instance of a given class identifier or default if not found.
- Parameters
base_cls – The base class to retrieve an instance of
settings – An optional dict providing configuration to the provider
default – default return value if no instance is found
- Returns
An instance of the base class, or None
- async rollback()[source]
Roll back any updates performed within the transaction.
If the current session is not a transaction, then nothing is performed.
- property settings: BaseSettings
Accessor for scope-specific settings.
aries_cloudagent.core.protocol_registry module
Handle registration and publication of supported protocols.
- class aries_cloudagent.core.protocol_registry.ProtocolRegistry[source]
Bases:
object
Protocol registry for indexing message families.
- create_msg_types_for_minor_version(typesets, version_definition)[source]
Return mapping of message type to module path for minor versions.
- Parameters
typesets – Mappings of message types to register
version_definition – Optional version definition dict
- Returns
Typesets mapping
- async prepare_disclosed(context: InjectionContext, protocols: Sequence[str])[source]
Call controllers and return publicly supported message families and roles.
- protocols_matching_query(query: str) Sequence[str] [source]
Return a list of message protocols matching a query string.
- register_controllers(*controller_sets, version_definition=None)[source]
Add new controllers.
- Parameters
controller_sets – Mappings of message families to coroutines
aries_cloudagent.core.util module
Core utilities and constants.
- aries_cloudagent.core.util.get_proto_default_version(def_path: str, major_version: int = 1) str [source]
Return default protocol version from version_definition.
- async aries_cloudagent.core.util.get_proto_default_version_from_msg_class(profile: Profile, msg_class: type, major_version: int = 1) str [source]
Return default protocol version from version_definition.
- async aries_cloudagent.core.util.get_version_def_from_msg_class(profile: Profile, msg_class: type, major_version: int = 1)[source]
Return version_definition of a protocol from msg_class.
- aries_cloudagent.core.util.get_version_from_message(msg: AgentMessage) str [source]
Return version from provided AgentMessage.
- aries_cloudagent.core.util.get_version_from_message_type(msg_type: str) str [source]
Return version from provided message_type.
- async aries_cloudagent.core.util.validate_get_response_version(profile: Profile, rec_version: str, msg_class: type) Tuple[str, Optional[str]] [source]
Return a tuple with version to respond with and warnings.
Process received version and protocol version definition, returns the tuple.
- Parameters
profile – Profile
rec_version – received version from message
msg_class – type
- Returns
Tuple with response version and any warnings