Source code for aries_cloudagent.transport.outbound.queue.redis

"""Redis outbound transport."""

from typing import Union

import aioredis
import msgpack

from ....core.profile import Profile
from .base import BaseOutboundQueue, OutboundQueueConfigurationError, OutboundQueueError


[docs]class RedisOutboundQueue(BaseOutboundQueue): """Redis outbound transport class.""" config_key = "redis_queue" def __init__(self, root_profile: Profile) -> None: """Set initial state.""" try: plugin_config = root_profile.settings["plugin_config"] or {} config = plugin_config[self.config_key] self.connection = config["connection"] except KeyError as error: raise OutboundQueueConfigurationError( "Configuration missing for redis queue" ) from error self.prefix = config.get("prefix", "acapy") self.pool = aioredis.ConnectionPool.from_url( self.connection, max_connections=10 ) self.redis = aioredis.Redis(connection_pool=self.pool) def __str__(self): """Return string representation of the outbound queue.""" return ( f"RedisOutboundQueue(" f"connection={self.connection}, " f"prefix={self.prefix}" f")" )
[docs] async def start(self): """Start the transport."""
# aioredis will lazily connect but we can eagerly trigger connection with: # await self.redis.ping() # Calling this on enter to `async with` just before another queue # operation is made does not make sense and we should just let aioredis # do lazy connection.
[docs] async def stop(self): """Stop the transport."""
# aioredis cleans up automatically but we can clean up manually with: # await self.pool.disconnect() # However, calling this on exit of `async with` does not make sense and # we should just let aioredis handle the connection lifecycle.
[docs] async def push(self, key: bytes, message: bytes): """Push a ``message`` to redis on ``key``.""" try: return await self.redis.rpush(key, message) except aioredis.RedisError as error: raise OutboundQueueError("Unexpected redis client exception") from error
[docs] async def enqueue_message( self, payload: Union[str, bytes], endpoint: str, ): """Prepare and send message to external redis. Args: payload: message payload in string or byte format endpoint: URI endpoint for delivery """ if not endpoint: raise OutboundQueueError("No endpoint provided") if isinstance(payload, bytes): content_type = "application/ssi-agent-wire" else: content_type = "application/json" payload = payload.encode(encoding="utf-8") message = msgpack.packb( { "headers": {"Content-Type": content_type}, "endpoint": endpoint, "payload": payload, } ) key = f"{self.prefix}.outbound_transport".encode() return await self.push(key, message)