aries_cloudagent.utils package
Subpackages
Submodules
aries_cloudagent.utils.classloader module
The classloader provides utilties to dynamically load classes and modules.
- class aries_cloudagent.utils.classloader.ClassLoader[source]
Bases:
object
Class used to load classes from modules dynamically.
- classmethod load_class(class_name: str, default_module: Optional[str] = None, package: Optional[str] = None)[source]
Resolve a complete class path (ie. typing.Dict) to the class itself.
- Parameters
class_name – the class name
default_module – the default module to load, if not part of in the class name
package – the parent package to search for the module
- Returns
The resolved class
- Raises
ClassNotFoundError – If the class could not be resolved at path
ModuleLoadError – If there was an error loading the module
- classmethod load_module(mod_path: str, package: Optional[str] = None) module [source]
Load a module by its absolute path.
- Parameters
mod_path – the absolute or relative module path
package – the parent package to search for the module
- Returns
The resolved module or None if the module cannot be found
- Raises
ModuleLoadError – If there was an error loading the module
- classmethod load_subclass_of(base_class: Type, mod_path: str, package: Optional[str] = None)[source]
Resolve an implementation of a base path within a module.
- Parameters
base_class – the base class being implemented
mod_path – the absolute module path
package – the parent package to search for the module
- Returns
The resolved class
- Raises
ClassNotFoundError – If the module or class implementation could not be found
ModuleLoadError – If there was an error loading the module
- exception aries_cloudagent.utils.classloader.ClassNotFoundError(*args, error_code: Optional[str] = None, **kwargs)[source]
Bases:
BaseError
Class not found error.
aries_cloudagent.utils.dependencies module
Dependency related util methods.
- aries_cloudagent.utils.dependencies.assert_ursa_bbs_signatures_installed()[source]
Assert ursa_bbs_signatures module is installed.
aries_cloudagent.utils.endorsement_setup module
aries_cloudagent.utils.env module
Environment utility methods.
aries_cloudagent.utils.http module
HTTP utility methods.
- exception aries_cloudagent.utils.http.FetchError(*args, error_code: Optional[str] = None, **kwargs)[source]
Bases:
BaseError
Error raised when an HTTP fetch fails.
- exception aries_cloudagent.utils.http.PutError(*args, error_code: Optional[str] = None, **kwargs)[source]
Bases:
BaseError
Error raised when an HTTP put fails.
- async aries_cloudagent.utils.http.fetch(url: str, *, headers: Optional[dict] = None, retry: bool = True, max_attempts: int = 5, interval: float = 1.0, backoff: float = 0.25, request_timeout: float = 10.0, connector: Optional[aiohttp.BaseConnector] = None, session: Optional[aiohttp.ClientSession] = None, json: bool = False)[source]
Fetch from an HTTP server with automatic retries and timeouts.
- Parameters
url – the address to fetch
headers – an optional dict of headers to send
retry – flag to retry the fetch
max_attempts – the maximum number of attempts to make
interval – the interval between retries, in seconds
backoff – the backoff interval, in seconds
request_timeout – the HTTP request timeout, in seconds
connector – an optional existing BaseConnector
session – a shared ClientSession
json – flag to parse the result as JSON
- async aries_cloudagent.utils.http.fetch_stream(url: str, *, headers: Optional[dict] = None, retry: bool = True, max_attempts: int = 5, interval: float = 1.0, backoff: float = 0.25, request_timeout: float = 10.0, connector: Optional[aiohttp.BaseConnector] = None, session: Optional[aiohttp.ClientSession] = None)[source]
Fetch from an HTTP server with automatic retries and timeouts.
- Parameters
url – the address to fetch
headers – an optional dict of headers to send
retry – flag to retry the fetch
max_attempts – the maximum number of attempts to make
interval – the interval between retries, in seconds
backoff – the backoff interval, in seconds
request_timeout – the HTTP request timeout, in seconds
connector – an optional existing BaseConnector
session – a shared ClientSession
json – flag to parse the result as JSON
- async aries_cloudagent.utils.http.put_file(url: str, file_data: dict, extra_data: dict, *, retry: bool = True, max_attempts: int = 5, interval: float = 1.0, backoff: float = 0.25, request_timeout: float = 10.0, connector: Optional[aiohttp.BaseConnector] = None, session: Optional[aiohttp.ClientSession] = None, json: bool = False)[source]
Put to HTTP server with automatic retries and timeouts.
- Parameters
url – the address to use
file_data – dict with data key and path of file to upload
extra_data – further content to include in data to put
headers – an optional dict of headers to send
retry – flag to retry the fetch
max_attempts – the maximum number of attempts to make
interval – the interval between retries, in seconds
backoff – the backoff interval, in seconds
request_timeout – the HTTP request timeout, in seconds
connector – an optional existing BaseConnector
session – a shared ClientSession
json – flag to parse the result as JSON
aries_cloudagent.utils.jwe module
JSON Web Encryption utilities.
- class aries_cloudagent.utils.jwe.B64Value(*args: Any, **kwargs: Any)[source]
Bases:
Str
A marshmallow-compatible wrapper for base64-URL values.
- class aries_cloudagent.utils.jwe.JweEnvelope(*, protected: Optional[dict] = None, protected_b64: Optional[bytes] = None, unprotected: Optional[dict] = None, ciphertext: Optional[bytes] = None, iv: Optional[bytes] = None, tag: Optional[bytes] = None, aad: Optional[bytes] = None, with_protected_recipients: bool = False, with_flatten_recipients: bool = True)[source]
Bases:
object
JWE envelope instance.
- add_recipient(recip: JweRecipient)[source]
Add a recipient to the JWE envelope.
- classmethod deserialize(message: Mapping[str, Any]) JweEnvelope [source]
Deserialize a JWE envelope from a mapping.
- classmethod from_json(message: Union[bytes, str]) JweEnvelope [source]
Decode a JWE envelope from a JSON string or bytes value.
- get_recipient(kid: str) JweRecipient [source]
Find a recipient by key ID.
- property protected_bytes: bytes
Access the protected data encoded as bytes.
This value is used in the additional authenticated data when encrypting.
- property recipient_key_ids: Iterable[JweRecipient]
Accessor for an iterator over the JWE recipient key identifiers.
- property recipients: Iterable[JweRecipient]
Accessor for an iterator over the JWE recipients.
The headers for each recipient include protected and unprotected headers from the outer envelope.
- class aries_cloudagent.utils.jwe.JweRecipient(*, encrypted_key: bytes, header: Optional[dict] = None)[source]
Bases:
object
A single message recipient.
- class aries_cloudagent.utils.jwe.JweRecipientSchema(*args: Any, **kwargs: Any)[source]
Bases:
Schema
JWE recipient schema.
- class aries_cloudagent.utils.jwe.JweSchema(*args: Any, **kwargs: Any)[source]
Bases:
Schema
JWE envelope schema.
aries_cloudagent.utils.multi_ledger module
Multiledger related utility methods.
- aries_cloudagent.utils.multi_ledger.get_write_ledger_config_for_profile(settings: BaseSettings) dict [source]
Return initial/default write ledger config on profile creation.
aries_cloudagent.utils.outofband module
Utilities for creating out-of-band messages.
aries_cloudagent.utils.profiles module
Profile utilities.
- aries_cloudagent.utils.profiles.is_anoncreds_profile_raise_web_exception(profile: Profile) None [source]
Raise a web exception when the supplied profile is anoncreds.
aries_cloudagent.utils.repeat module
Utils for repeating tasks.
- class aries_cloudagent.utils.repeat.RepeatAttempt(seq: RepeatSequence, index: int = 1)[source]
Bases:
object
Represents the current iteration in a repeat sequence.
- next() RepeatAttempt [source]
Get the next attempt instance.
aries_cloudagent.utils.stats module
Classes for tracking performance and timing.
- class aries_cloudagent.utils.stats.Collector(*, enabled: bool = True, log_path: Optional[str] = None)[source]
Bases:
object
Collector for a set of statistics.
- extract(groups: Optional[Sequence[str]] = None) dict [source]
Extract statistics for a specific set of groups.
- log(name: str, duration: float, start: Optional[float] = None)[source]
Log an entry in the statistics if the collector is enabled.
- wrap(obj, prop_name: Union[str, Sequence[str]], groups: Optional[Sequence[str]] = None, *, ignore_missing: bool = False)[source]
Wrap a method on a class or class instance.
aries_cloudagent.utils.task_queue module
Classes for managing a set of asyncio tasks.
- class aries_cloudagent.utils.task_queue.CompletedTask(task: Task, exc_info: Tuple, ident: Optional[str] = None, timing: Optional[dict] = None)[source]
Bases:
object
Represent the result of a queued task.
- class aries_cloudagent.utils.task_queue.PendingTask(coro: Coroutine, complete_hook: Optional[Callable] = None, ident: Optional[str] = None, task_future: Optional[Future] = None, queued_time: Optional[float] = None)[source]
Bases:
object
Represent a task in the queue.
- property cancelled
Accessor for the cancelled property.
- property task: Task
Accessor for the task.
- class aries_cloudagent.utils.task_queue.TaskQueue(max_active: int = 0, timed: bool = False, trace_fn: Optional[Callable] = None)[source]
Bases:
object
A class for managing a set of asyncio tasks.
- add_active(task: Task, task_complete: Optional[Callable] = None, ident: Optional[str] = None, timing: Optional[dict] = None) Task [source]
Register an active async task with an optional completion callback.
- Parameters
task – The asyncio task instance
task_complete – An optional callback to run on completion
ident – A string identifer for the task
timing – An optional dictionary of timing information
- add_pending(pending: PendingTask)[source]
Add a task to the pending queue.
- Parameters
pending – The PendingTask to add to the task queue
- async complete(timeout: Optional[float] = None, cleanup: bool = True)[source]
Cancel any pending tasks and wait for, or cancel active tasks.
- completed_task(task: Task, task_complete: Callable, ident: str, timing: Optional[dict] = None)[source]
Clean up after a task has completed and run callbacks.
- put(coro: Coroutine, task_complete: Optional[Callable] = None, ident: Optional[str] = None) PendingTask [source]
Add a new task to the queue, delaying execution if busy.
- Parameters
coro – The coroutine to run
task_complete – A callback to run on completion
ident – A string identifier for the task
Returns: a future resolving to the asyncio task instance once queued
- run(coro: Coroutine, task_complete: Optional[Callable] = None, ident: Optional[str] = None, timing: Optional[dict] = None) Task [source]
Start executing a coroutine as an async task, bypassing the pending queue.
- Parameters
coro – The coroutine to run
task_complete – An optional callback to run on completion
ident – A string identifier for the task
timing – An optional dictionary of timing information
Returns: the new asyncio task instance
- aries_cloudagent.utils.task_queue.coro_ident(coro: Coroutine)[source]
Extract an identifier for a coroutine.
aries_cloudagent.utils.tracing module
Event tracing.
- class aries_cloudagent.utils.tracing.AdminAPIMessageTracingSchema(*args: Any, **kwargs: Any)[source]
Bases:
OpenAPISchema
Request/result schema including agent message tracing.
This is to be used as a superclass for aca-py admin input/output messages that need to support tracing.
- aries_cloudagent.utils.tracing.decode_inbound_message(message)[source]
Return bundled message if appropriate.
- aries_cloudagent.utils.tracing.trace_event(context, message, handler: Optional[str] = None, outcome: Optional[str] = None, perf_counter: Optional[float] = None, force_trace: bool = False, raise_errors: bool = False) float [source]
Log a trace event to a configured target.
- Parameters
context – The application context, attributes of interest are: context[“trace.enabled”]: True if we are logging events context[“trace.target”]: Trace target (“log”, “message” or an http endpoint) context[“trace.tag”]: Tag to be included in trace output
message – the current message, can be an AgentMessage, InboundMessage, OutboundMessage or Exchange record
event – Dict that will be converted to json and posted to the target