aries_cloudagent.utils package


aries_cloudagent.utils.classloader module

The classloader provides utilties to dynamically load classes and modules.

class aries_cloudagent.utils.classloader.ClassLoader[source]

Bases: object

Class used to load classes from modules dynamically.

classmethod load_class(class_name: str, default_module: str = None, package: str = None)[source]

Resolve a complete class path (ie. typing.Dict) to the class itself.

  • class_name – the class name
  • default_module – the default module to load, if not part of in the class name
  • package – the parent package to search for the module

The resolved class

classmethod load_module(mod_path: str, package: str = None) → module[source]

Load a module by its absolute path.

  • mod_path – the absolute or relative module path
  • package – the parent package to search for the module

The resolved module or None if the module cannot be found


ModuleLoadError – If there was an error loading the module

classmethod load_subclass_of(base_class: Type[CT_co], mod_path: str, package: str = None)[source]

Resolve an implementation of a base path within a module.

  • base_class – the base class being implemented
  • mod_path – the absolute module path
  • package – the parent package to search for the module

The resolved class

classmethod scan_subpackages(package: str) → Sequence[str][source]

Return a list of sub-packages defined under a named package.

exception aries_cloudagent.utils.classloader.ClassNotFoundError(*args, error_code: str = None, **kwargs)[source]

Bases: aries_cloudagent.core.error.BaseError

Class not found error.

exception aries_cloudagent.utils.classloader.ModuleLoadError(*args, error_code: str = None, **kwargs)[source]

Bases: aries_cloudagent.core.error.BaseError

Module load error.

aries_cloudagent.utils.http module

HTTP utility methods.

exception aries_cloudagent.utils.http.FetchError(*args, error_code: str = None, **kwargs)[source]

Bases: aries_cloudagent.core.error.BaseError

Error raised when an HTTP fetch fails.

aries_cloudagent.utils.http.fetch(url: str, *, headers: dict = None, retry: bool = True, max_attempts: int = 5, interval: float = 1.0, backoff: float = 0.25, request_timeout: float = 10.0, connector: <sphinx.ext.autodoc.importer._MockObject object at 0x7faf699de588> = None, session: <sphinx.ext.autodoc.importer._MockObject object at 0x7faf699de5c0> = None, json: bool = False)[source]

Fetch from an HTTP server with automatic retries and timeouts.

  • url – the address to fetch
  • headers – an optional dict of headers to send
  • retry – flag to retry the fetch
  • max_attempts – the maximum number of attempts to make
  • interval – the interval between retries, in seconds
  • backoff – the backoff interval, in seconds
  • request_timeout – the HTTP request timeout, in seconds
  • connector – an optional existing BaseConnector
  • session – a shared ClientSession
  • json – flag to parse the result as JSON

aries_cloudagent.utils.repeat module

Utils for repeating tasks.

class aries_cloudagent.utils.repeat.RepeatAttempt(seq: aries_cloudagent.utils.repeat.RepeatSequence, index: int = 1)[source]

Bases: object

Represents the current iteration in a repeat sequence.


Check if this is the last instance in the sequence.

next() → aries_cloudagent.utils.repeat.RepeatAttempt[source]

Get the next attempt instance.


Calculate the interval before the next attempt.

timeout(interval: float = None)[source]

Create a context manager for timing out an attempt.

class aries_cloudagent.utils.repeat.RepeatSequence(limit: int = 0, interval: float = 0.0, backoff: float = 0.0)[source]

Bases: object

Represents a repetition sequence.

next_interval(index: int) → float[source]

Calculate the time before the next attempt.

start() → aries_cloudagent.utils.repeat.RepeatAttempt[source]

Get the first attempt in the sequence.

aries_cloudagent.utils.stats module

Classes for tracking performance and timing.

class aries_cloudagent.utils.stats.Collector(*, enabled: bool = True, log_path: str = None)[source]

Bases: object

Collector for a set of statistics.


Accessor for the collector’s enabled property.

extract(groups: Sequence[str] = None) → dict[source]

Extract statistics for a specific set of groups.

log(name: str, duration: float, start: float = None)[source]

Log an entry in the statistics if the collector is enabled.


Make a custom decorator function for adding to the set of groups.


Reset the collector’s statistics.


Accessor for the current set of collected statistics.


Create a new timer attached to this collector.

wrap(obj, prop_name: Union[str, Sequence[str]], groups: Sequence[str] = None, *, ignore_missing: bool = False)[source]

Wrap a method on a class or class instance.

wrap_coro(fn, groups: Sequence[str])[source]

Wrap a coroutine instance to collect timing statistics on execution.

wrap_fn(fn, groups: Sequence[str])[source]

Wrap a function instance to collect timing statistics on execution.

class aries_cloudagent.utils.stats.Stats[source]

Bases: object

A collection of statistics.

extract(names: Sequence[str] = None) → dict[source]

Summarize the stats in a dictionary.

log(name: str, duration: float)[source]

Log an entry in the stats.

class aries_cloudagent.utils.stats.Timer(collector: aries_cloudagent.utils.stats.Collector, groups: Sequence[str])[source]

Bases: object

Timer instance for a running task.

classmethod now()[source]

Fetch a standard timer value.

start() → aries_cloudagent.utils.stats.Timer[source]

Start the timer.


Stop the timer.

aries_cloudagent.utils.task_queue module

Classes for managing a set of asyncio tasks.

class aries_cloudagent.utils.task_queue.CompletedTask(task: _asyncio.Task, exc_info: Tuple, ident: str = None, timing: dict = None)[source]

Bases: object

Represent the result of a queued task.

class aries_cloudagent.utils.task_queue.PendingTask(coro: Coroutine[T_co, T_contra, V_co], complete_hook: Callable = None, ident: str = None, task_future: _asyncio.Future = None, queued_time: float = None)[source]

Bases: object

Represent a task in the queue.


Cancel the pending task.


Accessor for the cancelled property.


Accessor for the task.

class aries_cloudagent.utils.task_queue.TaskQueue(max_active: int = 0, timed: bool = False, trace_fn: Callable = None)[source]

Bases: object

A class for managing a set of asyncio tasks.

add_active(task: _asyncio.Task, task_complete: Callable = None, ident: str = None, timing: dict = None) → _asyncio.Task[source]

Register an active async task with an optional completion callback.

  • task – The asyncio task instance
  • task_complete – An optional callback to run on completion
  • ident – A string identifer for the task
  • timing – An optional dictionary of timing information
add_pending(pending: aries_cloudagent.utils.task_queue.PendingTask)[source]

Add a task to the pending queue.

Parameters:pending – The PendingTask to add to the task queue

Cancel any pending or active tasks in the queue.


Cancel any pending tasks in the queue.


Accessor for the cancelled property of the queue.

complete(timeout: float = None, cleanup: bool = True)[source]

Cancel any pending tasks and wait for, or cancel active tasks.

completed_task(task: _asyncio.Task, task_complete: Callable, ident: str, timing: dict = None)[source]

Clean up after a task has completed and run callbacks.


Accessor for the current number of active tasks in the queue.


Accessor for the current number of pending tasks in the queue.


Accessor for the total number of tasks in the queue.

drain() → _asyncio.Task[source]

Start the process to run queued tasks.


Wait for any active or pending tasks to be completed.


Accessor for the maximum number of active tasks in the queue.

put(coro: Coroutine[T_co, T_contra, V_co], task_complete: Callable = None, ident: str = None) → aries_cloudagent.utils.task_queue.PendingTask[source]

Add a new task to the queue, delaying execution if busy.

  • coro – The coroutine to run
  • task_complete – A callback to run on completion
  • ident – A string identifier for the task

Returns: a future resolving to the asyncio task instance once queued


Accessor for the ready property of the queue.

run(coro: Coroutine[T_co, T_contra, V_co], task_complete: Callable = None, ident: str = None, timing: dict = None) → _asyncio.Task[source]

Start executing a coroutine as an async task, bypassing the pending queue.

  • coro – The coroutine to run
  • task_complete – An optional callback to run on completion
  • ident – A string identifier for the task
  • timing – An optional dictionary of timing information

Returns: the new asyncio task instance

wait_for(timeout: float)[source]

Wait for all queued tasks to complete with a timeout.

aries_cloudagent.utils.task_queue.coro_ident(coro: Coroutine[T_co, T_contra, V_co])[source]

Extract an identifier for a coroutine.

aries_cloudagent.utils.task_queue.coro_timed(coro: Coroutine[T_co, T_contra, V_co], timing: dict)[source]

Capture timing for a coroutine.

aries_cloudagent.utils.task_queue.task_exc_info(task: _asyncio.Task)[source]

Extract exception info from an asyncio task.