"""Multitenant admin routes."""
from marshmallow import fields, validate, validates_schema, ValidationError
from aiohttp import web
from aiohttp_apispec import (
docs,
request_schema,
match_info_schema,
response_schema,
querystring_schema,
)
from ...admin.request_context import AdminRequestContext
from ...messaging.valid import JSONWebToken, UUIDFour
from ...messaging.models.base import BaseModelError
from ...messaging.models.openapi import OpenAPISchema
from ...storage.error import StorageError, StorageNotFoundError
from ...wallet.models.wallet_record import WalletRecord, WalletRecordSchema
from ...wallet.error import WalletSettingsError
from ...core.error import BaseError
from ...core.profile import ProfileManagerProvider
from ..manager import MultitenantManager
from ..error import WalletKeyMissingError
[docs]class MultitenantModuleResponseSchema(OpenAPISchema):
"""Response schema for multitenant module."""
[docs]class WalletIdMatchInfoSchema(OpenAPISchema):
"""Path parameters and validators for request taking wallet id."""
wallet_id = fields.Str(
description="Subwallet identifier", required=True, example=UUIDFour.EXAMPLE
)
[docs]class CreateWalletRequestSchema(OpenAPISchema):
"""Request schema for adding a new wallet which will be registered by the agent."""
wallet_name = fields.Str(description="Wallet name", example="MyNewWallet")
wallet_key = fields.Str(
description="Master key used for key derivation.", example="MySecretKey123"
)
wallet_type = fields.Str(
description="Type of the wallet to create",
example="indy",
default="in_memory",
validate=validate.OneOf(
[wallet_type for wallet_type in ProfileManagerProvider.MANAGER_TYPES]
),
)
wallet_dispatch_type = fields.Str(
description="Webhook target dispatch type for this wallet. \
default - Dispatch only to webhooks associated with this wallet. \
base - Dispatch only to webhooks associated with the base wallet. \
both - Dispatch to both webhook targets.",
example="default",
default="default",
validate=validate.OneOf(["default", "both", "base"]),
)
wallet_webhook_urls = fields.List(
fields.Str(
description="Optional webhook URL to receive webhook messages",
example="http://localhost:8022/webhooks",
),
required=False,
description="List of Webhook URLs associated with this subwallet",
)
label = fields.Str(
description="Label for this wallet. This label is publicized\
(self-attested) to other agents as part of forming a connection.",
example="Alice",
)
image_url = fields.Str(
description="Image url for this wallet. This image url is publicized\
(self-attested) to other agents as part of forming a connection.",
example="https://aries.ca/images/sample.png",
)
key_management_mode = fields.Str(
description="Key management method to use for this wallet.",
example=WalletRecord.MODE_MANAGED,
default=WalletRecord.MODE_MANAGED,
# MTODO: add unmanaged mode once implemented
validate=validate.OneOf((WalletRecord.MODE_MANAGED,)),
)
[docs] @validates_schema
def validate_fields(self, data, **kwargs):
"""
Validate schema fields.
Args:
data: The data to validate
Raises:
ValidationError: If any of the fields do not validate
"""
if data.get("wallet_type") == "indy":
for field in ("wallet_key", "wallet_name"):
if field not in data:
raise ValidationError("Missing required field", field)
[docs]class UpdateWalletRequestSchema(OpenAPISchema):
"""Request schema for updating a existing wallet."""
wallet_dispatch_type = fields.Str(
description="Webhook target dispatch type for this wallet. \
default - Dispatch only to webhooks associated with this wallet. \
base - Dispatch only to webhooks associated with the base wallet. \
both - Dispatch to both webhook targets.",
example="default",
default="default",
validate=validate.OneOf(["default", "both", "base"]),
)
wallet_webhook_urls = fields.List(
fields.Str(
description="Optional webhook URL to receive webhook messages",
example="http://localhost:8022/webhooks",
),
required=False,
description="List of Webhook URLs associated with this subwallet",
)
label = fields.Str(
description="Label for this wallet. This label is publicized\
(self-attested) to other agents as part of forming a connection.",
example="Alice",
)
image_url = fields.Str(
description="Image url for this wallet. This image url is publicized\
(self-attested) to other agents as part of forming a connection.",
example="https://aries.ca/images/sample.png",
)
[docs]class CreateWalletResponseSchema(WalletRecordSchema):
"""Response schema for creating a wallet."""
token = fields.Str(
description="Authorization token to authenticate wallet requests",
example=JSONWebToken.EXAMPLE,
)
[docs]class RemoveWalletRequestSchema(OpenAPISchema):
"""Request schema for removing a wallet."""
wallet_key = fields.Str(
description="Master key used for key derivation. Only required for \
unmanaged wallets.",
example="MySecretKey123",
)
[docs]class CreateWalletTokenRequestSchema(OpenAPISchema):
"""Request schema for creating a wallet token."""
wallet_key = fields.Str(
description="Master key used for key derivation. Only required for \
unamanged wallets.",
example="MySecretKey123",
)
[docs]class CreateWalletTokenResponseSchema(OpenAPISchema):
"""Response schema for creating a wallet token."""
token = fields.Str(
description="Authorization token to authenticate wallet requests",
example=JSONWebToken.EXAMPLE,
)
[docs]class WalletListSchema(OpenAPISchema):
"""Result schema for wallet list."""
results = fields.List(
fields.Nested(WalletRecordSchema()),
description="List of wallet records",
)
[docs]class WalletListQueryStringSchema(OpenAPISchema):
"""Parameters and validators for wallet list request query string."""
wallet_name = fields.Str(description="Wallet name", example="MyNewWallet")
[docs]@docs(tags=["multitenancy"], summary="Query subwallets")
@querystring_schema(WalletListQueryStringSchema())
@response_schema(WalletListSchema(), 200, description="")
async def wallets_list(request: web.BaseRequest):
"""
Request handler for listing all internal subwallets.
Args:
request: aiohttp request object
"""
context: AdminRequestContext = request["context"]
query = {}
wallet_name = request.query.get("wallet_name")
if wallet_name:
query["wallet_name"] = wallet_name
async with context.session() as session:
try:
records = await WalletRecord.query(session, tag_filter=query)
results = [format_wallet_record(record) for record in records]
results.sort(key=lambda w: w["created_at"])
except (StorageError, BaseModelError) as err:
raise web.HTTPBadRequest(reason=err.roll_up) from err
return web.json_response({"results": results})
[docs]@docs(tags=["multitenancy"], summary="Get a single subwallet")
@match_info_schema(WalletIdMatchInfoSchema())
@response_schema(WalletRecordSchema(), 200, description="")
async def wallet_get(request: web.BaseRequest):
"""
Request handler for getting a single subwallet.
Args:
request: aiohttp request object
Raises:
HTTPNotFound: if wallet_id does not match any known wallets
"""
context: AdminRequestContext = request["context"]
wallet_id = request.match_info["wallet_id"]
async with context.session() as session:
try:
wallet_record = await WalletRecord.retrieve_by_id(session, wallet_id)
result = format_wallet_record(wallet_record)
except StorageNotFoundError as err:
raise web.HTTPNotFound(reason=err.roll_up) from err
except BaseModelError as err:
raise web.HTTPBadRequest(reason=err.roll_up) from err
return web.json_response(result)
[docs]@docs(tags=["multitenancy"], summary="Create a subwallet")
@request_schema(CreateWalletRequestSchema)
@response_schema(CreateWalletResponseSchema(), 200, description="")
async def wallet_create(request: web.BaseRequest):
"""
Request handler for adding a new subwallet for handling by the agent.
Args:
request: aiohttp request object
"""
context: AdminRequestContext = request["context"]
body = await request.json()
key_management_mode = body.get("key_management_mode") or WalletRecord.MODE_MANAGED
wallet_key = body.get("wallet_key")
wallet_webhook_urls = body.get("wallet_webhook_urls") or []
wallet_dispatch_type = body.get("wallet_dispatch_type") or "default"
# If no webhooks specified, then dispatch only to base webhook targets
if wallet_webhook_urls == []:
wallet_dispatch_type = "base"
settings = {
"wallet.type": body.get("wallet_type") or "in_memory",
"wallet.name": body.get("wallet_name"),
"wallet.key": wallet_key,
"wallet.webhook_urls": wallet_webhook_urls,
"wallet.dispatch_type": wallet_dispatch_type,
}
label = body.get("label")
image_url = body.get("image_url")
if label:
settings["default_label"] = label
if image_url:
settings["image_url"] = image_url
async with context.session() as session:
try:
multitenant_mgr = session.inject(MultitenantManager)
wallet_record = await multitenant_mgr.create_wallet(
settings, key_management_mode
)
token = multitenant_mgr.create_auth_token(wallet_record, wallet_key)
except BaseError as err:
raise web.HTTPBadRequest(reason=err.roll_up) from err
result = {
**format_wallet_record(wallet_record),
"token": token,
}
return web.json_response(result)
[docs]@docs(tags=["multitenancy"], summary="Update a subwallet")
@match_info_schema(WalletIdMatchInfoSchema())
@request_schema(UpdateWalletRequestSchema)
@response_schema(WalletRecordSchema(), 200, description="")
async def wallet_update(request: web.BaseRequest):
"""
Request handler for updating a existing subwallet for handling by the agent.
Args:
request: aiohttp request object
"""
context: AdminRequestContext = request["context"]
wallet_id = request.match_info["wallet_id"]
body = await request.json()
wallet_webhook_urls = body.get("wallet_webhook_urls")
wallet_dispatch_type = body.get("wallet_dispatch_type")
label = body.get("label")
image_url = body.get("image_url")
if all(
v is None for v in (wallet_webhook_urls, wallet_dispatch_type, label, image_url)
):
raise web.HTTPBadRequest(reason="At least one parameter is required.")
# adjust wallet_dispatch_type according to wallet_webhook_urls
if wallet_webhook_urls and wallet_dispatch_type is None:
wallet_dispatch_type = "default"
if wallet_webhook_urls == []:
wallet_dispatch_type = "base"
# only parameters that are not none are updated
settings = {}
if wallet_webhook_urls is not None:
settings["wallet.webhook_urls"] = wallet_webhook_urls
if wallet_dispatch_type is not None:
settings["wallet.dispatch_type"] = wallet_dispatch_type
if label is not None:
settings["default_label"] = label
if image_url is not None:
settings["image_url"] = image_url
async with context.session() as session:
try:
multitenant_mgr = session.inject(MultitenantManager)
wallet_record = await multitenant_mgr.update_wallet(wallet_id, settings)
result = format_wallet_record(wallet_record)
except StorageNotFoundError as err:
raise web.HTTPNotFound(reason=err.roll_up) from err
except WalletSettingsError as err:
raise web.HTTPBadRequest(reason=err.roll_up) from err
return web.json_response(result)
[docs]@docs(tags=["multitenancy"], summary="Get auth token for a subwallet")
@request_schema(CreateWalletTokenRequestSchema)
@response_schema(CreateWalletTokenResponseSchema(), 200, description="")
async def wallet_create_token(request: web.BaseRequest):
"""
Request handler for creating an authorization token for a specific subwallet.
Args:
request: aiohttp request object
"""
context: AdminRequestContext = request["context"]
wallet_id = request.match_info["wallet_id"]
wallet_key = None
if request.has_body:
body = await request.json()
wallet_key = body.get("wallet_key")
async with context.session() as session:
try:
multitenant_mgr = session.inject(MultitenantManager)
wallet_record = await WalletRecord.retrieve_by_id(session, wallet_id)
if (not wallet_record.requires_external_key) and wallet_key:
raise web.HTTPBadRequest(
reason=f"Wallet {wallet_id} doesn't require"
" the wallet key to be provided"
)
token = multitenant_mgr.create_auth_token(wallet_record, wallet_key)
except StorageNotFoundError as err:
raise web.HTTPNotFound(reason=err.roll_up) from err
except WalletKeyMissingError as err:
raise web.HTTPUnauthorized(reason=err.roll_up) from err
return web.json_response({"token": token})
[docs]@docs(
tags=["multitenancy"],
summary="Remove a subwallet",
)
@match_info_schema(WalletIdMatchInfoSchema())
@request_schema(RemoveWalletRequestSchema)
@response_schema(MultitenantModuleResponseSchema(), 200, description="")
async def wallet_remove(request: web.BaseRequest):
"""
Request handler to remove a subwallet from agent and storage.
Args:
request: aiohttp request object.
"""
context: AdminRequestContext = request["context"]
wallet_id = request.match_info["wallet_id"]
wallet_key = None
if request.has_body:
body = await request.json()
wallet_key = body.get("wallet_key")
async with context.session() as session:
try:
multitenant_mgr = session.inject(MultitenantManager)
wallet_record = await WalletRecord.retrieve_by_id(session, wallet_id)
if (not wallet_record.requires_external_key) and wallet_key:
raise web.HTTPBadRequest(
reason=f"Wallet {wallet_id} doesn't require"
" the wallet key to be provided"
)
await multitenant_mgr.remove_wallet(wallet_id, wallet_key)
except StorageNotFoundError as err:
raise web.HTTPNotFound(reason=err.roll_up) from err
except WalletKeyMissingError as err:
raise web.HTTPUnauthorized(reason=err.roll_up) from err
return web.json_response({})
# MTODO: add wallet import route
# MTODO: add wallet export route
# MTODO: add rotate wallet key route
[docs]async def register(app: web.Application):
"""Register routes."""
app.add_routes(
[
web.get("/multitenancy/wallets", wallets_list, allow_head=False),
web.post("/multitenancy/wallet", wallet_create),
web.get("/multitenancy/wallet/{wallet_id}", wallet_get, allow_head=False),
web.put("/multitenancy/wallet/{wallet_id}", wallet_update),
web.post("/multitenancy/wallet/{wallet_id}/token", wallet_create_token),
web.post("/multitenancy/wallet/{wallet_id}/remove", wallet_remove),
]
)
[docs]def post_process_routes(app: web.Application):
"""Amend swagger API."""
# Add top-level tags description
if "tags" not in app._state["swagger_dict"]:
app._state["swagger_dict"]["tags"] = []
app._state["swagger_dict"]["tags"].append(
{"name": "multitenancy", "description": "Multitenant wallet management"}
)