Source code for aries_cloudagent.admin.server

"""Admin server classes."""

import asyncio
import logging
from typing import Callable, Coroutine, Sequence, Set
import uuid

from aiohttp import web
from aiohttp_apispec import (
    docs,
    response_schema,
    setup_aiohttp_apispec,
    validation_middleware,
)
import aiohttp_cors
import jwt

from marshmallow import fields

from ..config.injection_context import InjectionContext
from ..core.profile import Profile
from ..core.plugin_registry import PluginRegistry
from ..ledger.error import LedgerConfigError, LedgerTransactionError
from ..messaging.models.openapi import OpenAPISchema
from ..messaging.responder import BaseResponder
from ..transport.queue.basic import BasicMessageQueue
from ..transport.outbound.message import OutboundMessage
from ..utils.stats import Collector
from ..utils.task_queue import TaskQueue
from ..version import __version__
from ..multitenant.manager import MultitenantManager, MultitenantManagerError

from ..storage.error import StorageNotFoundError
from .base_server import BaseAdminServer
from .error import AdminSetupError
from .request_context import AdminRequestContext


LOGGER = logging.getLogger(__name__)


[docs]class AdminModulesSchema(OpenAPISchema): """Schema for the modules endpoint.""" result = fields.List( fields.Str(description="admin module"), description="List of admin modules" )
[docs]class AdminStatusSchema(OpenAPISchema): """Schema for the status endpoint."""
[docs]class AdminStatusLivelinessSchema(OpenAPISchema): """Schema for the liveliness endpoint.""" alive = fields.Boolean(description="Liveliness status", example=True)
[docs]class AdminStatusReadinessSchema(OpenAPISchema): """Schema for the readiness endpoint.""" ready = fields.Boolean(description="Readiness status", example=True)
[docs]class AdminShutdownSchema(OpenAPISchema): """Response schema for admin Module."""
[docs]class AdminResponder(BaseResponder): """Handle outgoing messages from message handlers.""" def __init__( self, profile: Profile, send: Coroutine, webhook: Coroutine, **kwargs, ): """ Initialize an instance of `AdminResponder`. Args: send: Function to send outbound message """ super().__init__(**kwargs) self._profile = profile self._send = send self._webhook = webhook
[docs] async def send_outbound(self, message: OutboundMessage): """ Send outbound message. Args: message: The `OutboundMessage` to be sent """ await self._send(self._profile, message)
[docs] async def send_webhook(self, topic: str, payload: dict): """ Dispatch a webhook. Args: topic: the webhook topic identifier payload: the webhook payload value """ await self._webhook(self._profile, topic, payload)
@property def send_fn(self) -> Coroutine: """Accessor for async function to send outbound message.""" return self._send @property def webhook_fn(self) -> Coroutine: """Accessor for the async function to dispatch a webhook.""" return self._webhook
[docs]class WebhookTarget: """Class for managing webhook target information.""" def __init__( self, endpoint: str, topic_filter: Sequence[str] = None, max_attempts: int = None, ): """Initialize the webhook target.""" self.endpoint = endpoint self.max_attempts = max_attempts self._topic_filter = None self.topic_filter = topic_filter # call setter @property def topic_filter(self) -> Set[str]: """Accessor for the target's topic filter.""" return self._topic_filter @topic_filter.setter def topic_filter(self, val: Sequence[str]): """Setter for the target's topic filter.""" filter = set(val) if val else None if filter and "*" in filter: filter = None self._topic_filter = filter
[docs]@web.middleware async def ready_middleware(request: web.BaseRequest, handler: Coroutine): """Only continue if application is ready to take work.""" if ( str(request.rel_url).rstrip("/") in ( "/status/live", "/status/ready", ) or request.app._state.get("ready") ): try: return await handler(request) except (LedgerConfigError, LedgerTransactionError) as e: # fatal, signal server shutdown LOGGER.error("Shutdown with %s", str(e)) request.app._state["ready"] = False request.app._state["alive"] = False raise except web.HTTPFound as e: # redirect, typically / -> /api/doc LOGGER.info("Handler redirect to: %s", e.location) raise except asyncio.CancelledError: # redirection spawns new task and cancels old LOGGER.debug("Task cancelled") raise except Exception as e: # some other error? LOGGER.error("Handler error with exception: %s", str(e)) import traceback print("\n=================") traceback.print_exc() raise raise web.HTTPServiceUnavailable(reason="Shutdown in progress")
[docs]@web.middleware async def debug_middleware(request: web.BaseRequest, handler: Coroutine): """Show request detail in debug log.""" if LOGGER.isEnabledFor(logging.DEBUG): LOGGER.debug(f"Incoming request: {request.method} {request.path_qs}") LOGGER.debug(f"Match info: {request.match_info}") body = await request.text() if request.body_exists else None LOGGER.debug(f"Body: {body}") return await handler(request)
[docs]class AdminServer(BaseAdminServer): """Admin HTTP server class.""" def __init__( self, host: str, port: int, context: InjectionContext, root_profile: Profile, outbound_message_router: Coroutine, webhook_router: Callable, conductor_stop: Coroutine, task_queue: TaskQueue = None, conductor_stats: Coroutine = None, ): """ Initialize an AdminServer instance. Args: host: Host to listen on port: Port to listen on context: The application context instance outbound_message_router: Coroutine for delivering outbound messages webhook_router: Callable for delivering webhooks conductor_stop: Conductor (graceful) stop for shutdown API call task_queue: An optional task queue for handlers """ self.app = None self.admin_api_key = context.settings.get("admin.admin_api_key") self.admin_insecure_mode = bool( context.settings.get("admin.admin_insecure_mode") ) self.host = host self.port = port self.context = context self.conductor_stop = conductor_stop self.conductor_stats = conductor_stats self.loaded_modules = [] self.outbound_message_router = outbound_message_router self.root_profile = root_profile self.task_queue = task_queue self.webhook_router = webhook_router self.webhook_targets = {} self.websocket_queues = {} self.site = None self.multitenant_manager = context.inject(MultitenantManager, required=False) self.server_paths = []
[docs] async def make_application(self) -> web.Application: """Get the aiohttp application instance.""" middlewares = [ready_middleware, debug_middleware, validation_middleware] # admin-token and admin-token are mutually exclusive and required. # This should be enforced during parameter parsing but to be sure, # we check here. assert self.admin_insecure_mode ^ bool(self.admin_api_key) def is_unprotected_path(path: str): return ( path in [ "/api/doc", "/api/docs/swagger.json", "/favicon.ico", "/ws", # ws handler checks authentication ] or path.startswith("/static/swagger/") ) # If admin_api_key is None, then admin_insecure_mode must be set so # we can safely enable the admin server with no security if self.admin_api_key: @web.middleware async def check_token(request: web.Request, handler): header_admin_api_key = request.headers.get("x-api-key") valid_key = self.admin_api_key == header_admin_api_key if valid_key or is_unprotected_path(request.path): return await handler(request) else: raise web.HTTPUnauthorized() middlewares.append(check_token) collector = self.context.inject(Collector, required=False) if self.multitenant_manager: @web.middleware async def check_multitenant_authorization(request: web.Request, handler): authorization_header = request.headers.get("Authorization") path = request.path is_multitenancy_path = path.startswith("/multitenancy") is_server_path = path in self.server_paths or path == "/features" # subwallets are not allowed to access multitenancy routes if authorization_header and is_multitenancy_path: raise web.HTTPUnauthorized() # base wallet is not allowed to perform ssi related actions. # Only multitenancy and general server actions if ( not authorization_header and not is_multitenancy_path and not is_server_path and not is_unprotected_path(path) ): raise web.HTTPUnauthorized() return await handler(request) middlewares.append(check_multitenant_authorization) @web.middleware async def setup_context(request: web.Request, handler): authorization_header = request.headers.get("Authorization") profile = self.root_profile # Multitenancy context setup if self.multitenant_manager and authorization_header: try: bearer, _, token = authorization_header.partition(" ") if bearer != "Bearer": raise web.HTTPUnauthorized( reason="Invalid Authorization header structure" ) profile = await self.multitenant_manager.get_profile_for_token( self.context, token ) except MultitenantManagerError as err: raise web.HTTPUnauthorized(err.roll_up) except (jwt.InvalidTokenError, StorageNotFoundError): raise web.HTTPUnauthorized() # Create a responder with the request specific context responder = AdminResponder( profile, self.outbound_message_router, self.send_webhook, ) profile.context.injector.bind_instance(BaseResponder, responder) # TODO may dynamically adjust the profile used here according to # headers or other parameters admin_context = AdminRequestContext(profile) request["context"] = admin_context request["outbound_message_router"] = responder.send if collector: handler = collector.wrap_coro(handler, [handler.__qualname__]) if self.task_queue: task = await self.task_queue.put(handler(request)) return await task return await handler(request) middlewares.append(setup_context) app = web.Application(middlewares=middlewares) server_routes = [ web.get("/", self.redirect_handler, allow_head=False), web.get("/plugins", self.plugins_handler, allow_head=False), web.get("/status", self.status_handler, allow_head=False), web.post("/status/reset", self.status_reset_handler), web.get("/status/live", self.liveliness_handler, allow_head=False), web.get("/status/ready", self.readiness_handler, allow_head=False), web.get("/shutdown", self.shutdown_handler, allow_head=False), web.get("/ws", self.websocket_handler, allow_head=False), ] # Store server_paths for multitenant authorization handling self.server_paths = [route.path for route in server_routes] app.add_routes(server_routes) plugin_registry = self.context.inject(PluginRegistry, required=False) if plugin_registry: await plugin_registry.register_admin_routes(app) cors = aiohttp_cors.setup( app, defaults={ "*": aiohttp_cors.ResourceOptions( allow_credentials=True, expose_headers="*", allow_headers="*", allow_methods="*", ) }, ) for route in app.router.routes(): cors.add(route) # get agent label agent_label = self.context.settings.get("default_label") version_string = f"v{__version__}" setup_aiohttp_apispec( app=app, title=agent_label, version=version_string, swagger_path="/api/doc" ) app.on_startup.append(self.on_startup) # ensure we always have status values app._state["ready"] = False app._state["alive"] = False return app
[docs] async def start(self) -> None: """ Start the webserver. Raises: AdminSetupError: If there was an error starting the webserver """ def sort_dict(raw: dict) -> dict: """Order (JSON, string keys) dict asciibetically by key, recursively.""" for (k, v) in raw.items(): if isinstance(v, dict): raw[k] = sort_dict(v) return dict(sorted([item for item in raw.items()], key=lambda x: x[0])) self.app = await self.make_application() runner = web.AppRunner(self.app) await runner.setup() plugin_registry = self.context.inject(PluginRegistry, required=False) if plugin_registry: plugin_registry.post_process_routes(self.app) # order tags alphabetically, parameters deterministically and pythonically swagger_dict = self.app._state["swagger_dict"] swagger_dict.get("tags", []).sort(key=lambda t: t["name"]) # sort content per path and sort paths for path_spec in swagger_dict["paths"].values(): for method_spec in path_spec.values(): method_spec["parameters"].sort( key=lambda p: (p["in"], not p["required"], p["name"]) ) for path in sorted([p for p in swagger_dict["paths"]]): swagger_dict["paths"][path] = swagger_dict["paths"].pop(path) # order definitions alphabetically by dict key swagger_dict["definitions"] = sort_dict(swagger_dict["definitions"]) self.site = web.TCPSite(runner, host=self.host, port=self.port) try: await self.site.start() self.app._state["ready"] = True self.app._state["alive"] = True except OSError: raise AdminSetupError( "Unable to start webserver with host " + f"'{self.host}' and port '{self.port}'\n" )
[docs] async def stop(self) -> None: """Stop the webserver.""" self.app._state["ready"] = False # in case call does not come through OpenAPI for queue in self.websocket_queues.values(): queue.stop() if self.site: await self.site.stop() self.site = None
[docs] async def on_startup(self, app: web.Application): """Perform webserver startup actions.""" security_definitions = {} security = [] if self.admin_api_key: security_definitions["ApiKeyHeader"] = { "type": "apiKey", "in": "header", "name": "X-API-KEY", } security.append({"ApiKeyHeader": []}) if self.multitenant_manager: security_definitions["AuthorizationHeader"] = { "type": "apiKey", "in": "header", "name": "Authorization", "description": "Bearer token. Be sure to preprend token with 'Bearer '", } # If multitenancy is enabled we need Authorization header multitenant_security = {"AuthorizationHeader": []} # If admin api key is also enabled, we need both for subwallet requests if self.admin_api_key: multitenant_security["ApiKeyHeader"] = [] security.append(multitenant_security) if self.admin_api_key or self.multitenant_manager: swagger = app["swagger_dict"] swagger["securityDefinitions"] = security_definitions swagger["security"] = security
[docs] @docs(tags=["server"], summary="Fetch the list of loaded plugins") @response_schema(AdminModulesSchema(), 200, description="") async def plugins_handler(self, request: web.BaseRequest): """ Request handler for the loaded plugins list. Args: request: aiohttp request object Returns: The module list response """ registry = self.context.inject(PluginRegistry, required=False) plugins = registry and sorted(registry.plugin_names) or [] return web.json_response({"result": plugins})
[docs] @docs(tags=["server"], summary="Fetch the server status") @response_schema(AdminStatusSchema(), 200, description="") async def status_handler(self, request: web.BaseRequest): """ Request handler for the server status information. Args: request: aiohttp request object Returns: The web response """ status = {"version": __version__} status["label"] = self.context.settings.get("default_label") collector = self.context.inject(Collector, required=False) if collector: status["timing"] = collector.results if self.conductor_stats: status["conductor"] = await self.conductor_stats() return web.json_response(status)
[docs] @docs(tags=["server"], summary="Reset statistics") @response_schema(AdminStatusSchema(), 200, description="") async def status_reset_handler(self, request: web.BaseRequest): """ Request handler for resetting the timing statistics. Args: request: aiohttp request object Returns: The web response """ collector = self.context.inject(Collector, required=False) if collector: collector.reset() return web.json_response({})
[docs] async def redirect_handler(self, request: web.BaseRequest): """Perform redirect to documentation.""" raise web.HTTPFound("/api/doc")
[docs] @docs(tags=["server"], summary="Liveliness check") @response_schema(AdminStatusLivelinessSchema(), 200, description="") async def liveliness_handler(self, request: web.BaseRequest): """ Request handler for liveliness check. Args: request: aiohttp request object Returns: The web response, always indicating True """ app_live = self.app._state["alive"] if app_live: return web.json_response({"alive": app_live}) else: raise web.HTTPServiceUnavailable(reason="Service not available")
[docs] @docs(tags=["server"], summary="Readiness check") @response_schema(AdminStatusReadinessSchema(), 200, description="") async def readiness_handler(self, request: web.BaseRequest): """ Request handler for liveliness check. Args: request: aiohttp request object Returns: The web response, indicating readiness for further calls """ app_ready = self.app._state["ready"] and self.app._state["alive"] if app_ready: return web.json_response({"ready": app_ready}) else: raise web.HTTPServiceUnavailable(reason="Service not ready")
[docs] @docs(tags=["server"], summary="Shut down server") @response_schema(AdminShutdownSchema(), description="") async def shutdown_handler(self, request: web.BaseRequest): """ Request handler for server shutdown. Args: request: aiohttp request object Returns: The web response (empty production) """ self.app._state["ready"] = False loop = asyncio.get_event_loop() asyncio.ensure_future(self.conductor_stop(), loop=loop) return web.json_response({})
[docs] def notify_fatal_error(self): """Set our readiness flags to force a restart (openshift).""" LOGGER.error("Received shutdown request notify_fatal_error()") self.app._state["ready"] = False self.app._state["alive"] = False
[docs] async def websocket_handler(self, request): """Send notifications to admin client over websocket.""" ws = web.WebSocketResponse() await ws.prepare(request) socket_id = str(uuid.uuid4()) queue = BasicMessageQueue() loop = asyncio.get_event_loop() if self.admin_insecure_mode: # open to send websocket messages without api key auth queue.authenticated = True else: header_admin_api_key = request.headers.get("x-api-key") # authenticated via http header? queue.authenticated = header_admin_api_key == self.admin_api_key try: self.websocket_queues[socket_id] = queue await queue.enqueue( { "topic": "settings", "payload": { "authenticated": queue.authenticated, "label": self.context.settings.get("default_label"), "endpoint": self.context.settings.get("default_endpoint"), "no_receive_invites": self.context.settings.get( "admin.no_receive_invites", False ), "help_link": self.context.settings.get("admin.help_link"), }, } ) closed = False receive = loop.create_task(ws.receive_json()) send = loop.create_task(queue.dequeue(timeout=5.0)) while not closed: try: await asyncio.wait( (receive, send), return_when=asyncio.FIRST_COMPLETED ) if ws.closed: closed = True if receive.done(): if not closed: msg_received = None msg_api_key = None try: # this call can re-raise exeptions from inside the task msg_received = receive.result() msg_api_key = msg_received.get("x-api-key") except Exception: LOGGER.exception( "Exception in websocket receiving task:" ) if self.admin_api_key and self.admin_api_key == msg_api_key: # authenticated via websocket message queue.authenticated = True receive = loop.create_task(ws.receive_json()) if send.done(): try: msg = send.result() except asyncio.TimeoutError: msg = None if msg is None: # we send fake pings because the JS client # can't detect real ones msg = { "topic": "ping", "authenticated": queue.authenticated, } if not closed: if msg: await ws.send_json(msg) send = loop.create_task(queue.dequeue(timeout=5.0)) except asyncio.CancelledError: closed = True if not receive.done(): receive.cancel() if not send.done(): send.cancel() finally: del self.websocket_queues[socket_id] return ws
[docs] def add_webhook_target( self, target_url: str, topic_filter: Sequence[str] = None, max_attempts: int = None, ): """Add a webhook target.""" self.webhook_targets[target_url] = WebhookTarget( target_url, topic_filter, max_attempts )
[docs] def remove_webhook_target(self, target_url: str): """Remove a webhook target.""" if target_url in self.webhook_targets: del self.webhook_targets[target_url]
[docs] async def send_webhook(self, profile: Profile, topic: str, payload: dict): """Add a webhook to the queue, to send to all registered targets.""" wallet_id = profile.settings.get("wallet.id") webhook_urls = profile.settings.get("admin.webhook_urls") metadata = None if wallet_id: metadata = {"x-wallet-id": wallet_id} if self.webhook_router: # for idx, target in self.webhook_targets.items(): # if not target.topic_filter or topic in target.topic_filter: for endpoint in webhook_urls: self.webhook_router( topic, payload, endpoint, None, metadata, ) # set ws webhook body, optionally add wallet id for multitenant mode webhook_body = {"topic": topic, "payload": payload} if wallet_id: webhook_body["wallet_id"] = wallet_id for queue in self.websocket_queues.values(): if queue.authenticated or topic in ("ping", "settings"): await queue.enqueue(webhook_body)