Source code for aries_cloudagent.transport.outbound.http

"""Http outbound transport."""

import logging
from typing import Union

from aiohttp import ClientSession, DummyCookieJar, TCPConnector

from ...config.injection_context import InjectionContext

from ..stats import StatsTracer

from .base import BaseOutboundTransport, OutboundTransportError

[docs]class HttpTransport(BaseOutboundTransport): """Http outbound transport class.""" schemes = ("http", "https") def __init__(self) -> None: """Initialize an `HttpTransport` instance.""" super(HttpTransport, self).__init__() self.client_session: ClientSession = None self.connector: TCPConnector = None self.logger = logging.getLogger(__name__)
[docs] async def start(self): """Start the transport.""" session_args = {} self.connector = TCPConnector(limit=200, limit_per_host=50) if self.collector: session_args["trace_configs"] = [ StatsTracer(self.collector, "outbound-http:") ] session_args["cookie_jar"] = DummyCookieJar() session_args["connector"] = self.connector self.client_session = ClientSession(**session_args) return self
[docs] async def stop(self): """Stop the transport.""" await self.client_session.close() self.client_session = None
[docs] async def handle_message( self, context: InjectionContext, payload: Union[str, bytes], endpoint: str ): """ Handle message from queue. Args: context: the context that produced the message payload: message payload in string or byte format endpoint: URI endpoint for delivery """ if not endpoint: raise OutboundTransportError("No endpoint provided") headers = {} if isinstance(payload, bytes): headers["Content-Type"] = "application/ssi-agent-wire" else: headers["Content-Type"] = "application/json" async with endpoint, data=payload, headers=headers ) as response: if response.status < 200 or response.status > 299: raise OutboundTransportError("Unexpected response status")