Source code for acapy_agent.protocols.routing.v1_0.manager

"""Routing manager classes for tracking and inspecting routing records."""

import asyncio
import logging
from typing import Optional, Sequence

from ....core.error import BaseError
from ....core.profile import Profile
from ....storage.error import StorageDuplicateError, StorageNotFoundError
from .models.route_record import RouteRecord

LOGGER = logging.getLogger(__name__)

RECIP_ROUTE_PAUSE = 0.1
RECIP_ROUTE_RETRY = 10


[docs] class RoutingManagerError(BaseError): """Generic routing error."""
[docs] class RouteNotFoundError(RoutingManagerError): """Requested route was not found."""
[docs] class RoutingManager: """Class for handling routing records.""" RECORD_TYPE = "forward_route" def __init__(self, profile: Profile): """Initialize a RoutingManager. Args: profile: The profile instance for this manager """ self._profile = profile if not profile: raise RoutingManagerError("Missing profile")
[docs] async def get_recipient(self, recip_verkey: str) -> RouteRecord: """Resolve the recipient for a verkey. Args: recip_verkey: The verkey ("to") of the incoming Forward message Returns: The `RouteRecord` associated with this verkey """ if not recip_verkey: raise RoutingManagerError("Must pass non-empty recip_verkey") i = 0 record = None while not record: try: LOGGER.debug("Fetching routing record for verkey: %s", recip_verkey) async with self._profile.session() as session: record = await RouteRecord.retrieve_by_recipient_key( session, recip_verkey ) LOGGER.debug("Found routing record for verkey: %s", recip_verkey) return record except StorageDuplicateError: LOGGER.info( "Duplicate routing records found for verkey: %s", recip_verkey ) raise RouteNotFoundError( f"More than one route record found with recipient key: {recip_verkey}" ) except StorageNotFoundError: LOGGER.debug("No routing record found for verkey: %s", recip_verkey) i += 1 if i > RECIP_ROUTE_RETRY: raise RouteNotFoundError( f"No route found with recipient key: {recip_verkey}" ) await asyncio.sleep(RECIP_ROUTE_PAUSE)
[docs] async def get_routes( self, client_connection_id: Optional[str] = None, tag_filter: Optional[dict] = None, ) -> Sequence[RouteRecord]: """Fetch all routes associated with the current connection. Args: client_connection_id: The ID of the connection record tag_filter: An optional dictionary of tag filters Returns: A sequence of route records found by the query """ # Routing protocol acts only as Server, filter out all client records filters = {"role": RouteRecord.ROLE_SERVER} if client_connection_id: filters["connection_id"] = client_connection_id if tag_filter: for key in ("recipient_key",): if key not in tag_filter: continue val = tag_filter[key] if isinstance(val, str): filters[key] = val elif isinstance(val, list): filters[key] = {"$in": val} else: raise RoutingManagerError( "Unsupported tag filter: '{}' = {}".format(key, val) ) async with self._profile.session() as session: results = await RouteRecord.query(session, tag_filter=filters) return results
[docs] async def delete_route_record(self, route: RouteRecord): """Remove an existing route record.""" async with self._profile.session() as session: await route.delete_record(session)
[docs] async def create_route_record( self, client_connection_id: Optional[str] = None, recipient_key: Optional[str] = None, internal_wallet_id: Optional[str] = None, ) -> RouteRecord: """Create and store a new RouteRecord. Args: client_connection_id: The ID of the connection record recipient_key: The recipient verkey of the route internal_wallet_id: The ID of the wallet record. Used for internal routing Returns: The new routing record """ if not (client_connection_id or internal_wallet_id): raise RoutingManagerError( "Either client_connection_id or internal_wallet_id is required" ) if not recipient_key: raise RoutingManagerError("Missing recipient_key") LOGGER.debug("Creating routing record for verkey: %s", recipient_key) route = RouteRecord( connection_id=client_connection_id, wallet_id=internal_wallet_id, recipient_key=recipient_key, ) async with self._profile.session() as session: await route.save(session, reason="Created new route") LOGGER.info("Created routing record for verkey: %s", recipient_key) return route