Source code for aries_cloudagent.transport.outbound.tests.test_http_transport

import asyncio

from aiohttp.test_utils import AioHTTPTestCase, unittest_run_loop
from aiohttp import web

from ....messaging.outbound_message import OutboundMessage

from ..http import HttpTransport


[docs]class TestHttpTransport(AioHTTPTestCase):
[docs] async def setUpAsync(self): self.message_results = []
[docs] async def receive_message(self, request): payload = await request.json() self.message_results.append(payload) raise web.HTTPOk()
[docs] async def get_application(self): """ Override the get_app method to return your application. """ app = web.Application() app.add_routes([web.post("/", self.receive_message)]) return app
[docs] @unittest_run_loop async def test_handle_message(self): server_addr = f"http://localhost:{self.server.port}" async def send_message(transport, message): async with transport: await transport.handle_message(message) transport = HttpTransport() message = OutboundMessage("{}", endpoint=server_addr) await asyncio.wait_for(send_message(transport, message), 5.0) assert self.message_results == [{}]