aries_cloudagent.utils package
Submodules
aries_cloudagent.utils.classloader module
The classloader provides utilties to dynamically load classes and modules.
- class aries_cloudagent.utils.classloader.ClassLoader[source]
Bases:
object
Class used to load classes from modules dynamically.
- classmethod load_class(class_name: str, default_module: Optional[str] = None, package: Optional[str] = None)[source]
Resolve a complete class path (ie. typing.Dict) to the class itself.
- Parameters
class_name – the class name
default_module – the default module to load, if not part of in the class name
package – the parent package to search for the module
- Returns
The resolved class
- Raises
ClassNotFoundError – If the class could not be resolved at path
ModuleLoadError – If there was an error loading the module
- classmethod load_module(mod_path: str, package: Optional[str] = None) module [source]
Load a module by its absolute path.
- Parameters
mod_path – the absolute or relative module path
package – the parent package to search for the module
- Returns
The resolved module or None if the module cannot be found
- Raises
ModuleLoadError – If there was an error loading the module
- classmethod load_subclass_of(base_class: Type, mod_path: str, package: Optional[str] = None)[source]
Resolve an implementation of a base path within a module.
- Parameters
base_class – the base class being implemented
mod_path – the absolute module path
package – the parent package to search for the module
- Returns
The resolved class
- Raises
ClassNotFoundError – If the module or class implementation could not be found
ModuleLoadError – If there was an error loading the module
- exception aries_cloudagent.utils.classloader.ClassNotFoundError(*args, error_code: Optional[str] = None, **kwargs)[source]
Bases:
aries_cloudagent.core.error.BaseError
Class not found error.
- class aries_cloudagent.utils.classloader.DeferLoad(cls_path: str)[source]
Bases:
object
Helper to defer loading of a class definition.
- property resolved
Accessor for the resolved class instance.
- exception aries_cloudagent.utils.classloader.ModuleLoadError(*args, error_code: Optional[str] = None, **kwargs)[source]
Bases:
aries_cloudagent.core.error.BaseError
Module load error.
aries_cloudagent.utils.dependencies module
Dependency related util methods.
- aries_cloudagent.utils.dependencies.assert_ursa_bbs_signatures_installed()[source]
Assert ursa_bbs_signatures module is installed.
aries_cloudagent.utils.env module
Environment utility methods.
- aries_cloudagent.utils.env.storage_path(*subpaths, create: bool = False) pathlib.Path [source]
Get the default aca-py home directory.
aries_cloudagent.utils.http module
HTTP utility methods.
- exception aries_cloudagent.utils.http.FetchError(*args, error_code: Optional[str] = None, **kwargs)[source]
Bases:
aries_cloudagent.core.error.BaseError
Error raised when an HTTP fetch fails.
- exception aries_cloudagent.utils.http.PutError(*args, error_code: Optional[str] = None, **kwargs)[source]
Bases:
aries_cloudagent.core.error.BaseError
Error raised when an HTTP put fails.
- async aries_cloudagent.utils.http.fetch(url: str, *, headers: Optional[dict] = None, retry: bool = True, max_attempts: int = 5, interval: float = 1.0, backoff: float = 0.25, request_timeout: float = 10.0, connector: Optional[aiohttp.BaseConnector] = None, session: Optional[aiohttp.ClientSession] = None, json: bool = False)[source]
Fetch from an HTTP server with automatic retries and timeouts.
- Parameters
url – the address to fetch
headers – an optional dict of headers to send
retry – flag to retry the fetch
max_attempts – the maximum number of attempts to make
interval – the interval between retries, in seconds
backoff – the backoff interval, in seconds
request_timeout – the HTTP request timeout, in seconds
connector – an optional existing BaseConnector
session – a shared ClientSession
json – flag to parse the result as JSON
- async aries_cloudagent.utils.http.fetch_stream(url: str, *, headers: Optional[dict] = None, retry: bool = True, max_attempts: int = 5, interval: float = 1.0, backoff: float = 0.25, request_timeout: float = 10.0, connector: Optional[aiohttp.BaseConnector] = None, session: Optional[aiohttp.ClientSession] = None)[source]
Fetch from an HTTP server with automatic retries and timeouts.
- Parameters
url – the address to fetch
headers – an optional dict of headers to send
retry – flag to retry the fetch
max_attempts – the maximum number of attempts to make
interval – the interval between retries, in seconds
backoff – the backoff interval, in seconds
request_timeout – the HTTP request timeout, in seconds
connector – an optional existing BaseConnector
session – a shared ClientSession
json – flag to parse the result as JSON
- async aries_cloudagent.utils.http.put_file(url: str, file_data: dict, extra_data: dict, *, retry: bool = True, max_attempts: int = 5, interval: float = 1.0, backoff: float = 0.25, request_timeout: float = 10.0, connector: Optional[aiohttp.BaseConnector] = None, session: Optional[aiohttp.ClientSession] = None, json: bool = False)[source]
Put to HTTP server with automatic retries and timeouts.
- Parameters
url – the address to use
file_data – dict with data key and path of file to upload
extra_data – further content to include in data to put
headers – an optional dict of headers to send
retry – flag to retry the fetch
max_attempts – the maximum number of attempts to make
interval – the interval between retries, in seconds
backoff – the backoff interval, in seconds
request_timeout – the HTTP request timeout, in seconds
connector – an optional existing BaseConnector
session – a shared ClientSession
json – flag to parse the result as JSON
aries_cloudagent.utils.jwe module
JSON Web Encryption utilities.
- class aries_cloudagent.utils.jwe.B64Value(*args: Any, **kwargs: Any)[source]
Bases:
marshmallow.fields.
A marshmallow-compatible wrapper for base64-URL values.
- class aries_cloudagent.utils.jwe.JweEnvelope(*, protected: Optional[dict] = None, protected_b64: Optional[bytes] = None, unprotected: Optional[dict] = None, ciphertext: Optional[bytes] = None, iv: Optional[bytes] = None, tag: Optional[bytes] = None, aad: Optional[bytes] = None, with_protected_recipients: bool = False, with_flatten_recipients: bool = True)[source]
Bases:
object
JWE envelope instance.
- add_recipient(recip: aries_cloudagent.utils.jwe.JweRecipient)[source]
Add a recipient to the JWE envelope.
- classmethod deserialize(message: Mapping[str, Any]) aries_cloudagent.utils.jwe.JweEnvelope [source]
Deserialize a JWE envelope from a mapping.
- classmethod from_json(message: Union[bytes, str]) aries_cloudagent.utils.jwe.JweEnvelope [source]
Decode a JWE envelope from a JSON string or bytes value.
- get_recipient(kid: str) aries_cloudagent.utils.jwe.JweRecipient [source]
Find a recipient by key ID.
- property protected_bytes: bytes
Access the protected data encoded as bytes.
This value is used in the additional authenticated data when encrypting.
- property recipient_key_ids: Iterable[aries_cloudagent.utils.jwe.JweRecipient]
Accessor for an iterator over the JWE recipient key identifiers.
- property recipients: Iterable[aries_cloudagent.utils.jwe.JweRecipient]
Accessor for an iterator over the JWE recipients.
The headers for each recipient include protected and unprotected headers from the outer envelope.
- class aries_cloudagent.utils.jwe.JweRecipient(*, encrypted_key: bytes, header: Optional[dict] = None)[source]
Bases:
object
A single message recipient.
- classmethod deserialize(entry: Mapping[str, Any]) aries_cloudagent.utils.jwe.JweRecipient [source]
Deserialize a JWE recipient from a mapping.
- class aries_cloudagent.utils.jwe.JweRecipientSchema(*args: Any, **kwargs: Any)[source]
Bases:
marshmallow.
JWE recipient schema.
- encrypted_key
A marshmallow-compatible wrapper for base64-URL values.
- header
- class aries_cloudagent.utils.jwe.JweSchema(*args: Any, **kwargs: Any)[source]
Bases:
marshmallow.
JWE envelope schema.
- aad
A marshmallow-compatible wrapper for base64-URL values.
- ciphertext
A marshmallow-compatible wrapper for base64-URL values.
- encrypted_key
A marshmallow-compatible wrapper for base64-URL values.
- header
- iv
A marshmallow-compatible wrapper for base64-URL values.
- protected
- recipients
- tag
A marshmallow-compatible wrapper for base64-URL values.
- unprotected
aries_cloudagent.utils.outofband module
Utilities for creating out-of-band messages.
- aries_cloudagent.utils.outofband.serialize_outofband(message: aries_cloudagent.messaging.agent_message.AgentMessage, did: aries_cloudagent.wallet.did_info.DIDInfo, endpoint: str) str [source]
Serialize the agent message as an out-of-band message.
- Returns
An OOB message in URL format.
aries_cloudagent.utils.repeat module
Utils for repeating tasks.
- class aries_cloudagent.utils.repeat.RepeatAttempt(seq: aries_cloudagent.utils.repeat.RepeatSequence, index: int = 1)[source]
Bases:
object
Represents the current iteration in a repeat sequence.
- next() aries_cloudagent.utils.repeat.RepeatAttempt [source]
Get the next attempt instance.
aries_cloudagent.utils.stats module
Classes for tracking performance and timing.
- class aries_cloudagent.utils.stats.Collector(*, enabled: bool = True, log_path: Optional[str] = None)[source]
Bases:
object
Collector for a set of statistics.
- extract(groups: Optional[Sequence[str]] = None) dict [source]
Extract statistics for a specific set of groups.
- log(name: str, duration: float, start: Optional[float] = None)[source]
Log an entry in the statistics if the collector is enabled.
- wrap(obj, prop_name: Union[str, Sequence[str]], groups: Optional[Sequence[str]] = None, *, ignore_missing: bool = False)[source]
Wrap a method on a class or class instance.
- class aries_cloudagent.utils.stats.Timer(collector: aries_cloudagent.utils.stats.Collector, groups: Sequence[str])[source]
Bases:
object
Timer instance for a running task.
- start() aries_cloudagent.utils.stats.Timer [source]
Start the timer.
aries_cloudagent.utils.task_queue module
Classes for managing a set of asyncio tasks.
- class aries_cloudagent.utils.task_queue.CompletedTask(task: _asyncio.Task, exc_info: Tuple, ident: Optional[str] = None, timing: Optional[dict] = None)[source]
Bases:
object
Represent the result of a queued task.
- class aries_cloudagent.utils.task_queue.PendingTask(coro: Coroutine, complete_hook: Optional[Callable] = None, ident: Optional[str] = None, task_future: Optional[_asyncio.Future] = None, queued_time: Optional[float] = None)[source]
Bases:
object
Represent a task in the queue.
- property cancelled
Accessor for the cancelled property.
- property task: _asyncio.Task
Accessor for the task.
- class aries_cloudagent.utils.task_queue.TaskQueue(max_active: int = 0, timed: bool = False, trace_fn: Optional[Callable] = None)[source]
Bases:
object
A class for managing a set of asyncio tasks.
- add_active(task: _asyncio.Task, task_complete: Optional[Callable] = None, ident: Optional[str] = None, timing: Optional[dict] = None) _asyncio.Task [source]
Register an active async task with an optional completion callback.
- Parameters
task – The asyncio task instance
task_complete – An optional callback to run on completion
ident – A string identifer for the task
timing – An optional dictionary of timing information
- add_pending(pending: aries_cloudagent.utils.task_queue.PendingTask)[source]
Add a task to the pending queue.
- Parameters
pending – The PendingTask to add to the task queue
- async complete(timeout: Optional[float] = None, cleanup: bool = True)[source]
Cancel any pending tasks and wait for, or cancel active tasks.
- completed_task(task: _asyncio.Task, task_complete: Callable, ident: str, timing: Optional[dict] = None)[source]
Clean up after a task has completed and run callbacks.
- put(coro: Coroutine, task_complete: Optional[Callable] = None, ident: Optional[str] = None) aries_cloudagent.utils.task_queue.PendingTask [source]
Add a new task to the queue, delaying execution if busy.
- Parameters
coro – The coroutine to run
task_complete – A callback to run on completion
ident – A string identifier for the task
Returns: a future resolving to the asyncio task instance once queued
- run(coro: Coroutine, task_complete: Optional[Callable] = None, ident: Optional[str] = None, timing: Optional[dict] = None) _asyncio.Task [source]
Start executing a coroutine as an async task, bypassing the pending queue.
- Parameters
coro – The coroutine to run
task_complete – An optional callback to run on completion
ident – A string identifier for the task
timing – An optional dictionary of timing information
Returns: the new asyncio task instance
- aries_cloudagent.utils.task_queue.coro_ident(coro: Coroutine)[source]
Extract an identifier for a coroutine.
aries_cloudagent.utils.tracing module
Event tracing.
- class aries_cloudagent.utils.tracing.AdminAPIMessageTracingSchema(*args: Any, **kwargs: Any)[source]
Bases:
marshmallow.
Request/result schema including agent message tracing.
This is to be used as a superclass for aca-py admin input/output messages that need to support tracing.
- trace
- aries_cloudagent.utils.tracing.decode_inbound_message(message)[source]
Return bundled message if appropriate.
- aries_cloudagent.utils.tracing.trace_event(context, message, handler: Optional[str] = None, outcome: Optional[str] = None, perf_counter: Optional[float] = None, force_trace: bool = False, raise_errors: bool = False) float [source]
Log a trace event to a configured target.
- Parameters
context – The application context, attributes of interest are: context[“trace.enabled”]: True if we are logging events context[“trace.target”]: Trace target (“log”, “message” or an http endpoint) context[“trace.tag”]: Tag to be included in trace output
message – the current message, can be an AgentMessage, InboundMessage, OutboundMessage or Exchange record
event – Dict that will be converted to json and posted to the target