Source code for aries_cloudagent.transport.outbound.http

"""Http outbound transport."""

import logging
from typing import Union

from aiohttp import ClientSession, DummyCookieJar, TCPConnector

from ...core.profile import Profile

from ..stats import StatsTracer
from ..wire_format import DIDCOMM_V0_MIME_TYPE, DIDCOMM_V1_MIME_TYPE

from .base import BaseOutboundTransport, OutboundTransportError


[docs]class HttpTransport(BaseOutboundTransport): """Http outbound transport class.""" schemes = ("http", "https") is_external = False def __init__(self, **kwargs) -> None: """Initialize an `HttpTransport` instance.""" super().__init__(**kwargs) self.client_session: ClientSession = None self.connector: TCPConnector = None self.logger = logging.getLogger(__name__)
[docs] async def start(self): """Start the transport.""" self.connector = TCPConnector(limit=200, limit_per_host=50) session_args = { "cookie_jar": DummyCookieJar(), "connector": self.connector, "trust_env": True, } if self.collector: session_args["trace_configs"] = [ StatsTracer(self.collector, "outbound-http:") ] self.client_session = ClientSession(**session_args) return self
[docs] async def stop(self): """Stop the transport.""" await self.client_session.close() self.client_session = None
[docs] async def handle_message( self, profile: Profile, payload: Union[str, bytes], endpoint: str, metadata: dict = None, api_key: str = None, ): """Handle message from queue. Args: profile: the profile that produced the message payload: message payload in string or byte format endpoint: URI endpoint for delivery metadata: Additional metadata associated with the payload """ if not endpoint: raise OutboundTransportError("No endpoint provided") headers = metadata or {} if api_key is not None: headers["x-api-key"] = api_key if isinstance(payload, bytes): if profile.settings.get("emit_new_didcomm_mime_type"): headers["Content-Type"] = DIDCOMM_V1_MIME_TYPE else: headers["Content-Type"] = DIDCOMM_V0_MIME_TYPE else: headers["Content-Type"] = "application/json" self.logger.debug( "Posting to %s; Data: %s; Headers: %s", endpoint, payload, headers ) async with self.client_session.post( endpoint, data=payload, headers=headers ) as response: if response.status < 200 or response.status > 299: raise OutboundTransportError( ( f"Unexpected response status {response.status}, " f"caused by: {response.reason}" ) )