Source code for aries_cloudagent.holder.routes

"""Holder admin routes."""

import json

from aiohttp import web
from aiohttp_apispec import (
    docs,
    match_info_schema,
    querystring_schema,
    request_schema,
    response_schema,
)
from marshmallow import fields

from ..admin.request_context import AdminRequestContext
from ..indy.holder import IndyHolder, IndyHolderError
from ..indy.models.cred_precis import IndyCredInfoSchema
from ..ledger.base import BaseLedger
from ..ledger.error import LedgerError
from ..messaging.models.openapi import OpenAPISchema
from ..messaging.valid import (
    ENDPOINT,
    INDY_WQL,
    NUM_STR_NATURAL,
    NUM_STR_WHOLE,
    UUIDFour,
)
from ..storage.error import StorageError, StorageNotFoundError
from ..storage.vc_holder.base import VCHolder
from ..storage.vc_holder.vc_record import VCRecordSchema
from ..wallet.error import WalletNotFoundError


[docs]class HolderModuleResponseSchema(OpenAPISchema): """Response schema for Holder Module."""
[docs]class AttributeMimeTypesResultSchema(OpenAPISchema): """Result schema for credential attribute MIME type.""" results = fields.Dict( keys=fields.Str(description="Attribute name"), values=fields.Str(description="MIME type"), allow_none=True, )
[docs]class CredInfoListSchema(OpenAPISchema): """Result schema for credential query.""" results = fields.List(fields.Nested(IndyCredInfoSchema()))
[docs]class CredentialsListQueryStringSchema(OpenAPISchema): """Parameters and validators for query string in credentials list query.""" start = fields.Str( description="Start index", required=False, **NUM_STR_WHOLE, ) count = fields.Str( description="Maximum number to retrieve", required=False, **NUM_STR_NATURAL, ) wql = fields.Str( description="(JSON) WQL query", required=False, **INDY_WQL, )
[docs]class W3CCredentialsListRequestSchema(OpenAPISchema): """Parameters and validators for W3C credentials request.""" contexts = fields.List( fields.Str( description="Credential context to match", **ENDPOINT, ), required=False, ) types = fields.List( fields.Str( description="Credential type to match", **ENDPOINT, ), required=False, ) schema_ids = fields.List( fields.Str( description="Credential schema identifier", **ENDPOINT, ), description="Schema identifiers, all of which to match", required=False, ) issuer_id = fields.Str( required=False, description="Credential issuer identifier to match", ) subject_ids = fields.List( fields.Str(description="Subject identifier"), description="Subject identifiers, all of which to match", required=False, ) proof_types = fields.List( fields.Str( description="Signature suite used for proof", example="Ed25519Signature2018" ) ) given_id = fields.Str(required=False, description="Given credential id to match") tag_query = fields.Dict( keys=fields.Str(description="Tag name"), values=fields.Str(description="Tag value"), required=False, description="Tag filter", ) max_results = fields.Int( strict=True, description="Maximum number of results to return", required=False )
[docs]class VCRecordListSchema(OpenAPISchema): """Result schema for W3C credential query.""" results = fields.List(fields.Nested(VCRecordSchema()))
[docs]class HolderCredIdMatchInfoSchema(OpenAPISchema): """Path parameters and validators for request taking credential id.""" credential_id = fields.Str( description="Credential identifier", required=True, example=UUIDFour.EXAMPLE )
[docs]class CredRevokedQueryStringSchema(OpenAPISchema): """Path parameters and validators for request seeking cred revocation status.""" fro = fields.Str( data_key="from", description="Earliest epoch of revocation status interval of interest", required=False, **NUM_STR_WHOLE, ) to = fields.Str( description="Latest epoch of revocation status interval of interest", required=False, **NUM_STR_WHOLE, )
[docs]class CredRevokedResultSchema(OpenAPISchema): """Result schema for credential revoked request.""" revoked = fields.Bool(description="Whether credential is revoked on the ledger")
@docs(tags=["credentials"], summary="Fetch credential from wallet by id") @match_info_schema(HolderCredIdMatchInfoSchema()) @response_schema(IndyCredInfoSchema(), 200, description="") async def credentials_get(request: web.BaseRequest): """ Request handler for retrieving credential. Args: request: aiohttp request object Returns: The credential info """ context: AdminRequestContext = request["context"] credential_id = request.match_info["credential_id"] holder = context.profile.inject(IndyHolder) try: credential = await holder.get_credential(credential_id) except WalletNotFoundError as err: raise web.HTTPNotFound(reason=err.roll_up) from err credential_json = json.loads(credential) return web.json_response(credential_json) @docs(tags=["credentials"], summary="Query credential revocation status by id") @match_info_schema(HolderCredIdMatchInfoSchema()) @querystring_schema(CredRevokedQueryStringSchema()) @response_schema(CredRevokedResultSchema(), 200, description="") async def credentials_revoked(request: web.BaseRequest): """ Request handler for querying revocation status of credential. Args: request: aiohttp request object Returns: Empty production """ context: AdminRequestContext = request["context"] credential_id = request.match_info["credential_id"] fro = request.query.get("from") to = request.query.get("to") async with context.profile.session() as session: ledger = session.inject_or(BaseLedger) if not ledger: reason = "No ledger available" if not context.settings.get_value("wallet.type"): reason += ": missing wallet-type?" raise web.HTTPForbidden(reason=reason) async with ledger: try: holder = session.inject(IndyHolder) revoked = await holder.credential_revoked( ledger, credential_id, int(fro) if fro else None, int(to) if to else None, ) except WalletNotFoundError as err: raise web.HTTPNotFound(reason=err.roll_up) from err except LedgerError as err: raise web.HTTPBadRequest(reason=err.roll_up) from err return web.json_response({"revoked": revoked}) @docs(tags=["credentials"], summary="Get attribute MIME types from wallet") @match_info_schema(HolderCredIdMatchInfoSchema()) @response_schema(AttributeMimeTypesResultSchema(), 200, description="") async def credentials_attr_mime_types_get(request: web.BaseRequest): """ Request handler for getting credential attribute MIME types. Args: request: aiohttp request object Returns: The MIME types response """ context: AdminRequestContext = request["context"] credential_id = request.match_info["credential_id"] async with context.profile.session() as session: holder = session.inject(IndyHolder) mime_types = await holder.get_mime_type(credential_id) return web.json_response({"results": mime_types}) @docs(tags=["credentials"], summary="Remove credential from wallet by id") @match_info_schema(HolderCredIdMatchInfoSchema()) @response_schema(HolderModuleResponseSchema(), description="") async def credentials_remove(request: web.BaseRequest): """ Request handler for searching connection records. Args: request: aiohttp request object Returns: Empty production """ context: AdminRequestContext = request["context"] credential_id = request.match_info["credential_id"] try: async with context.profile.session() as session: holder = session.inject(IndyHolder) await holder.delete_credential(credential_id) topic = "acapy::record::credential::delete" await context.profile.notify(topic, {"id": credential_id, "state": "deleted"}) except WalletNotFoundError as err: raise web.HTTPNotFound(reason=err.roll_up) from err return web.json_response({}) @docs( tags=["credentials"], summary="Fetch credentials from wallet", ) @querystring_schema(CredentialsListQueryStringSchema()) @response_schema(CredInfoListSchema(), 200, description="") async def credentials_list(request: web.BaseRequest): """ Request handler for searching credential records. Args: request: aiohttp request object Returns: The credential info list response """ context: AdminRequestContext = request["context"] start = request.query.get("start") count = request.query.get("count") # url encoded json wql encoded_wql = request.query.get("wql") or "{}" wql = json.loads(encoded_wql) # defaults start = int(start) if isinstance(start, str) else 0 count = int(count) if isinstance(count, str) else 10 async with context.profile.session() as session: holder = session.inject(IndyHolder) try: credentials = await holder.get_credentials(start, count, wql) except IndyHolderError as err: raise web.HTTPBadRequest(reason=err.roll_up) from err return web.json_response({"results": credentials}) @docs( tags=["credentials"], summary="Fetch W3C credential from wallet by id", ) @match_info_schema(HolderCredIdMatchInfoSchema()) @response_schema(VCRecordSchema(), 200, description="") async def w3c_cred_get(request: web.BaseRequest): """ Request handler for retrieving W3C credential. Args: request: aiohttp request object Returns: Verifiable credential record """ context: AdminRequestContext = request["context"] credential_id = request.match_info["credential_id"] async with context.profile.session() as session: holder = session.inject(VCHolder) try: vc_record = await holder.retrieve_credential_by_id(credential_id) except StorageNotFoundError as err: raise web.HTTPNotFound(reason=err.roll_up) from err except StorageError as err: raise web.HTTPBadRequest(reason=err.roll_up) from err return web.json_response(vc_record.serialize()) @docs( tags=["credentials"], summary="Remove W3C credential from wallet by id", ) @match_info_schema(HolderCredIdMatchInfoSchema()) @response_schema(HolderModuleResponseSchema(), 200, description="") async def w3c_cred_remove(request: web.BaseRequest): """ Request handler for deleting W3C credential. Args: request: aiohttp request object Returns: Empty production """ context: AdminRequestContext = request["context"] credential_id = request.match_info["credential_id"] async with context.profile.session() as session: holder = session.inject(VCHolder) try: vc_record = await holder.retrieve_credential_by_id(credential_id) await holder.delete_credential(vc_record) topic = "acapy::record::w3c_credential::delete" await session.profile.notify( topic, {"id": credential_id, "state": "deleted"} ) except StorageNotFoundError as err: raise web.HTTPNotFound(reason=err.roll_up) from err except StorageError as err: raise web.HTTPBadRequest(reason=err.roll_up) from err return web.json_response({}) @docs( tags=["credentials"], summary="Fetch W3C credentials from wallet", ) @request_schema(W3CCredentialsListRequestSchema()) @querystring_schema(CredentialsListQueryStringSchema()) @response_schema(VCRecordListSchema(), 200, description="") async def w3c_creds_list(request: web.BaseRequest): """ Request handler for searching W3C credential records. Args: request: aiohttp request object Returns: The credential record list response """ context: AdminRequestContext = request["context"] body = await request.json() contexts = body.get("contexts") types = body.get("types") schema_ids = body.get("schema_ids") issuer_id = body.get("issuer_id") subject_ids = body.get("subject_ids") proof_types = body.get("proof_types") given_id = body.get("given_id") tag_query = body.get("tag_query") max_results = body.get("max_results") async with context.profile.session() as session: holder = session.inject(VCHolder) try: search = holder.search_credentials( contexts=contexts, types=types, schema_ids=schema_ids, issuer_id=issuer_id, subject_ids=subject_ids, proof_types=proof_types, given_id=given_id, tag_query=tag_query, ) records = await search.fetch(max_results) except StorageNotFoundError as err: raise web.HTTPNotFound(reason=err.roll_up) from err except StorageError as err: raise web.HTTPBadRequest(reason=err.roll_up) from err return web.json_response({"results": [record.serialize() for record in records]})
[docs]async def register(app: web.Application): """Register routes.""" app.add_routes( [ web.get("/credential/{credential_id}", credentials_get, allow_head=False), web.get( "/credential/revoked/{credential_id}", credentials_revoked, allow_head=False, ), web.get( "/credential/mime-types/{credential_id}", credentials_attr_mime_types_get, allow_head=False, ), web.delete("/credential/{credential_id}", credentials_remove), web.get("/credentials", credentials_list, allow_head=False), web.get( "/credential/w3c/{credential_id}", w3c_cred_get, allow_head=False, ), web.delete("/credential/w3c/{credential_id}", w3c_cred_remove), web.post("/credentials/w3c", w3c_creds_list), ] )
[docs]def post_process_routes(app: web.Application): """Amend swagger API.""" # Add top-level tags description if "tags" not in app._state["swagger_dict"]: app._state["swagger_dict"]["tags"] = [] app._state["swagger_dict"]["tags"].append( { "name": "credentials", "description": "Holder credential management", "externalDocs": { "description": "Overview", "url": ("https://w3c.github.io/vc-data-model/#credentials"), }, } )