Source code for aries_cloudagent.transport.inbound.delivery_queue

The Delivery Queue.

The delivery queue holds and manages messages that have not yet
been delivered to their intended destination.

import time

from ..outbound.message import OutboundMessage

[docs]class QueuedMessage: """ Wrapper Class for queued messages. Allows tracking Metadata. """ def __init__(self, msg: OutboundMessage): """ Create Wrapper for queued message. Automatically sets timestamp on create. """ self.msg = msg self.timestamp = time.time()
[docs] def older_than(self, compare_timestamp: float) -> bool: """ Age Comparison. Allows you to test age as compared to the provided timestamp. Args: compare_timestamp: The timestamp to compare """ return self.timestamp < compare_timestamp
[docs]class DeliveryQueue: """ DeliveryQueue class. Manages undelivered messages. """ def __init__(self) -> None: """ Initialize an instance of DeliveryQueue. This uses an in memory structure to queue messages. """ self.queue_by_key = {} self.ttl_seconds = 604800 # one week
[docs] def expire_messages(self, ttl=None): """ Expire messages that are past the time limit. Args: ttl: Optional. Allows override of configured ttl """ ttl_seconds = ttl or self.ttl_seconds horizon = time.time() - ttl_seconds for key in self.queue_by_key.keys(): self.queue_by_key[key] = [ wm for wm in self.queue_by_key[key] if not wm.older_than(horizon) ]
[docs] def add_message(self, msg: OutboundMessage): """ Add an OutboundMessage to delivery queue. The message is added once per recipient key Args: msg: The OutboundMessage to add """ keys = set() if keys.update( if msg.reply_to_verkey: keys.add(msg.reply_to_verkey) wrapped_msg = QueuedMessage(msg) for recipient_key in keys: if recipient_key not in self.queue_by_key: self.queue_by_key[recipient_key] = [] self.queue_by_key[recipient_key].append(wrapped_msg)
[docs] def has_message_for_key(self, key: str): """ Check for queued messages by key. Args: key: The key to use for lookup """ if key in self.queue_by_key and len(self.queue_by_key[key]): return True return False
[docs] def message_count_for_key(self, key: str): """ Count of queued messages by key. Args: key: The key to use for lookup """ if key in self.queue_by_key: return len(self.queue_by_key[key]) else: return 0
[docs] def get_one_message_for_key(self, key: str): """ Remove and return a matching message. Args: key: The key to use for lookup """ if key in self.queue_by_key: return self.queue_by_key[key].pop(0).msg
[docs] def inspect_all_messages_for_key(self, key: str): """ Return all messages for key. Args: key: The key to use for lookup """ if key in self.queue_by_key: for wrapped_msg in self.queue_by_key[key]: yield wrapped_msg.msg
[docs] def remove_message_for_key(self, key: str, msg: OutboundMessage): """ Remove specified message from queue for key. Args: key: The key to use for lookup msg: The message to remove from the queue """ if key in self.queue_by_key: for wrapped_msg in self.queue_by_key[key]: if wrapped_msg.msg == msg: self.queue_by_key[key].remove(wrapped_msg) if not self.queue_by_key[key]: del self.queue_by_key[key] break # exit processing loop