"""
The Conductor.
The conductor is responsible for coordinating messages that are received
over the network, communicating with the ledger, passing messages to handlers,
instantiating concrete implementations of required modules and storing data in the
wallet.
"""
import hashlib
import json
import logging
from ..admin.base_server import BaseAdminServer
from ..admin.server import AdminResponder, AdminServer
from ..config.default_context import ContextBuilder
from ..config.injection_context import InjectionContext
from ..config.ledger import get_genesis_transactions, ledger_config
from ..config.logging import LoggingConfigurator
from ..config.wallet import wallet_config
from ..connections.models.conn_record import ConnRecord
from ..core.profile import Profile
from ..ledger.error import LedgerConfigError, LedgerTransactionError
from ..messaging.responder import BaseResponder
from ..multitenant.manager import MultitenantManager
from ..protocols.connections.v1_0.manager import (
ConnectionManager,
ConnectionManagerError,
)
from ..protocols.connections.v1_0.messages.connection_invitation import (
ConnectionInvitation,
)
from ..protocols.coordinate_mediation.v1_0.manager import MediationManager
from ..protocols.out_of_band.v1_0.manager import OutOfBandManager
from ..protocols.out_of_band.v1_0.messages.invitation import HSProto, InvitationMessage
from ..transport.inbound.manager import InboundTransportManager
from ..transport.inbound.message import InboundMessage
from ..transport.outbound.base import OutboundDeliveryError
from ..transport.outbound.manager import OutboundTransportManager, QueuedOutboundMessage
from ..transport.outbound.message import OutboundMessage
from ..transport.outbound.queue.base import BaseOutboundQueue
from ..transport.outbound.queue.loader import get_outbound_queue
from ..transport.outbound.status import OutboundSendStatus
from ..transport.wire_format import BaseWireFormat
from ..utils.stats import Collector
from ..utils.task_queue import CompletedTask, TaskQueue
from ..vc.ld_proofs.document_loader import DocumentLoader
from ..wallet.did_info import DIDInfo
from .dispatcher import Dispatcher
LOGGER = logging.getLogger(__name__)
[docs]class Conductor:
"""
Conductor class.
Class responsible for initializing concrete implementations
of our require interfaces and routing inbound and outbound message data.
"""
def __init__(self, context_builder: ContextBuilder) -> None:
"""
Initialize an instance of Conductor.
Args:
inbound_transports: Configuration for inbound transports
outbound_transports: Configuration for outbound transports
settings: Dictionary of various settings
"""
self.admin_server = None
self.context_builder = context_builder
self.dispatcher: Dispatcher = None
self.inbound_transport_manager: InboundTransportManager = None
self.outbound_transport_manager: OutboundTransportManager = None
self.root_profile: Profile = None
self.setup_public_did: DIDInfo = None
self.outbound_queue: BaseOutboundQueue = None
@property
def context(self) -> InjectionContext:
"""Accessor for the injection context."""
return self.root_profile.context
[docs] async def setup(self):
"""Initialize the global request context."""
context = await self.context_builder.build_context()
# Fetch genesis transactions if necessary
await get_genesis_transactions(context.settings)
# Configure the root profile
self.root_profile, self.setup_public_did = await wallet_config(context)
context = self.root_profile.context
# Configure the ledger
if not await ledger_config(
self.root_profile, self.setup_public_did and self.setup_public_did.did
):
LOGGER.warning("No ledger configured")
# Register all inbound transports
self.inbound_transport_manager = InboundTransportManager(
self.root_profile, self.inbound_message_router, self.handle_not_returned
)
await self.inbound_transport_manager.setup()
context.injector.bind_instance(
InboundTransportManager, self.inbound_transport_manager
)
# Register all outbound transports
self.outbound_transport_manager = OutboundTransportManager(
context, self.handle_not_delivered
)
await self.outbound_transport_manager.setup()
# Initialize dispatcher
self.dispatcher = Dispatcher(self.root_profile)
await self.dispatcher.setup()
wire_format = context.inject(BaseWireFormat, required=False)
if wire_format and hasattr(wire_format, "task_queue"):
wire_format.task_queue = self.dispatcher.task_queue
# Bind manager for multitenancy related tasks
if context.settings.get("multitenant.enabled"):
multitenant_mgr = MultitenantManager(self.root_profile)
context.injector.bind_instance(MultitenantManager, multitenant_mgr)
# Bind default PyLD document loader
context.injector.bind_instance(
DocumentLoader, DocumentLoader(self.root_profile)
)
self.outbound_queue = get_outbound_queue(context.settings)
# Admin API
if context.settings.get("admin.enabled"):
try:
admin_host = context.settings.get("admin.host", "0.0.0.0")
admin_port = context.settings.get("admin.port", "80")
self.admin_server = AdminServer(
admin_host,
admin_port,
context,
self.root_profile,
self.outbound_message_router,
self.webhook_router,
self.stop,
self.dispatcher.task_queue,
self.get_stats,
)
context.injector.bind_instance(BaseAdminServer, self.admin_server)
except Exception:
LOGGER.exception("Unable to register admin server")
raise
# Fetch stats collector, if any
collector = context.inject(Collector, required=False)
if collector:
# add stats to our own methods
collector.wrap(
self,
(
# "inbound_message_router",
"outbound_message_router",
# "create_inbound_session",
),
)
# at the class level (!) should not be performed multiple times
collector.wrap(
ConnectionManager,
(
# "get_connection_targets",
"fetch_did_document",
"find_inbound_connection",
),
)
[docs] async def start(self) -> None:
"""Start the agent."""
context = self.root_profile.context
# Start up transports
try:
await self.inbound_transport_manager.start()
except Exception:
LOGGER.exception("Unable to start inbound transports")
raise
try:
await self.outbound_transport_manager.start()
except Exception:
LOGGER.exception("Unable to start outbound transports")
raise
# Start up Admin server
if self.admin_server:
try:
await self.admin_server.start()
except Exception:
LOGGER.exception("Unable to start administration API")
# Make admin responder available during message parsing
# This allows webhooks to be called when a connection is marked active,
# for example
responder = AdminResponder(
self.root_profile,
self.admin_server.outbound_message_router,
)
context.injector.bind_instance(BaseResponder, responder)
# Get agent label
default_label = context.settings.get("default_label")
# Show some details about the configuration to the user
LoggingConfigurator.print_banner(
default_label,
self.inbound_transport_manager.registered_transports,
self.outbound_transport_manager.registered_transports,
self.outbound_queue,
self.setup_public_did and self.setup_public_did.did,
self.admin_server,
)
# Create a static connection for use by the test-suite
if context.settings.get("debug.test_suite_endpoint"):
async with self.root_profile.session() as session:
mgr = ConnectionManager(session)
their_endpoint = context.settings["debug.test_suite_endpoint"]
test_conn = await mgr.create_static_connection(
my_seed=hashlib.sha256(b"aries-protocol-test-subject").digest(),
their_seed=hashlib.sha256(b"aries-protocol-test-suite").digest(),
their_endpoint=their_endpoint,
alias="test-suite",
)
print("Created static connection for test suite")
print(" - My DID:", test_conn.my_did)
print(" - Their DID:", test_conn.their_did)
print(" - Their endpoint:", their_endpoint)
print()
del mgr
# Clear default mediator
if context.settings.get("mediation.clear"):
mediation_mgr = MediationManager(self.root_profile)
await mediation_mgr.clear_default_mediator()
print("Default mediator cleared.")
# Clear default mediator
# Set default mediator by id
default_mediator_id = context.settings.get("mediation.default_id")
if default_mediator_id:
mediation_mgr = MediationManager(self.root_profile)
try:
await mediation_mgr.set_default_mediator_by_id(default_mediator_id)
print(f"Default mediator set to {default_mediator_id}")
except Exception:
LOGGER.exception("Error updating default mediator")
# Print an invitation to the terminal
if context.settings.get("debug.print_invitation"):
try:
async with self.root_profile.session() as session:
mgr = OutOfBandManager(session)
invi_rec = await mgr.create_invitation(
my_label=context.settings.get("debug.invite_label"),
public=context.settings.get("debug.invite_public", False),
multi_use=context.settings.get("debug.invite_multi_use", False),
hs_protos=[HSProto.RFC23],
metadata=json.loads(
context.settings.get("debug.invite_metadata_json", "{}")
),
)
base_url = context.settings.get("invite_base_url")
invite_url = invi_rec.invitation.to_url(base_url)
print("Invitation URL:")
print(invite_url, flush=True)
del mgr
except Exception:
LOGGER.exception("Error creating invitation")
# Print connections protocol invitation to the terminal
if context.settings.get("debug.print_connections_invitation"):
try:
async with self.root_profile.session() as session:
mgr = ConnectionManager(session)
_record, invite = await mgr.create_invitation(
my_label=context.settings.get("debug.invite_label"),
public=context.settings.get("debug.invite_public", False),
multi_use=context.settings.get("debug.invite_multi_use", False),
metadata=json.loads(
context.settings.get("debug.invite_metadata_json", "{}")
),
)
base_url = context.settings.get("invite_base_url")
print("Invitation URL (Connections protocol):")
print(invite.to_url(base_url), flush=True)
del mgr
except Exception:
LOGGER.exception("Error creating invitation")
# Accept mediation invitation if specified
mediation_invitation = context.settings.get("mediation.invite")
if mediation_invitation:
try:
mediation_connections_invite = context.settings.get(
"mediation.connections_invite", False
)
if mediation_connections_invite:
async with self.root_profile.session() as session:
mgr = ConnectionManager(session)
conn_record = await mgr.receive_invitation(
invitation=ConnectionInvitation.from_url(
mediation_invitation
),
auto_accept=True,
)
await conn_record.metadata_set(
session, MediationManager.SEND_REQ_AFTER_CONNECTION, True
)
await conn_record.metadata_set(
session, MediationManager.SET_TO_DEFAULT_ON_GRANTED, True
)
print("Attempting to connect to mediator...")
del mgr
else:
async with self.root_profile.session() as session:
mgr = OutOfBandManager(session)
conn_record_dict = await mgr.receive_invitation(
invi_msg=InvitationMessage.from_url(mediation_invitation),
auto_accept=True,
)
conn_record = ConnRecord.deserialize(conn_record_dict)
await conn_record.metadata_set(
session, MediationManager.SEND_REQ_AFTER_CONNECTION, True
)
await conn_record.metadata_set(
session, MediationManager.SET_TO_DEFAULT_ON_GRANTED, True
)
print("Attempting to connect to mediator...")
del mgr
except Exception:
LOGGER.exception("Error accepting mediation invitation")
[docs] async def stop(self, timeout=1.0):
"""Stop the agent."""
shutdown = TaskQueue()
if self.dispatcher:
shutdown.run(self.dispatcher.complete())
if self.admin_server:
shutdown.run(self.admin_server.stop())
if self.inbound_transport_manager:
shutdown.run(self.inbound_transport_manager.stop())
if self.outbound_transport_manager:
shutdown.run(self.outbound_transport_manager.stop())
# close multitenant profiles
multitenant_mgr = self.context.inject(MultitenantManager, required=False)
if multitenant_mgr:
for profile in multitenant_mgr._instances.values():
shutdown.run(profile.close())
if self.root_profile:
shutdown.run(self.root_profile.close())
await shutdown.complete(timeout)
[docs] def inbound_message_router(
self,
profile: Profile,
message: InboundMessage,
can_respond: bool = False,
):
"""
Route inbound messages.
Args:
context: The context associated with the inbound message
message: The inbound message instance
can_respond: If the session supports return routing
"""
if message.receipt.direct_response_requested and not can_respond:
LOGGER.warning(
"Direct response requested, but not supported by transport: %s",
message.transport_type,
)
# Note: at this point we could send the message to a shared queue
# if this pod is too busy to process it
try:
self.dispatcher.queue_message(
profile,
message,
self.outbound_message_router,
lambda completed: self.dispatch_complete(message, completed),
)
except (LedgerConfigError, LedgerTransactionError) as e:
LOGGER.error("Shutdown on ledger error %s", str(e))
if self.admin_server:
self.admin_server.notify_fatal_error()
raise
[docs] def dispatch_complete(self, message: InboundMessage, completed: CompletedTask):
"""Handle completion of message dispatch."""
if completed.exc_info:
LOGGER.exception(
"Exception in message handler:", exc_info=completed.exc_info
)
if isinstance(completed.exc_info[1], LedgerConfigError) or isinstance(
completed.exc_info[1], LedgerTransactionError
):
LOGGER.error(
"%shutdown on ledger error %s",
"S" if self.admin_server else "No admin server to s",
str(completed.exc_info[1]),
)
if self.admin_server:
self.admin_server.notify_fatal_error()
else:
LOGGER.error(
"DON'T shutdown on %s %s",
completed.exc_info[0].__name__,
str(completed.exc_info[1]),
)
self.inbound_transport_manager.dispatch_complete(message, completed)
[docs] async def get_stats(self) -> dict:
"""Get the current stats tracked by the conductor."""
stats = {
"in_sessions": len(self.inbound_transport_manager.sessions),
"out_encode": 0,
"out_deliver": 0,
"task_active": self.dispatcher.task_queue.current_active,
"task_done": self.dispatcher.task_queue.total_done,
"task_failed": self.dispatcher.task_queue.total_failed,
"task_pending": self.dispatcher.task_queue.current_pending,
}
for m in self.outbound_transport_manager.outbound_buffer:
if m.state == QueuedOutboundMessage.STATE_ENCODE:
stats["out_encode"] += 1
if m.state == QueuedOutboundMessage.STATE_DELIVER:
stats["out_deliver"] += 1
return stats
[docs] async def outbound_message_router(
self,
profile: Profile,
outbound: OutboundMessage,
inbound: InboundMessage = None,
) -> OutboundSendStatus:
"""
Route an outbound message.
Args:
profile: The active profile for the request
message: An outbound message to be sent
inbound: The inbound message that produced this response, if available
"""
if not outbound.target and outbound.reply_to_verkey:
if not outbound.reply_from_verkey and inbound:
outbound.reply_from_verkey = inbound.receipt.recipient_verkey
# return message to an inbound session
if self.inbound_transport_manager.return_to_session(outbound):
return OutboundSendStatus.SENT_TO_SESSION
if not outbound.to_session_only:
return await self.queue_outbound(profile, outbound, inbound)
[docs] def handle_not_returned(self, profile: Profile, outbound: OutboundMessage):
"""Handle a message that failed delivery via an inbound session."""
try:
self.dispatcher.run_task(self.queue_outbound(profile, outbound))
except (LedgerConfigError, LedgerTransactionError) as e:
LOGGER.error("Shutdown on ledger error %s", str(e))
if self.admin_server:
self.admin_server.notify_fatal_error()
raise
[docs] async def queue_outbound(
self,
profile: Profile,
outbound: OutboundMessage,
inbound: InboundMessage = None,
) -> OutboundSendStatus:
"""
Queue an outbound message for transport.
Args:
profile: The active profile
message: An outbound message to be sent
inbound: The inbound message that produced this response, if available
"""
# populate connection target(s)
if not outbound.target and not outbound.target_list and outbound.connection_id:
async with profile.session() as session:
conn_mgr = ConnectionManager(session)
try:
outbound.target_list = await self.dispatcher.run_task(
conn_mgr.get_connection_targets(
connection_id=outbound.connection_id
)
)
except ConnectionManagerError:
LOGGER.exception(
"Error preparing outbound message for transmission"
)
return
except (LedgerConfigError, LedgerTransactionError) as e:
LOGGER.error("Shutdown on ledger error %s", str(e))
if self.admin_server:
self.admin_server.notify_fatal_error()
raise
del conn_mgr
# If ``self.outbound_queue`` is specified (usually set via
# outbound queue `-oq` commandline option), use that external
# queue. Else save the message to an internal queue. This
# internal queue usually results in the message to be sent over
# ACA-py `-ot` transport.
if self.outbound_queue:
return await self._queue_external(profile, outbound)
else:
return self._queue_internal(profile, outbound)
async def _queue_external(
self,
profile: Profile,
outbound: OutboundMessage,
) -> OutboundSendStatus:
"""Save the message to an external outbound queue."""
async with self.outbound_queue:
targets = (
[outbound.target] if outbound.target else (outbound.target_list or [])
)
for target in targets:
await self.outbound_queue.enqueue_message(
outbound.payload, target.endpoint
)
return OutboundSendStatus.SENT_TO_EXTERNAL_QUEUE
def _queue_internal(
self, profile: Profile, outbound: OutboundMessage
) -> OutboundSendStatus:
"""Save the message to an internal outbound queue."""
try:
self.outbound_transport_manager.enqueue_message(profile, outbound)
return OutboundSendStatus.QUEUED_FOR_DELIVERY
except OutboundDeliveryError:
LOGGER.warning("Cannot queue message for delivery, no supported transport")
return self.handle_not_delivered(profile, outbound)
[docs] def handle_not_delivered(
self, profile: Profile, outbound: OutboundMessage
) -> OutboundSendStatus:
"""Handle a message that failed delivery via outbound transports."""
queued_for_inbound = self.inbound_transport_manager.return_undelivered(outbound)
return (
OutboundSendStatus.WAITING_FOR_PICKUP
if queued_for_inbound
else OutboundSendStatus.UNDELIVERABLE
)
[docs] def webhook_router(
self,
topic: str,
payload: dict,
endpoint: str,
max_attempts: int = None,
metadata: dict = None,
):
"""
Route a webhook through the outbound transport manager.
Args:
topic: The webhook topic
payload: The webhook payload
endpoint: The endpoint of the webhook target
max_attempts: The maximum number of attempts
metadata: Additional metadata associated with the payload
"""
try:
self.outbound_transport_manager.enqueue_webhook(
topic, payload, endpoint, max_attempts, metadata
)
except OutboundDeliveryError:
LOGGER.warning(
"Cannot queue message webhook for delivery, no supported transport"
)