Source code for aries_cloudagent.transport.inbound.manager

"""Inbound transport manager."""

import logging

from .base import (
    BaseInboundTransport,
    InboundTransportConfiguration,
    InboundTransportRegistrationError,
)
from ...classloader import ClassLoader, ModuleLoadError, ClassNotFoundError

MODULE_BASE_PATH = "aries_cloudagent.transport.inbound"


[docs]class InboundTransportManager: """Inbound transport manager class.""" def __init__(self): """Initialize an `InboundTransportManager` instance.""" self.logger = logging.getLogger(__name__) self.class_loader = ClassLoader(MODULE_BASE_PATH, BaseInboundTransport) self.registered_transports = []
[docs] def register( self, config: InboundTransportConfiguration, message_handler, register_socket ): """ Register transport module. Args: module_path: Path to module host: The host to register on port: The port to register on message_handler: The message handler for incoming messages register_socket: A coroutine for registering a new socket """ try: imported_class = self.class_loader.load(config.module, True) except (ModuleLoadError, ClassNotFoundError): raise InboundTransportRegistrationError( f"Failed to load module {config.module}" ) instance = imported_class( config.host, config.port, message_handler, register_socket ) self.register_instance(instance)
[docs] def register_instance(self, transport: BaseInboundTransport): """ Register a new inbound transport instance. Args: transport: Inbound transport instance to register """ self.registered_transports.append(transport)
[docs] async def start(self): """Start all registered transports.""" for transport in self.registered_transports: await transport.start()
[docs] async def stop(self): """Stop all registered transports.""" for transport in self.registered_transports: await transport.stop()