"""Http Transport classes and functions."""
import asyncio
import logging
from typing import Coroutine
from aiohttp import web
from .base import BaseInboundTransport, InboundTransportSetupError
[docs]class HttpTransport(BaseInboundTransport):
"""Http Transport class."""
def __init__(
self,
host: str,
port: int,
message_router: Coroutine,
register_socket: Coroutine,
) -> None:
"""
Initialize a Transport instance.
Args:
host: Host to listen on
port: Port to listen on
message_router: Function to pass incoming messages to
register_socket: A coroutine for registering a new socket
"""
self.host = host
self.port = port
self.message_router = message_router
self.register_socket = register_socket
self.site = None
self._scheme = "http"
self.logger = logging.getLogger(__name__)
@property
def scheme(self):
"""Accessor for this transport's scheme."""
return self._scheme
[docs] async def make_application(self) -> web.Application:
"""Construct the aiohttp application."""
app = web.Application()
app.add_routes([web.get("/", self.invite_message_handler)])
app.add_routes([web.post("/", self.inbound_message_handler)])
return app
[docs] async def start(self) -> None:
"""
Start this transport.
Raises:
InboundTransportSetupError: If there was an error starting the webserver
"""
app = await self.make_application()
runner = web.AppRunner(app)
await runner.setup()
self.site = web.TCPSite(runner, host=self.host, port=self.port)
try:
await self.site.start()
except OSError:
raise InboundTransportSetupError(
"Unable to start webserver with host "
+ f"'{self.host}' and port '{self.port}'\n"
)
[docs] async def stop(self) -> None:
"""Stop this transport."""
if self.site:
await self.site.stop()
self.site = None
[docs] async def inbound_message_handler(self, request: web.BaseRequest):
"""
Message handler for inbound messages.
Args:
request: aiohttp request object
Returns:
The web response
"""
ctype = request.headers.get("content-type", "")
if ctype.split(";", 1)[0].lower() == "application/json":
body = await request.text()
else:
body = await request.read()
try:
response = asyncio.Future()
await self.message_router(body, self._scheme, single_response=response)
except Exception:
self.logger.exception("Error handling message")
return web.Response(status=400)
try:
await asyncio.wait_for(response, 30)
except asyncio.TimeoutError:
if not response.done():
response.cancel()
return web.Response(status=504)
except asyncio.CancelledError:
return web.Response(status=200)
message = response.result()
if message:
if isinstance(message, bytes):
return web.Response(
body=message,
status=200,
headers={"Content-Type": "application/ssi-agent-wire"},
)
else:
return web.Response(
text=message,
status=200,
headers={"Content-Type": "application/json"},
)
return web.Response(status=200)
[docs] async def invite_message_handler(self, request: web.BaseRequest):
"""
Message handler for invites.
Args:
request: aiohttp request object
Returns:
The web response
"""
if request.query.get("c_i"):
return web.Response(
text="You have received a connection invitation. To accept the "
"invitation, paste it into your agent application."
)
else:
return web.Response(status=200)