aries_cloudagent.utils package

Submodules

aries_cloudagent.utils.classloader module

The classloader provides utilties to dynamically load classes and modules.

class aries_cloudagent.utils.classloader.ClassLoader[source]

Bases: object

Class used to load classes from modules dynamically.

classmethod load_class(class_name: str, default_module: Optional[str] = None, package: Optional[str] = None)[source]

Resolve a complete class path (ie. typing.Dict) to the class itself.

Parameters
  • class_name – the class name

  • default_module – the default module to load, if not part of in the class name

  • package – the parent package to search for the module

Returns

The resolved class

Raises
classmethod load_module(mod_path: str, package: Optional[str] = None) module[source]

Load a module by its absolute path.

Parameters
  • mod_path – the absolute or relative module path

  • package – the parent package to search for the module

Returns

The resolved module or None if the module cannot be found

Raises

ModuleLoadError – If there was an error loading the module

classmethod load_subclass_of(base_class: Type, mod_path: str, package: Optional[str] = None)[source]

Resolve an implementation of a base path within a module.

Parameters
  • base_class – the base class being implemented

  • mod_path – the absolute module path

  • package – the parent package to search for the module

Returns

The resolved class

Raises
classmethod scan_subpackages(package: str) Sequence[str][source]

Return a list of sub-packages defined under a named package.

exception aries_cloudagent.utils.classloader.ClassNotFoundError(*args, error_code: Optional[str] = None, **kwargs)[source]

Bases: aries_cloudagent.core.error.BaseError

Class not found error.

class aries_cloudagent.utils.classloader.DeferLoad(cls_path: str)[source]

Bases: object

Helper to defer loading of a class definition.

property resolved

Accessor for the resolved class instance.

exception aries_cloudagent.utils.classloader.ModuleLoadError(*args, error_code: Optional[str] = None, **kwargs)[source]

Bases: aries_cloudagent.core.error.BaseError

Module load error.

aries_cloudagent.utils.dependencies module

Dependency related util methods.

aries_cloudagent.utils.dependencies.assert_ursa_bbs_signatures_installed()[source]

Assert ursa_bbs_signatures module is installed.

aries_cloudagent.utils.dependencies.is_indy_sdk_module_installed()[source]

Check whether indy (indy-sdk) module is installed.

Returns

Whether indy (indy-sdk) is installed.

Return type

bool

aries_cloudagent.utils.dependencies.is_ursa_bbs_signatures_module_installed()[source]

Check whether ursa_bbs_signatures module is installed.

Returns

Whether ursa_bbs_signatures is installed.

Return type

bool

aries_cloudagent.utils.env module

Environment utility methods.

aries_cloudagent.utils.env.storage_path(*subpaths, create: bool = False) pathlib.Path[source]

Get the default aca-py home directory.

aries_cloudagent.utils.http module

aries_cloudagent.utils.jwe module

aries_cloudagent.utils.outofband module

aries_cloudagent.utils.repeat module

aries_cloudagent.utils.stats module

Classes for tracking performance and timing.

class aries_cloudagent.utils.stats.Collector(*, enabled: bool = True, log_path: Optional[str] = None)[source]

Bases: object

Collector for a set of statistics.

property enabled: bool

Accessor for the collector’s enabled property.

extract(groups: Optional[Sequence[str]] = None) dict[source]

Extract statistics for a specific set of groups.

log(name: str, duration: float, start: Optional[float] = None)[source]

Log an entry in the statistics if the collector is enabled.

mark(*names)[source]

Make a custom decorator function for adding to the set of groups.

reset()[source]

Reset the collector’s statistics.

property results: dict

Accessor for the current set of collected statistics.

timer(*groups)[source]

Create a new timer attached to this collector.

wrap(obj, prop_name: Union[str, Sequence[str]], groups: Optional[Sequence[str]] = None, *, ignore_missing: bool = False)[source]

Wrap a method on a class or class instance.

wrap_coro(fn, groups: Sequence[str])[source]

Wrap a coroutine instance to collect timing statistics on execution.

wrap_fn(fn, groups: Sequence[str])[source]

Wrap a function instance to collect timing statistics on execution.

class aries_cloudagent.utils.stats.Stats[source]

Bases: object

A collection of statistics.

extract(names: Optional[Sequence[str]] = None) dict[source]

Summarize the stats in a dictionary.

log(name: str, duration: float)[source]

Log an entry in the stats.

class aries_cloudagent.utils.stats.Timer(collector: aries_cloudagent.utils.stats.Collector, groups: Sequence[str])[source]

Bases: object

Timer instance for a running task.

classmethod now()[source]

Fetch a standard timer value.

start() aries_cloudagent.utils.stats.Timer[source]

Start the timer.

stop()[source]

Stop the timer.

aries_cloudagent.utils.task_queue module

Classes for managing a set of asyncio tasks.

class aries_cloudagent.utils.task_queue.CompletedTask(task: _asyncio.Task, exc_info: Tuple, ident: Optional[str] = None, timing: Optional[dict] = None)[source]

Bases: object

Represent the result of a queued task.

class aries_cloudagent.utils.task_queue.PendingTask(coro: Coroutine, complete_hook: Optional[Callable] = None, ident: Optional[str] = None, task_future: Optional[_asyncio.Future] = None, queued_time: Optional[float] = None)[source]

Bases: object

Represent a task in the queue.

cancel()[source]

Cancel the pending task.

property cancelled

Accessor for the cancelled property.

property task: _asyncio.Task

Accessor for the task.

class aries_cloudagent.utils.task_queue.TaskQueue(max_active: int = 0, timed: bool = False, trace_fn: Optional[Callable] = None)[source]

Bases: object

A class for managing a set of asyncio tasks.

add_active(task: _asyncio.Task, task_complete: Optional[Callable] = None, ident: Optional[str] = None, timing: Optional[dict] = None) _asyncio.Task[source]

Register an active async task with an optional completion callback.

Parameters
  • task – The asyncio task instance

  • task_complete – An optional callback to run on completion

  • ident – A string identifer for the task

  • timing – An optional dictionary of timing information

add_pending(pending: aries_cloudagent.utils.task_queue.PendingTask)[source]

Add a task to the pending queue.

Parameters

pending – The PendingTask to add to the task queue

cancel()[source]

Cancel any pending or active tasks in the queue.

cancel_pending()[source]

Cancel any pending tasks in the queue.

property cancelled: bool

Accessor for the cancelled property of the queue.

async complete(timeout: Optional[float] = None, cleanup: bool = True)[source]

Cancel any pending tasks and wait for, or cancel active tasks.

completed_task(task: _asyncio.Task, task_complete: Callable, ident: str, timing: Optional[dict] = None)[source]

Clean up after a task has completed and run callbacks.

property current_active: int

Accessor for the current number of active tasks in the queue.

property current_pending: int

Accessor for the current number of pending tasks in the queue.

property current_size: int

Accessor for the total number of tasks in the queue.

drain() _asyncio.Task[source]

Start the process to run queued tasks.

async flush()[source]

Wait for any active or pending tasks to be completed.

property max_active: int

Accessor for the maximum number of active tasks in the queue.

put(coro: Coroutine, task_complete: Optional[Callable] = None, ident: Optional[str] = None) aries_cloudagent.utils.task_queue.PendingTask[source]

Add a new task to the queue, delaying execution if busy.

Parameters
  • coro – The coroutine to run

  • task_complete – A callback to run on completion

  • ident – A string identifier for the task

Returns: a future resolving to the asyncio task instance once queued

property ready: bool

Accessor for the ready property of the queue.

run(coro: Coroutine, task_complete: Optional[Callable] = None, ident: Optional[str] = None, timing: Optional[dict] = None) _asyncio.Task[source]

Start executing a coroutine as an async task, bypassing the pending queue.

Parameters
  • coro – The coroutine to run

  • task_complete – An optional callback to run on completion

  • ident – A string identifier for the task

  • timing – An optional dictionary of timing information

Returns: the new asyncio task instance

async wait_for(timeout: float)[source]

Wait for all queued tasks to complete with a timeout.

aries_cloudagent.utils.task_queue.coro_ident(coro: Coroutine)[source]

Extract an identifier for a coroutine.

async aries_cloudagent.utils.task_queue.coro_timed(coro: Coroutine, timing: dict)[source]

Capture timing for a coroutine.

aries_cloudagent.utils.task_queue.task_exc_info(task: _asyncio.Task)[source]

Extract exception info from an asyncio task.

aries_cloudagent.utils.tracing module