"""Credential definition admin routes."""
from asyncio import ensure_future, shield
from aiohttp import web
from aiohttp_apispec import (
docs,
match_info_schema,
querystring_schema,
request_schema,
response_schema,
)
from marshmallow import fields
from ...admin.request_context import AdminRequestContext
from ...indy.issuer import IndyIssuer
from ...ledger.base import BaseLedger
from ...storage.base import BaseStorage
from ...tails.base import BaseTailsServer
from ..models.openapi import OpenAPISchema
from ..valid import INDY_CRED_DEF_ID, INDY_REV_REG_SIZE, INDY_SCHEMA_ID, INDY_VERSION
from ...revocation.error import RevocationError, RevocationNotSupportedError
from ...revocation.indy import IndyRevocation
from ...ledger.error import LedgerError
from .util import CredDefQueryStringSchema, CRED_DEF_TAGS, CRED_DEF_SENT_RECORD_TYPE
[docs]class CredentialDefinitionSendRequestSchema(OpenAPISchema):
"""Request schema for schema send request."""
schema_id = fields.Str(description="Schema identifier", **INDY_SCHEMA_ID)
support_revocation = fields.Boolean(
required=False, description="Revocation supported flag"
)
revocation_registry_size = fields.Int(
description="Revocation registry size",
required=False,
strict=True,
**INDY_REV_REG_SIZE,
)
tag = fields.Str(
required=False,
description="Credential definition identifier tag",
default="default",
example="default",
)
[docs]class CredentialDefinitionSendResultsSchema(OpenAPISchema):
"""Results schema for schema send request."""
credential_definition_id = fields.Str(
description="Credential definition identifier", **INDY_CRED_DEF_ID
)
[docs]class CredentialDefinitionSchema(OpenAPISchema):
"""Credential definition schema."""
ver = fields.Str(description="Node protocol version", **INDY_VERSION)
ident = fields.Str(
description="Credential definition identifier",
data_key="id",
**INDY_CRED_DEF_ID,
)
schemaId = fields.Str(
description="Schema identifier within credential definition identifier",
example=":".join(INDY_CRED_DEF_ID["example"].split(":")[3:-1]), # long or short
)
typ = fields.Constant(
constant="CL",
description="Signature type: CL for Camenisch-Lysyanskaya",
data_key="type",
example="CL",
)
tag = fields.Str(
description="Tag within credential definition identifier",
example=INDY_CRED_DEF_ID["example"].split(":")[-1],
)
value = fields.Dict(
description="Credential definition primary and revocation values"
)
[docs]class CredentialDefinitionGetResultsSchema(OpenAPISchema):
"""Results schema for schema get request."""
credential_definition = fields.Nested(CredentialDefinitionSchema)
[docs]class CredentialDefinitionsCreatedResultsSchema(OpenAPISchema):
"""Results schema for cred-defs-created request."""
credential_definition_ids = fields.List(
fields.Str(description="Credential definition identifiers", **INDY_CRED_DEF_ID)
)
[docs]class CredDefIdMatchInfoSchema(OpenAPISchema):
"""Path parameters and validators for request taking cred def id."""
cred_def_id = fields.Str(
description="Credential definition identifier",
required=True,
**INDY_CRED_DEF_ID,
)
[docs]@docs(
tags=["credential-definition"],
summary="Sends a credential definition to the ledger",
)
@request_schema(CredentialDefinitionSendRequestSchema())
@response_schema(CredentialDefinitionSendResultsSchema(), 200, description="")
async def credential_definitions_send_credential_definition(request: web.BaseRequest):
"""
Request handler for sending a credential definition to the ledger.
Args:
request: aiohttp request object
Returns:
The credential definition identifier
"""
context: AdminRequestContext = request["context"]
body = await request.json()
schema_id = body.get("schema_id")
support_revocation = bool(body.get("support_revocation"))
tag = body.get("tag")
rev_reg_size = body.get("revocation_registry_size")
ledger = context.inject(BaseLedger, required=False)
if not ledger:
reason = "No ledger available"
if not context.settings.get_value("wallet.type"):
reason += ": missing wallet-type?"
raise web.HTTPForbidden(reason=reason)
issuer = context.inject(IndyIssuer)
try: # even if in wallet, send it and raise if erroneously so
async with ledger:
(cred_def_id, cred_def, novel) = await shield(
ledger.create_and_send_credential_definition(
issuer,
schema_id,
signature_type=None,
tag=tag,
support_revocation=support_revocation,
)
)
except LedgerError as e:
raise web.HTTPBadRequest(reason=e.message) from e
# If revocation is requested and cred def is novel, create revocation registry
if support_revocation and novel:
profile = context.profile
tails_base_url = profile.settings.get("tails_server_base_url")
if not tails_base_url:
raise web.HTTPBadRequest(reason="tails_server_base_url not configured")
try:
# Create registry
revoc = IndyRevocation(profile)
registry_record = await revoc.init_issuer_registry(
cred_def_id,
max_cred_num=rev_reg_size,
)
except RevocationNotSupportedError as e:
raise web.HTTPBadRequest(reason=e.message) from e
await shield(registry_record.generate_registry(profile))
try:
await registry_record.set_tails_file_public_uri(
profile, f"{tails_base_url}/{registry_record.revoc_reg_id}"
)
await registry_record.send_def(profile)
await registry_record.send_entry(profile)
# stage pending registry independent of whether tails server is OK
pending_registry_record = await revoc.init_issuer_registry(
registry_record.cred_def_id,
max_cred_num=registry_record.max_cred_num,
)
ensure_future(
pending_registry_record.stage_pending_registry(profile, max_attempts=16)
)
tails_server = profile.inject(BaseTailsServer)
(upload_success, reason) = await tails_server.upload_tails_file(
profile,
registry_record.revoc_reg_id,
registry_record.tails_local_path,
interval=0.8,
backoff=-0.5,
max_attempts=5, # heuristic: respect HTTP timeout
)
if not upload_success:
raise web.HTTPInternalServerError(
reason=(
f"Tails file for rev reg {registry_record.revoc_reg_id} "
f"failed to upload: {reason}"
)
)
except RevocationError as e:
raise web.HTTPBadRequest(reason=e.message) from e
return web.json_response({"credential_definition_id": cred_def_id})
[docs]@docs(
tags=["credential-definition"],
summary="Search for matching credential definitions that agent originated",
)
@querystring_schema(CredDefQueryStringSchema())
@response_schema(CredentialDefinitionsCreatedResultsSchema(), 200, description="")
async def credential_definitions_created(request: web.BaseRequest):
"""
Request handler for retrieving credential definitions that current agent created.
Args:
request: aiohttp request object
Returns:
The identifiers of matching credential definitions.
"""
context: AdminRequestContext = request["context"]
session = await context.session()
storage = session.inject(BaseStorage)
found = await storage.find_all_records(
type_filter=CRED_DEF_SENT_RECORD_TYPE,
tag_query={
tag: request.query[tag] for tag in CRED_DEF_TAGS if tag in request.query
},
)
return web.json_response(
{"credential_definition_ids": [record.value for record in found]}
)
[docs]@docs(
tags=["credential-definition"],
summary="Gets a credential definition from the ledger",
)
@match_info_schema(CredDefIdMatchInfoSchema())
@response_schema(CredentialDefinitionGetResultsSchema(), 200, description="")
async def credential_definitions_get_credential_definition(request: web.BaseRequest):
"""
Request handler for getting a credential definition from the ledger.
Args:
request: aiohttp request object
Returns:
The credential definition details.
"""
context: AdminRequestContext = request["context"]
cred_def_id = request.match_info["cred_def_id"]
ledger = context.inject(BaseLedger, required=False)
if not ledger:
reason = "No ledger available"
if not context.settings.get_value("wallet.type"):
reason += ": missing wallet-type?"
raise web.HTTPForbidden(reason=reason)
async with ledger:
cred_def = await ledger.get_credential_definition(cred_def_id)
return web.json_response({"credential_definition": cred_def})
[docs]async def register(app: web.Application):
"""Register routes."""
app.add_routes(
[
web.post(
"/credential-definitions",
credential_definitions_send_credential_definition,
),
web.get(
"/credential-definitions/created",
credential_definitions_created,
allow_head=False,
),
web.get(
"/credential-definitions/{cred_def_id}",
credential_definitions_get_credential_definition,
allow_head=False,
),
]
)
[docs]def post_process_routes(app: web.Application):
"""Amend swagger API."""
# Add top-level tags description
if "tags" not in app._state["swagger_dict"]:
app._state["swagger_dict"]["tags"] = []
app._state["swagger_dict"]["tags"].append(
{
"name": "credential-definition",
"description": "Credential definition operations",
"externalDocs": {
"description": "Specification",
"url": (
"https://github.com/hyperledger/indy-node/blob/master/"
"design/anoncreds.md#cred_def"
),
},
}
)