Source code for aries_cloudagent.protocols.coordinate_mediation.v1_0.routes
"""coordinate mediation admin routes."""
from aiohttp import web
from aiohttp_apispec import (
docs,
match_info_schema,
querystring_schema,
request_schema,
response_schema,
)
from marshmallow import fields, validate
from ....admin.request_context import AdminRequestContext
from ....connections.models.conn_record import ConnRecord
from ....messaging.models.base import BaseModelError
from ....messaging.models.openapi import OpenAPISchema
from ....messaging.valid import UUIDFour
from ....storage.error import StorageError, StorageNotFoundError
from ...connections.v1_0.routes import ConnectionsConnIdMatchInfoSchema
from ...routing.v1_0.models.route_record import RouteRecord, RouteRecordSchema
from .manager import MediationManager, MediationManagerError
from .message_types import SPEC_URI
from .messages.inner.keylist_update_rule import (
KeylistUpdateRule,
KeylistUpdateRuleSchema,
)
from .messages.keylist_query import KeylistQuerySchema
from .messages.keylist_update import KeylistUpdateSchema
from .messages.mediate_deny import MediationDenySchema
from .messages.mediate_grant import MediationGrantSchema
from .models.mediation_record import MediationRecord, MediationRecordSchema
CONNECTION_ID_SCHEMA = fields.UUID(
description="Connection identifier (optional)",
required=False,
example=UUIDFour.EXAMPLE,
)
MEDIATION_ID_SCHEMA = fields.UUID(
description="Mediation record identifier",
example=UUIDFour.EXAMPLE,
)
MEDIATION_STATE_SCHEMA = fields.Str(
description="Mediation state (optional)",
required=False,
validate=validate.OneOf(
[
getattr(MediationRecord, m)
for m in vars(MediationRecord)
if m.startswith("STATE_")
]
),
example=MediationRecord.STATE_GRANTED,
)
MEDIATOR_TERMS_SCHEMA = fields.List(
fields.Str(
description=(
"Indicate terms to which the mediator requires the recipient to agree"
)
),
required=False,
description="List of mediator rules for recipient",
)
RECIPIENT_TERMS_SCHEMA = fields.List(
fields.Str(
description=(
"Indicate terms to which the recipient requires the mediator to agree"
)
),
required=False,
description="List of recipient rules for mediation",
)
[docs]class MediationListSchema(OpenAPISchema):
"""Result schema for mediation list query."""
results = fields.List(
fields.Nested(MediationRecordSchema),
description="List of mediation records",
)
[docs]class MediationListQueryStringSchema(OpenAPISchema):
"""Parameters and validators for mediation record list request query string."""
conn_id = CONNECTION_ID_SCHEMA
mediator_terms = MEDIATOR_TERMS_SCHEMA
recipient_terms = RECIPIENT_TERMS_SCHEMA
state = MEDIATION_STATE_SCHEMA
[docs]class MediationCreateRequestSchema(OpenAPISchema):
"""Parameters and validators for create Mediation request query string."""
mediator_terms = MEDIATOR_TERMS_SCHEMA
recipient_terms = RECIPIENT_TERMS_SCHEMA
[docs]class AdminMediationDenySchema(OpenAPISchema):
"""Parameters and validators for Mediation deny admin request query string."""
mediator_terms = MEDIATOR_TERMS_SCHEMA
recipient_terms = RECIPIENT_TERMS_SCHEMA
[docs]class MediationIdMatchInfoSchema(OpenAPISchema):
"""Path parameters and validators for request taking mediation request id."""
mediation_id = MEDIATION_ID_SCHEMA
[docs]class GetKeylistQuerySchema(OpenAPISchema):
"""Get keylist query string paramaters."""
conn_id = CONNECTION_ID_SCHEMA
role = fields.Str(
description=f"Filer on role, '{MediationRecord.ROLE_CLIENT}' for keys \
mediated by other agents, '{MediationRecord.ROLE_SERVER}' for keys \
mediated by this agent",
validate=validate.OneOf(
[MediationRecord.ROLE_CLIENT, MediationRecord.ROLE_SERVER]
),
missing=MediationRecord.ROLE_SERVER,
required=False,
)
[docs]class KeylistSchema(OpenAPISchema):
"""Result schema for mediation list query."""
results = fields.List(
fields.Nested(RouteRecordSchema),
description="List of keylist records",
)
[docs]class KeylistQueryFilterRequestSchema(OpenAPISchema):
"""Request schema for keylist query filtering."""
filter = fields.Dict(
required=False,
description="Filter for keylist query",
)
[docs]class KeylistQueryPaginateQuerySchema(OpenAPISchema):
"""Query string schema for keylist query pagination."""
paginate_limit = fields.Int(
required=False, missing=-1, description="limit number of results"
)
paginate_offset = fields.Int(
required=False, missing=0, description="offset to use in pagination"
)
[docs]class KeylistUpdateRequestSchema(OpenAPISchema):
"""keylist update request schema."""
updates = fields.List(fields.Nested(KeylistUpdateRuleSchema()))
[docs]def mediation_sort_key(mediation: dict):
"""Get the sorting key for a particular serialized mediation record."""
if mediation["state"] == MediationRecord.STATE_DENIED:
pfx = "2"
elif mediation["state"] == MediationRecord.STATE_REQUEST:
pfx = "1"
else: # GRANTED
pfx = "0"
return pfx + mediation["created_at"]
[docs]@docs(
tags=["mediation"],
summary="Query mediation requests, returns list of all mediation records",
)
@querystring_schema(MediationListQueryStringSchema())
@response_schema(MediationListSchema(), 200)
async def list_mediation_requests(request: web.BaseRequest):
"""List mediation requests for either client or server role."""
context: AdminRequestContext = request["context"]
conn_id = request.query.get("conn_id")
state = request.query.get("state")
tag_filter = {}
if conn_id:
tag_filter["connection_id"] = conn_id
if state:
tag_filter["state"] = state
try:
session = await context.session()
records = await MediationRecord.query(session, tag_filter)
results = [record.serialize() for record in records]
results.sort(key=mediation_sort_key)
except (StorageError, BaseModelError) as err:
raise web.HTTPBadRequest(reason=err.roll_up) from err
return web.json_response({"results": results})
[docs]@docs(tags=["mediation"], summary="Retrieve mediation request record")
@match_info_schema(MediationIdMatchInfoSchema())
@response_schema(MediationRecordSchema(), 200)
async def retrieve_mediation_request(request: web.BaseRequest):
"""Retrieve a single mediation request."""
context: AdminRequestContext = request["context"]
mediation_id = request.match_info["mediation_id"]
try:
session = await context.session()
mediation_record = await MediationRecord.retrieve_by_id(session, mediation_id)
result = mediation_record.serialize()
except StorageNotFoundError as err:
raise web.HTTPNotFound(reason=err.roll_up) from err
except (BaseModelError, StorageError) as err:
raise web.HTTPBadRequest(reason=err.roll_up) from err
return web.json_response(result)
[docs]@docs(tags=["mediation"], summary="Delete mediation request by ID")
@match_info_schema(MediationIdMatchInfoSchema())
@response_schema(MediationRecordSchema, 200)
async def delete_mediation_request(request: web.BaseRequest):
"""Delete a mediation request by ID."""
context: AdminRequestContext = request["context"]
mediation_id = request.match_info["mediation_id"]
try:
session = await context.session()
mediation_record = await MediationRecord.retrieve_by_id(session, mediation_id)
result = mediation_record.serialize()
await mediation_record.delete_record(session)
except StorageNotFoundError as err:
raise web.HTTPNotFound(reason=err.roll_up) from err
except (BaseModelError, StorageError) as err:
raise web.HTTPBadRequest(reason=err.roll_up) from err
return web.json_response(result)
[docs]@docs(tags=["mediation"], summary="Request mediation from connection")
@match_info_schema(ConnectionsConnIdMatchInfoSchema())
@request_schema(MediationCreateRequestSchema())
@response_schema(MediationRecordSchema(), 201)
async def request_mediation(request: web.BaseRequest):
"""Request mediation from connection."""
context: AdminRequestContext = request["context"]
outbound_message_router = request["outbound_message_router"]
conn_id = request.match_info["conn_id"]
body = await request.json()
mediator_terms = body.get("mediator_terms")
recipient_terms = body.get("recipient_terms")
try:
async with context.session() as session:
connection_record = await ConnRecord.retrieve_by_id(session, conn_id)
if not connection_record.is_ready:
raise web.HTTPBadRequest(reason="requested connection is not ready")
if await MediationRecord.exists_for_connection_id(session, conn_id):
raise web.HTTPBadRequest(
reason=f"MediationRecord already exists for connection {conn_id}"
)
mediation_record, mediation_request = await MediationManager(
context.profile
).prepare_request(
connection_id=conn_id,
mediator_terms=mediator_terms,
recipient_terms=recipient_terms,
)
result = mediation_record.serialize()
except StorageNotFoundError as err:
raise web.HTTPNotFound(reason=err.roll_up) from err
except (StorageError, BaseModelError) as err:
raise web.HTTPBadRequest(reason=err.roll_up) from err
await outbound_message_router(mediation_request, connection_id=conn_id)
return web.json_response(result, status=201)
[docs]@docs(tags=["mediation"], summary="Grant received mediation")
@match_info_schema(MediationIdMatchInfoSchema())
@response_schema(MediationGrantSchema(), 201)
async def mediation_request_grant(request: web.BaseRequest):
"""Grant a stored mediation request."""
context: AdminRequestContext = request["context"]
outbound_handler = request["outbound_message_router"]
mediation_id = request.match_info.get("mediation_id")
try:
mediation_mgr = MediationManager(context.profile)
record, grant_request = await mediation_mgr.grant_request(
mediation_id=mediation_id
)
result = record.serialize()
except StorageNotFoundError as err:
raise web.HTTPNotFound(reason=err.roll_up) from err
except (MediationManagerError, StorageError, BaseModelError) as err:
raise web.HTTPBadRequest(reason=err.roll_up) from err
await outbound_handler(grant_request, connection_id=record.connection_id)
return web.json_response(result, status=201)
[docs]@docs(tags=["mediation"], summary="Deny a stored mediation request")
@match_info_schema(MediationIdMatchInfoSchema())
@request_schema(AdminMediationDenySchema())
@response_schema(MediationDenySchema(), 201)
async def mediation_request_deny(request: web.BaseRequest):
"""Deny a stored mediation request."""
context: AdminRequestContext = request["context"]
outbound_handler = request["outbound_message_router"]
mediation_id = request.match_info.get("mediation_id")
body = await request.json()
mediator_terms = body.get("mediator_terms")
recipient_terms = body.get("recipient_terms")
try:
mediation_manager = MediationManager(context.profile)
record, deny_request = await mediation_manager.deny_request(
mediation_id=mediation_id,
mediator_terms=mediator_terms,
recipient_terms=recipient_terms,
)
result = record.serialize()
except StorageNotFoundError as err:
raise web.HTTPNotFound(reason=err.roll_up) from err
except (MediationManagerError, StorageError, BaseModelError) as err:
raise web.HTTPBadRequest(reason=err.roll_up) from err
await outbound_handler(deny_request, connection_id=record.connection_id)
return web.json_response(result, status=201)
[docs]@docs(
tags=["mediation"],
summary="Retrieve keylists by connection or role",
)
@querystring_schema(GetKeylistQuerySchema())
@response_schema(KeylistSchema(), 200)
async def get_keylist(request: web.BaseRequest):
"""Retrieve keylists by connection or role."""
context: AdminRequestContext = request["context"]
connection_id = request.query.get("conn_id")
role = request.query.get("role")
tag_filter = {}
if connection_id:
tag_filter["connection_id"] = connection_id
if role:
tag_filter["role"] = role
try:
async with context.session() as session:
keylists = await RouteRecord.query(session, tag_filter)
results = [record.serialize() for record in keylists]
except (StorageError, BaseModelError) as err:
raise web.HTTPBadRequest(reason=err.roll_up) from err
return web.json_response({"results": results}, status=200)
[docs]@docs(
tags=["mediation"],
summary="Send keylist query to mediator",
)
@match_info_schema(MediationIdMatchInfoSchema())
@querystring_schema(KeylistQueryPaginateQuerySchema())
@request_schema(KeylistQueryFilterRequestSchema())
@response_schema(KeylistQuerySchema(), 201)
async def send_keylist_query(request: web.BaseRequest):
"""Send keylist query to mediator."""
context: AdminRequestContext = request["context"]
outbound_handler = request["outbound_message_router"]
mediation_id = request.match_info["mediation_id"]
body = await request.json()
filter_ = body.get("filter")
paginate_limit = request.query.get("paginate_limit")
paginate_offset = request.query.get("paginate_offset")
try:
async with context.session() as session:
record = await MediationRecord.retrieve_by_id(session, mediation_id)
mediation_manager = MediationManager(context.profile)
keylist_query_request = await mediation_manager.prepare_keylist_query(
filter_=filter_,
paginate_limit=paginate_limit,
paginate_offset=paginate_offset,
)
except StorageNotFoundError as err:
raise web.HTTPNotFound(reason=err.roll_up) from err
except (StorageError, BaseModelError) as err:
raise web.HTTPBadRequest(reason=err.roll_up) from err
await outbound_handler(keylist_query_request, connection_id=record.connection_id)
return web.json_response(keylist_query_request.serialize(), status=201)
[docs]@docs(tags=["mediation"], summary="Send keylist update to mediator")
@match_info_schema(MediationIdMatchInfoSchema())
@request_schema(KeylistUpdateRequestSchema())
@response_schema(KeylistUpdateSchema(), 201)
async def send_keylist_update(request: web.BaseRequest):
"""Send keylist update to mediator."""
context: AdminRequestContext = request["context"]
outbound_handler = request["outbound_message_router"]
mediation_id = request.match_info["mediation_id"]
body = await request.json()
updates = body.get("updates")
if not updates:
raise web.HTTPBadRequest(reason="Updates cannot be empty.")
mediation_mgr = MediationManager(context.profile)
keylist_updates = None
for update in updates:
if update.get("action") == KeylistUpdateRule.RULE_ADD:
keylist_updates = await mediation_mgr.add_key(
update.get("recipient_key"), keylist_updates
)
elif update.get("action") == KeylistUpdateRule.RULE_REMOVE:
keylist_updates = await mediation_mgr.remove_key(
update.get("recipient_key"), keylist_updates
)
else:
raise web.HTTPBadRequest(reason="Invalid action for keylist update.")
try:
async with context.session() as session:
record = await MediationRecord.retrieve_by_id(session, mediation_id)
if record.state != MediationRecord.STATE_GRANTED:
raise web.HTTPBadRequest(reason=("mediation is not granted."))
results = keylist_updates.serialize()
except StorageNotFoundError as err:
raise web.HTTPNotFound(reason=err.roll_up) from err
except (StorageError, BaseModelError) as err:
raise web.HTTPBadRequest(reason=err.roll_up) from err
await outbound_handler(keylist_updates, connection_id=record.connection_id)
return web.json_response(results, status=201)
[docs]@docs(tags=["mediation"], summary="Get default mediator")
@response_schema(MediationRecordSchema(), 200)
async def get_default_mediator(request: web.BaseRequest):
"""Get default mediator."""
context: AdminRequestContext = request["context"]
try:
default_mediator = await MediationManager(
context.profile
).get_default_mediator()
results = default_mediator.serialize() if default_mediator else {}
except (StorageError, BaseModelError) as err:
raise web.HTTPBadRequest(reason=err.roll_up) from err
return web.json_response(results, status=200)
[docs]@docs(tags=["mediation"], summary="Set default mediator")
@match_info_schema(MediationIdMatchInfoSchema())
@response_schema(MediationRecordSchema(), 201)
async def set_default_mediator(request: web.BaseRequest):
"""Set default mediator."""
context: AdminRequestContext = request["context"]
mediation_id = request.match_info.get("mediation_id")
try:
mediator_mgr = MediationManager(context.profile)
await mediator_mgr.set_default_mediator_by_id(mediation_id=mediation_id)
default_mediator = await mediator_mgr.get_default_mediator()
results = default_mediator.serialize()
except (StorageError, BaseModelError) as err:
raise web.HTTPBadRequest(reason=err.roll_up) from err
return web.json_response(results, status=201)
[docs]@docs(tags=["mediation"], summary="Clear default mediator")
@response_schema(MediationRecordSchema(), 201)
async def clear_default_mediator(request: web.BaseRequest):
"""Clear set default mediator."""
context: AdminRequestContext = request["context"]
try:
mediator_mgr = MediationManager(context.profile)
default_mediator = await mediator_mgr.get_default_mediator()
await mediator_mgr.clear_default_mediator()
results = default_mediator.serialize()
except (StorageError, BaseModelError) as err:
raise web.HTTPBadRequest(reason=err.roll_up) from err
return web.json_response(results, status=201)
[docs]async def register(app: web.Application):
"""Register routes."""
app.add_routes(
[
web.get("/mediation/requests", list_mediation_requests, allow_head=False),
web.get(
"/mediation/requests/{mediation_id}",
retrieve_mediation_request,
allow_head=False,
),
web.delete("/mediation/requests/{mediation_id}", delete_mediation_request),
web.post(
"/mediation/requests/{mediation_id}/grant",
mediation_request_grant,
),
web.post("/mediation/requests/{mediation_id}/deny", mediation_request_deny),
web.post("/mediation/request/{conn_id}", request_mediation),
web.get("/mediation/keylists", get_keylist, allow_head=False),
web.post(
"/mediation/keylists/{mediation_id}/send-keylist-update",
send_keylist_update,
),
web.post(
"/mediation/keylists/{mediation_id}/send-keylist-query",
send_keylist_query,
),
web.get(
"/mediation/default-mediator", get_default_mediator, allow_head=False
),
web.put("/mediation/{mediation_id}/default-mediator", set_default_mediator),
web.delete("/mediation/default-mediator", clear_default_mediator),
]
)
[docs]def post_process_routes(app: web.Application):
"""Amend swagger API."""
# Add top-level tags description
if "tags" not in app._state["swagger_dict"]:
app._state["swagger_dict"]["tags"] = []
app._state["swagger_dict"]["tags"].append(
{
"name": "mediation",
"description": "Mediation management",
"externalDocs": {"description": "Specification", "url": SPEC_URI},
}
)