"""Base classes for the queue module."""
from abc import ABC, abstractmethod
import asyncio
import logging
from typing import Union
from ....core.profile import Profile
from ...error import BaseError, TransportError
[docs]class BaseOutboundQueue(ABC):
"""Base class for the outbound queue generic type."""
def __init__(self, root_profile: Profile):
"""Initialize base queue type."""
self.logger = logging.getLogger(__name__)
def __str__(self):
"""Return string representation used in banner on startup."""
return type(self).__name__
async def __aenter__(self):
"""Async context manager enter."""
await self.open()
async def __aexit__(self, err_type, err_value, err_t):
"""Async context manager exit."""
if err_type and err_type != asyncio.CancelledError:
self.logger.exception("Exception in outbound queue")
await self.close()
[docs] async def start(self):
"""Start the queue."""
[docs] async def stop(self):
"""Stop the queue."""
[docs] async def open(self):
"""Start the queue."""
[docs] async def close(self):
"""Stop the queue."""
[docs] @abstractmethod
async def enqueue_message(
self,
payload: Union[str, bytes],
endpoint: str,
):
"""Prepare and send message to external queue."""
[docs]class OutboundQueueError(TransportError):
"""Generic outbound transport error."""
[docs]class OutboundQueueConfigurationError(BaseError):
"""An error with the queue configuration."""
def __init__(self, message):
"""Initialize the exception instance."""
super().__init__(message)