"""Connection handling admin routes."""
import json
from typing import cast
from aiohttp import web
from aiohttp_apispec import (
docs,
match_info_schema,
querystring_schema,
request_schema,
response_schema,
)
from marshmallow import fields, validate, validates_schema
from ....admin.request_context import AdminRequestContext
from ....cache.base import BaseCache
from ....connections.models.conn_record import ConnRecord, ConnRecordSchema
from ....messaging.models.base import BaseModelError
from ....messaging.models.openapi import OpenAPISchema
from ....messaging.valid import (
ENDPOINT_EXAMPLE,
ENDPOINT_VALIDATE,
INDY_DID_EXAMPLE,
INDY_DID_VALIDATE,
INDY_RAW_PUBLIC_KEY_EXAMPLE,
INDY_RAW_PUBLIC_KEY_VALIDATE,
UUID4_EXAMPLE,
UUID4_VALIDATE,
GENERIC_DID_VALIDATE,
)
from ....storage.error import StorageError, StorageNotFoundError
from ....wallet.error import WalletError
from .manager import ConnectionManager, ConnectionManagerError
from .message_types import SPEC_URI
from .messages.connection_invitation import (
ConnectionInvitation,
ConnectionInvitationSchema,
)
[docs]class ConnectionModuleResponseSchema(OpenAPISchema):
"""Response schema for connection module."""
[docs]class ConnectionListSchema(OpenAPISchema):
"""Result schema for connection list."""
results = fields.List(
fields.Nested(ConnRecordSchema()),
metadata={"description": "List of connection records"},
)
[docs]class ReceiveInvitationRequestSchema(ConnectionInvitationSchema):
"""Request schema for receive invitation request."""
@validates_schema
def validate_fields(self, data, **kwargs):
"""Bypass middleware field validation: marshmallow has no data yet."""
[docs]class CreateInvitationRequestSchema(OpenAPISchema):
"""Request schema for invitation connection target."""
recipient_keys = fields.List(
fields.Str(
validate=INDY_RAW_PUBLIC_KEY_VALIDATE,
metadata={
"description": "Recipient public key",
"example": INDY_RAW_PUBLIC_KEY_EXAMPLE,
},
),
required=False,
metadata={"description": "List of recipient keys"},
)
service_endpoint = fields.Str(
required=False,
metadata={
"description": "Connection endpoint",
"example": "http://192.168.56.102:8020",
},
)
routing_keys = fields.List(
fields.Str(
validate=INDY_RAW_PUBLIC_KEY_VALIDATE,
metadata={
"description": "Routing key",
"example": INDY_RAW_PUBLIC_KEY_EXAMPLE,
},
),
required=False,
metadata={"description": "List of routing keys"},
)
my_label = fields.Str(
required=False,
metadata={
"description": "Optional label for connection invitation",
"example": "Bob",
},
)
metadata = fields.Dict(
required=False,
metadata={
"description": (
"Optional metadata to attach to the connection created with the"
" invitation"
)
},
)
mediation_id = fields.Str(
required=False,
validate=UUID4_VALIDATE,
metadata={
"description": "Identifier for active mediation record to be used",
"example": UUID4_EXAMPLE,
},
)
[docs]class InvitationResultSchema(OpenAPISchema):
"""Result schema for a new connection invitation."""
connection_id = fields.Str(
metadata={"description": "Connection identifier", "example": UUID4_EXAMPLE}
)
invitation = fields.Nested(ConnectionInvitationSchema())
invitation_url = fields.Str(
metadata={
"description": "Invitation URL",
"example": "http://192.168.56.101:8020/invite?c_i=eyJAdHlwZSI6Li4ufQ==",
}
)
[docs]class ConnectionStaticRequestSchema(OpenAPISchema):
"""Request schema for a new static connection."""
my_seed = fields.Str(
required=False, metadata={"description": "Seed to use for the local DID"}
)
my_did = fields.Str(
required=False,
validate=INDY_DID_VALIDATE,
metadata={"description": "Local DID", "example": INDY_DID_EXAMPLE},
)
their_seed = fields.Str(
required=False, metadata={"description": "Seed to use for the remote DID"}
)
their_did = fields.Str(
required=False,
validate=INDY_DID_VALIDATE,
metadata={"description": "Remote DID", "example": INDY_DID_EXAMPLE},
)
their_verkey = fields.Str(
required=False, metadata={"description": "Remote verification key"}
)
their_endpoint = fields.Str(
required=False,
validate=ENDPOINT_VALIDATE,
metadata={
"description": "URL endpoint for other party",
"example": ENDPOINT_EXAMPLE,
},
)
their_label = fields.Str(
required=False,
metadata={"description": "Other party's label for this connection"},
)
alias = fields.Str(
required=False, metadata={"description": "Alias to assign to this connection"}
)
[docs]class ConnectionStaticResultSchema(OpenAPISchema):
"""Result schema for new static connection."""
my_did = fields.Str(
required=True,
validate=INDY_DID_VALIDATE,
metadata={"description": "Local DID", "example": INDY_DID_EXAMPLE},
)
my_verkey = fields.Str(
required=True,
validate=INDY_RAW_PUBLIC_KEY_VALIDATE,
metadata={
"description": "My verification key",
"example": INDY_RAW_PUBLIC_KEY_EXAMPLE,
},
)
my_endpoint = fields.Str(
required=True,
validate=ENDPOINT_VALIDATE,
metadata={"description": "My URL endpoint", "example": ENDPOINT_EXAMPLE},
)
their_did = fields.Str(
required=True,
validate=INDY_DID_VALIDATE,
metadata={"description": "Remote DID", "example": INDY_DID_EXAMPLE},
)
their_verkey = fields.Str(
required=True,
validate=INDY_RAW_PUBLIC_KEY_VALIDATE,
metadata={
"description": "Remote verification key",
"example": INDY_RAW_PUBLIC_KEY_EXAMPLE,
},
)
record = fields.Nested(ConnRecordSchema, required=True)
[docs]class ConnectionsListQueryStringSchema(OpenAPISchema):
"""Parameters and validators for connections list request query string."""
alias = fields.Str(
required=False, metadata={"description": "Alias", "example": "Barry"}
)
invitation_key = fields.Str(
required=False,
validate=INDY_RAW_PUBLIC_KEY_VALIDATE,
metadata={
"description": "invitation key",
"example": INDY_RAW_PUBLIC_KEY_EXAMPLE,
},
)
my_did = fields.Str(
required=False,
validate=GENERIC_DID_VALIDATE,
metadata={"description": "My DID", "example": INDY_DID_EXAMPLE},
)
state = fields.Str(
required=False,
validate=validate.OneOf(
{label for state in ConnRecord.State for label in state.value}
),
metadata={"description": "Connection state"},
)
their_did = fields.Str(
required=False,
validate=GENERIC_DID_VALIDATE,
metadata={"description": "Their DID", "example": INDY_DID_EXAMPLE},
)
their_public_did = fields.Str(
required=False,
validate=GENERIC_DID_VALIDATE,
metadata={"description": "Their Public DID", "example": INDY_DID_EXAMPLE},
)
their_role = fields.Str(
required=False,
validate=validate.OneOf(
[label for role in ConnRecord.Role for label in role.value]
),
metadata={
"description": "Their role in the connection protocol",
"example": ConnRecord.Role.REQUESTER.rfc160,
},
)
connection_protocol = fields.Str(
required=False,
validate=validate.OneOf(
[proto.aries_protocol for proto in ConnRecord.Protocol]
),
metadata={
"description": "Connection protocol used",
"example": ConnRecord.Protocol.RFC_0160.aries_protocol,
},
)
invitation_msg_id = fields.UUID(
required=False,
metadata={
"description": "Identifier of the associated Invitation Mesage",
"example": UUID4_EXAMPLE,
},
)
[docs]class CreateInvitationQueryStringSchema(OpenAPISchema):
"""Parameters and validators for create invitation request query string."""
alias = fields.Str(
required=False, metadata={"description": "Alias", "example": "Barry"}
)
auto_accept = fields.Boolean(
required=False,
metadata={"description": "Auto-accept connection (defaults to configuration)"},
)
public = fields.Boolean(
required=False,
metadata={"description": "Create invitation from public DID (default false)"},
)
multi_use = fields.Boolean(
required=False,
metadata={"description": "Create invitation for multiple use (default false)"},
)
[docs]class ReceiveInvitationQueryStringSchema(OpenAPISchema):
"""Parameters and validators for receive invitation request query string."""
alias = fields.Str(
required=False, metadata={"description": "Alias", "example": "Barry"}
)
auto_accept = fields.Boolean(
required=False,
metadata={"description": "Auto-accept connection (defaults to configuration)"},
)
mediation_id = fields.Str(
required=False,
validate=UUID4_VALIDATE,
metadata={
"description": "Identifier for active mediation record to be used",
"example": UUID4_EXAMPLE,
},
)
[docs]class AcceptInvitationQueryStringSchema(OpenAPISchema):
"""Parameters and validators for accept invitation request query string."""
my_endpoint = fields.Str(
required=False,
validate=ENDPOINT_VALIDATE,
metadata={"description": "My URL endpoint", "example": ENDPOINT_EXAMPLE},
)
my_label = fields.Str(
required=False,
metadata={"description": "Label for connection", "example": "Broker"},
)
mediation_id = fields.Str(
required=False,
validate=UUID4_VALIDATE,
metadata={
"description": "Identifier for active mediation record to be used",
"example": UUID4_EXAMPLE,
},
)
[docs]class AcceptRequestQueryStringSchema(OpenAPISchema):
"""Parameters and validators for accept conn-request web-request query string."""
my_endpoint = fields.Str(
required=False,
validate=ENDPOINT_VALIDATE,
metadata={"description": "My URL endpoint", "example": ENDPOINT_EXAMPLE},
)
[docs]class ConnectionsConnIdMatchInfoSchema(OpenAPISchema):
"""Path parameters and validators for request taking connection id."""
conn_id = fields.Str(
required=True,
metadata={"description": "Connection identifier", "example": UUID4_EXAMPLE},
)
[docs]class ConnIdRefIdMatchInfoSchema(OpenAPISchema):
"""Path parameters and validators for request taking connection and ref ids."""
conn_id = fields.Str(
required=True,
metadata={"description": "Connection identifier", "example": UUID4_EXAMPLE},
)
ref_id = fields.Str(
required=True,
metadata={
"description": "Inbound connection identifier",
"example": UUID4_EXAMPLE,
},
)
[docs]class EndpointsResultSchema(OpenAPISchema):
"""Result schema for connection endpoints."""
my_endpoint = fields.Str(
validate=ENDPOINT_VALIDATE,
metadata={"description": "My endpoint", "example": ENDPOINT_EXAMPLE},
)
their_endpoint = fields.Str(
validate=ENDPOINT_VALIDATE,
metadata={"description": "Their endpoint", "example": ENDPOINT_EXAMPLE},
)
[docs]def connection_sort_key(conn):
"""Get the sorting key for a particular connection."""
conn_rec_state = ConnRecord.State.get(conn["state"])
if conn_rec_state is ConnRecord.State.ABANDONED:
pfx = "2"
elif conn_rec_state is ConnRecord.State.INVITATION:
pfx = "1"
else:
pfx = "0"
return pfx + conn["created_at"]
@docs(
tags=["connection"],
summary="Query agent-to-agent connections",
)
@querystring_schema(ConnectionsListQueryStringSchema())
@response_schema(ConnectionListSchema(), 200, description="")
async def connections_list(request: web.BaseRequest):
"""Request handler for searching connection records.
Args:
request: aiohttp request object
Returns:
The connection list response
"""
context: AdminRequestContext = request["context"]
tag_filter = {}
for param_name in (
"invitation_id",
"my_did",
"their_did",
"request_id",
"invitation_key",
"their_public_did",
"invitation_msg_id",
):
if param_name in request.query and request.query[param_name] != "":
tag_filter[param_name] = request.query[param_name]
post_filter = {}
if request.query.get("alias"):
post_filter["alias"] = request.query["alias"]
if request.query.get("state"):
post_filter["state"] = list(ConnRecord.State.get(request.query["state"]).value)
if request.query.get("their_role"):
post_filter["their_role"] = list(
ConnRecord.Role.get(request.query["their_role"]).value
)
if request.query.get("connection_protocol"):
post_filter["connection_protocol"] = request.query["connection_protocol"]
profile = context.profile
try:
async with profile.session() as session:
records = await ConnRecord.query(
session, tag_filter, post_filter_positive=post_filter, alt=True
)
results = [record.serialize() for record in records]
results.sort(key=connection_sort_key)
except (StorageError, BaseModelError) as err:
raise web.HTTPBadRequest(reason=err.roll_up) from err
return web.json_response({"results": results})
@docs(tags=["connection"], summary="Fetch a single connection record")
@match_info_schema(ConnectionsConnIdMatchInfoSchema())
@response_schema(ConnRecordSchema(), 200, description="")
async def connections_retrieve(request: web.BaseRequest):
"""Request handler for fetching a single connection record.
Args:
request: aiohttp request object
Returns:
The connection record response
"""
context: AdminRequestContext = request["context"]
connection_id = request.match_info["conn_id"]
profile = context.profile
try:
async with profile.session() as session:
record = await ConnRecord.retrieve_by_id(session, connection_id)
result = record.serialize()
except StorageNotFoundError as err:
raise web.HTTPNotFound(reason=err.roll_up) from err
except BaseModelError as err:
raise web.HTTPBadRequest(reason=err.roll_up) from err
return web.json_response(result)
@docs(tags=["connection"], summary="Fetch connection remote endpoint")
@match_info_schema(ConnectionsConnIdMatchInfoSchema())
@response_schema(EndpointsResultSchema(), 200, description="")
async def connections_endpoints(request: web.BaseRequest):
"""Request handler for fetching connection endpoints.
Args:
request: aiohttp request object
Returns:
The endpoints response
"""
context: AdminRequestContext = request["context"]
connection_id = request.match_info["conn_id"]
profile = context.profile
connection_mgr = ConnectionManager(profile)
try:
endpoints = await connection_mgr.get_endpoints(connection_id)
except StorageNotFoundError as err:
raise web.HTTPNotFound(reason=err.roll_up) from err
except (BaseModelError, StorageError, WalletError) as err:
raise web.HTTPBadRequest(reason=err.roll_up) from err
return web.json_response(dict(zip(("my_endpoint", "their_endpoint"), endpoints)))
@docs(tags=["connection"], summary="Fetch connection metadata")
@match_info_schema(ConnectionsConnIdMatchInfoSchema())
@querystring_schema(ConnectionMetadataQuerySchema())
@response_schema(ConnectionMetadataSchema(), 200, description="")
async def connections_metadata(request: web.BaseRequest):
"""Handle fetching metadata associated with a single connection record."""
context: AdminRequestContext = request["context"]
connection_id = request.match_info["conn_id"]
key = request.query.get("key", None)
profile = context.profile
try:
async with profile.session() as session:
record = await ConnRecord.retrieve_by_id(session, connection_id)
if key:
result = await record.metadata_get(session, key)
else:
result = await record.metadata_get_all(session)
except StorageNotFoundError as err:
raise web.HTTPNotFound(reason=err.roll_up) from err
except BaseModelError as err:
raise web.HTTPBadRequest(reason=err.roll_up) from err
return web.json_response({"results": result})
@docs(tags=["connection"], summary="Set connection metadata")
@match_info_schema(ConnectionsConnIdMatchInfoSchema())
@request_schema(ConnectionMetadataSetRequestSchema())
@response_schema(ConnectionMetadataSchema(), 200, description="")
async def connections_metadata_set(request: web.BaseRequest):
"""Handle fetching metadata associated with a single connection record."""
context: AdminRequestContext = request["context"]
connection_id = request.match_info["conn_id"]
body = await request.json() if request.body_exists else {}
profile = context.profile
try:
async with profile.session() as session:
record = await ConnRecord.retrieve_by_id(session, connection_id)
for key, value in body.get("metadata", {}).items():
await record.metadata_set(session, key, value)
result = await record.metadata_get_all(session)
except StorageNotFoundError as err:
raise web.HTTPNotFound(reason=err.roll_up) from err
except BaseModelError as err:
raise web.HTTPBadRequest(reason=err.roll_up) from err
return web.json_response({"results": result})
@docs(
tags=["connection"],
summary="Create a new connection invitation",
)
@querystring_schema(CreateInvitationQueryStringSchema())
@request_schema(CreateInvitationRequestSchema())
@response_schema(InvitationResultSchema(), 200, description="")
async def connections_create_invitation(request: web.BaseRequest):
"""Request handler for creating a new connection invitation.
Args:
request: aiohttp request object
Returns:
The connection invitation details
"""
context: AdminRequestContext = request["context"]
auto_accept = json.loads(request.query.get("auto_accept", "null"))
alias = request.query.get("alias")
public = json.loads(request.query.get("public", "false"))
multi_use = json.loads(request.query.get("multi_use", "false"))
body = await request.json() if request.body_exists else {}
my_label = body.get("my_label")
recipient_keys = body.get("recipient_keys")
service_endpoint = body.get("service_endpoint")
routing_keys = body.get("routing_keys")
metadata = body.get("metadata")
mediation_id = body.get("mediation_id")
if public and not context.settings.get("public_invites"):
raise web.HTTPForbidden(
reason="Configuration does not include public invitations"
)
profile = context.profile
base_url = profile.settings.get("invite_base_url")
connection_mgr = ConnectionManager(profile)
try:
(connection, invitation) = await connection_mgr.create_invitation(
my_label=my_label,
auto_accept=auto_accept,
public=public,
multi_use=multi_use,
alias=alias,
recipient_keys=recipient_keys,
my_endpoint=service_endpoint,
routing_keys=routing_keys,
metadata=metadata,
mediation_id=mediation_id,
)
invitation_url = invitation.to_url(base_url)
base_endpoint = service_endpoint or cast(
str, profile.settings.get("default_endpoint")
)
result = {
"connection_id": connection and connection.connection_id,
"invitation": invitation.serialize(),
"invitation_url": (
f"{base_endpoint}{invitation_url}"
if invitation_url.startswith("?")
else invitation_url
),
}
except (ConnectionManagerError, StorageError, BaseModelError) as err:
raise web.HTTPBadRequest(reason=err.roll_up) from err
if connection and connection.alias:
result["alias"] = connection.alias
return web.json_response(result)
@docs(
tags=["connection"],
summary="Receive a new connection invitation",
)
@querystring_schema(ReceiveInvitationQueryStringSchema())
@request_schema(ReceiveInvitationRequestSchema())
@response_schema(ConnRecordSchema(), 200, description="")
async def connections_receive_invitation(request: web.BaseRequest):
"""Request handler for receiving a new connection invitation.
Args:
request: aiohttp request object
Returns:
The resulting connection record details
"""
context: AdminRequestContext = request["context"]
if context.settings.get("admin.no_receive_invites"):
raise web.HTTPForbidden(
reason="Configuration does not allow receipt of invitations"
)
profile = context.profile
connection_mgr = ConnectionManager(profile)
invitation_json = await request.json()
try:
invitation = ConnectionInvitation.deserialize(invitation_json)
auto_accept = json.loads(request.query.get("auto_accept", "null"))
alias = request.query.get("alias")
mediation_id = request.query.get("mediation_id")
connection = await connection_mgr.receive_invitation(
invitation, auto_accept=auto_accept, alias=alias, mediation_id=mediation_id
)
result = connection.serialize()
except (ConnectionManagerError, StorageError, BaseModelError) as err:
raise web.HTTPBadRequest(reason=err.roll_up) from err
return web.json_response(result)
@docs(
tags=["connection"],
summary="Accept a stored connection invitation",
)
@match_info_schema(ConnectionsConnIdMatchInfoSchema())
@querystring_schema(AcceptInvitationQueryStringSchema())
@response_schema(ConnRecordSchema(), 200, description="")
async def connections_accept_invitation(request: web.BaseRequest):
"""Request handler for accepting a stored connection invitation.
Args:
request: aiohttp request object
Returns:
The resulting connection record details
"""
context: AdminRequestContext = request["context"]
outbound_handler = request["outbound_message_router"]
connection_id = request.match_info["conn_id"]
profile = context.profile
try:
async with profile.session() as session:
connection = await ConnRecord.retrieve_by_id(session, connection_id)
connection_mgr = ConnectionManager(profile)
my_label = request.query.get("my_label")
my_endpoint = request.query.get("my_endpoint")
mediation_id = request.query.get("mediation_id")
try:
request = await connection_mgr.create_request(
connection, my_label, my_endpoint, mediation_id=mediation_id
)
except StorageError as err:
# Handle storage errors (including not found errors) from
# create_request separately as these errors represent a bad request
# rather than a bad url
raise web.HTTPBadRequest(reason=err.roll_up) from err
result = connection.serialize()
except StorageNotFoundError as err:
raise web.HTTPNotFound(reason=err.roll_up) from err
except (StorageError, WalletError, ConnectionManagerError, BaseModelError) as err:
raise web.HTTPBadRequest(reason=err.roll_up) from err
await outbound_handler(request, connection_id=connection.connection_id)
return web.json_response(result)
@docs(
tags=["connection"],
summary="Accept a stored connection request",
)
@match_info_schema(ConnectionsConnIdMatchInfoSchema())
@querystring_schema(AcceptRequestQueryStringSchema())
@response_schema(ConnRecordSchema(), 200, description="")
async def connections_accept_request(request: web.BaseRequest):
"""Request handler for accepting a stored connection request.
Args:
request: aiohttp request object
Returns:
The resulting connection record details
"""
context: AdminRequestContext = request["context"]
outbound_handler = request["outbound_message_router"]
connection_id = request.match_info["conn_id"]
profile = context.profile
try:
async with profile.session() as session:
connection = await ConnRecord.retrieve_by_id(session, connection_id)
connection_mgr = ConnectionManager(profile)
my_endpoint = request.query.get("my_endpoint") or None
response = await connection_mgr.create_response(connection, my_endpoint)
result = connection.serialize()
except StorageNotFoundError as err:
raise web.HTTPNotFound(reason=err.roll_up) from err
except (StorageError, WalletError, ConnectionManagerError, BaseModelError) as err:
raise web.HTTPBadRequest(reason=err.roll_up) from err
await outbound_handler(response, connection_id=connection.connection_id)
return web.json_response(result)
@docs(tags=["connection"], summary="Remove an existing connection record")
@match_info_schema(ConnectionsConnIdMatchInfoSchema())
@response_schema(ConnectionModuleResponseSchema, 200, description="")
async def connections_remove(request: web.BaseRequest):
"""Request handler for removing a connection record.
Args:
request: aiohttp request object
"""
context: AdminRequestContext = request["context"]
connection_id = request.match_info["conn_id"]
profile = context.profile
try:
async with profile.session() as session:
connection = await ConnRecord.retrieve_by_id(session, connection_id)
await connection.delete_record(session)
cache = session.inject_or(BaseCache)
if cache:
await cache.clear(f"conn_rec_state::{connection_id}")
except StorageNotFoundError as err:
raise web.HTTPNotFound(reason=err.roll_up) from err
except StorageError as err:
raise web.HTTPBadRequest(reason=err.roll_up) from err
return web.json_response({})
@docs(tags=["connection"], summary="Create a new static connection")
@request_schema(ConnectionStaticRequestSchema())
@response_schema(ConnectionStaticResultSchema(), 200, description="")
async def connections_create_static(request: web.BaseRequest):
"""Request handler for creating a new static connection.
Args:
request: aiohttp request object
Returns:
The new connection record
"""
context: AdminRequestContext = request["context"]
body = await request.json()
profile = context.profile
connection_mgr = ConnectionManager(profile)
try:
(
my_info,
their_info,
connection,
) = await connection_mgr.create_static_connection(
my_seed=body.get("my_seed") or None,
my_did=body.get("my_did") or None,
their_seed=body.get("their_seed") or None,
their_did=body.get("their_did") or None,
their_verkey=body.get("their_verkey") or None,
their_endpoint=body.get("their_endpoint") or None,
their_label=body.get("their_label") or None,
alias=body.get("alias") or None,
)
response = {
"my_did": my_info.did,
"my_verkey": my_info.verkey,
"my_endpoint": context.settings.get("default_endpoint"),
"their_did": their_info.did,
"their_verkey": their_info.verkey,
"record": connection.serialize(),
}
except (WalletError, StorageError, BaseModelError) as err:
raise web.HTTPBadRequest(reason=err.roll_up) from err
return web.json_response(response)
[docs]async def register(app: web.Application):
"""Register routes."""
app.add_routes(
[
web.get("/connections", connections_list, allow_head=False),
web.get("/connections/{conn_id}", connections_retrieve, allow_head=False),
web.get(
"/connections/{conn_id}/metadata",
connections_metadata,
allow_head=False,
),
web.post("/connections/{conn_id}/metadata", connections_metadata_set),
web.get(
"/connections/{conn_id}/endpoints",
connections_endpoints,
allow_head=False,
),
web.post("/connections/create-static", connections_create_static),
web.post("/connections/create-invitation", connections_create_invitation),
web.post("/connections/receive-invitation", connections_receive_invitation),
web.post(
"/connections/{conn_id}/accept-invitation",
connections_accept_invitation,
),
web.post(
"/connections/{conn_id}/accept-request",
connections_accept_request,
),
web.delete("/connections/{conn_id}", connections_remove),
]
)
[docs]def post_process_routes(app: web.Application):
"""Amend swagger API."""
# Add top-level tags description
if "tags" not in app._state["swagger_dict"]:
app._state["swagger_dict"]["tags"] = []
app._state["swagger_dict"]["tags"].append(
{
"name": "connection",
"description": "Connection management",
"externalDocs": {"description": "Specification", "url": SPEC_URI},
}
)