Source code for aries_cloudagent.conductor

"""
The Conductor.

The conductor is responsible for coordinating messages that are received
over the network, communicating with the ledger, passing messages to handlers,
instantiating concrete implementations of required modules and storing data in the
wallet.

"""

import asyncio
from collections import OrderedDict
import logging
from typing import Coroutine, Union

from .admin.base_server import BaseAdminServer
from .admin.server import AdminServer
from .config.default_context import ContextBuilder
from .config.injection_context import InjectionContext
from .config.ledger import ledger_config
from .config.logging import LoggingConfigurator
from .config.wallet import wallet_config
from .dispatcher import Dispatcher
from .messaging.connections.manager import ConnectionManager, ConnectionManagerError
from .messaging.connections.models.connection_record import ConnectionRecord
from .messaging.error import MessageParseError, MessagePrepareError
from .messaging.outbound_message import OutboundMessage
from .messaging.responder import BaseResponder
from .messaging.serializer import MessageSerializer
from .messaging.socket import SocketInfo, SocketRef
from .stats import Collector
from .storage.error import StorageNotFoundError
from .transport.inbound.base import InboundTransportConfiguration
from .transport.inbound.manager import InboundTransportManager
from .transport.outbound.manager import OutboundTransportManager
from .transport.outbound.queue.base import BaseOutboundMessageQueue


[docs]class Conductor: """ Conductor class. Class responsible for initializing concrete implementations of our require interfaces and routing inbound and outbound message data. """ def __init__(self, context_builder: ContextBuilder) -> None: """ Initialize an instance of Conductor. Args: inbound_transports: Configuration for inbound transports outbound_transports: Configuration for outbound transports settings: Dictionary of various settings """ self.admin_server = None self.context: InjectionContext = None self.context_builder = context_builder self.dispatcher: Dispatcher = None self.logger = logging.getLogger(__name__) self.message_serializer: MessageSerializer = None self.inbound_transport_manager: InboundTransportManager = None self.outbound_transport_manager: OutboundTransportManager = None self.sockets = OrderedDict()
[docs] async def setup(self): """Initialize the global request context.""" context = await self.context_builder.build() # Populate message serializer self.message_serializer = await context.inject(MessageSerializer) # Register all inbound transports self.inbound_transport_manager = InboundTransportManager() inbound_transports = context.settings.get("transport.inbound_configs") or [] for transport in inbound_transports: try: module, host, port = transport self.inbound_transport_manager.register( InboundTransportConfiguration(module=module, host=host, port=port), self.inbound_message_router, self.register_socket, ) except Exception: self.logger.exception("Unable to register inbound transport") raise # Register all outbound transports outbound_queue = await context.inject(BaseOutboundMessageQueue) self.outbound_transport_manager = OutboundTransportManager(outbound_queue) outbound_transports = context.settings.get("transport.outbound_configs") or [] for outbound_transport in outbound_transports: try: self.outbound_transport_manager.register(outbound_transport) except Exception: self.logger.exception("Unable to register outbound transport") raise # Admin API if context.settings.get("admin.enabled"): try: admin_host = context.settings.get("admin.host", "0.0.0.0") admin_port = context.settings.get("admin.port", "80") self.admin_server = AdminServer( admin_host, admin_port, context, self.outbound_message_router ) webhook_urls = context.settings.get("admin.webhook_urls") if webhook_urls: for url in webhook_urls: self.admin_server.add_webhook_target(url) context.injector.bind_instance(BaseAdminServer, self.admin_server) except Exception: self.logger.exception("Unable to register admin server") raise self.context = context self.dispatcher = Dispatcher(self.context) collector = await context.inject(Collector, required=False) if collector: # add stats to our own methods collector.wrap( self, ( "inbound_message_router", "outbound_message_router", "prepare_outbound_message", ), ) collector.wrap(self.dispatcher, "dispatch") # at the class level (!) should not be performed multiple times collector.wrap( ConnectionManager, ("get_connection_target", "fetch_did_document", "find_connection"), )
[docs] async def start(self) -> None: """Start the agent.""" context = self.context # Configure the wallet public_did = await wallet_config(context) # Configure the ledger await ledger_config(context, public_did) # Start up transports try: await self.inbound_transport_manager.start() except Exception: self.logger.exception("Unable to start inbound transports") raise try: await self.outbound_transport_manager.start() except Exception: self.logger.exception("Unable to start outbound transports") raise # Start up Admin server if self.admin_server: try: await self.admin_server.start() except Exception: self.logger.exception("Unable to start administration API") # Make admin responder available during message parsing # This allows webhooks to be called when a connection is marked active, # for example context.injector.bind_instance(BaseResponder, self.admin_server.responder) # Show some details about the configuration to the user LoggingConfigurator.print_banner( self.inbound_transport_manager.registered_transports, self.outbound_transport_manager.registered_transports, public_did, self.admin_server, ) # Print an invitation to the terminal if context.settings.get("debug.print_invitation"): try: mgr = ConnectionManager(self.context) _connection, invitation = await mgr.create_invitation() invite_url = invitation.to_url() print("Invitation URL:") print(invite_url) except Exception: self.logger.exception("Error creating invitation")
[docs] async def stop(self, timeout=0.1): """Stop the agent.""" tasks = [] if self.admin_server: tasks.append(self.admin_server.stop()) tasks.append(self.inbound_transport_manager.stop()) tasks.append(self.outbound_transport_manager.stop()) await asyncio.wait_for(asyncio.gather(*tasks), timeout)
[docs] async def register_socket( self, *, handler: Coroutine = None, single_response: asyncio.Future = None ) -> SocketRef: """Register a new duplex connection.""" socket = SocketInfo(handler=handler, single_response=single_response) socket_id = socket.socket_id self.sockets[socket_id] = socket async def close_socket(): socket.closed = True return SocketRef(socket_id=socket_id, close=close_socket)
[docs] async def inbound_message_router( self, message_body: Union[str, bytes], transport_type: str = None, socket_id: str = None, single_response: asyncio.Future = None, ) -> asyncio.Future: """ Route inbound messages. Args: message_body: Body of the incoming message transport_type: Type of transport this message came from socket_id: The identifier of the incoming socket connection single_response: A future to contain the first direct response message """ try: parsed_msg, delivery = await self.message_serializer.parse_message( self.context, message_body, transport_type ) except MessageParseError: self.logger.exception("Error expanding message") raise connection_mgr = ConnectionManager(self.context) connection = await connection_mgr.find_message_connection(delivery) if connection: delivery.connection_id = connection.connection_id if single_response and not socket_id: socket = SocketInfo(single_response=single_response) socket_id = socket.socket_id self.sockets[socket_id] = socket if socket_id: if socket_id not in self.sockets: self.logger.warning( "Inbound message on unregistered socket ID: %s", socket_id ) socket_id = None elif self.sockets[socket_id].closed: self.logger.warning( "Inbound message on closed socket ID: %s", socket_id ) socket_id = None delivery.socket_id = socket_id socket = self.sockets[socket_id] if socket_id else None if socket: socket.process_incoming(parsed_msg, delivery) elif ( delivery.direct_response_requested and delivery.direct_response_requested != SocketInfo.REPLY_MODE_NONE ): self.logger.warning( "Direct response requested, but not supported by transport: %s", delivery.transport_type, ) complete = await self.dispatcher.dispatch( parsed_msg, delivery, connection, self.outbound_message_router ) if socket: complete.add_done_callback(lambda fut: socket.dispatch_complete()) return complete
[docs] async def prepare_outbound_message( self, message: OutboundMessage, context: InjectionContext = None ): """Prepare a response message for transmission. Args: message: An outbound message to be sent context: Optional request context """ context = context or self.context if message.connection_id and not message.target: try: record = await ConnectionRecord.retrieve_by_id( context, message.connection_id ) except StorageNotFoundError as e: raise MessagePrepareError( "Could not locate connection record: {}".format( message.connection_id ) ) from e mgr = ConnectionManager(context) try: target = await mgr.get_connection_target(record) except ConnectionManagerError as e: raise MessagePrepareError(str(e)) from e if not target: raise MessagePrepareError( "No connection target for message: {}".format(message.connection_id) ) message.target = target if not message.encoded and message.target: target = message.target message.payload = await self.message_serializer.encode_message( context, message.payload, target.recipient_keys, target.routing_keys, target.sender_key, ) message.encoded = True
[docs] async def outbound_message_router( self, message: OutboundMessage, context: InjectionContext = None ) -> None: """ Route an outbound message. Args: message: An outbound message to be sent context: Optional request context """ try: await self.prepare_outbound_message(message, context) except MessagePrepareError: self.logger.exception("Error preparing outbound message for transmission") return # try socket connections first, preferring the same socket ID socket_id = message.reply_socket_id sel_socket = None if ( socket_id and socket_id in self.sockets and self.sockets[socket_id].select_outgoing(message) ): sel_socket = self.sockets[socket_id] else: for socket in self.sockets.values(): if socket.select_outgoing(message): sel_socket = socket break if sel_socket: await sel_socket.send(message) self.logger.debug("Returned message to socket %s", sel_socket.socket_id) return # deliver directly to endpoint if message.endpoint: await self.outbound_transport_manager.send_message(message) return self.logger.warning("No endpoint or direct route for outbound message, dropped")