Source code for aries_cloudagent.transport.queue.base

"""Abstract message queue."""

from abc import ABC, abstractmethod
import asyncio


[docs]class BaseMessageQueue(ABC): """Abstract message queue class."""
[docs] @abstractmethod async def enqueue(self, message): """ Enqueue a message. Args: message: The message to add to the end of the queue Raises: asyncio.CancelledError if the queue has been stopped """
[docs] @abstractmethod async def dequeue(self, *, timeout: int = None): """ Dequeue a message. Returns: The dequeued message, or None if a timeout occurs Raises: asyncio.CancelledError if the queue has been stopped asyncio.TimeoutError if the timeout is reached """
[docs] @abstractmethod async def join(self): """Wait for the queue to empty."""
[docs] @abstractmethod def task_done(self): """Indicate that the current task is complete."""
[docs] @abstractmethod def stop(self): """Cancel active iteration of the queue."""
[docs] @abstractmethod def reset(self): """Empty the queue and reset the stop event."""
def __aiter__(self): """Async iterator magic method.""" return self async def __anext__(self): """Async iterator magic method.""" try: message = await self.dequeue() except asyncio.CancelledError: raise StopAsyncIteration return message