aries_cloudagent.utils package¶
Submodules¶
aries_cloudagent.utils.classloader module¶
The classloader provides utilties to dynamically load classes and modules.
-
class
aries_cloudagent.utils.classloader.
ClassLoader
[source]¶ Bases:
object
Class used to load classes from modules dynamically.
-
classmethod
load_class
(class_name: str, default_module: Optional[str] = None, package: Optional[str] = None)[source]¶ Resolve a complete class path (ie. typing.Dict) to the class itself.
Parameters: - class_name – the class name
- default_module – the default module to load, if not part of in the class name
- package – the parent package to search for the module
Returns: The resolved class
Raises: ClassNotFoundError
– If the class could not be resolved at pathModuleLoadError
– If there was an error loading the module
-
classmethod
load_module
(mod_path: str, package: str = None) → module[source]¶ Load a module by its absolute path.
Parameters: - mod_path – the absolute or relative module path
- package – the parent package to search for the module
Returns: The resolved module or None if the module cannot be found
Raises: ModuleLoadError
– If there was an error loading the module
-
classmethod
load_subclass_of
(base_class: Type, mod_path: str, package: str = None)[source]¶ Resolve an implementation of a base path within a module.
Parameters: - base_class – the base class being implemented
- mod_path – the absolute module path
- package – the parent package to search for the module
Returns: The resolved class
Raises: ClassNotFoundError
– If the module or class implementation could not be foundModuleLoadError
– If there was an error loading the module
-
classmethod
-
exception
aries_cloudagent.utils.classloader.
ClassNotFoundError
(*args, error_code: str = None, **kwargs)[source]¶ Bases:
aries_cloudagent.core.error.BaseError
Class not found error.
-
class
aries_cloudagent.utils.classloader.
DeferLoad
(cls_path: str)[source]¶ Bases:
object
Helper to defer loading of a class definition.
-
resolved
¶ Accessor for the resolved class instance.
-
-
exception
aries_cloudagent.utils.classloader.
ModuleLoadError
(*args, error_code: str = None, **kwargs)[source]¶ Bases:
aries_cloudagent.core.error.BaseError
Module load error.
aries_cloudagent.utils.dependencies module¶
Dependency related util methods.
-
aries_cloudagent.utils.dependencies.
assert_ursa_bbs_signatures_installed
()[source]¶ Assert ursa_bbs_signatures module is installed.
aries_cloudagent.utils.env module¶
Environment utility methods.
aries_cloudagent.utils.http module¶
HTTP utility methods.
-
exception
aries_cloudagent.utils.http.
FetchError
(*args, error_code: str = None, **kwargs)[source]¶ Bases:
aries_cloudagent.core.error.BaseError
Error raised when an HTTP fetch fails.
-
exception
aries_cloudagent.utils.http.
PutError
(*args, error_code: str = None, **kwargs)[source]¶ Bases:
aries_cloudagent.core.error.BaseError
Error raised when an HTTP put fails.
-
aries_cloudagent.utils.http.
fetch
(url: str, *, headers: dict = None, retry: bool = True, max_attempts: int = 5, interval: float = 1.0, backoff: float = 0.25, request_timeout: float = 10.0, connector: <sphinx.ext.autodoc.importer._MockObject object at 0x7f34953e9fd0> = None, session: <sphinx.ext.autodoc.importer._MockObject object at 0x7f34953e9ee0> = None, json: bool = False)[source]¶ Fetch from an HTTP server with automatic retries and timeouts.
Parameters: - url – the address to fetch
- headers – an optional dict of headers to send
- retry – flag to retry the fetch
- max_attempts – the maximum number of attempts to make
- interval – the interval between retries, in seconds
- backoff – the backoff interval, in seconds
- request_timeout – the HTTP request timeout, in seconds
- connector – an optional existing BaseConnector
- session – a shared ClientSession
- json – flag to parse the result as JSON
-
aries_cloudagent.utils.http.
fetch_stream
(url: str, *, headers: dict = None, retry: bool = True, max_attempts: int = 5, interval: float = 1.0, backoff: float = 0.25, request_timeout: float = 10.0, connector: <sphinx.ext.autodoc.importer._MockObject object at 0x7f34953e9fd0> = None, session: <sphinx.ext.autodoc.importer._MockObject object at 0x7f34953e9ee0> = None)[source]¶ Fetch from an HTTP server with automatic retries and timeouts.
Parameters: - url – the address to fetch
- headers – an optional dict of headers to send
- retry – flag to retry the fetch
- max_attempts – the maximum number of attempts to make
- interval – the interval between retries, in seconds
- backoff – the backoff interval, in seconds
- request_timeout – the HTTP request timeout, in seconds
- connector – an optional existing BaseConnector
- session – a shared ClientSession
- json – flag to parse the result as JSON
-
aries_cloudagent.utils.http.
put_file
(url: str, file_data: dict, extra_data: dict, *, retry: bool = True, max_attempts: int = 5, interval: float = 1.0, backoff: float = 0.25, request_timeout: float = 10.0, connector: <sphinx.ext.autodoc.importer._MockObject object at 0x7f34953e9fd0> = None, session: <sphinx.ext.autodoc.importer._MockObject object at 0x7f34953e9ee0> = None, json: bool = False)[source]¶ Put to HTTP server with automatic retries and timeouts.
Parameters: - url – the address to use
- file_data – dict with data key and path of file to upload
- extra_data – further content to include in data to put
- headers – an optional dict of headers to send
- retry – flag to retry the fetch
- max_attempts – the maximum number of attempts to make
- interval – the interval between retries, in seconds
- backoff – the backoff interval, in seconds
- request_timeout – the HTTP request timeout, in seconds
- connector – an optional existing BaseConnector
- session – a shared ClientSession
- json – flag to parse the result as JSON
aries_cloudagent.utils.jwe module¶
JSON Web Encryption utilities.
-
class
aries_cloudagent.utils.jwe.
B64Value
(*args, **kwargs)[source]¶ Bases:
sphinx.ext.autodoc.importer._MockObject
A marshmallow-compatible wrapper for base64-URL values.
-
class
aries_cloudagent.utils.jwe.
JweEnvelope
(*, protected: dict = None, protected_b64: bytes = None, unprotected: dict = None, ciphertext: bytes = None, iv: bytes = None, tag: bytes = None, aad: bytes = None, with_protected_recipients: bool = False, with_flatten_recipients: bool = True)[source]¶ Bases:
object
JWE envelope instance.
-
add_recipient
(recip: aries_cloudagent.utils.jwe.JweRecipient)[source]¶ Add a recipient to the JWE envelope.
-
combined_aad
¶ Accessor for the additional authenticated data.
-
classmethod
deserialize
(message: Mapping[str, Any]) → aries_cloudagent.utils.jwe.JweEnvelope[source]¶ Deserialize a JWE envelope from a mapping.
-
classmethod
from_json
(message: Union[bytes, str]) → aries_cloudagent.utils.jwe.JweEnvelope[source]¶ Decode a JWE envelope from a JSON string or bytes value.
-
get_recipient
(kid: str) → aries_cloudagent.utils.jwe.JweRecipient[source]¶ Find a recipient by key ID.
-
protected_bytes
¶ Access the protected data encoded as bytes.
This value is used in the additional authenticated data when encrypting.
-
recipient_key_ids
¶ Accessor for an iterator over the JWE recipient key identifiers.
-
recipients
¶ Accessor for an iterator over the JWE recipients.
The headers for each recipient include protected and unprotected headers from the outer envelope.
-
recipients_json
¶ Encode the current recipients for JSON.
-
-
class
aries_cloudagent.utils.jwe.
JweRecipient
(*, encrypted_key: bytes, header: dict = None)[source]¶ Bases:
object
A single message recipient.
-
class
aries_cloudagent.utils.jwe.
JweRecipientSchema
(*args, **kwargs)[source]¶ Bases:
sphinx.ext.autodoc.importer._MockObject
JWE recipient schema.
-
encrypted_key
¶ A marshmallow-compatible wrapper for base64-URL values.
-
header
¶ Used by autodoc_mock_imports.
-
-
class
aries_cloudagent.utils.jwe.
JweSchema
(*args, **kwargs)[source]¶ Bases:
sphinx.ext.autodoc.importer._MockObject
JWE envelope schema.
-
aad
¶ A marshmallow-compatible wrapper for base64-URL values.
-
ciphertext
¶ A marshmallow-compatible wrapper for base64-URL values.
-
encrypted_key
¶ A marshmallow-compatible wrapper for base64-URL values.
-
header
¶ Used by autodoc_mock_imports.
-
iv
¶ A marshmallow-compatible wrapper for base64-URL values.
-
protected
¶ Used by autodoc_mock_imports.
-
recipients
¶ Used by autodoc_mock_imports.
-
tag
¶ A marshmallow-compatible wrapper for base64-URL values.
-
unprotected
¶ Used by autodoc_mock_imports.
-
aries_cloudagent.utils.outofband module¶
Utilities for creating out-of-band messages.
aries_cloudagent.utils.repeat module¶
Utils for repeating tasks.
-
class
aries_cloudagent.utils.repeat.
RepeatAttempt
(seq: aries_cloudagent.utils.repeat.RepeatSequence, index: int = 1)[source]¶ Bases:
object
Represents the current iteration in a repeat sequence.
-
final
¶ Check if this is the last instance in the sequence.
-
next_interval
¶ Calculate the interval before the next attempt.
-
aries_cloudagent.utils.stats module¶
Classes for tracking performance and timing.
-
class
aries_cloudagent.utils.stats.
Collector
(*, enabled: bool = True, log_path: str = None)[source]¶ Bases:
object
Collector for a set of statistics.
-
enabled
¶ Accessor for the collector’s enabled property.
-
extract
(groups: Sequence[str] = None) → dict[source]¶ Extract statistics for a specific set of groups.
-
log
(name: str, duration: float, start: float = None)[source]¶ Log an entry in the statistics if the collector is enabled.
-
results
¶ Accessor for the current set of collected statistics.
-
wrap
(obj, prop_name: Union[str, Sequence[str]], groups: Sequence[str] = None, *, ignore_missing: bool = False)[source]¶ Wrap a method on a class or class instance.
-
aries_cloudagent.utils.task_queue module¶
Classes for managing a set of asyncio tasks.
-
class
aries_cloudagent.utils.task_queue.
CompletedTask
(task: _asyncio.Task, exc_info: Tuple, ident: str = None, timing: dict = None)[source]¶ Bases:
object
Represent the result of a queued task.
-
class
aries_cloudagent.utils.task_queue.
PendingTask
(coro: Coroutine, complete_hook: Callable = None, ident: str = None, task_future: _asyncio.Future = None, queued_time: float = None)[source]¶ Bases:
object
Represent a task in the queue.
-
cancelled
¶ Accessor for the cancelled property.
-
task
¶ Accessor for the task.
-
-
class
aries_cloudagent.utils.task_queue.
TaskQueue
(max_active: int = 0, timed: bool = False, trace_fn: Callable = None)[source]¶ Bases:
object
A class for managing a set of asyncio tasks.
-
add_active
(task: _asyncio.Task, task_complete: Callable = None, ident: str = None, timing: dict = None) → _asyncio.Task[source]¶ Register an active async task with an optional completion callback.
Parameters: - task – The asyncio task instance
- task_complete – An optional callback to run on completion
- ident – A string identifer for the task
- timing – An optional dictionary of timing information
-
add_pending
(pending: aries_cloudagent.utils.task_queue.PendingTask)[source]¶ Add a task to the pending queue.
Parameters: pending – The PendingTask to add to the task queue
-
cancelled
¶ Accessor for the cancelled property of the queue.
-
complete
(timeout: float = None, cleanup: bool = True)[source]¶ Cancel any pending tasks and wait for, or cancel active tasks.
-
completed_task
(task: _asyncio.Task, task_complete: Callable, ident: str, timing: dict = None)[source]¶ Clean up after a task has completed and run callbacks.
-
current_active
¶ Accessor for the current number of active tasks in the queue.
-
current_pending
¶ Accessor for the current number of pending tasks in the queue.
-
current_size
¶ Accessor for the total number of tasks in the queue.
-
max_active
¶ Accessor for the maximum number of active tasks in the queue.
-
put
(coro: Coroutine, task_complete: Callable = None, ident: str = None) → aries_cloudagent.utils.task_queue.PendingTask[source]¶ Add a new task to the queue, delaying execution if busy.
Parameters: - coro – The coroutine to run
- task_complete – A callback to run on completion
- ident – A string identifier for the task
Returns: a future resolving to the asyncio task instance once queued
-
ready
¶ Accessor for the ready property of the queue.
-
run
(coro: Coroutine, task_complete: Callable = None, ident: str = None, timing: dict = None) → _asyncio.Task[source]¶ Start executing a coroutine as an async task, bypassing the pending queue.
Parameters: - coro – The coroutine to run
- task_complete – An optional callback to run on completion
- ident – A string identifier for the task
- timing – An optional dictionary of timing information
Returns: the new asyncio task instance
-
-
aries_cloudagent.utils.task_queue.
coro_ident
(coro: Coroutine)[source]¶ Extract an identifier for a coroutine.
aries_cloudagent.utils.tracing module¶
Event tracing.
-
class
aries_cloudagent.utils.tracing.
AdminAPIMessageTracingSchema
(*args, **kwargs)[source]¶ Bases:
aries_cloudagent.messaging.models.openapi.OpenAPISchema
Request/result schema including agent message tracing.
This is to be used as a superclass for aca-py admin input/output messages that need to support tracing.
-
trace
¶ Used by autodoc_mock_imports.
-
-
aries_cloudagent.utils.tracing.
decode_inbound_message
(message)[source]¶ Return bundled message if appropriate.
-
aries_cloudagent.utils.tracing.
trace_event
(context, message, handler: str = None, outcome: str = None, perf_counter: float = None, force_trace: bool = False, raise_errors: bool = False) → float[source]¶ Log a trace event to a configured target.
Parameters: - context – The application context, attributes of interest are: context[“trace.enabled”]: True if we are logging events context[“trace.target”]: Trace target (“log”, “message” or an http endpoint) context[“trace.tag”]: Tag to be included in trace output
- message – the current message, can be an AgentMessage, InboundMessage, OutboundMessage or Exchange record
- event – Dict that will be converted to json and posted to the target