Source code for aries_cloudagent.core.conductor

"""
The Conductor.

The conductor is responsible for coordinating messages that are received
over the network, communicating with the ledger, passing messages to handlers,
instantiating concrete implementations of required modules and storing data in the
wallet.

"""

import hashlib
import logging

from ..admin.base_server import BaseAdminServer
from ..admin.server import AdminServer
from ..config.default_context import ContextBuilder
from ..config.injection_context import InjectionContext
from ..config.ledger import ledger_config
from ..config.logging import LoggingConfigurator
from ..config.wallet import wallet_config
from ..messaging.responder import BaseResponder

# FIXME: We shouldn't rely on a hardcoded message version here.
from ..protocols.connections.v1_0.manager import (
    ConnectionManager,
    ConnectionManagerError,
)
from ..transport.inbound.manager import InboundTransportManager
from ..transport.inbound.message import InboundMessage
from ..transport.outbound.base import OutboundDeliveryError
from ..transport.outbound.manager import OutboundTransportManager
from ..transport.outbound.message import OutboundMessage
from ..transport.wire_format import BaseWireFormat
from ..utils.task_queue import CompletedTask, TaskQueue
from ..utils.stats import Collector

from .dispatcher import Dispatcher

LOGGER = logging.getLogger(__name__)


[docs]class Conductor: """ Conductor class. Class responsible for initializing concrete implementations of our require interfaces and routing inbound and outbound message data. """ def __init__(self, context_builder: ContextBuilder) -> None: """ Initialize an instance of Conductor. Args: inbound_transports: Configuration for inbound transports outbound_transports: Configuration for outbound transports settings: Dictionary of various settings """ self.admin_server = None self.context: InjectionContext = None self.context_builder = context_builder self.dispatcher: Dispatcher = None self.inbound_transport_manager: InboundTransportManager = None self.outbound_transport_manager: OutboundTransportManager = None
[docs] async def setup(self): """Initialize the global request context.""" context = await self.context_builder.build() self.dispatcher = Dispatcher(context) await self.dispatcher.setup() wire_format = await context.inject(BaseWireFormat, required=False) if wire_format and hasattr(wire_format, "task_queue"): wire_format.task_queue = self.dispatcher.task_queue # Register all inbound transports self.inbound_transport_manager = InboundTransportManager( context, self.inbound_message_router, self.handle_not_returned ) await self.inbound_transport_manager.setup() # Register all outbound transports self.outbound_transport_manager = OutboundTransportManager( context, self.handle_not_delivered ) await self.outbound_transport_manager.setup() # Admin API if context.settings.get("admin.enabled"): try: admin_host = context.settings.get("admin.host", "0.0.0.0") admin_port = context.settings.get("admin.port", "80") self.admin_server = AdminServer( admin_host, admin_port, context, self.outbound_message_router, self.webhook_router, self.dispatcher.task_queue, self.get_stats, ) webhook_urls = context.settings.get("admin.webhook_urls") if webhook_urls: for url in webhook_urls: self.admin_server.add_webhook_target(url) context.injector.bind_instance(BaseAdminServer, self.admin_server) if "http" not in self.outbound_transport_manager.registered_schemes: self.outbound_transport_manager.register("http") except Exception: LOGGER.exception("Unable to register admin server") raise # Fetch stats collector, if any collector = await context.inject(Collector, required=False) if collector: # add stats to our own methods collector.wrap( self, ( # "inbound_message_router", "outbound_message_router", # "create_inbound_session", ), ) # at the class level (!) should not be performed multiple times collector.wrap( ConnectionManager, ( # "get_connection_targets", "fetch_did_document", "find_inbound_connection", ), ) self.context = context
[docs] async def start(self) -> None: """Start the agent.""" context = self.context # Configure the wallet public_did = await wallet_config(context) # Configure the ledger await ledger_config(context, public_did) # Start up transports try: await self.inbound_transport_manager.start() except Exception: LOGGER.exception("Unable to start inbound transports") raise try: await self.outbound_transport_manager.start() except Exception: LOGGER.exception("Unable to start outbound transports") raise # Start up Admin server if self.admin_server: try: await self.admin_server.start() except Exception: LOGGER.exception("Unable to start administration API") # Make admin responder available during message parsing # This allows webhooks to be called when a connection is marked active, # for example context.injector.bind_instance(BaseResponder, self.admin_server.responder) # Get agent label default_label = context.settings.get("default_label") # Show some details about the configuration to the user LoggingConfigurator.print_banner( default_label, self.inbound_transport_manager.registered_transports, self.outbound_transport_manager.registered_transports, public_did, self.admin_server, ) # Create a static connection for use by the test-suite if context.settings.get("debug.test_suite_endpoint"): mgr = ConnectionManager(self.context) their_endpoint = context.settings["debug.test_suite_endpoint"] test_conn = await mgr.create_static_connection( my_seed=hashlib.sha256(b"aries-protocol-test-subject").digest(), their_seed=hashlib.sha256(b"aries-protocol-test-suite").digest(), their_endpoint=their_endpoint, their_role="tester", alias="test-suite", ) print("Created static connection for test suite") print(" - My DID:", test_conn.my_did) print(" - Their DID:", test_conn.their_did) print(" - Their endpoint:", their_endpoint) print() # Print an invitation to the terminal if context.settings.get("debug.print_invitation"): try: mgr = ConnectionManager(self.context) _connection, invitation = await mgr.create_invitation( their_role=context.settings.get("debug.invite_role"), my_label=context.settings.get("debug.invite_label"), multi_use=context.settings.get("debug.invite_multi_use", False), public=context.settings.get("debug.invite_public", False), ) base_url = context.settings.get("invite_base_url") invite_url = invitation.to_url(base_url) print("Invitation URL:") print(invite_url, flush=True) except Exception: LOGGER.exception("Error creating invitation")
[docs] async def stop(self, timeout=1.0): """Stop the agent.""" shutdown = TaskQueue() if self.dispatcher: shutdown.run(self.dispatcher.complete()) if self.admin_server: shutdown.run(self.admin_server.stop()) if self.inbound_transport_manager: shutdown.run(self.inbound_transport_manager.stop()) if self.outbound_transport_manager: shutdown.run(self.outbound_transport_manager.stop()) await shutdown.complete(timeout)
[docs] def inbound_message_router( self, message: InboundMessage, can_respond: bool = False ): """ Route inbound messages. Args: message: The inbound message instance can_respond: If the session supports return routing """ if message.receipt.direct_response_requested and not can_respond: LOGGER.warning( "Direct response requested, but not supported by transport: %s", message.transport_type, ) # Note: at this point we could send the message to a shared queue # if this pod is too busy to process it self.dispatcher.queue_message( message, self.outbound_message_router, self.admin_server and self.admin_server.send_webhook, lambda completed: self.dispatch_complete(message, completed), )
[docs] def dispatch_complete(self, message: InboundMessage, completed: CompletedTask): """Handle completion of message dispatch.""" if completed.exc_info: LOGGER.exception( "Exception in message handler:", exc_info=completed.exc_info ) self.inbound_transport_manager.dispatch_complete(message, completed)
[docs] async def get_stats(self) -> dict: """Get the current stats tracked by the conductor.""" stats = { "in_sessions": len(self.inbound_transport_manager.sessions), "out_encode": 0, "out_deliver": 0, "task_active": self.dispatcher.task_queue.current_active, "task_done": self.dispatcher.task_queue.total_done, "task_failed": self.dispatcher.task_queue.total_failed, "task_pending": self.dispatcher.task_queue.current_pending, } for m in self.outbound_transport_manager.outbound_buffer: if m.state == m.STATE_ENCODE: stats["out_encode"] += 1 if m.state == m.STATE_DELIVER: stats["out_deliver"] += 1 return stats
[docs] async def outbound_message_router( self, context: InjectionContext, outbound: OutboundMessage, inbound: InboundMessage = None, ) -> None: """ Route an outbound message. Args: context: The request context message: An outbound message to be sent inbound: The inbound message that produced this response, if available """ if not outbound.target and outbound.reply_to_verkey: if not outbound.reply_from_verkey and inbound: outbound.reply_from_verkey = inbound.receipt.recipient_verkey # return message to an inbound session if self.inbound_transport_manager.return_to_session(outbound): return if not outbound.to_session_only: await self.queue_outbound(context, outbound, inbound)
[docs] def handle_not_returned(self, context: InjectionContext, outbound: OutboundMessage): """Handle a message that failed delivery via an inbound session.""" self.dispatcher.run_task(self.queue_outbound(context, outbound))
[docs] async def queue_outbound( self, context: InjectionContext, outbound: OutboundMessage, inbound: InboundMessage = None, ): """ Queue an outbound message. Args: context: The request context message: An outbound message to be sent inbound: The inbound message that produced this response, if available """ # populate connection target(s) if not outbound.target and not outbound.target_list and outbound.connection_id: # using provided request context mgr = ConnectionManager(context) try: outbound.target_list = await self.dispatcher.run_task( mgr.get_connection_targets(connection_id=outbound.connection_id) ) except ConnectionManagerError: LOGGER.exception("Error preparing outbound message for transmission") return try: self.outbound_transport_manager.enqueue_message(context, outbound) except OutboundDeliveryError: LOGGER.warning("Cannot queue message for delivery, no supported transport") self.handle_not_delivered(context, outbound)
[docs] def handle_not_delivered( self, context: InjectionContext, outbound: OutboundMessage ): """Handle a message that failed delivery via outbound transports.""" self.inbound_transport_manager.return_undelivered(outbound)
[docs] def webhook_router( self, topic: str, payload: dict, endpoint: str, max_attempts: int = None ): """ Route a webhook through the outbound transport manager. Args: topic: The webhook topic payload: The webhook payload endpoint: The endpoint of the webhook target max_attempts: The maximum number of attempts """ try: self.outbound_transport_manager.enqueue_webhook( topic, payload, endpoint, max_attempts ) except OutboundDeliveryError: LOGGER.warning( "Cannot queue message webhook for delivery, no supported transport" )