Source code for aries_cloudagent.holder.routes

"""Holder admin routes."""

import json

from aiohttp import web
from aiohttp_apispec import (
    docs,
    match_info_schema,
    querystring_schema,
    request_schema,
    response_schema,
)

from marshmallow import fields

from ..admin.request_context import AdminRequestContext
from ..indy.holder import IndyHolder, IndyHolderError
from ..indy.models.cred_precis import IndyCredInfoSchema
from ..ledger.base import BaseLedger
from ..ledger.error import LedgerError
from ..messaging.models.openapi import OpenAPISchema
from ..messaging.valid import (
    ENDPOINT_EXAMPLE,
    ENDPOINT_VALIDATE,
    INDY_WQL_EXAMPLE,
    INDY_WQL_VALIDATE,
    NUM_STR_NATURAL_EXAMPLE,
    NUM_STR_NATURAL_VALIDATE,
    NUM_STR_WHOLE_EXAMPLE,
    NUM_STR_WHOLE_VALIDATE,
    UUID4_EXAMPLE,
)
from ..storage.error import StorageError, StorageNotFoundError
from ..storage.vc_holder.base import VCHolder
from ..storage.vc_holder.vc_record import VCRecordSchema
from ..wallet.error import WalletNotFoundError


[docs]class HolderModuleResponseSchema(OpenAPISchema): """Response schema for Holder Module."""
[docs]class AttributeMimeTypesResultSchema(OpenAPISchema): """Result schema for credential attribute MIME type.""" results = fields.Dict( keys=fields.Str(metadata={"description": "Attribute name"}), values=fields.Str(metadata={"description": "MIME type"}), allow_none=True, )
[docs]class CredInfoListSchema(OpenAPISchema): """Result schema for credential query.""" results = fields.List(fields.Nested(IndyCredInfoSchema()))
[docs]class CredentialsListQueryStringSchema(OpenAPISchema): """Parameters and validators for query string in credentials list query.""" start = fields.Str( required=False, validate=NUM_STR_WHOLE_VALIDATE, metadata={"description": "Start index", "example": NUM_STR_WHOLE_EXAMPLE}, ) count = fields.Str( required=False, validate=NUM_STR_NATURAL_VALIDATE, metadata={ "description": "Maximum number to retrieve", "example": NUM_STR_NATURAL_EXAMPLE, }, ) wql = fields.Str( required=False, validate=INDY_WQL_VALIDATE, metadata={"description": "(JSON) WQL query", "example": INDY_WQL_EXAMPLE}, )
[docs]class W3CCredentialsListRequestSchema(OpenAPISchema): """Parameters and validators for W3C credentials request.""" contexts = fields.List( fields.Str( validate=ENDPOINT_VALIDATE, metadata={ "description": "Credential context to match", "example": ENDPOINT_EXAMPLE, }, ), required=False, ) types = fields.List( fields.Str( validate=ENDPOINT_VALIDATE, metadata={ "description": "Credential type to match", "example": ENDPOINT_EXAMPLE, }, ), required=False, ) schema_ids = fields.List( fields.Str( validate=ENDPOINT_VALIDATE, metadata={ "description": "Credential schema identifier", "example": ENDPOINT_EXAMPLE, }, ), required=False, metadata={"description": "Schema identifiers, all of which to match"}, ) issuer_id = fields.Str( required=False, metadata={"description": "Credential issuer identifier to match"}, ) subject_ids = fields.List( fields.Str(metadata={"description": "Subject identifier"}), required=False, metadata={"description": "Subject identifiers, all of which to match"}, ) proof_types = fields.List( fields.Str( metadata={ "description": "Signature suite used for proof", "example": "Ed25519Signature2018", } ) ) given_id = fields.Str( required=False, metadata={"description": "Given credential id to match"} ) tag_query = fields.Dict( keys=fields.Str(metadata={"description": "Tag name"}), values=fields.Str(metadata={"description": "Tag value"}), required=False, metadata={"description": "Tag filter"}, ) max_results = fields.Int( required=False, metadata={"strict": True, "description": "Maximum number of results to return"}, )
[docs]class VCRecordListSchema(OpenAPISchema): """Result schema for W3C credential query.""" results = fields.List(fields.Nested(VCRecordSchema()))
[docs]class HolderCredIdMatchInfoSchema(OpenAPISchema): """Path parameters and validators for request taking credential id.""" credential_id = fields.Str( required=True, metadata={"description": "Credential identifier", "example": UUID4_EXAMPLE}, )
[docs]class CredRevokedQueryStringSchema(OpenAPISchema): """Path parameters and validators for request seeking cred revocation status.""" fro = fields.Str( data_key="from", required=False, validate=NUM_STR_WHOLE_VALIDATE, metadata={ "description": "Earliest epoch of revocation status interval of interest", "example": NUM_STR_WHOLE_EXAMPLE, }, ) to = fields.Str( required=False, validate=NUM_STR_WHOLE_VALIDATE, metadata={ "description": "Latest epoch of revocation status interval of interest", "example": NUM_STR_WHOLE_EXAMPLE, }, )
[docs]class CredRevokedResultSchema(OpenAPISchema): """Result schema for credential revoked request.""" revoked = fields.Bool( metadata={"description": "Whether credential is revoked on the ledger"} )
@docs(tags=["credentials"], summary="Fetch credential from wallet by id") @match_info_schema(HolderCredIdMatchInfoSchema()) @response_schema(IndyCredInfoSchema(), 200, description="") async def credentials_get(request: web.BaseRequest): """Request handler for retrieving credential. Args: request: aiohttp request object Returns: The credential info """ context: AdminRequestContext = request["context"] credential_id = request.match_info["credential_id"] holder = context.profile.inject(IndyHolder) try: credential = await holder.get_credential(credential_id) except WalletNotFoundError as err: raise web.HTTPNotFound(reason=err.roll_up) from err credential_json = json.loads(credential) return web.json_response(credential_json) @docs(tags=["credentials"], summary="Query credential revocation status by id") @match_info_schema(HolderCredIdMatchInfoSchema()) @querystring_schema(CredRevokedQueryStringSchema()) @response_schema(CredRevokedResultSchema(), 200, description="") async def credentials_revoked(request: web.BaseRequest): """Request handler for querying revocation status of credential. Args: request: aiohttp request object Returns: Empty production """ context: AdminRequestContext = request["context"] credential_id = request.match_info["credential_id"] fro = request.query.get("from") to = request.query.get("to") async with context.profile.session() as session: ledger = session.inject_or(BaseLedger) if not ledger: reason = "No ledger available" if not context.settings.get_value("wallet.type"): reason += ": missing wallet-type?" raise web.HTTPForbidden(reason=reason) async with ledger: try: holder = session.inject(IndyHolder) revoked = await holder.credential_revoked( ledger, credential_id, int(fro) if fro else None, int(to) if to else None, ) except WalletNotFoundError as err: raise web.HTTPNotFound(reason=err.roll_up) from err except LedgerError as err: raise web.HTTPBadRequest(reason=err.roll_up) from err return web.json_response({"revoked": revoked}) @docs(tags=["credentials"], summary="Get attribute MIME types from wallet") @match_info_schema(HolderCredIdMatchInfoSchema()) @response_schema(AttributeMimeTypesResultSchema(), 200, description="") async def credentials_attr_mime_types_get(request: web.BaseRequest): """Request handler for getting credential attribute MIME types. Args: request: aiohttp request object Returns: The MIME types response """ context: AdminRequestContext = request["context"] credential_id = request.match_info["credential_id"] async with context.profile.session() as session: holder = session.inject(IndyHolder) mime_types = await holder.get_mime_type(credential_id) return web.json_response({"results": mime_types}) @docs(tags=["credentials"], summary="Remove credential from wallet by id") @match_info_schema(HolderCredIdMatchInfoSchema()) @response_schema(HolderModuleResponseSchema(), description="") async def credentials_remove(request: web.BaseRequest): """Request handler for searching connection records. Args: request: aiohttp request object Returns: Empty production """ context: AdminRequestContext = request["context"] credential_id = request.match_info["credential_id"] try: async with context.profile.session() as session: holder = session.inject(IndyHolder) await holder.delete_credential(credential_id) topic = "acapy::record::credential::delete" await context.profile.notify(topic, {"id": credential_id, "state": "deleted"}) except WalletNotFoundError as err: raise web.HTTPNotFound(reason=err.roll_up) from err return web.json_response({}) @docs( tags=["credentials"], summary="Fetch credentials from wallet", ) @querystring_schema(CredentialsListQueryStringSchema()) @response_schema(CredInfoListSchema(), 200, description="") async def credentials_list(request: web.BaseRequest): """Request handler for searching credential records. Args: request: aiohttp request object Returns: The credential info list response """ context: AdminRequestContext = request["context"] start = request.query.get("start") count = request.query.get("count") # url encoded json wql encoded_wql = request.query.get("wql") or "{}" wql = json.loads(encoded_wql) # defaults start = int(start) if isinstance(start, str) else 0 count = int(count) if isinstance(count, str) else 10 async with context.profile.session() as session: holder = session.inject(IndyHolder) try: credentials = await holder.get_credentials(start, count, wql) except IndyHolderError as err: raise web.HTTPBadRequest(reason=err.roll_up) from err return web.json_response({"results": credentials}) @docs( tags=["credentials"], summary="Fetch W3C credential from wallet by id", ) @match_info_schema(HolderCredIdMatchInfoSchema()) @response_schema(VCRecordSchema(), 200, description="") async def w3c_cred_get(request: web.BaseRequest): """Request handler for retrieving W3C credential. Args: request: aiohttp request object Returns: Verifiable credential record """ context: AdminRequestContext = request["context"] credential_id = request.match_info["credential_id"] async with context.profile.session() as session: holder = session.inject(VCHolder) try: vc_record = await holder.retrieve_credential_by_id(credential_id) except StorageNotFoundError as err: raise web.HTTPNotFound(reason=err.roll_up) from err except StorageError as err: raise web.HTTPBadRequest(reason=err.roll_up) from err return web.json_response(vc_record.serialize()) @docs( tags=["credentials"], summary="Remove W3C credential from wallet by id", ) @match_info_schema(HolderCredIdMatchInfoSchema()) @response_schema(HolderModuleResponseSchema(), 200, description="") async def w3c_cred_remove(request: web.BaseRequest): """Request handler for deleting W3C credential. Args: request: aiohttp request object Returns: Empty production """ context: AdminRequestContext = request["context"] credential_id = request.match_info["credential_id"] async with context.profile.session() as session: holder = session.inject(VCHolder) try: vc_record = await holder.retrieve_credential_by_id(credential_id) await holder.delete_credential(vc_record) topic = "acapy::record::w3c_credential::delete" await session.profile.notify( topic, {"id": credential_id, "state": "deleted"} ) except StorageNotFoundError as err: raise web.HTTPNotFound(reason=err.roll_up) from err except StorageError as err: raise web.HTTPBadRequest(reason=err.roll_up) from err return web.json_response({}) @docs( tags=["credentials"], summary="Fetch W3C credentials from wallet", ) @request_schema(W3CCredentialsListRequestSchema()) @querystring_schema(CredentialsListQueryStringSchema()) @response_schema(VCRecordListSchema(), 200, description="") async def w3c_creds_list(request: web.BaseRequest): """Request handler for searching W3C credential records. Args: request: aiohttp request object Returns: The credential record list response """ context: AdminRequestContext = request["context"] body = await request.json() contexts = body.get("contexts") types = body.get("types") schema_ids = body.get("schema_ids") issuer_id = body.get("issuer_id") subject_ids = body.get("subject_ids") proof_types = body.get("proof_types") given_id = body.get("given_id") tag_query = body.get("tag_query") max_results = body.get("max_results") async with context.profile.session() as session: holder = session.inject(VCHolder) try: search = holder.search_credentials( contexts=contexts, types=types, schema_ids=schema_ids, issuer_id=issuer_id, subject_ids=subject_ids, proof_types=proof_types, given_id=given_id, tag_query=tag_query, ) records = await search.fetch(max_results) except StorageNotFoundError as err: raise web.HTTPNotFound(reason=err.roll_up) from err except StorageError as err: raise web.HTTPBadRequest(reason=err.roll_up) from err return web.json_response({"results": [record.serialize() for record in records]})
[docs]async def register(app: web.Application): """Register routes.""" app.add_routes( [ web.get("/credential/{credential_id}", credentials_get, allow_head=False), web.get( "/credential/revoked/{credential_id}", credentials_revoked, allow_head=False, ), web.get( "/credential/mime-types/{credential_id}", credentials_attr_mime_types_get, allow_head=False, ), web.delete("/credential/{credential_id}", credentials_remove), web.get("/credentials", credentials_list, allow_head=False), web.get( "/credential/w3c/{credential_id}", w3c_cred_get, allow_head=False, ), web.delete("/credential/w3c/{credential_id}", w3c_cred_remove), web.post("/credentials/w3c", w3c_creds_list), ] )
[docs]def post_process_routes(app: web.Application): """Amend swagger API.""" # Add top-level tags description if "tags" not in app._state["swagger_dict"]: app._state["swagger_dict"]["tags"] = [] app._state["swagger_dict"]["tags"].append( { "name": "credentials", "description": "Holder credential management", "externalDocs": { "description": "Overview", "url": "https://w3c.github.io/vc-data-model/#credentials", }, } )