"""Holder admin routes."""
import json
from aiohttp import web
from aiohttp_apispec import docs, match_info_schema, querystring_schema, response_schema
from marshmallow import fields
from ..admin.request_context import AdminRequestContext
from ..indy.holder import IndyHolder, IndyHolderError
from ..ledger.base import BaseLedger
from ..ledger.error import LedgerError
from ..messaging.models.openapi import OpenAPISchema
from ..messaging.valid import (
INDY_CRED_DEF_ID,
INDY_CRED_REV_ID,
INDY_REV_REG_ID,
INDY_SCHEMA_ID,
INDY_WQL,
NUM_STR_NATURAL,
NUM_STR_WHOLE,
UUIDFour,
)
from ..wallet.error import WalletNotFoundError
[docs]class HolderModuleResponseSchema(OpenAPISchema):
"""Response schema for Holder Module."""
[docs]class AttributeMimeTypesResultSchema(OpenAPISchema):
"""Result schema for credential attribute MIME type."""
[docs]class CredBriefSchema(OpenAPISchema):
"""Result schema with credential brief for a credential query."""
referent = fields.Str(description="Credential referent", example=UUIDFour.EXAMPLE)
attrs = fields.Dict(
keys=fields.Str(description="Attribute name"),
values=fields.Str(description="Attribute value"),
description="Attribute names mapped to their raw values",
)
schema_id = fields.Str(description="Schema identifier", **INDY_SCHEMA_ID)
cred_def_id = fields.Str(
description="Credential definition identifier", **INDY_CRED_DEF_ID
)
rev_reg_id = fields.Str(
description="Revocation registry identifier", **INDY_REV_REG_ID
)
cred_rev_id = fields.Str(
description="Credential revocation identifier", **INDY_CRED_REV_ID
)
[docs]class CredBriefListSchema(OpenAPISchema):
"""Result schema for a credential query."""
results = fields.List(fields.Nested(CredBriefSchema()))
[docs]class CredentialsListQueryStringSchema(OpenAPISchema):
"""Parameters and validators for query string in credentials list query."""
start = fields.Str(
description="Start index",
required=False,
**NUM_STR_WHOLE,
)
count = fields.Str(
description="Maximum number to retrieve",
required=False,
**NUM_STR_NATURAL,
)
wql = fields.Str(
description="(JSON) WQL query",
required=False,
**INDY_WQL,
)
[docs]class CredIdMatchInfoSchema(OpenAPISchema):
"""Path parameters and validators for request taking credential id."""
credential_id = fields.Str(
description="Credential identifier", required=True, example=UUIDFour.EXAMPLE
)
[docs]class CredRevokedQueryStringSchema(OpenAPISchema):
"""Path parameters and validators for request seeking cred revocation status."""
fro = fields.Str(
data_key="from",
description="Earliest epoch of revocation status interval of interest",
required=False,
**NUM_STR_WHOLE,
)
to = fields.Str(
description="Latest epoch of revocation status interval of interest",
required=False,
**NUM_STR_WHOLE,
)
[docs]class CredRevokedResultSchema(OpenAPISchema):
"""Result schema for credential revoked request."""
revoked = fields.Bool(description="Whether credential is revoked on the ledger")
[docs]@docs(tags=["credentials"], summary="Fetch a credential from wallet by id")
@match_info_schema(CredIdMatchInfoSchema())
@response_schema(CredBriefSchema(), 200, description="")
async def credentials_get(request: web.BaseRequest):
"""
Request handler for retrieving a credential.
Args:
request: aiohttp request object
Returns:
The credential response
"""
context: AdminRequestContext = request["context"]
credential_id = request.match_info["credential_id"]
session = await context.session()
holder = session.inject(IndyHolder)
try:
credential = await holder.get_credential(credential_id)
except WalletNotFoundError as err:
raise web.HTTPNotFound(reason=err.roll_up) from err
credential_json = json.loads(credential)
return web.json_response(credential_json)
[docs]@docs(tags=["credentials"], summary="Query credential revocation status by id")
@match_info_schema(CredIdMatchInfoSchema())
@querystring_schema(CredRevokedQueryStringSchema())
@response_schema(CredRevokedResultSchema(), 200, description="")
async def credentials_revoked(request: web.BaseRequest):
"""
Request handler for querying revocation status of credential.
Args:
request: aiohttp request object
Returns:
The credential response
"""
context: AdminRequestContext = request["context"]
session = await context.session()
credential_id = request.match_info["credential_id"]
fro = request.query.get("from")
to = request.query.get("to")
ledger = session.inject(BaseLedger, required=False)
if not ledger:
reason = "No ledger available"
if not context.settings.get_value("wallet.type"):
reason += ": missing wallet-type?"
raise web.HTTPForbidden(reason=reason)
async with ledger:
try:
holder = session.inject(IndyHolder)
revoked = await holder.credential_revoked(
ledger,
credential_id,
int(fro) if fro else None,
int(to) if to else None,
)
except WalletNotFoundError as err:
raise web.HTTPNotFound(reason=err.roll_up) from err
except LedgerError as err:
raise web.HTTPBadRequest(reason=err.roll_up) from err
return web.json_response({"revoked": revoked})
[docs]@docs(tags=["credentials"], summary="Get attribute MIME types from wallet")
@match_info_schema(CredIdMatchInfoSchema())
@response_schema(AttributeMimeTypesResultSchema(), 200, description="")
async def credentials_attr_mime_types_get(request: web.BaseRequest):
"""
Request handler for getting credential attribute MIME types.
Args:
request: aiohttp request object
Returns:
The MIME types response
"""
context: AdminRequestContext = request["context"]
session = await context.session()
credential_id = request.match_info["credential_id"]
holder = session.inject(IndyHolder)
return web.json_response(await holder.get_mime_type(credential_id))
[docs]@docs(tags=["credentials"], summary="Remove a credential from the wallet by id")
@match_info_schema(CredIdMatchInfoSchema())
@response_schema(HolderModuleResponseSchema(), description="")
async def credentials_remove(request: web.BaseRequest):
"""
Request handler for searching connection records.
Args:
request: aiohttp request object
Returns:
The connection list response
"""
context: AdminRequestContext = request["context"]
credential_id = request.match_info["credential_id"]
session = await context.session()
holder = session.inject(IndyHolder)
try:
await holder.delete_credential(credential_id)
except WalletNotFoundError as err:
raise web.HTTPNotFound(reason=err.roll_up) from err
return web.json_response({})
[docs]@docs(
tags=["credentials"],
summary="Fetch credentials from wallet",
)
@querystring_schema(CredentialsListQueryStringSchema())
@response_schema(CredBriefListSchema(), 200, description="")
async def credentials_list(request: web.BaseRequest):
"""
Request handler for searching credential records.
Args:
request: aiohttp request object
Returns:
The credential list response
"""
context: AdminRequestContext = request["context"]
session = await context.session()
start = request.query.get("start")
count = request.query.get("count")
# url encoded json wql
encoded_wql = request.query.get("wql") or "{}"
wql = json.loads(encoded_wql)
# defaults
start = int(start) if isinstance(start, str) else 0
count = int(count) if isinstance(count, str) else 10
holder = session.inject(IndyHolder)
try:
credentials = await holder.get_credentials(start, count, wql)
except IndyHolderError as err:
raise web.HTTPBadRequest(reason=err.roll_up) from err
return web.json_response({"results": credentials})
[docs]async def register(app: web.Application):
"""Register routes."""
app.add_routes(
[
web.get("/credential/{credential_id}", credentials_get, allow_head=False),
web.get(
"/credential/revoked/{credential_id}",
credentials_revoked,
allow_head=False,
),
web.get(
"/credential/mime-types/{credential_id}",
credentials_attr_mime_types_get,
allow_head=False,
),
web.delete("/credential/{credential_id}", credentials_remove),
web.get("/credentials", credentials_list, allow_head=False),
]
)
[docs]def post_process_routes(app: web.Application):
"""Amend swagger API."""
# Add top-level tags description
if "tags" not in app._state["swagger_dict"]:
app._state["swagger_dict"]["tags"] = []
app._state["swagger_dict"]["tags"].append(
{
"name": "credentials",
"description": "Holder credential management",
"externalDocs": {
"description": "Overview",
"url": ("https://w3c.github.io/vc-data-model/#credentials"),
},
}
)