aries_cloudagent.utils package¶
Submodules¶
aries_cloudagent.utils.classloader module¶
The classloader provides utilties to dynamically load classes and modules.
-
class
aries_cloudagent.utils.classloader.
ClassLoader
[source]¶ Bases:
object
Class used to load classes from modules dynamically.
-
classmethod
load_class
(class_name: str, default_module: str = None, package: str = None)[source]¶ Resolve a complete class path (ie. typing.Dict) to the class itself.
Parameters: - class_name – the class name
- default_module – the default module to load, if not part of in the class name
- package – the parent package to search for the module
Returns: The resolved class
Raises: ClassNotFoundError
– If the class could not be resolved at pathModuleLoadError
– If there was an error loading the module
-
classmethod
load_module
(mod_path: str, package: str = None) → module[source]¶ Load a module by its absolute path.
Parameters: - mod_path – the absolute or relative module path
- package – the parent package to search for the module
Returns: The resolved module or None if the module cannot be found
Raises: ModuleLoadError
– If there was an error loading the module
-
classmethod
load_subclass_of
(base_class: Type[CT_co], mod_path: str, package: str = None)[source]¶ Resolve an implementation of a base path within a module.
Parameters: - base_class – the base class being implemented
- mod_path – the absolute module path
- package – the parent package to search for the module
Returns: The resolved class
Raises: ClassNotFoundError
– If the module or class implementation could not be foundModuleLoadError
– If there was an error loading the module
-
classmethod
-
exception
aries_cloudagent.utils.classloader.
ClassNotFoundError
(*args, error_code: str = None, **kwargs)[source]¶ Bases:
aries_cloudagent.core.error.BaseError
Class not found error.
-
class
aries_cloudagent.utils.classloader.
DeferLoad
(cls_path: str)[source]¶ Bases:
object
Helper to defer loading of a class definition.
-
resolved
¶ Accessor for the resolved class instance.
-
-
exception
aries_cloudagent.utils.classloader.
ModuleLoadError
(*args, error_code: str = None, **kwargs)[source]¶ Bases:
aries_cloudagent.core.error.BaseError
Module load error.
aries_cloudagent.utils.http module¶
HTTP utility methods.
-
exception
aries_cloudagent.utils.http.
FetchError
(*args, error_code: str = None, **kwargs)[source]¶ Bases:
aries_cloudagent.core.error.BaseError
Error raised when an HTTP fetch fails.
-
exception
aries_cloudagent.utils.http.
PutError
(*args, error_code: str = None, **kwargs)[source]¶ Bases:
aries_cloudagent.core.error.BaseError
Error raised when an HTTP put fails.
-
aries_cloudagent.utils.http.
fetch
(url: str, *, headers: dict = None, retry: bool = True, max_attempts: int = 5, interval: float = 1.0, backoff: float = 0.25, request_timeout: float = 10.0, connector: <sphinx.ext.autodoc.importer._MockObject object at 0x7fb05627a390> = None, session: <sphinx.ext.autodoc.importer._MockObject object at 0x7fb055797310> = None, json: bool = False)[source]¶ Fetch from an HTTP server with automatic retries and timeouts.
Parameters: - url – the address to fetch
- headers – an optional dict of headers to send
- retry – flag to retry the fetch
- max_attempts – the maximum number of attempts to make
- interval – the interval between retries, in seconds
- backoff – the backoff interval, in seconds
- request_timeout – the HTTP request timeout, in seconds
- connector – an optional existing BaseConnector
- session – a shared ClientSession
- json – flag to parse the result as JSON
-
aries_cloudagent.utils.http.
fetch_stream
(url: str, *, headers: dict = None, retry: bool = True, max_attempts: int = 5, interval: float = 1.0, backoff: float = 0.25, request_timeout: float = 10.0, connector: <sphinx.ext.autodoc.importer._MockObject object at 0x7fb05627a390> = None, session: <sphinx.ext.autodoc.importer._MockObject object at 0x7fb055797310> = None)[source]¶ Fetch from an HTTP server with automatic retries and timeouts.
Parameters: - url – the address to fetch
- headers – an optional dict of headers to send
- retry – flag to retry the fetch
- max_attempts – the maximum number of attempts to make
- interval – the interval between retries, in seconds
- backoff – the backoff interval, in seconds
- request_timeout – the HTTP request timeout, in seconds
- connector – an optional existing BaseConnector
- session – a shared ClientSession
- json – flag to parse the result as JSON
-
aries_cloudagent.utils.http.
put_file
(url: str, file_data: dict, extra_data: dict, *, retry: bool = True, max_attempts: int = 5, interval: float = 1.0, backoff: float = 0.25, request_timeout: float = 10.0, connector: <sphinx.ext.autodoc.importer._MockObject object at 0x7fb05627a390> = None, session: <sphinx.ext.autodoc.importer._MockObject object at 0x7fb055797310> = None, json: bool = False)[source]¶ Put to HTTP server with automatic retries and timeouts.
Parameters: - url – the address to use
- file_data – dict with data key and path of file to upload
- extra_data – further content to include in data to put
- headers – an optional dict of headers to send
- retry – flag to retry the fetch
- max_attempts – the maximum number of attempts to make
- interval – the interval between retries, in seconds
- backoff – the backoff interval, in seconds
- request_timeout – the HTTP request timeout, in seconds
- connector – an optional existing BaseConnector
- session – a shared ClientSession
- json – flag to parse the result as JSON
aries_cloudagent.utils.outofband module¶
Utilities for creating out-of-band messages.
aries_cloudagent.utils.repeat module¶
Utils for repeating tasks.
-
class
aries_cloudagent.utils.repeat.
RepeatAttempt
(seq: aries_cloudagent.utils.repeat.RepeatSequence, index: int = 1)[source]¶ Bases:
object
Represents the current iteration in a repeat sequence.
-
final
¶ Check if this is the last instance in the sequence.
-
next_interval
¶ Calculate the interval before the next attempt.
-
aries_cloudagent.utils.stats module¶
Classes for tracking performance and timing.
-
class
aries_cloudagent.utils.stats.
Collector
(*, enabled: bool = True, log_path: str = None)[source]¶ Bases:
object
Collector for a set of statistics.
-
enabled
¶ Accessor for the collector’s enabled property.
-
extract
(groups: Sequence[str] = None) → dict[source]¶ Extract statistics for a specific set of groups.
-
log
(name: str, duration: float, start: float = None)[source]¶ Log an entry in the statistics if the collector is enabled.
-
results
¶ Accessor for the current set of collected statistics.
-
wrap
(obj, prop_name: Union[str, Sequence[str]], groups: Sequence[str] = None, *, ignore_missing: bool = False)[source]¶ Wrap a method on a class or class instance.
-
aries_cloudagent.utils.task_queue module¶
Classes for managing a set of asyncio tasks.
-
class
aries_cloudagent.utils.task_queue.
CompletedTask
(task: _asyncio.Task, exc_info: Tuple, ident: str = None, timing: dict = None)[source]¶ Bases:
object
Represent the result of a queued task.
-
class
aries_cloudagent.utils.task_queue.
PendingTask
(coro: Coroutine[T_co, T_contra, V_co], complete_hook: Callable = None, ident: str = None, task_future: _asyncio.Future = None, queued_time: float = None)[source]¶ Bases:
object
Represent a task in the queue.
-
cancelled
¶ Accessor for the cancelled property.
-
task
¶ Accessor for the task.
-
-
class
aries_cloudagent.utils.task_queue.
TaskQueue
(max_active: int = 0, timed: bool = False, trace_fn: Callable = None)[source]¶ Bases:
object
A class for managing a set of asyncio tasks.
-
add_active
(task: _asyncio.Task, task_complete: Callable = None, ident: str = None, timing: dict = None) → _asyncio.Task[source]¶ Register an active async task with an optional completion callback.
Parameters: - task – The asyncio task instance
- task_complete – An optional callback to run on completion
- ident – A string identifer for the task
- timing – An optional dictionary of timing information
-
add_pending
(pending: aries_cloudagent.utils.task_queue.PendingTask)[source]¶ Add a task to the pending queue.
Parameters: pending – The PendingTask to add to the task queue
-
cancelled
¶ Accessor for the cancelled property of the queue.
-
complete
(timeout: float = None, cleanup: bool = True)[source]¶ Cancel any pending tasks and wait for, or cancel active tasks.
-
completed_task
(task: _asyncio.Task, task_complete: Callable, ident: str, timing: dict = None)[source]¶ Clean up after a task has completed and run callbacks.
-
current_active
¶ Accessor for the current number of active tasks in the queue.
-
current_pending
¶ Accessor for the current number of pending tasks in the queue.
-
current_size
¶ Accessor for the total number of tasks in the queue.
-
max_active
¶ Accessor for the maximum number of active tasks in the queue.
-
put
(coro: Coroutine[T_co, T_contra, V_co], task_complete: Callable = None, ident: str = None) → aries_cloudagent.utils.task_queue.PendingTask[source]¶ Add a new task to the queue, delaying execution if busy.
Parameters: - coro – The coroutine to run
- task_complete – A callback to run on completion
- ident – A string identifier for the task
Returns: a future resolving to the asyncio task instance once queued
-
ready
¶ Accessor for the ready property of the queue.
-
run
(coro: Coroutine[T_co, T_contra, V_co], task_complete: Callable = None, ident: str = None, timing: dict = None) → _asyncio.Task[source]¶ Start executing a coroutine as an async task, bypassing the pending queue.
Parameters: - coro – The coroutine to run
- task_complete – An optional callback to run on completion
- ident – A string identifier for the task
- timing – An optional dictionary of timing information
Returns: the new asyncio task instance
-
-
aries_cloudagent.utils.task_queue.
coro_ident
(coro: Coroutine[T_co, T_contra, V_co])[source]¶ Extract an identifier for a coroutine.
aries_cloudagent.utils.tracing module¶
Event tracing.
-
class
aries_cloudagent.utils.tracing.
AdminAPIMessageTracingSchema
(*args, **kwargs)[source]¶ Bases:
aries_cloudagent.messaging.models.openapi.OpenAPISchema
Request/result schema including agent message tracing.
This is to be used as a superclass for aca-py admin input/output messages that need to support tracing.
-
trace
= <fields.Boolean(default=False, attribute=None, validate=None, required=False, load_only=False, dump_only=False, missing=<marshmallow.missing>, allow_none=False, error_messages={'required': 'Missing data for required field.', 'null': 'Field may not be null.', 'validator_failed': 'Invalid value.', 'invalid': 'Not a valid boolean.'})>¶
-
-
aries_cloudagent.utils.tracing.
decode_inbound_message
(message)[source]¶ Return bundled message if appropriate.
-
aries_cloudagent.utils.tracing.
trace_event
(context, message, handler: str = None, outcome: str = None, perf_counter: float = None, force_trace: bool = False, raise_errors: bool = False) → float[source]¶ Log a trace event to a configured target.
Parameters: - context – The application context, attributes of interest are: context[“trace.enabled”]: True if we are logging events context[“trace.target”]: Trace target (“log”, “message” or an http endpoint) context[“trace.tag”]: Tag to be included in trace output
- message – the current message, can be an AgentMessage, InboundMessage, OutboundMessage or Exchange record
- event – Dict that will be converted to json and posted to the target