"""Wallet admin routes."""
import json
import logging
from typing import List
from aiohttp import web
from aiohttp_apispec import docs, querystring_schema, request_schema, response_schema
from marshmallow import fields, validate
from ..admin.request_context import AdminRequestContext
from ..connections.models.conn_record import ConnRecord
from ..core.event_bus import Event, EventBus
from ..core.profile import Profile
from ..ledger.base import BaseLedger
from ..ledger.endpoint_type import EndpointType
from ..ledger.error import LedgerConfigError, LedgerError
from ..messaging.models.base import BaseModelError
from ..messaging.models.openapi import OpenAPISchema
from ..messaging.responder import BaseResponder
from ..messaging.valid import (
DID_POSTURE,
ENDPOINT,
ENDPOINT_TYPE,
INDY_DID,
INDY_RAW_PUBLIC_KEY,
GENERIC_DID,
)
from ..protocols.coordinate_mediation.v1_0.route_manager import RouteManager
from ..protocols.endorse_transaction.v1_0.manager import (
TransactionManager,
TransactionManagerError,
)
from ..protocols.endorse_transaction.v1_0.util import (
get_endorser_connection_id,
is_author_role,
)
from ..storage.error import StorageError, StorageNotFoundError
from .base import BaseWallet
from .did_info import DIDInfo
from .did_method import SOV, KEY, DIDMethod, DIDMethods, HolderDefinedDid
from .did_posture import DIDPosture
from .error import WalletError, WalletNotFoundError
from .key_type import BLS12381G2, ED25519, KeyTypes
from .util import EVENT_LISTENER_PATTERN
LOGGER = logging.getLogger(__name__)
[docs]class WalletModuleResponseSchema(OpenAPISchema):
"""Response schema for Wallet Module."""
[docs]class DIDSchema(OpenAPISchema):
"""Result schema for a DID."""
did = fields.Str(description="DID of interest", **GENERIC_DID)
verkey = fields.Str(description="Public verification key", **INDY_RAW_PUBLIC_KEY)
posture = fields.Str(
description=(
"Whether DID is current public DID, "
"posted to ledger but not current public DID, "
"or local to the wallet"
),
**DID_POSTURE,
)
method = fields.Str(
description="Did method associated with the DID", example=SOV.method_name
)
key_type = fields.Str(
description="Key type associated with the DID",
example=ED25519.key_type,
validate=validate.OneOf([ED25519.key_type, BLS12381G2.key_type]),
)
[docs]class DIDResultSchema(OpenAPISchema):
"""Result schema for a DID."""
result = fields.Nested(DIDSchema())
[docs]class DIDListSchema(OpenAPISchema):
"""Result schema for connection list."""
results = fields.List(fields.Nested(DIDSchema()), description="DID list")
[docs]class DIDEndpointWithTypeSchema(OpenAPISchema):
"""Request schema to set DID endpoint of particular type."""
did = fields.Str(description="DID of interest", required=True, **INDY_DID)
endpoint = fields.Str(
description="Endpoint to set (omit to delete)", required=False, **ENDPOINT
)
endpoint_type = fields.Str(
description=(
f"Endpoint type to set (default '{EndpointType.ENDPOINT.w3c}'); "
"affects only public or posted DIDs"
),
required=False,
**ENDPOINT_TYPE,
)
[docs]class DIDEndpointSchema(OpenAPISchema):
"""Request schema to set DID endpoint; response schema to get DID endpoint."""
did = fields.Str(description="DID of interest", required=True, **INDY_DID)
endpoint = fields.Str(
description="Endpoint to set (omit to delete)", required=False, **ENDPOINT
)
[docs]class DIDListQueryStringSchema(OpenAPISchema):
"""Parameters and validators for DID list request query string."""
did = fields.Str(description="DID of interest", required=False, **GENERIC_DID)
verkey = fields.Str(
description="Verification key of interest",
required=False,
**INDY_RAW_PUBLIC_KEY,
)
posture = fields.Str(
description=(
"Whether DID is current public DID, "
"posted to ledger but current public DID, "
"or local to the wallet"
),
required=False,
**DID_POSTURE,
)
method = fields.Str(
required=False,
example=KEY.method_name,
validate=validate.OneOf([KEY.method_name, SOV.method_name]),
description="DID method to query for. e.g. sov to only fetch indy/sov DIDs",
)
key_type = fields.Str(
required=False,
example=ED25519.key_type,
validate=validate.OneOf([ED25519.key_type, BLS12381G2.key_type]),
description="Key type to query for.",
)
[docs]class DIDQueryStringSchema(OpenAPISchema):
"""Parameters and validators for set public DID request query string."""
did = fields.Str(description="DID of interest", required=True, **INDY_DID)
[docs]class DIDCreateOptionsSchema(OpenAPISchema):
"""Parameters and validators for create DID options."""
key_type = fields.Str(
required=True,
example=ED25519.key_type,
description="Key type to use for the DID keypair. "
+ "Validated with the chosen DID method's supported key types.",
validate=validate.OneOf([ED25519.key_type, BLS12381G2.key_type]),
)
did = fields.Str(
required=False,
description="Specify final value of the did (including did:<method>: prefix)"
+ "if the method supports or requires so.",
**GENERIC_DID,
)
[docs]class DIDCreateSchema(OpenAPISchema):
"""Parameters and validators for create DID endpoint."""
method = fields.Str(
required=False,
default=SOV.method_name,
example=SOV.method_name,
description="Method for the requested DID."
+ "Supported methods are 'key', 'sov', and any other registered method.",
)
options = fields.Nested(
DIDCreateOptionsSchema,
required=False,
description="To define a key type and/or a did depending on chosen DID method.",
)
seed = fields.Str(
required=False,
description=(
"Optional seed to use for DID, Must be"
"enabled in configuration before use."
),
example="000000000000000000000000Trustee1",
)
[docs]class CreateAttribTxnForEndorserOptionSchema(OpenAPISchema):
"""Class for user to input whether to create a transaction for endorser or not."""
create_transaction_for_endorser = fields.Boolean(
description="Create Transaction For Endorser's signature",
required=False,
)
[docs]class AttribConnIdMatchInfoSchema(OpenAPISchema):
"""Path parameters and validators for request taking connection id."""
conn_id = fields.Str(description="Connection identifier", required=False)
@docs(tags=["wallet"], summary="List wallet DIDs")
@querystring_schema(DIDListQueryStringSchema())
@response_schema(DIDListSchema, 200, description="")
async def wallet_did_list(request: web.BaseRequest):
"""
Request handler for searching wallet DIDs.
Args:
request: aiohttp request object
Returns:
The DID list response
"""
context: AdminRequestContext = request["context"]
filter_did = request.query.get("did")
filter_verkey = request.query.get("verkey")
filter_posture = DIDPosture.get(request.query.get("posture"))
results = []
async with context.session() as session:
did_methods: DIDMethods = session.inject(DIDMethods)
filter_method: DIDMethod | None = did_methods.from_method(
request.query.get("method")
)
key_types = session.inject(KeyTypes)
filter_key_type = key_types.from_key_type(request.query.get("key_type", ""))
wallet: BaseWallet | None = session.inject_or(BaseWallet)
if not wallet:
raise web.HTTPForbidden(reason="No wallet available")
if filter_posture is DIDPosture.PUBLIC:
public_did_info = await wallet.get_public_did()
if (
public_did_info
and (not filter_verkey or public_did_info.verkey == filter_verkey)
and (not filter_did or public_did_info.did == filter_did)
and (not filter_method or public_did_info.method == filter_method)
and (not filter_key_type or public_did_info.key_type == filter_key_type)
):
results.append(format_did_info(public_did_info))
elif filter_posture is DIDPosture.POSTED:
results = []
posted_did_infos = await wallet.get_posted_dids()
for info in posted_did_infos:
if (
(not filter_verkey or info.verkey == filter_verkey)
and (not filter_did or info.did == filter_did)
and (not filter_method or info.method == filter_method)
and (not filter_key_type or info.key_type == filter_key_type)
):
results.append(format_did_info(info))
elif filter_did:
try:
info = await wallet.get_local_did(filter_did)
except WalletError:
# badly formatted DID or record not found
info = None
if (
info
and (not filter_verkey or info.verkey == filter_verkey)
and (not filter_method or info.method == filter_method)
and (not filter_key_type or info.key_type == filter_key_type)
and (
filter_posture is None
or (
filter_posture is DIDPosture.WALLET_ONLY
and not info.metadata.get("posted")
)
)
):
results.append(format_did_info(info))
elif filter_verkey:
try:
info = await wallet.get_local_did_for_verkey(filter_verkey)
except WalletError:
info = None
if (
info
and (not filter_method or info.method == filter_method)
and (not filter_key_type or info.key_type == filter_key_type)
and (
filter_posture is None
or (
filter_posture is DID_POSTURE.WALLET_ONLY
and not info.metadata.get("posted")
)
)
):
results.append(format_did_info(info))
else:
dids = await wallet.get_local_dids()
results = [
format_did_info(info)
for info in dids
if (
filter_posture is None
or DIDPosture.get(info.metadata) is DIDPosture.WALLET_ONLY
)
and (not filter_method or info.method == filter_method)
and (not filter_key_type or info.key_type == filter_key_type)
]
results.sort(
key=lambda info: (DIDPosture.get(info["posture"]).ordinal, info["did"])
)
return web.json_response({"results": results})
@docs(tags=["wallet"], summary="Create a local DID")
@request_schema(DIDCreateSchema())
@response_schema(DIDResultSchema, 200, description="")
async def wallet_create_did(request: web.BaseRequest):
"""
Request handler for creating a new local DID in the wallet.
Args:
request: aiohttp request object
Returns:
The DID info
"""
context: AdminRequestContext = request["context"]
try:
body = await request.json()
except Exception:
body = {}
# set default method and key type for backwards compat
seed = body.get("seed") or None
if seed and not context.settings.get("wallet.allow_insecure_seed"):
raise web.HTTPBadRequest(reason="Seed support is not enabled")
info = None
async with context.session() as session:
did_methods = session.inject(DIDMethods)
method = did_methods.from_method(body.get("method", "")) or SOV
key_types = session.inject(KeyTypes)
# set default method and key type for backwards compat
key_type = (
key_types.from_key_type(body.get("options", {}).get("key_type", ""))
or ED25519
)
if not method.supports_key_type(key_type):
raise web.HTTPForbidden(
reason=(
f"method {method.method_name} does not"
f" support key type {key_type.key_type}"
)
)
did = body.get("options", {}).get("did")
if method.holder_defined_did() == HolderDefinedDid.NO and did:
raise web.HTTPForbidden(
reason=(
f"method {method.method_name} does not"
f" support user-defined DIDs"
)
)
elif method.holder_defined_did() == HolderDefinedDid.REQUIRED and not did:
raise web.HTTPBadRequest(
reason=f"method {method.method_name} requires a user-defined DIDs"
)
wallet = session.inject_or(BaseWallet)
if not wallet:
raise web.HTTPForbidden(reason="No wallet available")
try:
info = await wallet.create_local_did(
method=method, key_type=key_type, seed=seed, did=did
)
except WalletError as err:
raise web.HTTPBadRequest(reason=err.roll_up) from err
return web.json_response({"result": format_did_info(info)})
@docs(tags=["wallet"], summary="Fetch the current public DID")
@response_schema(DIDResultSchema, 200, description="")
async def wallet_get_public_did(request: web.BaseRequest):
"""
Request handler for fetching the current public DID.
Args:
request: aiohttp request object
Returns:
The DID info
"""
context: AdminRequestContext = request["context"]
info = None
async with context.session() as session:
wallet = session.inject_or(BaseWallet)
if not wallet:
raise web.HTTPForbidden(reason="No wallet available")
try:
info = await wallet.get_public_did()
except WalletError as err:
raise web.HTTPBadRequest(reason=err.roll_up) from err
return web.json_response({"result": format_did_info(info)})
@docs(tags=["wallet"], summary="Assign the current public DID")
@querystring_schema(DIDQueryStringSchema())
@querystring_schema(CreateAttribTxnForEndorserOptionSchema())
@querystring_schema(AttribConnIdMatchInfoSchema())
@querystring_schema(MediationIDSchema())
@response_schema(DIDResultSchema, 200, description="")
async def wallet_set_public_did(request: web.BaseRequest):
"""
Request handler for setting the current public DID.
Args:
request: aiohttp request object
Returns:
The updated DID info
"""
context: AdminRequestContext = request["context"]
session = await context.session()
outbound_handler = request["outbound_message_router"]
create_transaction_for_endorser = json.loads(
request.query.get("create_transaction_for_endorser", "false")
)
write_ledger = not create_transaction_for_endorser
connection_id = request.query.get("conn_id")
attrib_def = None
# check if we need to endorse
if is_author_role(context.profile):
# authors cannot write to the ledger
write_ledger = False
create_transaction_for_endorser = True
if not connection_id:
# author has not provided a connection id, so determine which to use
connection_id = await get_endorser_connection_id(context.profile)
if not connection_id:
raise web.HTTPBadRequest(reason="No endorser connection found")
wallet = session.inject_or(BaseWallet)
if not wallet:
raise web.HTTPForbidden(reason="No wallet available")
did = request.query.get("did")
if not did:
raise web.HTTPBadRequest(reason="Request query must include DID")
info: DIDInfo = None
mediation_id = request.query.get("mediation_id")
profile = context.profile
route_manager = profile.inject(RouteManager)
mediation_record = await route_manager.mediation_record_if_id(
profile=profile, mediation_id=mediation_id, or_default=True
)
routing_keys = None
mediator_endpoint = None
if mediation_record:
routing_keys = mediation_record.routing_keys
mediator_endpoint = mediation_record.endpoint
try:
info, attrib_def = await promote_wallet_public_did(
context.profile,
context,
context.session,
did,
write_ledger=write_ledger,
connection_id=connection_id,
routing_keys=routing_keys,
mediator_endpoint=mediator_endpoint,
)
except LookupError as err:
raise web.HTTPNotFound(reason=str(err)) from err
except PermissionError as err:
raise web.HTTPForbidden(reason=str(err)) from err
except WalletNotFoundError as err:
raise web.HTTPNotFound(reason=err.roll_up) from err
except (LedgerError, WalletError) as err:
raise web.HTTPBadRequest(reason=err.roll_up) from err
if not create_transaction_for_endorser:
return web.json_response({"result": format_did_info(info)})
else:
transaction_mgr = TransactionManager(context.profile)
try:
transaction = await transaction_mgr.create_record(
messages_attach=attrib_def["signed_txn"], connection_id=connection_id
)
except StorageError as err:
raise web.HTTPBadRequest(reason=err.roll_up) from err
# if auto-request, send the request to the endorser
if context.settings.get_value("endorser.auto_request"):
try:
transaction, transaction_request = await transaction_mgr.create_request(
transaction=transaction,
# TODO see if we need to parameterize these params
# expires_time=expires_time,
# endorser_write_txn=endorser_write_txn,
)
except (StorageError, TransactionManagerError) as err:
raise web.HTTPBadRequest(reason=err.roll_up) from err
await outbound_handler(transaction_request, connection_id=connection_id)
return web.json_response({"txn": transaction.serialize()})
@docs(
tags=["wallet"], summary="Update endpoint in wallet and on ledger if posted to it"
)
@request_schema(DIDEndpointWithTypeSchema)
@querystring_schema(CreateAttribTxnForEndorserOptionSchema())
@querystring_schema(AttribConnIdMatchInfoSchema())
@response_schema(WalletModuleResponseSchema(), description="")
async def wallet_set_did_endpoint(request: web.BaseRequest):
"""
Request handler for setting an endpoint for a DID.
Args:
request: aiohttp request object
"""
context: AdminRequestContext = request["context"]
outbound_handler = request["outbound_message_router"]
body = await request.json()
did = body["did"]
endpoint = body.get("endpoint")
endpoint_type = EndpointType.get(
body.get("endpoint_type", EndpointType.ENDPOINT.w3c)
)
create_transaction_for_endorser = json.loads(
request.query.get("create_transaction_for_endorser", "false")
)
write_ledger = not create_transaction_for_endorser
endorser_did = None
connection_id = request.query.get("conn_id")
attrib_def = None
# check if we need to endorse
if is_author_role(context.profile):
# authors cannot write to the ledger
write_ledger = False
create_transaction_for_endorser = True
if not connection_id:
# author has not provided a connection id, so determine which to use
connection_id = await get_endorser_connection_id(context.profile)
if not connection_id:
raise web.HTTPBadRequest(reason="No endorser connection found")
if not write_ledger:
try:
async with context.session() as session:
connection_record = await ConnRecord.retrieve_by_id(
session, connection_id
)
except StorageNotFoundError as err:
raise web.HTTPNotFound(reason=err.roll_up) from err
except BaseModelError as err:
raise web.HTTPBadRequest(reason=err.roll_up) from err
async with context.session() as session:
endorser_info = await connection_record.metadata_get(
session, "endorser_info"
)
if not endorser_info:
raise web.HTTPForbidden(
reason="Endorser Info is not set up in "
"connection metadata for this connection record"
)
if "endorser_did" not in endorser_info.keys():
raise web.HTTPForbidden(
reason=' "endorser_did" is not set in "endorser_info"'
" in connection metadata for this connection record"
)
endorser_did = endorser_info["endorser_did"]
async with context.session() as session:
wallet = session.inject_or(BaseWallet)
if not wallet:
raise web.HTTPForbidden(reason="No wallet available")
try:
ledger = context.profile.inject_or(BaseLedger)
attrib_def = await wallet.set_did_endpoint(
did,
endpoint,
ledger,
endpoint_type,
write_ledger=write_ledger,
endorser_did=endorser_did,
)
except WalletNotFoundError as err:
raise web.HTTPNotFound(reason=err.roll_up) from err
except LedgerConfigError as err:
raise web.HTTPForbidden(reason=err.roll_up) from err
except (LedgerError, WalletError) as err:
raise web.HTTPBadRequest(reason=err.roll_up) from err
if not create_transaction_for_endorser:
return web.json_response({})
else:
transaction_mgr = TransactionManager(context.profile)
try:
transaction = await transaction_mgr.create_record(
messages_attach=attrib_def["signed_txn"], connection_id=connection_id
)
except StorageError as err:
raise web.HTTPBadRequest(reason=err.roll_up) from err
# if auto-request, send the request to the endorser
if context.settings.get_value("endorser.auto_request"):
try:
transaction, transaction_request = await transaction_mgr.create_request(
transaction=transaction,
# TODO see if we need to parameterize these params
# expires_time=expires_time,
# endorser_write_txn=endorser_write_txn,
)
except (StorageError, TransactionManagerError) as err:
raise web.HTTPBadRequest(reason=err.roll_up) from err
await outbound_handler(transaction_request, connection_id=connection_id)
return web.json_response({"txn": transaction.serialize()})
@docs(tags=["wallet"], summary="Query DID endpoint in wallet")
@querystring_schema(DIDQueryStringSchema())
@response_schema(DIDEndpointSchema, 200, description="")
async def wallet_get_did_endpoint(request: web.BaseRequest):
"""
Request handler for getting the current DID endpoint from the wallet.
Args:
request: aiohttp request object
Returns:
The updated DID info
"""
context: AdminRequestContext = request["context"]
async with context.session() as session:
wallet = session.inject_or(BaseWallet)
if not wallet:
raise web.HTTPForbidden(reason="No wallet available")
did = request.query.get("did")
if not did:
raise web.HTTPBadRequest(reason="Request query must include DID")
try:
did_info = await wallet.get_local_did(did)
endpoint = did_info.metadata.get("endpoint")
except WalletNotFoundError as err:
raise web.HTTPNotFound(reason=err.roll_up) from err
except WalletError as err:
raise web.HTTPBadRequest(reason=err.roll_up) from err
return web.json_response({"did": did, "endpoint": endpoint})
@docs(tags=["wallet"], summary="Rotate keypair for a DID not posted to the ledger")
@querystring_schema(DIDQueryStringSchema())
@response_schema(WalletModuleResponseSchema(), description="")
async def wallet_rotate_did_keypair(request: web.BaseRequest):
"""
Request handler for rotating local DID keypair.
Args:
request: aiohttp request object
Returns:
An empty JSON response
"""
context: AdminRequestContext = request["context"]
did = request.query.get("did")
if not did:
raise web.HTTPBadRequest(reason="Request query must include DID")
async with context.session() as session:
wallet = session.inject_or(BaseWallet)
if not wallet:
raise web.HTTPForbidden(reason="No wallet available")
try:
did_info: DIDInfo = None
did_info = await wallet.get_local_did(did)
if did_info.metadata.get("posted", False):
# call from ledger API instead to propagate through ledger NYM transaction
raise web.HTTPBadRequest(reason=f"DID {did} is posted to the ledger")
await wallet.rotate_did_keypair_start(did) # do not take seed over the wire
await wallet.rotate_did_keypair_apply(did)
except WalletNotFoundError as err:
raise web.HTTPNotFound(reason=err.roll_up) from err
except WalletError as err:
raise web.HTTPBadRequest(reason=err.roll_up) from err
return web.json_response({})
[docs]def register_events(event_bus: EventBus):
"""Subscribe to any events we need to support."""
event_bus.subscribe(EVENT_LISTENER_PATTERN, on_register_nym_event)
[docs]async def on_register_nym_event(profile: Profile, event: Event):
"""Handle any events we need to support."""
# after the nym record is written, promote to wallet public DID
if is_author_role(profile) and profile.context.settings.get_value(
"endorser.auto_promote_author_did"
):
did = event.payload["did"]
connection_id = event.payload.get("connection_id")
try:
info, attrib_def = await promote_wallet_public_did(
profile, profile.context, profile.session, did, connection_id
)
except Exception as err:
# log the error, but continue
LOGGER.exception(
"Error promoting to public DID: %s",
err,
)
return
transaction_mgr = TransactionManager(profile)
try:
transaction = await transaction_mgr.create_record(
messages_attach=attrib_def["signed_txn"], connection_id=connection_id
)
except StorageError as err:
# log the error, but continue
LOGGER.exception(
"Error accepting endorser invitation/configuring endorser connection: %s",
err,
)
return
# if auto-request, send the request to the endorser
if profile.settings.get_value("endorser.auto_request"):
try:
transaction, transaction_request = await transaction_mgr.create_request(
transaction=transaction,
# TODO see if we need to parameterize these params
# expires_time=expires_time,
# endorser_write_txn=endorser_write_txn,
)
except (StorageError, TransactionManagerError) as err:
# log the error, but continue
LOGGER.exception(
"Error creating endorser transaction request: %s",
err,
)
# TODO not sure how to get outbound_handler in an event ...
# await outbound_handler(transaction_request, connection_id=connection_id)
responder = profile.inject_or(BaseResponder)
if responder:
await responder.send(
transaction_request,
connection_id=connection_id,
)
else:
LOGGER.warning(
"Configuration has no BaseResponder: cannot update "
"ATTRIB record on DID: %s",
did,
)
[docs]async def register(app: web.Application):
"""Register routes."""
app.add_routes(
[
web.get("/wallet/did", wallet_did_list, allow_head=False),
web.post("/wallet/did/create", wallet_create_did),
web.get("/wallet/did/public", wallet_get_public_did, allow_head=False),
web.post("/wallet/did/public", wallet_set_public_did),
web.post("/wallet/set-did-endpoint", wallet_set_did_endpoint),
web.get(
"/wallet/get-did-endpoint", wallet_get_did_endpoint, allow_head=False
),
web.patch("/wallet/did/local/rotate-keypair", wallet_rotate_did_keypair),
]
)
[docs]def post_process_routes(app: web.Application):
"""Amend swagger API."""
# Add top-level tags description
if "tags" not in app._state["swagger_dict"]:
app._state["swagger_dict"]["tags"] = []
app._state["swagger_dict"]["tags"].append(
{
"name": "wallet",
"description": "DID and tag policy management",
"externalDocs": {
"description": "Design",
"url": (
"https://github.com/hyperledger/indy-sdk/tree/"
"master/docs/design/003-wallet-storage"
),
},
}
)