aries_cloudagent.utils package

Subpackages

Submodules

aries_cloudagent.utils.classloader module

The classloader provides utilties to dynamically load classes and modules.

class aries_cloudagent.utils.classloader.ClassLoader[source]

Bases: object

Class used to load classes from modules dynamically.

classmethod load_class(class_name: str, default_module: Optional[str] = None, package: Optional[str] = None)[source]

Resolve a complete class path (ie. typing.Dict) to the class itself.

Parameters
  • class_name – the class name

  • default_module – the default module to load, if not part of in the class name

  • package – the parent package to search for the module

Returns

The resolved class

Raises
classmethod load_module(mod_path: str, package: Optional[str] = None) module[source]

Load a module by its absolute path.

Parameters
  • mod_path – the absolute or relative module path

  • package – the parent package to search for the module

Returns

The resolved module or None if the module cannot be found

Raises

ModuleLoadError – If there was an error loading the module

classmethod load_subclass_of(base_class: Type, mod_path: str, package: Optional[str] = None)[source]

Resolve an implementation of a base path within a module.

Parameters
  • base_class – the base class being implemented

  • mod_path – the absolute module path

  • package – the parent package to search for the module

Returns

The resolved class

Raises
classmethod scan_subpackages(package: str) Sequence[str][source]

Return a list of sub-packages defined under a named package.

exception aries_cloudagent.utils.classloader.ClassNotFoundError(*args, error_code: Optional[str] = None, **kwargs)[source]

Bases: BaseError

Class not found error.

class aries_cloudagent.utils.classloader.DeferLoad(cls_path: str)[source]

Bases: object

Helper to defer loading of a class definition.

property resolved

Accessor for the resolved class instance.

exception aries_cloudagent.utils.classloader.ModuleLoadError(*args, error_code: Optional[str] = None, **kwargs)[source]

Bases: BaseError

Module load error.

aries_cloudagent.utils.dependencies module

Dependency related util methods.

aries_cloudagent.utils.dependencies.assert_ursa_bbs_signatures_installed()[source]

Assert ursa_bbs_signatures module is installed.

aries_cloudagent.utils.dependencies.is_indy_sdk_module_installed()[source]

Check whether indy (indy-sdk) module is installed.

Returns

Whether indy (indy-sdk) is installed.

Return type

bool

aries_cloudagent.utils.dependencies.is_ursa_bbs_signatures_module_installed()[source]

Check whether ursa_bbs_signatures module is installed.

Returns

Whether ursa_bbs_signatures is installed.

Return type

bool

aries_cloudagent.utils.env module

Environment utility methods.

aries_cloudagent.utils.env.storage_path(*subpaths, create: bool = False) Path[source]

Get the default aca-py home directory.

aries_cloudagent.utils.http module

HTTP utility methods.

exception aries_cloudagent.utils.http.FetchError(*args, error_code: Optional[str] = None, **kwargs)[source]

Bases: BaseError

Error raised when an HTTP fetch fails.

exception aries_cloudagent.utils.http.PutError(*args, error_code: Optional[str] = None, **kwargs)[source]

Bases: BaseError

Error raised when an HTTP put fails.

async aries_cloudagent.utils.http.fetch(url: str, *, headers: Optional[dict] = None, retry: bool = True, max_attempts: int = 5, interval: float = 1.0, backoff: float = 0.25, request_timeout: float = 10.0, connector: Optional[aiohttp.BaseConnector] = None, session: Optional[aiohttp.ClientSession] = None, json: bool = False)[source]

Fetch from an HTTP server with automatic retries and timeouts.

Parameters
  • url – the address to fetch

  • headers – an optional dict of headers to send

  • retry – flag to retry the fetch

  • max_attempts – the maximum number of attempts to make

  • interval – the interval between retries, in seconds

  • backoff – the backoff interval, in seconds

  • request_timeout – the HTTP request timeout, in seconds

  • connector – an optional existing BaseConnector

  • session – a shared ClientSession

  • json – flag to parse the result as JSON

async aries_cloudagent.utils.http.fetch_stream(url: str, *, headers: Optional[dict] = None, retry: bool = True, max_attempts: int = 5, interval: float = 1.0, backoff: float = 0.25, request_timeout: float = 10.0, connector: Optional[aiohttp.BaseConnector] = None, session: Optional[aiohttp.ClientSession] = None)[source]

Fetch from an HTTP server with automatic retries and timeouts.

Parameters
  • url – the address to fetch

  • headers – an optional dict of headers to send

  • retry – flag to retry the fetch

  • max_attempts – the maximum number of attempts to make

  • interval – the interval between retries, in seconds

  • backoff – the backoff interval, in seconds

  • request_timeout – the HTTP request timeout, in seconds

  • connector – an optional existing BaseConnector

  • session – a shared ClientSession

  • json – flag to parse the result as JSON

async aries_cloudagent.utils.http.put_file(url: str, file_data: dict, extra_data: dict, *, retry: bool = True, max_attempts: int = 5, interval: float = 1.0, backoff: float = 0.25, request_timeout: float = 10.0, connector: Optional[aiohttp.BaseConnector] = None, session: Optional[aiohttp.ClientSession] = None, json: bool = False)[source]

Put to HTTP server with automatic retries and timeouts.

Parameters
  • url – the address to use

  • file_data – dict with data key and path of file to upload

  • extra_data – further content to include in data to put

  • headers – an optional dict of headers to send

  • retry – flag to retry the fetch

  • max_attempts – the maximum number of attempts to make

  • interval – the interval between retries, in seconds

  • backoff – the backoff interval, in seconds

  • request_timeout – the HTTP request timeout, in seconds

  • connector – an optional existing BaseConnector

  • session – a shared ClientSession

  • json – flag to parse the result as JSON

aries_cloudagent.utils.jwe module

JSON Web Encryption utilities.

class aries_cloudagent.utils.jwe.B64Value(*args: Any, **kwargs: Any)[source]

Bases: Str

A marshmallow-compatible wrapper for base64-URL values.

class aries_cloudagent.utils.jwe.JweEnvelope(*, protected: Optional[dict] = None, protected_b64: Optional[bytes] = None, unprotected: Optional[dict] = None, ciphertext: Optional[bytes] = None, iv: Optional[bytes] = None, tag: Optional[bytes] = None, aad: Optional[bytes] = None, with_protected_recipients: bool = False, with_flatten_recipients: bool = True)[source]

Bases: object

JWE envelope instance.

add_recipient(recip: JweRecipient)[source]

Add a recipient to the JWE envelope.

property combined_aad: bytes

Accessor for the additional authenticated data.

classmethod deserialize(message: Mapping[str, Any]) JweEnvelope[source]

Deserialize a JWE envelope from a mapping.

classmethod from_json(message: Union[bytes, str]) JweEnvelope[source]

Decode a JWE envelope from a JSON string or bytes value.

get_recipient(kid: str) JweRecipient[source]

Find a recipient by key ID.

property protected_bytes: bytes

Access the protected data encoded as bytes.

This value is used in the additional authenticated data when encrypting.

property recipient_key_ids: Iterable[JweRecipient]

Accessor for an iterator over the JWE recipient key identifiers.

property recipients: Iterable[JweRecipient]

Accessor for an iterator over the JWE recipients.

The headers for each recipient include protected and unprotected headers from the outer envelope.

property recipients_json: List[Dict[str, Any]]

Encode the current recipients for JSON.

serialize() dict[source]

Serialize the JWE envelope to a mapping.

set_payload(ciphertext: bytes, iv: bytes, tag: bytes, aad: Optional[bytes] = None)[source]

Set the payload of the JWE envelope.

set_protected(protected: Mapping[str, Any])[source]

Set the protected headers of the JWE envelope.

to_json() str[source]

Serialize the JWE envelope to a JSON string.

class aries_cloudagent.utils.jwe.JweRecipient(*, encrypted_key: bytes, header: Optional[dict] = None)[source]

Bases: object

A single message recipient.

classmethod deserialize(entry: Mapping[str, Any]) JweRecipient[source]

Deserialize a JWE recipient from a mapping.

serialize() dict[source]

Serialize the JWE recipient to a mapping.

class aries_cloudagent.utils.jwe.JweRecipientSchema(*args: Any, **kwargs: Any)[source]

Bases: Schema

JWE recipient schema.

class aries_cloudagent.utils.jwe.JweSchema(*args: Any, **kwargs: Any)[source]

Bases: Schema

JWE envelope schema.

aries_cloudagent.utils.jwe.b64url(value: Union[bytes, str]) str[source]

Encode a string or bytes value as unpadded base64-URL.

aries_cloudagent.utils.jwe.from_b64url(value: str) bytes[source]

Decode an unpadded base64-URL value.

aries_cloudagent.utils.multi_ledger module

Multiledger related utility methods.

aries_cloudagent.utils.multi_ledger.get_write_ledger_config_for_profile(settings: BaseSettings) dict[source]

Return initial/default write ledger config on profile creation.

aries_cloudagent.utils.outofband module

Utilities for creating out-of-band messages.

aries_cloudagent.utils.outofband.serialize_outofband(message: AgentMessage, did: DIDInfo, endpoint: str) str[source]

Serialize the agent message as an out-of-band message.

Returns

An OOB message in URL format.

aries_cloudagent.utils.repeat module

Utils for repeating tasks.

class aries_cloudagent.utils.repeat.RepeatAttempt(seq: RepeatSequence, index: int = 1)[source]

Bases: object

Represents the current iteration in a repeat sequence.

property final: bool

Check if this is the last instance in the sequence.

next() RepeatAttempt[source]

Get the next attempt instance.

property next_interval: float

Calculate the interval before the next attempt.

timeout(interval: Optional[float] = None)[source]

Create a context manager for timing out an attempt.

class aries_cloudagent.utils.repeat.RepeatSequence(limit: int = 0, interval: float = 0.0, backoff: float = 0.0)[source]

Bases: object

Represents a repetition sequence.

next_interval(index: int) float[source]

Calculate the time before the next attempt.

start() RepeatAttempt[source]

Get the first attempt in the sequence.

aries_cloudagent.utils.stats module

Classes for tracking performance and timing.

class aries_cloudagent.utils.stats.Collector(*, enabled: bool = True, log_path: Optional[str] = None)[source]

Bases: object

Collector for a set of statistics.

property enabled: bool

Accessor for the collector’s enabled property.

extract(groups: Optional[Sequence[str]] = None) dict[source]

Extract statistics for a specific set of groups.

log(name: str, duration: float, start: Optional[float] = None)[source]

Log an entry in the statistics if the collector is enabled.

mark(*names)[source]

Make a custom decorator function for adding to the set of groups.

reset()[source]

Reset the collector’s statistics.

property results: dict

Accessor for the current set of collected statistics.

timer(*groups)[source]

Create a new timer attached to this collector.

wrap(obj, prop_name: Union[str, Sequence[str]], groups: Optional[Sequence[str]] = None, *, ignore_missing: bool = False)[source]

Wrap a method on a class or class instance.

wrap_coro(fn, groups: Sequence[str])[source]

Wrap a coroutine instance to collect timing statistics on execution.

wrap_fn(fn, groups: Sequence[str])[source]

Wrap a function instance to collect timing statistics on execution.

class aries_cloudagent.utils.stats.Stats[source]

Bases: object

A collection of statistics.

extract(names: Optional[Sequence[str]] = None) dict[source]

Summarize the stats in a dictionary.

log(name: str, duration: float)[source]

Log an entry in the stats.

class aries_cloudagent.utils.stats.Timer(collector: Collector, groups: Sequence[str])[source]

Bases: object

Timer instance for a running task.

classmethod now()[source]

Fetch a standard timer value.

start() Timer[source]

Start the timer.

stop()[source]

Stop the timer.

aries_cloudagent.utils.task_queue module

Classes for managing a set of asyncio tasks.

class aries_cloudagent.utils.task_queue.CompletedTask(task: Task, exc_info: Tuple, ident: Optional[str] = None, timing: Optional[dict] = None)[source]

Bases: object

Represent the result of a queued task.

class aries_cloudagent.utils.task_queue.PendingTask(coro: Coroutine, complete_hook: Optional[Callable] = None, ident: Optional[str] = None, task_future: Optional[Future] = None, queued_time: Optional[float] = None)[source]

Bases: object

Represent a task in the queue.

cancel()[source]

Cancel the pending task.

property cancelled

Accessor for the cancelled property.

property task: Task

Accessor for the task.

class aries_cloudagent.utils.task_queue.TaskQueue(max_active: int = 0, timed: bool = False, trace_fn: Optional[Callable] = None)[source]

Bases: object

A class for managing a set of asyncio tasks.

add_active(task: Task, task_complete: Optional[Callable] = None, ident: Optional[str] = None, timing: Optional[dict] = None) Task[source]

Register an active async task with an optional completion callback.

Parameters
  • task – The asyncio task instance

  • task_complete – An optional callback to run on completion

  • ident – A string identifer for the task

  • timing – An optional dictionary of timing information

add_pending(pending: PendingTask)[source]

Add a task to the pending queue.

Parameters

pending – The PendingTask to add to the task queue

cancel()[source]

Cancel any pending or active tasks in the queue.

cancel_pending()[source]

Cancel any pending tasks in the queue.

property cancelled: bool

Accessor for the cancelled property of the queue.

async complete(timeout: Optional[float] = None, cleanup: bool = True)[source]

Cancel any pending tasks and wait for, or cancel active tasks.

completed_task(task: Task, task_complete: Callable, ident: str, timing: Optional[dict] = None)[source]

Clean up after a task has completed and run callbacks.

property current_active: int

Accessor for the current number of active tasks in the queue.

property current_pending: int

Accessor for the current number of pending tasks in the queue.

property current_size: int

Accessor for the total number of tasks in the queue.

drain() Task[source]

Start the process to run queued tasks.

async flush()[source]

Wait for any active or pending tasks to be completed.

property max_active: int

Accessor for the maximum number of active tasks in the queue.

put(coro: Coroutine, task_complete: Optional[Callable] = None, ident: Optional[str] = None) PendingTask[source]

Add a new task to the queue, delaying execution if busy.

Parameters
  • coro – The coroutine to run

  • task_complete – A callback to run on completion

  • ident – A string identifier for the task

Returns: a future resolving to the asyncio task instance once queued

property ready: bool

Accessor for the ready property of the queue.

run(coro: Coroutine, task_complete: Optional[Callable] = None, ident: Optional[str] = None, timing: Optional[dict] = None) Task[source]

Start executing a coroutine as an async task, bypassing the pending queue.

Parameters
  • coro – The coroutine to run

  • task_complete – An optional callback to run on completion

  • ident – A string identifier for the task

  • timing – An optional dictionary of timing information

Returns: the new asyncio task instance

async wait_for(timeout: float)[source]

Wait for all queued tasks to complete with a timeout.

aries_cloudagent.utils.task_queue.coro_ident(coro: Coroutine)[source]

Extract an identifier for a coroutine.

async aries_cloudagent.utils.task_queue.coro_timed(coro: Coroutine, timing: dict)[source]

Capture timing for a coroutine.

aries_cloudagent.utils.task_queue.task_exc_info(task: Task)[source]

Extract exception info from an asyncio task.

aries_cloudagent.utils.tracing module

Event tracing.

class aries_cloudagent.utils.tracing.AdminAPIMessageTracingSchema(*args: Any, **kwargs: Any)[source]

Bases: OpenAPISchema

Request/result schema including agent message tracing.

This is to be used as a superclass for aca-py admin input/output messages that need to support tracing.

aries_cloudagent.utils.tracing.decode_inbound_message(message)[source]

Return bundled message if appropriate.

aries_cloudagent.utils.tracing.get_timer() float[source]

Return a timer.

aries_cloudagent.utils.tracing.trace_event(context, message, handler: Optional[str] = None, outcome: Optional[str] = None, perf_counter: Optional[float] = None, force_trace: bool = False, raise_errors: bool = False) float[source]

Log a trace event to a configured target.

Parameters
  • context – The application context, attributes of interest are: context[“trace.enabled”]: True if we are logging events context[“trace.target”]: Trace target (“log”, “message” or an http endpoint) context[“trace.tag”]: Tag to be included in trace output

  • message – the current message, can be an AgentMessage, InboundMessage, OutboundMessage or Exchange record

  • event – Dict that will be converted to json and posted to the target

aries_cloudagent.utils.tracing.tracing_enabled(context, message) bool[source]

Determine whether to log trace messages or not.