aries_cloudagent.utils package

Submodules

aries_cloudagent.utils.classloader module

The classloader provides utilties to dynamically load classes and modules.

class aries_cloudagent.utils.classloader.ClassLoader[source]

Bases: object

Class used to load classes from modules dynamically.

classmethod load_class(class_name: str, default_module: str = None, package: str = None)[source]

Resolve a complete class path (ie. typing.Dict) to the class itself.

Parameters:
  • class_name – the class name
  • default_module – the default module to load, if not part of in the class name
  • package – the parent package to search for the module
Returns:

The resolved class

Raises:
classmethod load_module(mod_path: str, package: str = None) → module[source]

Load a module by its absolute path.

Parameters:
  • mod_path – the absolute or relative module path
  • package – the parent package to search for the module
Returns:

The resolved module or None if the module cannot be found

Raises:

ModuleLoadError – If there was an error loading the module

classmethod load_subclass_of(base_class: Type[CT_co], mod_path: str, package: str = None)[source]

Resolve an implementation of a base path within a module.

Parameters:
  • base_class – the base class being implemented
  • mod_path – the absolute module path
  • package – the parent package to search for the module
Returns:

The resolved class

Raises:
classmethod scan_subpackages(package: str) → Sequence[str][source]

Return a list of sub-packages defined under a named package.

exception aries_cloudagent.utils.classloader.ClassNotFoundError(*args, error_code: str = None, **kwargs)[source]

Bases: aries_cloudagent.core.error.BaseError

Class not found error.

exception aries_cloudagent.utils.classloader.ModuleLoadError(*args, error_code: str = None, **kwargs)[source]

Bases: aries_cloudagent.core.error.BaseError

Module load error.

aries_cloudagent.utils.http module

HTTP utility methods.

exception aries_cloudagent.utils.http.FetchError(*args, error_code: str = None, **kwargs)[source]

Bases: aries_cloudagent.core.error.BaseError

Error raised when an HTTP fetch fails.

aries_cloudagent.utils.http.fetch(url: str, *, headers: dict = None, retry: bool = True, max_attempts: int = 5, interval: float = 1.0, backoff: float = 0.25, request_timeout: float = 10.0, connector: <sphinx.ext.autodoc.importer._MockObject object at 0x7faf699de588> = None, session: <sphinx.ext.autodoc.importer._MockObject object at 0x7faf699de5c0> = None, json: bool = False)[source]

Fetch from an HTTP server with automatic retries and timeouts.

Parameters:
  • url – the address to fetch
  • headers – an optional dict of headers to send
  • retry – flag to retry the fetch
  • max_attempts – the maximum number of attempts to make
  • interval – the interval between retries, in seconds
  • backoff – the backoff interval, in seconds
  • request_timeout – the HTTP request timeout, in seconds
  • connector – an optional existing BaseConnector
  • session – a shared ClientSession
  • json – flag to parse the result as JSON

aries_cloudagent.utils.repeat module

Utils for repeating tasks.

class aries_cloudagent.utils.repeat.RepeatAttempt(seq: aries_cloudagent.utils.repeat.RepeatSequence, index: int = 1)[source]

Bases: object

Represents the current iteration in a repeat sequence.

final

Check if this is the last instance in the sequence.

next() → aries_cloudagent.utils.repeat.RepeatAttempt[source]

Get the next attempt instance.

next_interval

Calculate the interval before the next attempt.

timeout(interval: float = None)[source]

Create a context manager for timing out an attempt.

class aries_cloudagent.utils.repeat.RepeatSequence(limit: int = 0, interval: float = 0.0, backoff: float = 0.0)[source]

Bases: object

Represents a repetition sequence.

next_interval(index: int) → float[source]

Calculate the time before the next attempt.

start() → aries_cloudagent.utils.repeat.RepeatAttempt[source]

Get the first attempt in the sequence.

aries_cloudagent.utils.stats module

Classes for tracking performance and timing.

class aries_cloudagent.utils.stats.Collector(*, enabled: bool = True, log_path: str = None)[source]

Bases: object

Collector for a set of statistics.

enabled

Accessor for the collector’s enabled property.

extract(groups: Sequence[str] = None) → dict[source]

Extract statistics for a specific set of groups.

log(name: str, duration: float, start: float = None)[source]

Log an entry in the statistics if the collector is enabled.

mark(*names)[source]

Make a custom decorator function for adding to the set of groups.

reset()[source]

Reset the collector’s statistics.

results

Accessor for the current set of collected statistics.

timer(*groups)[source]

Create a new timer attached to this collector.

wrap(obj, prop_name: Union[str, Sequence[str]], groups: Sequence[str] = None, *, ignore_missing: bool = False)[source]

Wrap a method on a class or class instance.

wrap_coro(fn, groups: Sequence[str])[source]

Wrap a coroutine instance to collect timing statistics on execution.

wrap_fn(fn, groups: Sequence[str])[source]

Wrap a function instance to collect timing statistics on execution.

class aries_cloudagent.utils.stats.Stats[source]

Bases: object

A collection of statistics.

extract(names: Sequence[str] = None) → dict[source]

Summarize the stats in a dictionary.

log(name: str, duration: float)[source]

Log an entry in the stats.

class aries_cloudagent.utils.stats.Timer(collector: aries_cloudagent.utils.stats.Collector, groups: Sequence[str])[source]

Bases: object

Timer instance for a running task.

classmethod now()[source]

Fetch a standard timer value.

start() → aries_cloudagent.utils.stats.Timer[source]

Start the timer.

stop()[source]

Stop the timer.

aries_cloudagent.utils.task_queue module

Classes for managing a set of asyncio tasks.

class aries_cloudagent.utils.task_queue.CompletedTask(task: _asyncio.Task, exc_info: Tuple, ident: str = None, timing: dict = None)[source]

Bases: object

Represent the result of a queued task.

class aries_cloudagent.utils.task_queue.PendingTask(coro: Coroutine[T_co, T_contra, V_co], complete_hook: Callable = None, ident: str = None, task_future: _asyncio.Future = None, queued_time: float = None)[source]

Bases: object

Represent a task in the queue.

cancel()[source]

Cancel the pending task.

cancelled

Accessor for the cancelled property.

task

Accessor for the task.

class aries_cloudagent.utils.task_queue.TaskQueue(max_active: int = 0, timed: bool = False, trace_fn: Callable = None)[source]

Bases: object

A class for managing a set of asyncio tasks.

add_active(task: _asyncio.Task, task_complete: Callable = None, ident: str = None, timing: dict = None) → _asyncio.Task[source]

Register an active async task with an optional completion callback.

Parameters:
  • task – The asyncio task instance
  • task_complete – An optional callback to run on completion
  • ident – A string identifer for the task
  • timing – An optional dictionary of timing information
add_pending(pending: aries_cloudagent.utils.task_queue.PendingTask)[source]

Add a task to the pending queue.

Parameters:pending – The PendingTask to add to the task queue
cancel()[source]

Cancel any pending or active tasks in the queue.

cancel_pending()[source]

Cancel any pending tasks in the queue.

cancelled

Accessor for the cancelled property of the queue.

complete(timeout: float = None, cleanup: bool = True)[source]

Cancel any pending tasks and wait for, or cancel active tasks.

completed_task(task: _asyncio.Task, task_complete: Callable, ident: str, timing: dict = None)[source]

Clean up after a task has completed and run callbacks.

current_active

Accessor for the current number of active tasks in the queue.

current_pending

Accessor for the current number of pending tasks in the queue.

current_size

Accessor for the total number of tasks in the queue.

drain() → _asyncio.Task[source]

Start the process to run queued tasks.

flush()[source]

Wait for any active or pending tasks to be completed.

max_active

Accessor for the maximum number of active tasks in the queue.

put(coro: Coroutine[T_co, T_contra, V_co], task_complete: Callable = None, ident: str = None) → aries_cloudagent.utils.task_queue.PendingTask[source]

Add a new task to the queue, delaying execution if busy.

Parameters:
  • coro – The coroutine to run
  • task_complete – A callback to run on completion
  • ident – A string identifier for the task

Returns: a future resolving to the asyncio task instance once queued

ready

Accessor for the ready property of the queue.

run(coro: Coroutine[T_co, T_contra, V_co], task_complete: Callable = None, ident: str = None, timing: dict = None) → _asyncio.Task[source]

Start executing a coroutine as an async task, bypassing the pending queue.

Parameters:
  • coro – The coroutine to run
  • task_complete – An optional callback to run on completion
  • ident – A string identifier for the task
  • timing – An optional dictionary of timing information

Returns: the new asyncio task instance

wait_for(timeout: float)[source]

Wait for all queued tasks to complete with a timeout.

aries_cloudagent.utils.task_queue.coro_ident(coro: Coroutine[T_co, T_contra, V_co])[source]

Extract an identifier for a coroutine.

aries_cloudagent.utils.task_queue.coro_timed(coro: Coroutine[T_co, T_contra, V_co], timing: dict)[source]

Capture timing for a coroutine.

aries_cloudagent.utils.task_queue.task_exc_info(task: _asyncio.Task)[source]

Extract exception info from an asyncio task.