aries_cloudagent.messaging package

Subpackages

Submodules

aries_cloudagent.messaging.agent_message module

Agent message base class and schema.

class aries_cloudagent.messaging.agent_message.AgentMessage(_id: str = None, _decorators: aries_cloudagent.messaging.decorators.base.BaseDecoratorSet = None)[source]

Bases: aries_cloudagent.messaging.models.base.BaseModel

Agent message base class.

Handler

Accessor for the agent message’s handler class.

Returns:Handler class
class Meta[source]

Bases: object

AgentMessage metadata.

handler_class = None
message_type = None
schema_class = None
assign_thread_from(msg: aries_cloudagent.messaging.agent_message.AgentMessage)[source]

Copy thread information from a previous message.

Parameters:msg – The received message containing optional thread information
assign_thread_id(thid: str, pthid: str = None)[source]

Assign a specific thread ID.

Parameters:
  • thid – The thread identifier
  • pthid – The parent thread identifier
get_signature(field_name: str) → aries_cloudagent.messaging.decorators.signature_decorator.SignatureDecorator[source]

Get the signature for a named field.

Parameters:field_name – Field name to get the signature for
Returns:A SignatureDecorator for the requested field name
set_signature(field_name: str, signature: aries_cloudagent.messaging.decorators.signature_decorator.SignatureDecorator)[source]

Add or replace the signature for a named field.

Parameters:
  • field_name – Field to set signature on
  • signature – Signature for the field
sign_field(field_name: str, signer_verkey: str, wallet: aries_cloudagent.wallet.base.BaseWallet, timestamp=None) → aries_cloudagent.messaging.decorators.signature_decorator.SignatureDecorator[source]

Create and store a signature for a named field.

Parameters:
  • field_name – Field to sign
  • signer_verkey – Verkey of signer
  • wallet – Wallet to use for signature
  • timestamp – Optional timestamp for signature
Returns:

A SignatureDecorator for newly created signature

Raises:

ValueError – If field_name doesn’t exist on this message

verify_signatures(wallet: aries_cloudagent.wallet.base.BaseWallet) → bool[source]

Verify all associated field signatures.

Parameters:wallet – Wallet to use in verification
Returns:True if all signatures verify, else false
verify_signed_field(field_name: str, wallet: aries_cloudagent.wallet.base.BaseWallet, signer_verkey: str = None) → str[source]

Verify a specific field signature.

Parameters:
  • field_name – The field name to verify
  • wallet – Wallet to use for the verification
  • signer_verkey – Verkey of signer to use
Returns:

The verkey of the signer

Raises:
  • ValueError – If field_name does not exist on this message
  • ValueError – If the verification fails
  • ValueError – If the verkey of the signature does not match the
  • provided verkey
exception aries_cloudagent.messaging.agent_message.AgentMessageError(*args, error_code: str = None, **kwargs)[source]

Bases: aries_cloudagent.messaging.models.base.BaseModelError

Base exception for agent message issues.

class aries_cloudagent.messaging.agent_message.AgentMessageSchema(*args, **kwargs)[source]

Bases: aries_cloudagent.messaging.models.base.BaseModelSchema

AgentMessage schema.

class Meta[source]

Bases: object

AgentMessageSchema metadata.

model_class = None
signed_fields = None
check_dump_decorators(obj)[source]

Pre-dump hook to validate and load the message decorators.

Parameters:obj – The AgentMessage object
Raises:BaseModelError – If a decorator does not validate
dump_decorators(data)[source]

Post-dump hook to write the decorators to the serialized output.

Parameters:obj – The serialized data
Returns:The modified data
extract_decorators(data)[source]

Pre-load hook to extract the decorators and check the signed fields.

Parameters:

data – Incoming data to parse

Returns:

Parsed and modified data

Raises:
  • ValidationError – If a field signature does not correlate
  • to a field in the message
  • ValidationError – If the message defines both a field signature
  • and a value for the same field
  • ValidationError – If there is a missing field signature
populate_decorators(obj)[source]

Post-load hook to populate decorators on the message.

Parameters:obj – The AgentMessage object
Returns:The AgentMessage object with populated decorators
replace_signatures(data)[source]

Post-dump hook to write the signatures to the serialized output.

Parameters:obj – The serialized data
Returns:The modified data

aries_cloudagent.messaging.base_context module

Abstract RequestContext base class.

class aries_cloudagent.messaging.base_context.BaseRequestContext[source]

Bases: abc.ABC

Abstract RequestContext base class.

This class is only for resolving recursive import issues.

aries_cloudagent.messaging.base_handler module

A Base handler class for all message handlers.

class aries_cloudagent.messaging.base_handler.BaseHandler[source]

Bases: abc.ABC

Abstract base class for handlers.

handle(context: aries_cloudagent.messaging.request_context.RequestContext, responder: aries_cloudagent.messaging.responder.BaseResponder)[source]

Abstract method for handler logic.

Parameters:
  • context – Request context object
  • responder – A responder object
exception aries_cloudagent.messaging.base_handler.HandlerException(*args, error_code: str = None, **kwargs)[source]

Bases: aries_cloudagent.error.BaseError

Exception base class for generic handler errors.

aries_cloudagent.messaging.error module

Messaging-related error classes and codes.

exception aries_cloudagent.messaging.error.MessageParseError(*args, error_code: str = None, **kwargs)[source]

Bases: aries_cloudagent.error.BaseError

Message parse error.

error_code = 'message_parse_error'
exception aries_cloudagent.messaging.error.MessagePrepareError(*args, error_code: str = None, **kwargs)[source]

Bases: aries_cloudagent.error.BaseError

Message preparation error.

error_code = 'message_prepare_error'

aries_cloudagent.messaging.message_delivery module

Classes for representing message delivery details.

class aries_cloudagent.messaging.message_delivery.MessageDelivery(*, connection_id: str = None, direct_response: bool = False, direct_response_requested: str = None, in_time: datetime.datetime = None, raw_message: str = None, recipient_verkey: str = None, recipient_did: str = None, recipient_did_public: str = None, sender_did: str = None, sender_verkey: str = None, socket_id: str = None, thread_id: str = None, transport_type: str = None)[source]

Bases: object

Properties of an agent message’s delivery.

connection_id

Accessor for the pairwise connection identifier.

Returns:This context’s connection identifier
direct_response

Accessor for the flag indicating that direct responses are preferred.

Returns:This context’s direct response flag
direct_response_requested

Accessor for the requested direct response mode.

Returns:This context’s requested direct response mode
in_time

Accessor for the datetime the message was received.

Returns:This context’s received time
raw_message

Accessor for the raw message text.

Returns:The raw message text
recipient_did

Accessor for the recipient DID which corresponds with the verkey.

Returns:The recipient DID
recipient_did_public

Check if the recipient did is public.

Indicates whether the message is associated with a public (ledger) recipient DID.

Returns:True if the recipient’s DID is public, else false
recipient_verkey

Accessor for the recipient verkey key used to pack the incoming request.

Returns:The recipient verkey
sender_did

Accessor for the sender DID which corresponds with the verkey.

Returns:The sender did
sender_verkey

Accessor for the sender public key used to pack the incoming request.

Returns:This context’s sender’s verkey
socket_id

Accessor for the identifier of the incoming socket connection.

Returns:This context’s socket identifier
thread_id

Accessor for the identifier of the message thread.

Returns:The delivery thread ID
transport_type

Accessor for the transport type used to receive the message.

Returns:This context’s transport type

aries_cloudagent.messaging.outbound_message module

Outbound message representation.

class aries_cloudagent.messaging.outbound_message.OutboundMessage(payload: Union[str, bytes], *, connection_id: str = None, encoded: bool = False, endpoint: str = None, reply_socket_id: str = None, reply_thread_id: str = None, reply_to_verkey: str = None, target: aries_cloudagent.messaging.connections.models.connection_target.ConnectionTarget = None)[source]

Bases: object

Represents an outgoing message.

endpoint

Return the endpoint of the outbound message.

Defaults to the endpoint of the connection target.

aries_cloudagent.messaging.protocol_registry module

Handle registration and publication of supported message families.

class aries_cloudagent.messaging.protocol_registry.ProtocolRegistry[source]

Bases: object

Protocol registry for indexing message families.

controllers

Accessor for a list of all protocol controller functions.

message_types

Accessor for a list of all message types.

prepare_disclosed(context: aries_cloudagent.config.injection_context.InjectionContext, protocols: Sequence[str])[source]

Call controllers and return publicly supported message families and roles.

protocols

Accessor for a list of all message protocols.

protocols_matching_query(query: str) → Sequence[str][source]

Return a list of message protocols matching a query string.

register_controllers(*controller_sets)[source]

Add new controllers.

Parameters:controller_sets – Mappings of message families to coroutines
register_message_types(*typesets)[source]

Add new supported message types.

Parameters:typesets – Mappings of message types to register
resolve_message_class(message_type: str) → type[source]

Resolve a message_type to a message class.

Given a message type identifier, this method returns the corresponding registered message class.

Parameters:message_type – Message type to resolve
Returns:The resolved message class

aries_cloudagent.messaging.request_context module

Request context class.

A request context provides everything required by handlers and other parts of the system to process a message.

class aries_cloudagent.messaging.request_context.RequestContext(*, base_context: aries_cloudagent.config.injection_context.InjectionContext = None, settings: Mapping[str, object] = None)[source]

Bases: aries_cloudagent.config.injection_context.InjectionContext

Context established by the Conductor and passed into message handlers.

connection_ready

Accessor for the flag indicating an active connection with the sender.

Returns:True if the connection is active, else False
connection_record

Accessor for the related connection record.

copy() → aries_cloudagent.messaging.request_context.RequestContext[source]

Produce a copy of the request context instance.

default_endpoint

Accessor for the default agent endpoint (from agent config).

Returns:The default agent endpoint
default_label

Accessor for the default agent label (from agent config).

Returns:The default label
message

Accessor for the deserialized message instance.

Returns:This context’s agent message
message_delivery

Accessor for the message delivery information.

Returns:This context’s message delivery information

aries_cloudagent.messaging.responder module

A message responder.

The responder is provided to message handlers to enable them to send a new message in response to the message being handled.

class aries_cloudagent.messaging.responder.BaseResponder(*, connection_id: str = None, reply_socket_id: str = None, reply_to_verkey: str = None)[source]

Bases: abc.ABC

Interface for message handlers to send responses.

create_outbound(message: Union[aries_cloudagent.messaging.agent_message.AgentMessage, str, bytes], *, connection_id: str = None, reply_socket_id: str = None, reply_thread_id: str = None, reply_to_verkey: str = None, target: aries_cloudagent.messaging.connections.models.connection_target.ConnectionTarget = None) → aries_cloudagent.messaging.outbound_message.OutboundMessage[source]

Create an OutboundMessage from a message payload.

send(message: Union[aries_cloudagent.messaging.agent_message.AgentMessage, str, bytes], **kwargs)[source]

Convert a message to an OutboundMessage and send it.

send_outbound(message: aries_cloudagent.messaging.outbound_message.OutboundMessage)[source]

Send an outbound message.

Parameters:message – The OutboundMessage to be sent
send_reply(message: Union[aries_cloudagent.messaging.agent_message.AgentMessage, str, bytes], *, connection_id: str = None, target: aries_cloudagent.messaging.connections.models.connection_target.ConnectionTarget = None)[source]

Send a reply to an incoming message.

Parameters:
  • message – the AgentMessage, or pre-packed str or bytes to reply with
  • connection_id – optionally override the target connection ID
  • target – optionally specify a ConnectionTarget to send to
Raises:

ResponderError – If there is no active connection

send_webhook(topic: str, payload: dict)[source]

Dispatch a webhook.

Parameters:
  • topic – the webhook topic identifier
  • payload – the webhook payload value
class aries_cloudagent.messaging.responder.MockResponder[source]

Bases: aries_cloudagent.messaging.responder.BaseResponder

Mock responder implementation for use by tests.

send(message: Union[aries_cloudagent.messaging.agent_message.AgentMessage, str, bytes], **kwargs)[source]

Convert a message to an OutboundMessage and send it.

send_outbound(message: aries_cloudagent.messaging.outbound_message.OutboundMessage)[source]

Send an outbound message.

send_reply(message: Union[aries_cloudagent.messaging.agent_message.AgentMessage, str, bytes], **kwargs)[source]

Send a reply to an incoming message.

send_webhook(topic: str, payload: dict)[source]

Send an outbound message.

exception aries_cloudagent.messaging.responder.ResponderError(*args, error_code: str = None, **kwargs)[source]

Bases: aries_cloudagent.error.BaseError

Responder error.

aries_cloudagent.messaging.serializer module

Standard message serializer classes.

class aries_cloudagent.messaging.serializer.MessageSerializer[source]

Bases: object

Standard DIDComm message parser and serializer.

encode_message(context: aries_cloudagent.config.injection_context.InjectionContext, message_json: Union[str, bytes], recipient_keys: Sequence[str], routing_keys: Sequence[str], sender_key: str) → Union[str, bytes][source]

Encode an outgoing message for transport.

Parameters:
  • context – The injection context for settings and services
  • message_json – The message body to serialize
  • recipient_keys – A sequence of recipient verkeys
  • routing_keys – A sequence of routing verkeys
  • sender_key – The verification key of the sending agent
Returns:

The encoded message

extract_message_type(parsed_msg: dict) → str[source]

Extract the message type identifier from a parsed message.

Raises:MessageParseError – If the message doesn’t specify a type
parse_message(context: aries_cloudagent.config.injection_context.InjectionContext, message_body: Union[str, bytes], transport_type: str) → Tuple[dict, aries_cloudagent.messaging.message_delivery.MessageDelivery][source]

Deserialize an incoming message and further populate the request context.

Parameters:
  • context – The injection context for settings and services
  • message_body – The body of the message
  • transport_type – The transport the message was received on
Returns:

A message delivery object with details on the parsed message

Raises:
  • MessageParseError – If the JSON parsing failed
  • MessageParseError – If a wallet is required but can’t be located

aries_cloudagent.messaging.socket module

Duplex connection handling classes.

class aries_cloudagent.messaging.socket.SocketInfo(*, connection_id: str = None, handler: Coroutine[T_co, T_contra, V_co] = None, reply_mode: str = None, reply_thread_ids: Sequence[str] = None, reply_verkeys: Sequence[str] = None, single_response: _asyncio.Future = None, socket_id: str = None)[source]

Bases: object

Track an open transport connection for direct routing of outbound messages.

REPLY_MODE_ALL = 'all'
REPLY_MODE_NONE = 'none'
REPLY_MODE_THREAD = 'thread'
add_reply_thread_id(thid: str)[source]

Add a thread ID to the set of potential reply targets.

add_reply_verkey(verkey: str)[source]

Add a verkey to the set of potential reply targets.

closed

Accessor for the socket closed state.

dispatch_complete()[source]

Indicate that a message handler has completed.

process_incoming(parsed_msg: dict, delivery: aries_cloudagent.messaging.message_delivery.MessageDelivery)[source]

Process an incoming message and update the socket metadata as necessary.

Parameters:
  • parsed_msg – The unserialized message body
  • delivery – The message delivery metadata
reply_mode

Accessor for the socket reply mode.

select_outgoing(message: aries_cloudagent.messaging.outbound_message.OutboundMessage) → bool[source]

Determine if an outbound message should be sent to this socket.

Parameters:message – The outbound message to be checked
send(message: aries_cloudagent.messaging.outbound_message.OutboundMessage)[source]

.

class aries_cloudagent.messaging.socket.SocketRef(socket_id: str, close: Coroutine[T_co, T_contra, V_co])[source]

Bases: object

A reference to a registered duplex connection.

aries_cloudagent.messaging.util module

Utils for messages.

aries_cloudagent.messaging.util.datetime_now() → datetime.datetime[source]

Timestamp in UTC.

aries_cloudagent.messaging.util.datetime_to_str(dt: Union[str, datetime.datetime]) → str[source]

Convert a datetime object to an indy-standard datetime string.

Parameters:dt – May be a string or datetime to allow automatic conversion
aries_cloudagent.messaging.util.str_to_datetime(dt: Union[str, datetime.datetime]) → datetime.datetime[source]

Convert an indy-standard datetime string to a datetime.

Using a fairly lax regex pattern to match slightly different formats. In Python 3.7 datetime.fromisoformat might be used.

Parameters:dt – May be a string or datetime to allow automatic conversion
aries_cloudagent.messaging.util.time_now() → str[source]

Timestamp in ISO format.