Source code for aries_cloudagent.protocols.routing.v1_0.messages.route_query_request

"""Query existing forwarding routes."""

from marshmallow import EXCLUDE, fields

from .....messaging.agent_message import AgentMessage, AgentMessageSchema

from ..message_types import PROTOCOL_PACKAGE, ROUTE_QUERY_REQUEST
from ..models.paginate import Paginate, PaginateSchema

HANDLER_CLASS = (
    f"{PROTOCOL_PACKAGE}.handlers.route_query_request_handler.RouteQueryRequestHandler"
)


[docs]class RouteQueryRequest(AgentMessage): """Query existing routes from a routing agent."""
[docs] class Meta: """RouteQueryRequest metadata.""" handler_class = HANDLER_CLASS message_type = ROUTE_QUERY_REQUEST schema_class = "RouteQueryRequestSchema"
def __init__(self, *, filter: dict = None, paginate: Paginate = None, **kwargs): """ Initialize a RouteQueryRequest message instance. Args: filter: Filter results according to specific field values """ super().__init__(**kwargs) self.filter = filter self.paginate = paginate
[docs]class RouteQueryRequestSchema(AgentMessageSchema): """RouteQueryRequest message schema used in serialization/deserialization."""
[docs] class Meta: """RouteQueryRequestSchema metadata.""" model_class = RouteQueryRequest unknown = EXCLUDE
filter = fields.Dict( keys=fields.Str(description="field"), values=fields.List( fields.Str(description="value"), description="List of values" ), required=False, allow_none=True, description="Filter by field name and value", ) paginate = fields.Nested(PaginateSchema(), required=False, allow_none=True)