Source code for aries_cloudagent.protocols.connections.routes

"""Connection handling admin routes."""

from aiohttp import web
from aiohttp_apispec import docs, request_schema, response_schema

from marshmallow import fields, Schema

from ...connections.models.connection_record import (
    ConnectionRecord,
    ConnectionRecordSchema,
)
from ...messaging.valid import IndyDID, UUIDFour
from ...storage.error import StorageNotFoundError

from .manager import ConnectionManager
from .messages.connection_invitation import (
    ConnectionInvitation,
    ConnectionInvitationSchema,
)


[docs]class ConnectionListSchema(Schema): """Result schema for connection list.""" results = fields.List( fields.Nested(ConnectionRecordSchema()), description="List of connection records", )
[docs]class InvitationResultSchema(Schema): """Result schema for a new connection invitation.""" connection_id = fields.Str( description="Connection identifier", example=UUIDFour.EXAMPLE ) invitation = fields.Nested(ConnectionInvitationSchema()) invitation_url = fields.Str( description="Invitation URL", example="http://192.168.56.101:8020/invite?c_i=eyJAdHlwZSI6Li4ufQ==", )
[docs]class ConnectionStaticRequestSchema(Schema): """Request schema for a new static connection.""" my_seed = fields.Str(description="Seed to use for the local DID", required=False) my_did = fields.Str( description="Local DID", required=False, example=IndyDID.EXAMPLE ) their_seed = fields.Str( description="Seed to use for the remote DID", required=False ) their_did = fields.Str( description="Remote DID", required=False, example=IndyDID.EXAMPLE ) their_verkey = fields.Str(description="Remote verification key", required=False) their_endpoint = fields.Str( description="URL endpoint for the other party", required=False, example="http://192.168.56.101:5000", ) their_role = fields.Str( description="Role to assign to this connection", required=False ) their_label = fields.Str( description="Label to assign to this connection", required=False ) alias = fields.Str(description="Alias to assign to this connection", required=False)
[docs]class ConnectionStaticResultSchema(Schema): """Result schema for new static connection.""" my_did = fields.Str( description="Local DID", required=True, example=IndyDID.EXAMPLE ) mv_verkey = fields.Str(description="My verification key", required=True) my_endpoint = fields.Str(description="My endpoint", required=True) their_did = fields.Str( description="Remote DID", required=True, example=IndyDID.EXAMPLE ) their_verkey = fields.Str(description="Remote verification key", required=True) record = fields.Nested(ConnectionRecordSchema, required=True)
[docs]def connection_sort_key(conn): """Get the sorting key for a particular connection.""" if conn["state"] == ConnectionRecord.STATE_INACTIVE: pfx = "2" elif conn["state"] == ConnectionRecord.STATE_INVITATION: pfx = "1" else: pfx = "0" return pfx + conn["created_at"]
[docs]@docs( tags=["connection"], summary="Query agent-to-agent connections", parameters=[ { "name": "alias", "in": "query", "schema": {"type": "string"}, "required": False, }, { "name": "initiator", "in": "query", "schema": {"type": "string", "enum": ["self", "external"]}, "required": False, }, { "name": "invitation_key", "in": "query", "schema": {"type": "string"}, "required": False, }, { "name": "my_did", "in": "query", "schema": {"type": "string"}, "required": False, }, { "name": "state", "in": "query", "schema": { "type": "string", "enum": [ "init", "invitation", "request", "response", "active", "error", "inactive", ], }, "required": False, }, { "name": "their_did", "in": "query", "schema": {"type": "string"}, "required": False, }, { "name": "their_role", "in": "query", "schema": {"type": "string"}, "required": False, }, ], ) @response_schema(ConnectionListSchema(), 200) async def connections_list(request: web.BaseRequest): """ Request handler for searching connection records. Args: request: aiohttp request object Returns: The connection list response """ context = request.app["request_context"] tag_filter = {} for param_name in ( "invitation_id", "my_did", "their_did", "request_id", ): if param_name in request.query and request.query[param_name] != "": tag_filter[param_name] = request.query[param_name] post_filter = {} for param_name in ( "alias", "initiator", "state", "their_role", ): if param_name in request.query and request.query[param_name] != "": post_filter[param_name] = request.query[param_name] records = await ConnectionRecord.query(context, tag_filter, post_filter) results = [record.serialize() for record in records] results.sort(key=connection_sort_key) return web.json_response({"results": results})
[docs]@docs(tags=["connection"], summary="Fetch a single connection record") @response_schema(ConnectionRecordSchema(), 200) async def connections_retrieve(request: web.BaseRequest): """ Request handler for fetching a single connection record. Args: request: aiohttp request object Returns: The connection record response """ context = request.app["request_context"] connection_id = request.match_info["id"] try: record = await ConnectionRecord.retrieve_by_id(context, connection_id) except StorageNotFoundError: raise web.HTTPNotFound() return web.json_response(record.serialize())
[docs]@docs( tags=["connection"], summary="Create a new connection invitation", parameters=[ { "name": "alias", "in": "query", "schema": {"type": "string"}, "required": False, }, { "name": "accept", "in": "query", "schema": {"type": "string", "enum": ["none", "auto"]}, "required": False, }, { "name": "public", "in": "query", "schema": {"type": "int"}, "required": False }, { "name": "multi_use", "in": "query", "schema": {"type": "int"}, "required": False }, ], ) @response_schema(InvitationResultSchema(), 200) async def connections_create_invitation(request: web.BaseRequest): """ Request handler for creating a new connection invitation. Args: request: aiohttp request object Returns: The connection invitation details """ context = request.app["request_context"] accept = request.query.get("accept") alias = request.query.get("alias") public = request.query.get("public") multi_use = request.query.get("multi_use") if public and not context.settings.get("public_invites"): raise web.HTTPForbidden() base_url = context.settings.get("invite_base_url") connection_mgr = ConnectionManager(context) connection, invitation = await connection_mgr.create_invitation( accept=accept, public=bool(public), multi_use=bool(multi_use), alias=alias ) result = { "connection_id": connection and connection.connection_id, "invitation": invitation.serialize(), "invitation_url": invitation.to_url(base_url), } if connection and connection.alias: result["alias"] = connection.alias return web.json_response(result)
[docs]@docs( tags=["connection"], summary="Receive a new connection invitation", parameters=[ { "name": "alias", "in": "query", "schema": {"type": "string"}, "required": False, }, { "name": "accept", "in": "query", "schema": {"type": "string", "enum": ["none", "auto"]}, "required": False, }, ], ) @request_schema(ConnectionInvitationSchema()) @response_schema(ConnectionRecordSchema(), 200) async def connections_receive_invitation(request: web.BaseRequest): """ Request handler for receiving a new connection invitation. Args: request: aiohttp request object Returns: The resulting connection record details """ context = request.app["request_context"] if context.settings.get("admin.no_receive_invites"): raise web.HTTPForbidden() connection_mgr = ConnectionManager(context) invitation_json = await request.json() invitation = ConnectionInvitation.deserialize(invitation_json) accept = request.query.get("accept") alias = request.query.get("alias") connection = await connection_mgr.receive_invitation( invitation, accept=accept, alias=alias ) return web.json_response(connection.serialize())
[docs]@docs( tags=["connection"], summary="Accept a stored connection invitation", parameters=[ { "name": "my_endpoint", "in": "query", "schema": {"type": "string"}, "required": False, }, { "name": "my_label", "in": "query", "schema": {"type": "string"}, "required": False, }, ], ) @response_schema(ConnectionRecordSchema(), 200) async def connections_accept_invitation(request: web.BaseRequest): """ Request handler for accepting a stored connection invitation. Args: request: aiohttp request object Returns: The resulting connection record details """ context = request.app["request_context"] outbound_handler = request.app["outbound_message_router"] connection_id = request.match_info["id"] try: connection = await ConnectionRecord.retrieve_by_id(context, connection_id) except StorageNotFoundError: raise web.HTTPNotFound() connection_mgr = ConnectionManager(context) my_label = request.query.get("my_label") or None my_endpoint = request.query.get("my_endpoint") or None request = await connection_mgr.create_request(connection, my_label, my_endpoint) await outbound_handler(request, connection_id=connection.connection_id) return web.json_response(connection.serialize())
[docs]@docs( tags=["connection"], summary="Accept a stored connection request", parameters=[ { "name": "my_endpoint", "in": "query", "schema": {"type": "string"}, "required": False, } ], ) @response_schema(ConnectionRecordSchema(), 200) async def connections_accept_request(request: web.BaseRequest): """ Request handler for accepting a stored connection request. Args: request: aiohttp request object Returns: The resulting connection record details """ context = request.app["request_context"] outbound_handler = request.app["outbound_message_router"] connection_id = request.match_info["id"] try: connection = await ConnectionRecord.retrieve_by_id(context, connection_id) except StorageNotFoundError: raise web.HTTPNotFound() connection_mgr = ConnectionManager(context) my_endpoint = request.query.get("my_endpoint") or None response = await connection_mgr.create_response(connection, my_endpoint) await outbound_handler(response, connection_id=connection.connection_id) return web.json_response(connection.serialize())
[docs]@docs( tags=["connection"], summary="Assign another connection as the inbound connection" ) async def connections_establish_inbound(request: web.BaseRequest): """ Request handler for setting the inbound connection on a connection record. Args: request: aiohttp request object """ context = request.app["request_context"] connection_id = request.match_info["id"] outbound_handler = request.app["outbound_message_router"] inbound_connection_id = request.match_info["ref_id"] try: connection = await ConnectionRecord.retrieve_by_id(context, connection_id) except StorageNotFoundError: raise web.HTTPNotFound() connection_mgr = ConnectionManager(context) await connection_mgr.establish_inbound( connection, inbound_connection_id, outbound_handler ) return web.json_response({})
[docs]@docs(tags=["connection"], summary="Remove an existing connection record") async def connections_remove(request: web.BaseRequest): """ Request handler for removing a connection record. Args: request: aiohttp request object """ context = request.app["request_context"] connection_id = request.match_info["id"] try: connection = await ConnectionRecord.retrieve_by_id(context, connection_id) except StorageNotFoundError: raise web.HTTPNotFound() await connection.delete_record(context) return web.json_response({})
[docs]@docs(tags=["connection"], summary="Create a new static connection") @request_schema(ConnectionStaticRequestSchema()) @response_schema(ConnectionStaticResultSchema(), 200) async def connections_create_static(request: web.BaseRequest): """ Request handler for creating a new static connection. Args: request: aiohttp request object Returns: The new connection record """ context = request.app["request_context"] body = await request.json() connection_mgr = ConnectionManager(context) ( my_info, their_info, connection ) = await connection_mgr.create_static_connection( my_seed=body.get("my_seed") or None, my_did=body.get("my_did") or None, their_seed=body.get("their_seed") or None, their_did=body.get("their_did") or None, their_verkey=body.get("their_verkey") or None, their_endpoint=body.get("their_endpoint") or None, their_role=body.get("their_role") or None, their_label=body.get("their_label") or None, alias=body.get("alias") or None, ) response = { 'my_did': my_info.did, 'my_verkey': my_info.verkey, 'my_endpoint': context.settings.get('default_endpoint'), 'their_did': their_info.did, 'their_verkey': their_info.verkey, 'record': connection.serialize() } return web.json_response(response)
[docs]async def register(app: web.Application): """Register routes.""" app.add_routes( [ web.get("/connections", connections_list), web.get("/connections/{id}", connections_retrieve), web.post("/connections/create-static", connections_create_static), web.post("/connections/create-invitation", connections_create_invitation), web.post("/connections/receive-invitation", connections_receive_invitation), web.post( "/connections/{id}/accept-invitation", connections_accept_invitation ), web.post("/connections/{id}/accept-request", connections_accept_request), web.post( "/connections/{id}/establish-inbound/{ref_id}", connections_establish_inbound, ), web.post("/connections/{id}/remove", connections_remove), ] )