Source code for aries_cloudagent.verifier.indy

"""Indy verifier implementation."""

from enum import Enum
import json
import logging

import indy.anoncreds
from indy.error import IndyError

from ..messaging.util import canon, encode
from .base import BaseVerifier

LOGGER = logging.getLogger(__name__)


[docs]class PreVerifyResult(Enum): """Represent the result of IndyVerifier.pre_verify.""" OK = "ok" INCOMPLETE = "missing essential components" ENCODING_MISMATCH = "demonstrates tampering with raw values"
[docs]class IndyVerifier(BaseVerifier): """Indy verifier class.""" def __init__(self, wallet): """ Initialize an IndyVerifier instance. Args: wallet: IndyWallet instance """ self.wallet = wallet
[docs] @staticmethod def pre_verify(pres_req: dict, pres: dict) -> (PreVerifyResult, str): """ Check for essential components and tampering in presentation. Visit encoded attribute values against raw, and predicate bounds, in presentation, cross-reference against presentation request. Args: pres_req: presentation request pres: corresponding presentation Returns: An instance of `PreVerifyResult` representing the validation result """ if not pres: return (PreVerifyResult.INCOMPLETE, "No proof provided") if "requested_proof" not in pres: return (PreVerifyResult.INCOMPLETE, "Missing 'requested_proof'") if "proof" not in pres: return (PreVerifyResult.INCOMPLETE, "Missing 'proof'") for (uuid, req_pred) in pres_req["requested_predicates"].items(): canon_attr = canon(req_pred["name"]) try: for ge_proof in pres["proof"]["proofs"][ pres["requested_proof"]["predicates"][uuid]["sub_proof_index"] ]["primary_proof"]["ge_proofs"]: pred = ge_proof["predicate"] if pred["attr_name"] == canon_attr: if pred["value"] != req_pred["p_value"]: return ( PreVerifyResult.INCOMPLETE, f"Predicate value != p_value: {pred['attr_name']}", ) break else: return ( PreVerifyResult.INCOMPLETE, f"Missing requested predicate '{uuid}'", ) except (KeyError, TypeError): return ( PreVerifyResult.INCOMPLETE, f"Missing requested predicate '{uuid}'", ) revealed_attrs = pres["requested_proof"].get("revealed_attrs", {}) revealed_groups = pres["requested_proof"].get("revealed_attr_groups", {}) self_attested = pres["requested_proof"].get("self_attested_attrs", {}) for (uuid, req_attr) in pres_req["requested_attributes"].items(): if "name" in req_attr: if uuid in revealed_attrs: pres_req_attr_spec = {req_attr["name"]: revealed_attrs[uuid]} elif uuid in self_attested: if not req_attr.get("restrictions"): continue else: return ( PreVerifyResult.INCOMPLETE, "Attribute with restrictions cannot be self-attested " f"'{req_attr['name']}'", ) else: return ( PreVerifyResult.INCOMPLETE, f"Missing requested attribute '{req_attr['name']}'", ) elif "names" in req_attr: group_spec = revealed_groups.get(uuid) if ( group_spec is None or "sub_proof_index" not in group_spec or "values" not in group_spec ): return ( PreVerifyResult.INCOMPLETE, f"Missing requested attribute group '{uuid}'", ) pres_req_attr_spec = { attr: { "sub_proof_index": group_spec["sub_proof_index"], **group_spec["values"].get(attr), } for attr in req_attr["names"] } else: return ( PreVerifyResult.INCOMPLETE, f"Request attribute missing 'name' and 'names': '{uuid}'", ) for (attr, spec) in pres_req_attr_spec.items(): try: primary_enco = pres["proof"]["proofs"][spec["sub_proof_index"]][ "primary_proof" ]["eq_proof"]["revealed_attrs"][canon(attr)] except (KeyError, TypeError): return ( PreVerifyResult.INCOMPLETE, f"Missing revealed attribute: '{attr}'", ) if primary_enco != spec["encoded"]: return ( PreVerifyResult.ENCODING_MISMATCH, f"Encoded representation mismatch for '{attr}'", ) if primary_enco != encode(spec["raw"]): return ( PreVerifyResult.ENCODING_MISMATCH, f"Encoded representation mismatch for '{attr}'", ) return (PreVerifyResult.OK, None)
[docs] async def verify_presentation( self, presentation_request, presentation, schemas, credential_definitions, rev_reg_defs, rev_reg_entries, ) -> bool: """ Verify a presentation. Args: presentation_request: Presentation request data presentation: Presentation data schemas: Schema data credential_definitions: credential definition data rev_reg_defs: revocation registry definitions rev_reg_entries: revocation registry entries """ (pv_result, pv_msg) = self.pre_verify(presentation_request, presentation) if pv_result != PreVerifyResult.OK: LOGGER.error( f"Presentation on nonce={presentation_request['nonce']} " f"cannot be validated: {pv_result.value} [{pv_msg}]" ) return False try: verified = await indy.anoncreds.verifier_verify_proof( json.dumps(presentation_request), json.dumps(presentation), json.dumps(schemas), json.dumps(credential_definitions), json.dumps(rev_reg_defs), json.dumps(rev_reg_entries), ) except IndyError: LOGGER.exception( f"Validation of presentation on nonce={presentation_request['nonce']} " "failed with error" ) verified = False return verified