"""Multitenant admin routes."""
from aiohttp import web
from aiohttp_apispec import (
docs,
match_info_schema,
querystring_schema,
request_schema,
response_schema,
)
from marshmallow import ValidationError, fields, validate, validates_schema
from ...admin.request_context import AdminRequestContext
from ...core.error import BaseError
from ...core.profile import ProfileManagerProvider
from ...messaging.models.base import BaseModelError
from ...messaging.models.openapi import OpenAPISchema
from ...messaging.valid import UUID4_EXAMPLE, JSONWebToken
from ...multitenant.base import BaseMultitenantManager
from ...storage.error import StorageError, StorageNotFoundError
from ...wallet.error import WalletSettingsError
from ...wallet.models.wallet_record import WalletRecord, WalletRecordSchema
from ..error import WalletKeyMissingError
ACAPY_LIFECYCLE_CONFIG_FLAG_MAP = {
"ACAPY_LOG_LEVEL": "log.level",
"ACAPY_INVITE_PUBLIC": "debug.invite_public",
"ACAPY_PUBLIC_INVITES": "public_invites",
"ACAPY_AUTO_ACCEPT_INVITES": "debug.auto_accept_invites",
"ACAPY_AUTO_ACCEPT_REQUESTS": "debug.auto_accept_requests",
"ACAPY_AUTO_PING_CONNECTION": "auto_ping_connection",
"ACAPY_MONITOR_PING": "debug.monitor_ping",
"ACAPY_AUTO_RESPOND_MESSAGES": "debug.auto_respond_messages",
"ACAPY_AUTO_RESPOND_CREDENTIAL_OFFER": "debug.auto_respond_credential_offer",
"ACAPY_AUTO_RESPOND_CREDENTIAL_REQUEST": "debug.auto_respond_credential_request",
"ACAPY_AUTO_VERIFY_PRESENTATION": "debug.auto_verify_presentation",
"ACAPY_NOTIFY_REVOCATION": "revocation.notify",
"ACAPY_AUTO_REQUEST_ENDORSEMENT": "endorser.auto_request",
"ACAPY_AUTO_WRITE_TRANSACTIONS": "endorser.auto_write",
"ACAPY_CREATE_REVOCATION_TRANSACTIONS": "endorser.auto_create_rev_reg",
"ACAPY_ENDORSER_ROLE": "endorser.protocol_role",
}
ACAPY_LIFECYCLE_CONFIG_FLAG_ARGS_MAP = {
"log-level": "log.level",
"invite-public": "debug.invite_public",
"public-invites": "public_invites",
"auto-accept-invites": "debug.auto_accept_invites",
"auto-accept-requests": "debug.auto_accept_requests",
"auto-ping-connection": "auto_ping_connection",
"monitor-ping": "debug.monitor_ping",
"auto-respond-messages": "debug.auto_respond_messages",
"auto-respond-credential-offer": "debug.auto_respond_credential_offer",
"auto-respond-credential-request": "debug.auto_respond_credential_request",
"auto-verify-presentation": "debug.auto_verify_presentation",
"notify-revocation": "revocation.notify",
"auto-request-endorsement": "endorser.auto_request",
"auto-write-transactions": "endorser.auto_write",
"auto-create-revocation-transactions": "endorser.auto_create_rev_reg",
"endorser-protocol-role": "endorser.protocol_role",
}
ACAPY_ENDORSER_FLAGS_DEPENDENT_ON_AUTHOR_ROLE = [
"ACAPY_AUTO_REQUEST_ENDORSEMENT",
"ACAPY_AUTO_WRITE_TRANSACTIONS",
"ACAPY_CREATE_REVOCATION_TRANSACTIONS",
"auto-request-endorsement",
"auto-write-transactions",
"auto-create-revocation-transactions",
]
[docs]class MultitenantModuleResponseSchema(OpenAPISchema):
"""Response schema for multitenant module."""
[docs]class WalletIdMatchInfoSchema(OpenAPISchema):
"""Path parameters and validators for request taking wallet id."""
wallet_id = fields.Str(
required=True,
metadata={"description": "Subwallet identifier", "example": UUID4_EXAMPLE},
)
[docs]class CreateWalletRequestSchema(OpenAPISchema):
"""Request schema for adding a new wallet which will be registered by the agent."""
wallet_name = fields.Str(
metadata={"description": "Wallet name", "example": "MyNewWallet"}
)
wallet_key = fields.Str(
metadata={
"description": "Master key used for key derivation.",
"example": "MySecretKey123",
}
)
extra_settings = fields.Dict(
required=False, metadata={"description": "Agent config key-value pairs"}
)
wallet_key_derivation = fields.Str(
required=False,
validate=validate.OneOf(["ARGON2I_MOD", "ARGON2I_INT", "RAW"]),
metadata={"description": "Key derivation", "example": "RAW"},
)
wallet_type = fields.Str(
dump_default="in_memory",
validate=validate.OneOf(list(ProfileManagerProvider.MANAGER_TYPES)),
metadata={"description": "Type of the wallet to create", "example": "indy"},
)
wallet_dispatch_type = fields.Str(
dump_default="default",
validate=validate.OneOf(["default", "both", "base"]),
metadata={
"description": (
"Webhook target dispatch type for this wallet. default -"
" Dispatch only to webhooks associated with this wallet. base -"
" Dispatch only to webhooks associated with the base wallet. "
" both - Dispatch to both webhook targets."
),
"example": "default",
},
)
wallet_webhook_urls = fields.List(
fields.Str(
metadata={
"description": "Optional webhook URL to receive webhook messages",
"example": "http://localhost:8022/webhooks",
}
),
required=False,
metadata={"description": "List of Webhook URLs associated with this subwallet"},
)
label = fields.Str(
metadata={
"description": (
"Label for this wallet. This label is publicized (self-attested)"
" to other agents as part of forming a connection."
),
"example": "Alice",
}
)
image_url = fields.Str(
metadata={
"description": (
"Image url for this wallet. This image url is publicized "
" (self-attested) to other agents as part of forming a connection."
),
"example": "https://aries.ca/images/sample.png",
}
)
key_management_mode = fields.Str(
dump_default=WalletRecord.MODE_MANAGED,
validate=validate.OneOf((WalletRecord.MODE_MANAGED,)),
metadata={
"description": "Key management method to use for this wallet.",
"example": WalletRecord.MODE_MANAGED,
},
)
@validates_schema
def validate_fields(self, data, **kwargs):
"""Validate schema fields.
Args:
data: The data to validate
Raises:
ValidationError: If any of the fields do not validate
"""
if data.get("wallet_type") == "indy":
for field in ("wallet_key", "wallet_name"):
if field not in data:
raise ValidationError("Missing required field", field)
[docs]class UpdateWalletRequestSchema(OpenAPISchema):
"""Request schema for updating a existing wallet."""
wallet_dispatch_type = fields.Str(
dump_default="default",
validate=validate.OneOf(["default", "both", "base"]),
metadata={
"description": (
"Webhook target dispatch type for this wallet. default -"
" Dispatch only to webhooks associated with this wallet. base -"
" Dispatch only to webhooks associated with the base wallet. "
" both - Dispatch to both webhook targets."
),
"example": "default",
},
)
extra_settings = fields.Dict(
required=False, metadata={"description": "Agent config key-value pairs"}
)
wallet_webhook_urls = fields.List(
fields.Str(
metadata={
"description": "Optional webhook URL to receive webhook messages",
"example": "http://localhost:8022/webhooks",
}
),
required=False,
metadata={"description": "List of Webhook URLs associated with this subwallet"},
)
label = fields.Str(
metadata={
"description": (
"Label for this wallet. This label is publicized (self-attested)"
" to other agents as part of forming a connection."
),
"example": "Alice",
}
)
image_url = fields.Str(
metadata={
"description": (
"Image url for this wallet. This image url is publicized "
" (self-attested) to other agents as part of forming a connection."
),
"example": "https://aries.ca/images/sample.png",
}
)
[docs]class CreateWalletResponseSchema(WalletRecordSchema):
"""Response schema for creating a wallet."""
token = fields.Str(
metadata={
"description": "Authorization token to authenticate wallet requests",
"example": JSONWebToken.EXAMPLE,
}
)
[docs]class RemoveWalletRequestSchema(OpenAPISchema):
"""Request schema for removing a wallet."""
wallet_key = fields.Str(
metadata={
"description": (
"Master key used for key derivation. Only required for "
" unmanaged wallets."
),
"example": "MySecretKey123",
}
)
[docs]class CreateWalletTokenRequestSchema(OpenAPISchema):
"""Request schema for creating a wallet token."""
wallet_key = fields.Str(
metadata={
"description": (
"Master key used for key derivation. Only required for "
" unamanged wallets."
),
"example": "MySecretKey123",
}
)
[docs]class CreateWalletTokenResponseSchema(OpenAPISchema):
"""Response schema for creating a wallet token."""
token = fields.Str(
metadata={
"description": "Authorization token to authenticate wallet requests",
"example": JSONWebToken.EXAMPLE,
}
)
[docs]class WalletListSchema(OpenAPISchema):
"""Result schema for wallet list."""
results = fields.List(
fields.Nested(WalletRecordSchema()),
metadata={"description": "List of wallet records"},
)
[docs]class WalletListQueryStringSchema(OpenAPISchema):
"""Parameters and validators for wallet list request query string."""
wallet_name = fields.Str(
metadata={"description": "Wallet name", "example": "MyNewWallet"}
)
@docs(tags=["multitenancy"], summary="Query subwallets")
@querystring_schema(WalletListQueryStringSchema())
@response_schema(WalletListSchema(), 200, description="")
async def wallets_list(request: web.BaseRequest):
"""Request handler for listing all internal subwallets.
Args:
request: aiohttp request object
"""
context: AdminRequestContext = request["context"]
profile = context.profile
query = {}
wallet_name = request.query.get("wallet_name")
if wallet_name:
query["wallet_name"] = wallet_name
try:
async with profile.session() as session:
records = await WalletRecord.query(session, tag_filter=query)
results = [format_wallet_record(record) for record in records]
results.sort(key=lambda w: w["created_at"])
except (StorageError, BaseModelError) as err:
raise web.HTTPBadRequest(reason=err.roll_up) from err
return web.json_response({"results": results})
@docs(tags=["multitenancy"], summary="Get a single subwallet")
@match_info_schema(WalletIdMatchInfoSchema())
@response_schema(WalletRecordSchema(), 200, description="")
async def wallet_get(request: web.BaseRequest):
"""Request handler for getting a single subwallet.
Args:
request: aiohttp request object
Raises:
HTTPNotFound: if wallet_id does not match any known wallets
"""
context: AdminRequestContext = request["context"]
profile = context.profile
wallet_id = request.match_info["wallet_id"]
try:
async with profile.session() as session:
wallet_record = await WalletRecord.retrieve_by_id(session, wallet_id)
result = format_wallet_record(wallet_record)
except StorageNotFoundError as err:
raise web.HTTPNotFound(reason=err.roll_up) from err
except BaseModelError as err:
raise web.HTTPBadRequest(reason=err.roll_up) from err
return web.json_response(result)
@docs(tags=["multitenancy"], summary="Create a subwallet")
@request_schema(CreateWalletRequestSchema)
@response_schema(CreateWalletResponseSchema(), 200, description="")
async def wallet_create(request: web.BaseRequest):
"""Request handler for adding a new subwallet for handling by the agent.
Args:
request: aiohttp request object
"""
context: AdminRequestContext = request["context"]
body = await request.json()
key_management_mode = body.get("key_management_mode") or WalletRecord.MODE_MANAGED
wallet_key = body.get("wallet_key")
wallet_webhook_urls = body.get("wallet_webhook_urls") or []
wallet_dispatch_type = body.get("wallet_dispatch_type") or "default"
extra_settings = body.get("extra_settings") or {}
# If no webhooks specified, then dispatch only to base webhook targets
if wallet_webhook_urls == []:
wallet_dispatch_type = "base"
settings = {
"wallet.type": body.get("wallet_type") or "in_memory",
"wallet.name": body.get("wallet_name"),
"wallet.key": wallet_key,
"wallet.webhook_urls": wallet_webhook_urls,
"wallet.dispatch_type": wallet_dispatch_type,
}
extra_subwallet_setting = get_extra_settings_dict_per_tenant(extra_settings)
settings.update(extra_subwallet_setting)
label = body.get("label")
image_url = body.get("image_url")
key_derivation = body.get("wallet_key_derivation")
if label:
settings["default_label"] = label
if image_url:
settings["image_url"] = image_url
if key_derivation: # allow lower levels to handle default
settings["wallet.key_derivation_method"] = key_derivation
try:
multitenant_mgr = context.profile.inject(BaseMultitenantManager)
wallet_record = await multitenant_mgr.create_wallet(
settings, key_management_mode
)
token = await multitenant_mgr.create_auth_token(wallet_record, wallet_key)
except BaseError as err:
raise web.HTTPBadRequest(reason=err.roll_up) from err
result = {
**format_wallet_record(wallet_record),
"token": token,
}
return web.json_response(result)
@docs(tags=["multitenancy"], summary="Update a subwallet")
@match_info_schema(WalletIdMatchInfoSchema())
@request_schema(UpdateWalletRequestSchema)
@response_schema(WalletRecordSchema(), 200, description="")
async def wallet_update(request: web.BaseRequest):
"""Request handler for updating a existing subwallet for handling by the agent.
Args:
request: aiohttp request object
"""
context: AdminRequestContext = request["context"]
wallet_id = request.match_info["wallet_id"]
body = await request.json()
wallet_webhook_urls = body.get("wallet_webhook_urls")
wallet_dispatch_type = body.get("wallet_dispatch_type")
label = body.get("label")
image_url = body.get("image_url")
extra_settings = body.get("extra_settings")
if all(
v is None
for v in (
wallet_webhook_urls,
wallet_dispatch_type,
label,
image_url,
extra_settings,
)
):
raise web.HTTPBadRequest(reason="At least one parameter is required.")
# adjust wallet_dispatch_type according to wallet_webhook_urls
if wallet_webhook_urls and wallet_dispatch_type is None:
wallet_dispatch_type = "default"
if wallet_webhook_urls == []:
wallet_dispatch_type = "base"
# only parameters that are not none are updated
settings = {}
if wallet_webhook_urls is not None:
settings["wallet.webhook_urls"] = wallet_webhook_urls
if wallet_dispatch_type is not None:
settings["wallet.dispatch_type"] = wallet_dispatch_type
if label is not None:
settings["default_label"] = label
if image_url is not None:
settings["image_url"] = image_url
extra_subwallet_setting = get_extra_settings_dict_per_tenant(extra_settings or {})
settings.update(extra_subwallet_setting)
try:
multitenant_mgr = context.profile.inject(BaseMultitenantManager)
wallet_record = await multitenant_mgr.update_wallet(wallet_id, settings)
result = format_wallet_record(wallet_record)
except StorageNotFoundError as err:
raise web.HTTPNotFound(reason=err.roll_up) from err
except WalletSettingsError as err:
raise web.HTTPBadRequest(reason=err.roll_up) from err
return web.json_response(result)
@docs(tags=["multitenancy"], summary="Get auth token for a subwallet")
@request_schema(CreateWalletTokenRequestSchema)
@response_schema(CreateWalletTokenResponseSchema(), 200, description="")
async def wallet_create_token(request: web.BaseRequest):
"""Request handler for creating an authorization token for a specific subwallet.
Args:
request: aiohttp request object
"""
context: AdminRequestContext = request["context"]
wallet_id = request.match_info["wallet_id"]
wallet_key = None
if request.has_body:
body = await request.json()
wallet_key = body.get("wallet_key")
profile = context.profile
try:
multitenant_mgr = profile.inject(BaseMultitenantManager)
async with profile.session() as session:
wallet_record = await WalletRecord.retrieve_by_id(session, wallet_id)
if (not wallet_record.requires_external_key) and wallet_key:
raise web.HTTPBadRequest(
reason=(
f"Wallet {wallet_id} doesn't require the wallet key to be provided"
)
)
token = await multitenant_mgr.create_auth_token(wallet_record, wallet_key)
except StorageNotFoundError as err:
raise web.HTTPNotFound(reason=err.roll_up) from err
except WalletKeyMissingError as err:
raise web.HTTPUnauthorized(reason=err.roll_up) from err
return web.json_response({"token": token})
@docs(
tags=["multitenancy"],
summary="Remove a subwallet",
)
@match_info_schema(WalletIdMatchInfoSchema())
@request_schema(RemoveWalletRequestSchema)
@response_schema(MultitenantModuleResponseSchema(), 200, description="")
async def wallet_remove(request: web.BaseRequest):
"""Request handler to remove a subwallet from agent and storage.
Args:
request: aiohttp request object.
"""
context: AdminRequestContext = request["context"]
wallet_id = request.match_info["wallet_id"]
wallet_key = None
if request.has_body:
body = await request.json()
wallet_key = body.get("wallet_key")
profile = context.profile
try:
multitenant_mgr = profile.inject(BaseMultitenantManager)
async with profile.session() as session:
wallet_record = await WalletRecord.retrieve_by_id(session, wallet_id)
if (not wallet_record.requires_external_key) and wallet_key:
raise web.HTTPBadRequest(
reason=(
f"Wallet {wallet_id} doesn't require the wallet key to be provided"
)
)
await multitenant_mgr.remove_wallet(wallet_id, wallet_key)
except StorageNotFoundError as err:
raise web.HTTPNotFound(reason=err.roll_up) from err
except WalletKeyMissingError as err:
raise web.HTTPUnauthorized(reason=err.roll_up) from err
return web.json_response({})
# MTODO: add wallet import route
# MTODO: add wallet export route
# MTODO: add rotate wallet key route
[docs]async def register(app: web.Application):
"""Register routes."""
app.add_routes(
[
web.get("/multitenancy/wallets", wallets_list, allow_head=False),
web.post("/multitenancy/wallet", wallet_create),
web.get("/multitenancy/wallet/{wallet_id}", wallet_get, allow_head=False),
web.put("/multitenancy/wallet/{wallet_id}", wallet_update),
web.post("/multitenancy/wallet/{wallet_id}/token", wallet_create_token),
web.post("/multitenancy/wallet/{wallet_id}/remove", wallet_remove),
]
)
[docs]def post_process_routes(app: web.Application):
"""Amend swagger API."""
# Add top-level tags description
if "tags" not in app._state["swagger_dict"]:
app._state["swagger_dict"]["tags"] = []
app._state["swagger_dict"]["tags"].append(
{"name": "multitenancy", "description": "Multitenant wallet management"}
)