Source code for aries_cloudagent.transport.inbound.base

"""Base inbound transport class."""

from abc import ABC, abstractmethod
from collections import namedtuple
from typing import Awaitable, Callable

from ..error import TransportError
from ..wire_format import BaseWireFormat

from .session import InboundSession


[docs]class BaseInboundTransport(ABC): """Base inbound transport class.""" def __init__( self, scheme: str, create_session: Callable, *, max_message_size: int = 0, wire_format: BaseWireFormat = None, ): """ Initialize the inbound transport instance. Args: scheme: The transport scheme identifier create_session: Method to create a new inbound session """ self._create_session = create_session self._max_message_size = max_message_size self._scheme = scheme self.wire_format: BaseWireFormat = wire_format @property def max_message_size(self): """Accessor for this transport's max message size.""" return self._max_message_size @property def scheme(self): """Accessor for this transport's scheme.""" return self._scheme
[docs] def create_session( self, *, accept_undelivered: bool = False, can_respond: bool = False, client_info: dict = None, wire_format: BaseWireFormat = None, ) -> Awaitable[InboundSession]: """ Create a new inbound session. Args: accept_undelivered: Flag for accepting undelivered messages can_respond: Flag indicating that the transport can send responses client_info: Request-specific client information wire_format: Optionally override the session wire format """ return self._create_session( accept_undelivered=accept_undelivered, can_respond=can_respond, client_info=client_info, wire_format=wire_format or self.wire_format, transport_type=self.scheme, )
[docs] @abstractmethod async def start(self) -> None: """Start listening for on this transport."""
[docs] @abstractmethod async def stop(self) -> None: """Stop listening for on this transport."""
[docs]class InboundTransportError(TransportError): """Generic inbound transport error."""
[docs]class InboundTransportRegistrationError(InboundTransportError): """Error in loading an inbound transport."""
[docs]class InboundTransportSetupError(InboundTransportError): """Setup error for an inbound transport."""
InboundTransportConfiguration = namedtuple( "InboundTransportConfiguration", "module host port" )