Source code for aries_cloudagent.messaging.schemas.routes

"""Credential schema admin routes."""

import json

from asyncio import shield

from aiohttp import web
from aiohttp_apispec import (
    docs,
    match_info_schema,
    querystring_schema,
    request_schema,
    response_schema,
)

from marshmallow import fields
from marshmallow.validate import Regexp

from ...admin.request_context import AdminRequestContext
from ...indy.issuer import IndyIssuer, IndyIssuerError
from ...indy.models.schema import SchemaSchema
from ...ledger.base import BaseLedger
from ...ledger.error import LedgerError
from ...protocols.endorse_transaction.v1_0.manager import TransactionManager
from ...protocols.endorse_transaction.v1_0.models.transaction_record import (
    TransactionRecordSchema,
)
from ...storage.base import BaseStorage
from ...storage.error import StorageError

from ..models.openapi import OpenAPISchema
from ..valid import B58, INDY_SCHEMA_ID, INDY_VERSION

from .util import SchemaQueryStringSchema, SCHEMA_SENT_RECORD_TYPE, SCHEMA_TAGS


from ..valid import UUIDFour
from ...connections.models.conn_record import ConnRecord
from ...storage.error import StorageNotFoundError
from ..models.base import BaseModelError


[docs]class SchemaSendRequestSchema(OpenAPISchema): """Request schema for schema send request.""" schema_name = fields.Str( required=True, description="Schema name", example="prefs", ) schema_version = fields.Str( required=True, description="Schema version", **INDY_VERSION ) attributes = fields.List( fields.Str( description="attribute name", example="score", ), required=True, description="List of schema attributes", )
[docs]class SchemaSendResultSchema(OpenAPISchema): """Result schema content for schema send request with auto-endorse.""" schema_id = fields.Str( description="Schema identifier", required=True, **INDY_SCHEMA_ID ) schema = fields.Nested( SchemaSchema(), description="Schema definition", )
[docs]class TxnOrSchemaSendResultSchema(OpenAPISchema): """Result schema for schema send request.""" sent = fields.Nested( SchemaSendResultSchema(), required=False, description="Content sent", ) txn = fields.Nested( TransactionRecordSchema(), required=False, description="Schema transaction to endorse", )
[docs]class SchemaGetResultSchema(OpenAPISchema): """Result schema for schema get request.""" schema = fields.Nested(SchemaSchema())
[docs]class SchemasCreatedResultSchema(OpenAPISchema): """Result schema for a schemas-created request.""" schema_ids = fields.List( fields.Str(description="Schema identifiers", **INDY_SCHEMA_ID) )
[docs]class SchemaIdMatchInfoSchema(OpenAPISchema): """Path parameters and validators for request taking schema id.""" schema_id = fields.Str( description="Schema identifier", required=True, validate=Regexp(rf"^[1-9][0-9]*|[{B58}]{{21,22}}:2:.+:[0-9.]+$"), example=INDY_SCHEMA_ID["example"], )
[docs]class CreateSchemaTxnForEndorserOptionSchema(OpenAPISchema): """Class for user to input whether to create a transaction for endorser or not.""" create_transaction_for_endorser = fields.Boolean( description="Create Transaction For Endorser's signature", required=False, )
[docs]class SchemaConnIdMatchInfoSchema(OpenAPISchema): """Path parameters and validators for request taking connection id.""" conn_id = fields.Str( description="Connection identifier", required=False, example=UUIDFour.EXAMPLE )
[docs]@docs(tags=["schema"], summary="Sends a schema to the ledger") @request_schema(SchemaSendRequestSchema()) @querystring_schema(CreateSchemaTxnForEndorserOptionSchema()) @querystring_schema(SchemaConnIdMatchInfoSchema()) @response_schema(TxnOrSchemaSendResultSchema(), 200, description="") async def schemas_send_schema(request: web.BaseRequest): """ Request handler for sending a credential offer. Args: request: aiohttp request object Returns: The schema id sent """ context: AdminRequestContext = request["context"] create_transaction_for_endorser = json.loads( request.query.get("create_transaction_for_endorser", "false") ) write_ledger = not create_transaction_for_endorser endorser_did = None connection_id = request.query.get("conn_id") body = await request.json() schema_name = body.get("schema_name") schema_version = body.get("schema_version") attributes = body.get("attributes") if not write_ledger: try: async with context.session() as session: connection_record = await ConnRecord.retrieve_by_id( session, connection_id ) except StorageNotFoundError as err: raise web.HTTPNotFound(reason=err.roll_up) from err except BaseModelError as err: raise web.HTTPBadRequest(reason=err.roll_up) from err session = await context.session() endorser_info = await connection_record.metadata_get(session, "endorser_info") if not endorser_info: raise web.HTTPForbidden( reason="Endorser Info is not set up in " "connection metadata for this connection record" ) if "endorser_did" not in endorser_info.keys(): raise web.HTTPForbidden( reason=' "endorser_did" is not set in "endorser_info"' " in connection metadata for this connection record" ) endorser_did = endorser_info["endorser_did"] ledger = context.inject(BaseLedger, required=False) if not ledger: reason = "No ledger available" if not context.settings.get_value("wallet.type"): reason += ": missing wallet-type?" raise web.HTTPForbidden(reason=reason) issuer = context.inject(IndyIssuer) async with ledger: try: # if create_transaction_for_endorser, then the returned "schema_def" # is actually the signed transaction schema_id, schema_def = await shield( ledger.create_and_send_schema( issuer, schema_name, schema_version, attributes, write_ledger=write_ledger, endorser_did=endorser_did, ) ) except (IndyIssuerError, LedgerError) as err: raise web.HTTPBadRequest(reason=err.roll_up) from err if not create_transaction_for_endorser: return web.json_response({"schema_id": schema_id, "schema": schema_def}) else: session = await context.session() transaction_mgr = TransactionManager(session) try: transaction = await transaction_mgr.create_record( messages_attach=schema_def["signed_txn"], connection_id=connection_id ) except StorageError as err: raise web.HTTPBadRequest(reason=err.roll_up) from err return web.json_response({"txn": transaction.serialize()})
[docs]@docs( tags=["schema"], summary="Search for matching schema that agent originated", ) @querystring_schema(SchemaQueryStringSchema()) @response_schema(SchemasCreatedResultSchema(), 200, description="") async def schemas_created(request: web.BaseRequest): """ Request handler for retrieving schemas that current agent created. Args: request: aiohttp request object Returns: The identifiers of matching schemas """ context: AdminRequestContext = request["context"] session = await context.session() storage = session.inject(BaseStorage) found = await storage.find_all_records( type_filter=SCHEMA_SENT_RECORD_TYPE, tag_query={ tag: request.query[tag] for tag in SCHEMA_TAGS if tag in request.query }, ) return web.json_response({"schema_ids": [record.value for record in found]})
[docs]@docs(tags=["schema"], summary="Gets a schema from the ledger") @match_info_schema(SchemaIdMatchInfoSchema()) @response_schema(SchemaGetResultSchema(), 200, description="") async def schemas_get_schema(request: web.BaseRequest): """ Request handler for sending a credential offer. Args: request: aiohttp request object Returns: The schema details. """ context: AdminRequestContext = request["context"] schema_id = request.match_info["schema_id"] ledger = context.inject(BaseLedger, required=False) if not ledger: reason = "No ledger available" if not context.settings.get_value("wallet.type"): reason += ": missing wallet-type?" raise web.HTTPForbidden(reason=reason) async with ledger: try: schema = await ledger.get_schema(schema_id) except LedgerError as err: raise web.HTTPBadRequest(reason=err.roll_up) from err return web.json_response({"schema": schema})
[docs]async def register(app: web.Application): """Register routes.""" app.add_routes( [ web.post("/schemas", schemas_send_schema), web.get("/schemas/created", schemas_created, allow_head=False), web.get("/schemas/{schema_id}", schemas_get_schema, allow_head=False), ] )
[docs]def post_process_routes(app: web.Application): """Amend swagger API.""" # Add top-level tags description if "tags" not in app._state["swagger_dict"]: app._state["swagger_dict"]["tags"] = [] app._state["swagger_dict"]["tags"].append( { "name": "schema", "description": "Schema operations", "externalDocs": { "description": "Specification", "url": ( "https://github.com/hyperledger/indy-node/blob/master/" "design/anoncreds.md#schema" ), }, } )