Source code for aries_cloudagent.commands.start

"""Entrypoint."""

import asyncio
import functools
import logging
import os
import signal
from argparse import ArgumentParser
from typing import Coroutine, Sequence

try:
    import uvloop
except ImportError:
    uvloop = None

from ..core.conductor import Conductor
from ..config import argparse as arg
from ..config.default_context import DefaultContextBuilder
from ..config.util import common_config

LOGGER = logging.getLogger(__name__)


[docs]async def start_app(conductor: Conductor): """Start up.""" await conductor.setup() await conductor.start()
[docs]async def shutdown_app(conductor: Conductor): """Shut down.""" print("\nShutting down") await conductor.stop()
[docs]def init_argument_parser(parser: ArgumentParser): """Initialize an argument parser with the module's arguments.""" return arg.load_argument_groups(parser, *arg.group.get_registered(arg.CAT_START))
[docs]def execute(argv: Sequence[str] = None): """Entrypoint.""" parser = ArgumentParser() parser.prog += " start" get_settings = init_argument_parser(parser) args = parser.parse_args(argv) settings = get_settings(args) common_config(settings) # set ledger to read only if explicitely specified settings["ledger.read_only"] = settings.get("read_only_ledger", False) # Support WEBHOOK_URL environment variable webhook_url = os.environ.get("WEBHOOK_URL") if webhook_url: webhook_urls = list(settings.get("admin.webhook_urls") or []) webhook_urls.append(webhook_url) settings["admin.webhook_urls"] = webhook_urls # Create the Conductor instance context_builder = DefaultContextBuilder(settings) conductor = Conductor(context_builder) # Run the application if uvloop: uvloop.install() print("uvloop installed") run_loop(start_app(conductor), shutdown_app(conductor))
[docs]def run_loop(startup: Coroutine, shutdown: Coroutine): """Execute the application, handling signals and ctrl-c.""" async def init(cleanup): """Perform startup, terminating if an exception occurs.""" try: await startup except Exception: LOGGER.exception("Exception during startup:") cleanup() async def done(): """Run shutdown and clean up any outstanding tasks.""" await shutdown tasks = [ task for task in asyncio.Task.all_tasks() if task is not asyncio.Task.current_task() ] for task in tasks: task.cancel() if tasks: await asyncio.gather(*tasks, return_exceptions=True) asyncio.get_event_loop().stop() loop = asyncio.get_event_loop() cleanup = functools.partial(asyncio.ensure_future, done(), loop=loop) loop.add_signal_handler(signal.SIGTERM, cleanup) asyncio.ensure_future(init(cleanup), loop=loop) try: loop.run_forever() except KeyboardInterrupt: loop.run_until_complete(done())
if __name__ == "__main__": execute()