aries_cloudagent.utils package

Submodules

aries_cloudagent.utils.classloader module

The classloader provides utilties to dynamically load classes and modules.

class aries_cloudagent.utils.classloader.ClassLoader[source]

Bases: object

Class used to load classes from modules dynamically.

classmethod load_class(class_name: str, default_module: Optional[str] = None, package: Optional[str] = None)[source]

Resolve a complete class path (ie. typing.Dict) to the class itself.

Parameters:
  • class_name – the class name
  • default_module – the default module to load, if not part of in the class name
  • package – the parent package to search for the module
Returns:

The resolved class

Raises:
classmethod load_module(mod_path: str, package: str = None) → module[source]

Load a module by its absolute path.

Parameters:
  • mod_path – the absolute or relative module path
  • package – the parent package to search for the module
Returns:

The resolved module or None if the module cannot be found

Raises:

ModuleLoadError – If there was an error loading the module

classmethod load_subclass_of(base_class: Type, mod_path: str, package: str = None)[source]

Resolve an implementation of a base path within a module.

Parameters:
  • base_class – the base class being implemented
  • mod_path – the absolute module path
  • package – the parent package to search for the module
Returns:

The resolved class

Raises:
classmethod scan_subpackages(package: str) → Sequence[str][source]

Return a list of sub-packages defined under a named package.

exception aries_cloudagent.utils.classloader.ClassNotFoundError(*args, error_code: str = None, **kwargs)[source]

Bases: aries_cloudagent.core.error.BaseError

Class not found error.

class aries_cloudagent.utils.classloader.DeferLoad(cls_path: str)[source]

Bases: object

Helper to defer loading of a class definition.

resolved

Accessor for the resolved class instance.

exception aries_cloudagent.utils.classloader.ModuleLoadError(*args, error_code: str = None, **kwargs)[source]

Bases: aries_cloudagent.core.error.BaseError

Module load error.

aries_cloudagent.utils.dependencies module

Dependency related util methods.

aries_cloudagent.utils.dependencies.assert_ursa_bbs_signatures_installed()[source]

Assert ursa_bbs_signatures module is installed.

aries_cloudagent.utils.dependencies.is_indy_sdk_module_installed()[source]

Check whether indy (indy-sdk) module is installed.

Returns:Whether indy (indy-sdk) is installed.
Return type:bool
aries_cloudagent.utils.dependencies.is_ursa_bbs_signatures_module_installed()[source]

Check whether ursa_bbs_signatures module is installed.

Returns:Whether ursa_bbs_signatures is installed.
Return type:bool

aries_cloudagent.utils.env module

Environment utility methods.

aries_cloudagent.utils.env.storage_path(*subpaths, create: bool = False) → pathlib.Path[source]

Get the default aca-py home directory.

aries_cloudagent.utils.http module

HTTP utility methods.

exception aries_cloudagent.utils.http.FetchError(*args, error_code: str = None, **kwargs)[source]

Bases: aries_cloudagent.core.error.BaseError

Error raised when an HTTP fetch fails.

exception aries_cloudagent.utils.http.PutError(*args, error_code: str = None, **kwargs)[source]

Bases: aries_cloudagent.core.error.BaseError

Error raised when an HTTP put fails.

aries_cloudagent.utils.http.fetch(url: str, *, headers: dict = None, retry: bool = True, max_attempts: int = 5, interval: float = 1.0, backoff: float = 0.25, request_timeout: float = 10.0, connector: <sphinx.ext.autodoc.importer._MockObject object at 0x7f42e398b130> = None, session: <sphinx.ext.autodoc.importer._MockObject object at 0x7f42e2de4310> = None, json: bool = False)[source]

Fetch from an HTTP server with automatic retries and timeouts.

Parameters:
  • url – the address to fetch
  • headers – an optional dict of headers to send
  • retry – flag to retry the fetch
  • max_attempts – the maximum number of attempts to make
  • interval – the interval between retries, in seconds
  • backoff – the backoff interval, in seconds
  • request_timeout – the HTTP request timeout, in seconds
  • connector – an optional existing BaseConnector
  • session – a shared ClientSession
  • json – flag to parse the result as JSON
aries_cloudagent.utils.http.fetch_stream(url: str, *, headers: dict = None, retry: bool = True, max_attempts: int = 5, interval: float = 1.0, backoff: float = 0.25, request_timeout: float = 10.0, connector: <sphinx.ext.autodoc.importer._MockObject object at 0x7f42e398b130> = None, session: <sphinx.ext.autodoc.importer._MockObject object at 0x7f42e2de4310> = None)[source]

Fetch from an HTTP server with automatic retries and timeouts.

Parameters:
  • url – the address to fetch
  • headers – an optional dict of headers to send
  • retry – flag to retry the fetch
  • max_attempts – the maximum number of attempts to make
  • interval – the interval between retries, in seconds
  • backoff – the backoff interval, in seconds
  • request_timeout – the HTTP request timeout, in seconds
  • connector – an optional existing BaseConnector
  • session – a shared ClientSession
  • json – flag to parse the result as JSON
aries_cloudagent.utils.http.put_file(url: str, file_data: dict, extra_data: dict, *, retry: bool = True, max_attempts: int = 5, interval: float = 1.0, backoff: float = 0.25, request_timeout: float = 10.0, connector: <sphinx.ext.autodoc.importer._MockObject object at 0x7f42e398b130> = None, session: <sphinx.ext.autodoc.importer._MockObject object at 0x7f42e2de4310> = None, json: bool = False)[source]

Put to HTTP server with automatic retries and timeouts.

Parameters:
  • url – the address to use
  • file_data – dict with data key and path of file to upload
  • extra_data – further content to include in data to put
  • headers – an optional dict of headers to send
  • retry – flag to retry the fetch
  • max_attempts – the maximum number of attempts to make
  • interval – the interval between retries, in seconds
  • backoff – the backoff interval, in seconds
  • request_timeout – the HTTP request timeout, in seconds
  • connector – an optional existing BaseConnector
  • session – a shared ClientSession
  • json – flag to parse the result as JSON

aries_cloudagent.utils.jwe module

JSON Web Encryption utilities.

class aries_cloudagent.utils.jwe.B64Value(*args, **kwargs)[source]

Bases: sphinx.ext.autodoc.importer._MockObject

A marshmallow-compatible wrapper for base64-URL values.

class aries_cloudagent.utils.jwe.JweEnvelope(*, protected: dict = None, protected_b64: bytes = None, unprotected: dict = None, ciphertext: bytes = None, iv: bytes = None, tag: bytes = None, aad: bytes = None, with_protected_recipients: bool = False, with_flatten_recipients: bool = True)[source]

Bases: object

JWE envelope instance.

add_recipient(recip: aries_cloudagent.utils.jwe.JweRecipient)[source]

Add a recipient to the JWE envelope.

combined_aad

Accessor for the additional authenticated data.

classmethod deserialize(message: Mapping[str, Any]) → aries_cloudagent.utils.jwe.JweEnvelope[source]

Deserialize a JWE envelope from a mapping.

classmethod from_json(message: Union[bytes, str]) → aries_cloudagent.utils.jwe.JweEnvelope[source]

Decode a JWE envelope from a JSON string or bytes value.

get_recipient(kid: str) → aries_cloudagent.utils.jwe.JweRecipient[source]

Find a recipient by key ID.

protected_bytes

Access the protected data encoded as bytes.

This value is used in the additional authenticated data when encrypting.

recipient_key_ids

Accessor for an iterator over the JWE recipient key identifiers.

recipients

Accessor for an iterator over the JWE recipients.

The headers for each recipient include protected and unprotected headers from the outer envelope.

recipients_json

Encode the current recipients for JSON.

serialize() → dict[source]

Serialize the JWE envelope to a mapping.

set_payload(ciphertext: bytes, iv: bytes, tag: bytes, aad: bytes = None)[source]

Set the payload of the JWE envelope.

set_protected(protected: Mapping[str, Any])[source]

Set the protected headers of the JWE envelope.

to_json() → str[source]

Serialize the JWE envelope to a JSON string.

class aries_cloudagent.utils.jwe.JweRecipient(*, encrypted_key: bytes, header: dict = None)[source]

Bases: object

A single message recipient.

classmethod deserialize(entry: Mapping[str, Any]) → aries_cloudagent.utils.jwe.JweRecipient[source]

Deserialize a JWE recipient from a mapping.

serialize() → dict[source]

Serialize the JWE recipient to a mapping.

class aries_cloudagent.utils.jwe.JweRecipientSchema(*args, **kwargs)[source]

Bases: sphinx.ext.autodoc.importer._MockObject

JWE recipient schema.

encrypted_key

A marshmallow-compatible wrapper for base64-URL values.

header

Used by autodoc_mock_imports.

class aries_cloudagent.utils.jwe.JweSchema(*args, **kwargs)[source]

Bases: sphinx.ext.autodoc.importer._MockObject

JWE envelope schema.

aad

A marshmallow-compatible wrapper for base64-URL values.

ciphertext

A marshmallow-compatible wrapper for base64-URL values.

encrypted_key

A marshmallow-compatible wrapper for base64-URL values.

header

Used by autodoc_mock_imports.

iv

A marshmallow-compatible wrapper for base64-URL values.

protected

Used by autodoc_mock_imports.

recipients

Used by autodoc_mock_imports.

tag

A marshmallow-compatible wrapper for base64-URL values.

unprotected

Used by autodoc_mock_imports.

aries_cloudagent.utils.jwe.b64url(value: Union[bytes, str]) → str[source]

Encode a string or bytes value as unpadded base64-URL.

aries_cloudagent.utils.jwe.from_b64url(value: str) → bytes[source]

Decode an unpadded base64-URL value.

aries_cloudagent.utils.outofband module

Utilities for creating out-of-band messages.

aries_cloudagent.utils.outofband.serialize_outofband(message: aries_cloudagent.messaging.agent_message.AgentMessage, did: aries_cloudagent.wallet.did_info.DIDInfo, endpoint: str) → str[source]

Serialize the agent message as an out-of-band message.

Returns:An OOB message in URL format.

aries_cloudagent.utils.repeat module

Utils for repeating tasks.

class aries_cloudagent.utils.repeat.RepeatAttempt(seq: aries_cloudagent.utils.repeat.RepeatSequence, index: int = 1)[source]

Bases: object

Represents the current iteration in a repeat sequence.

final

Check if this is the last instance in the sequence.

next() → aries_cloudagent.utils.repeat.RepeatAttempt[source]

Get the next attempt instance.

next_interval

Calculate the interval before the next attempt.

timeout(interval: float = None)[source]

Create a context manager for timing out an attempt.

class aries_cloudagent.utils.repeat.RepeatSequence(limit: int = 0, interval: float = 0.0, backoff: float = 0.0)[source]

Bases: object

Represents a repetition sequence.

next_interval(index: int) → float[source]

Calculate the time before the next attempt.

start() → aries_cloudagent.utils.repeat.RepeatAttempt[source]

Get the first attempt in the sequence.

aries_cloudagent.utils.stats module

Classes for tracking performance and timing.

class aries_cloudagent.utils.stats.Collector(*, enabled: bool = True, log_path: str = None)[source]

Bases: object

Collector for a set of statistics.

enabled

Accessor for the collector’s enabled property.

extract(groups: Sequence[str] = None) → dict[source]

Extract statistics for a specific set of groups.

log(name: str, duration: float, start: float = None)[source]

Log an entry in the statistics if the collector is enabled.

mark(*names)[source]

Make a custom decorator function for adding to the set of groups.

reset()[source]

Reset the collector’s statistics.

results

Accessor for the current set of collected statistics.

timer(*groups)[source]

Create a new timer attached to this collector.

wrap(obj, prop_name: Union[str, Sequence[str]], groups: Sequence[str] = None, *, ignore_missing: bool = False)[source]

Wrap a method on a class or class instance.

wrap_coro(fn, groups: Sequence[str])[source]

Wrap a coroutine instance to collect timing statistics on execution.

wrap_fn(fn, groups: Sequence[str])[source]

Wrap a function instance to collect timing statistics on execution.

class aries_cloudagent.utils.stats.Stats[source]

Bases: object

A collection of statistics.

extract(names: Sequence[str] = None) → dict[source]

Summarize the stats in a dictionary.

log(name: str, duration: float)[source]

Log an entry in the stats.

class aries_cloudagent.utils.stats.Timer(collector: aries_cloudagent.utils.stats.Collector, groups: Sequence[str])[source]

Bases: object

Timer instance for a running task.

classmethod now()[source]

Fetch a standard timer value.

start() → aries_cloudagent.utils.stats.Timer[source]

Start the timer.

stop()[source]

Stop the timer.

aries_cloudagent.utils.task_queue module

Classes for managing a set of asyncio tasks.

class aries_cloudagent.utils.task_queue.CompletedTask(task: _asyncio.Task, exc_info: Tuple, ident: str = None, timing: dict = None)[source]

Bases: object

Represent the result of a queued task.

class aries_cloudagent.utils.task_queue.PendingTask(coro: Coroutine, complete_hook: Callable = None, ident: str = None, task_future: _asyncio.Future = None, queued_time: float = None)[source]

Bases: object

Represent a task in the queue.

cancel()[source]

Cancel the pending task.

cancelled

Accessor for the cancelled property.

task

Accessor for the task.

class aries_cloudagent.utils.task_queue.TaskQueue(max_active: int = 0, timed: bool = False, trace_fn: Callable = None)[source]

Bases: object

A class for managing a set of asyncio tasks.

add_active(task: _asyncio.Task, task_complete: Callable = None, ident: str = None, timing: dict = None) → _asyncio.Task[source]

Register an active async task with an optional completion callback.

Parameters:
  • task – The asyncio task instance
  • task_complete – An optional callback to run on completion
  • ident – A string identifer for the task
  • timing – An optional dictionary of timing information
add_pending(pending: aries_cloudagent.utils.task_queue.PendingTask)[source]

Add a task to the pending queue.

Parameters:pending – The PendingTask to add to the task queue
cancel()[source]

Cancel any pending or active tasks in the queue.

cancel_pending()[source]

Cancel any pending tasks in the queue.

cancelled

Accessor for the cancelled property of the queue.

complete(timeout: float = None, cleanup: bool = True)[source]

Cancel any pending tasks and wait for, or cancel active tasks.

completed_task(task: _asyncio.Task, task_complete: Callable, ident: str, timing: dict = None)[source]

Clean up after a task has completed and run callbacks.

current_active

Accessor for the current number of active tasks in the queue.

current_pending

Accessor for the current number of pending tasks in the queue.

current_size

Accessor for the total number of tasks in the queue.

drain() → _asyncio.Task[source]

Start the process to run queued tasks.

flush()[source]

Wait for any active or pending tasks to be completed.

max_active

Accessor for the maximum number of active tasks in the queue.

put(coro: Coroutine, task_complete: Callable = None, ident: str = None) → aries_cloudagent.utils.task_queue.PendingTask[source]

Add a new task to the queue, delaying execution if busy.

Parameters:
  • coro – The coroutine to run
  • task_complete – A callback to run on completion
  • ident – A string identifier for the task

Returns: a future resolving to the asyncio task instance once queued

ready

Accessor for the ready property of the queue.

run(coro: Coroutine, task_complete: Callable = None, ident: str = None, timing: dict = None) → _asyncio.Task[source]

Start executing a coroutine as an async task, bypassing the pending queue.

Parameters:
  • coro – The coroutine to run
  • task_complete – An optional callback to run on completion
  • ident – A string identifier for the task
  • timing – An optional dictionary of timing information

Returns: the new asyncio task instance

wait_for(timeout: float)[source]

Wait for all queued tasks to complete with a timeout.

aries_cloudagent.utils.task_queue.coro_ident(coro: Coroutine)[source]

Extract an identifier for a coroutine.

aries_cloudagent.utils.task_queue.coro_timed(coro: Coroutine, timing: dict)[source]

Capture timing for a coroutine.

aries_cloudagent.utils.task_queue.task_exc_info(task: _asyncio.Task)[source]

Extract exception info from an asyncio task.

aries_cloudagent.utils.tracing module

Event tracing.

class aries_cloudagent.utils.tracing.AdminAPIMessageTracingSchema(*args, **kwargs)[source]

Bases: aries_cloudagent.messaging.models.openapi.OpenAPISchema

Request/result schema including agent message tracing.

This is to be used as a superclass for aca-py admin input/output messages that need to support tracing.

trace

Used by autodoc_mock_imports.

aries_cloudagent.utils.tracing.decode_inbound_message(message)[source]

Return bundled message if appropriate.

aries_cloudagent.utils.tracing.get_timer() → float[source]

Return a timer.

aries_cloudagent.utils.tracing.trace_event(context, message, handler: str = None, outcome: str = None, perf_counter: float = None, force_trace: bool = False, raise_errors: bool = False) → float[source]

Log a trace event to a configured target.

Parameters:
  • context – The application context, attributes of interest are: context[“trace.enabled”]: True if we are logging events context[“trace.target”]: Trace target (“log”, “message” or an http endpoint) context[“trace.tag”]: Tag to be included in trace output
  • message – the current message, can be an AgentMessage, InboundMessage, OutboundMessage or Exchange record
  • event – Dict that will be converted to json and posted to the target
aries_cloudagent.utils.tracing.tracing_enabled(context, message) → bool[source]

Determine whether to log trace messages or not.