Source code for acapy_agent.transport.outbound.ws

"""Websockets outbound transport."""

import logging
from typing import Optional, Union

from aiohttp import ClientSession, DummyCookieJar

from ...core.profile import Profile
from .base import BaseOutboundTransport


[docs] class WsTransport(BaseOutboundTransport): """Websockets outbound transport class.""" schemes = ("ws", "wss") is_external = False def __init__(self, **kwargs) -> None: """Initialize an `WsTransport` instance.""" super().__init__(**kwargs) self.logger = logging.getLogger(__name__)
[docs] async def start(self): """Start the outbound transport.""" self.client_session = ClientSession(cookie_jar=DummyCookieJar(), trust_env=True) return self
[docs] async def stop(self): """Stop the outbound transport.""" await self.client_session.close() self.client_session = None
[docs] async def handle_message( self, profile: Profile, payload: Union[str, bytes], endpoint: str, metadata: Optional[dict] = None, api_key: Optional[str] = None, ): """Handle message from queue. Args: profile: the profile that produced the message payload: message payload in string or byte format endpoint: URI endpoint for delivery metadata: Additional metadata associated with the payload api_key: API key for the endpoint """ # aiohttp should automatically handle websocket sessions async with self.client_session.ws_connect(endpoint, headers=metadata) as ws: self.logger.debug("Sending outbound websocket message %s", payload) if isinstance(payload, bytes): await ws.send_bytes(payload) else: await ws.send_str(payload)