Source code for aries_cloudagent.tests.test_task_processor

import asyncio

from asynctest import TestCase as AsyncTestCase
from asynctest import mock as async_mock

from ..task_processor import TaskProcessor, PendingTask


[docs]class RetryTask: def __init__(self, retries: int, result): self.attempts = 0 self.retries = retries self.result = result
[docs] async def run(self, pending: PendingTask): self.attempts += 1 if self.attempts <= self.retries: raise Exception() return self.result
[docs]class TestTaskProcessor(AsyncTestCase):
[docs] async def test_coro(self): collected = [] async def test_task(val): collected.append(val) return val processor = TaskProcessor() await processor.run_task(test_task(1)) await processor.run_task(test_task(2)) future = await processor.run_task(test_task(3)) result = await asyncio.wait_for(future, timeout=5.0) assert result == 3 await asyncio.wait_for(processor.wait_done(), timeout=5.0) collected.sort() assert collected == [1, 2, 3]
[docs] async def test_error(self): async def test_error(): raise ValueError("test") processor = TaskProcessor() future = await processor.run_task(test_error()) with self.assertRaises(ValueError): result = await asyncio.wait_for(future, timeout=5.0) await asyncio.wait_for(processor.wait_done(), timeout=5.0)
[docs] async def test_retry(self): test_value = "test_value" task = RetryTask(1, test_value) processor = TaskProcessor() future = await processor.run_retry( lambda pending: task.run(pending), retries=5, retry_delay=0.01 ) result = await asyncio.wait_for(future, timeout=5.0) assert result == test_value await asyncio.wait_for(processor.wait_done(), timeout=5.0) assert task.attempts == 2