"""Base Indy Verifier class."""
import logging
from abc import ABC, ABCMeta, abstractmethod
from time import time
from typing import Mapping
from ..ledger.base import BaseLedger
from ..messaging.util import canon, encode
from .models.xform import indy_proof_req2non_revoc_intervals
LOGGER = logging.getLogger(__name__)
[docs]class IndyVerifier(ABC, metaclass=ABCMeta):
"""Base class for Indy Verifier."""
def __repr__(self) -> str:
"""
Return a human readable representation of this class.
Returns:
A human readable string for this class
"""
return "<{}>".format(self.__class__.__name__)
[docs] def non_revoc_intervals(self, pres_req: dict, pres: dict):
"""
Remove superfluous non-revocation intervals in presentation request.
Irrevocable credentials constitute proof of non-revocation, but
indy rejects proof requests with non-revocation intervals lining up
with non-revocable credentials in proof: seek and remove.
Args:
pres_req: presentation request
pres: corresponding presentation
"""
for (req_proof_key, pres_key) in {
"revealed_attrs": "requested_attributes",
"revealed_attr_groups": "requested_attributes",
"predicates": "requested_predicates",
}.items():
for (uuid, spec) in pres["requested_proof"].get(req_proof_key, {}).items():
if (
pres["identifiers"][spec["sub_proof_index"]].get("timestamp")
is None
):
if pres_req[pres_key][uuid].pop("non_revoked", None):
LOGGER.info(
(
"Amended presentation request (nonce=%s): removed "
"non-revocation interval at %s referent "
"%s; corresponding credential in proof is irrevocable"
),
pres_req["nonce"],
pres_key,
uuid,
)
if all(spec.get("timestamp") is None for spec in pres["identifiers"]):
pres_req.pop("non_revoked", None)
LOGGER.warning(
(
"Amended presentation request (nonce=%s); removed global "
"non-revocation interval; no revocable credentials in proof"
),
pres_req["nonce"],
)
[docs] async def check_timestamps(
self,
ledger: BaseLedger,
pres_req: Mapping,
pres: Mapping,
rev_reg_defs: Mapping,
):
"""
Check for suspicious, missing, and superfluous timestamps.
Raises ValueError on timestamp in the future, prior to rev reg creation,
superfluous or missing.
Args:
ledger: the base ledger for retrieving revocation registry definitions
pres_req: indy proof request
pres: indy proof request
rev_reg_defs: rev reg defs by rev reg id, augmented with transaction times
"""
now = int(time())
non_revoc_intervals = indy_proof_req2non_revoc_intervals(pres_req)
# timestamp for irrevocable credential
async with ledger:
for (index, ident) in enumerate(pres["identifiers"]):
if ident.get("timestamp"):
cred_def_id = ident["cred_def_id"]
cred_def = await ledger.get_credential_definition(cred_def_id)
if not cred_def["value"].get("revocation"):
raise ValueError(
f"Timestamp in presentation identifier #{index} "
f"for irrevocable cred def id {cred_def_id}"
)
# timestamp in the future too far in the past
for ident in pres["identifiers"]:
timestamp = ident.get("timestamp")
rev_reg_id = ident.get("rev_reg_id")
if not timestamp:
continue
if timestamp > now + 300: # allow 5 min for clock skew
raise ValueError(f"Timestamp {timestamp} is in the future")
if timestamp < rev_reg_defs[rev_reg_id]["txnTime"]:
raise ValueError(
f"Timestamp {timestamp} predates rev reg {rev_reg_id} creation"
)
# timestamp superfluous, missing, or outside non-revocation interval
revealed_attrs = pres["requested_proof"].get("revealed_attrs", {})
revealed_groups = pres["requested_proof"].get("revealed_attr_groups", {})
self_attested = pres["requested_proof"].get("self_attested_attrs", {})
preds = pres["requested_proof"].get("predicates", {})
for (uuid, req_attr) in pres_req["requested_attributes"].items():
if "name" in req_attr:
if uuid in revealed_attrs:
index = revealed_attrs[uuid]["sub_proof_index"]
timestamp = pres["identifiers"][index].get("timestamp")
if (timestamp is not None) ^ bool(non_revoc_intervals.get(uuid)):
raise ValueError(
f"Timestamp on sub-proof #{index} "
f"is {'superfluous' if timestamp else 'missing'} "
f"vs. requested attribute {uuid}"
)
if non_revoc_intervals.get(uuid) and not (
non_revoc_intervals[uuid].get("from", 0)
< timestamp
< non_revoc_intervals[uuid].get("to", now)
):
LOGGER.info(
f"Timestamp {timestamp} from ledger for item"
f"{uuid} falls outside non-revocation interval "
f"{non_revoc_intervals[uuid]}"
)
elif uuid not in self_attested:
raise ValueError(
f"Presentation attributes mismatch requested attribute {uuid}"
)
elif "names" in req_attr:
group_spec = revealed_groups.get(uuid)
if (
group_spec is None
or "sub_proof_index" not in group_spec
or "values" not in group_spec
):
raise ValueError(f"Missing requested attribute group {uuid}")
index = group_spec["sub_proof_index"]
timestamp = pres["identifiers"][index].get("timestamp")
if (timestamp is not None) ^ bool(non_revoc_intervals.get(uuid)):
raise ValueError(
f"Timestamp on sub-proof #{index} "
f"is {'superfluous' if timestamp else 'missing'} "
f"vs. requested attribute group {uuid}"
)
if non_revoc_intervals.get(uuid) and not (
non_revoc_intervals[uuid].get("from", 0)
< timestamp
< non_revoc_intervals[uuid].get("to", now)
):
LOGGER.warning(
f"Timestamp {timestamp} from ledger for item"
f"{uuid} falls outside non-revocation interval "
f"{non_revoc_intervals[uuid]}"
)
for (uuid, req_pred) in pres_req["requested_predicates"].items():
pred_spec = preds.get(uuid)
if pred_spec is None or "sub_proof_index" not in pred_spec:
f"Presentation predicates mismatch requested predicate {uuid}"
index = pred_spec["sub_proof_index"]
timestamp = pres["identifiers"][index].get("timestamp")
if (timestamp is not None) ^ bool(non_revoc_intervals.get(uuid)):
raise ValueError(
f"Timestamp on sub-proof #{index} "
f"is {'superfluous' if timestamp else 'missing'} "
f"vs. requested predicate {uuid}"
)
if non_revoc_intervals.get(uuid) and not (
non_revoc_intervals[uuid].get("from", 0)
< timestamp
< non_revoc_intervals[uuid].get("to", now)
):
LOGGER.warning(
f"Best-effort timestamp {timestamp} "
"from ledger falls outside non-revocation interval "
f"{non_revoc_intervals[uuid]}"
)
[docs] async def pre_verify(self, pres_req: dict, pres: dict):
"""
Check for essential components and tampering in presentation.
Visit encoded attribute values against raw, and predicate bounds,
in presentation, cross-reference against presentation request.
Args:
pres_req: presentation request
pres: corresponding presentation
"""
if not (
pres_req
and "requested_predicates" in pres_req
and "requested_attributes" in pres_req
):
raise ValueError("Incomplete or missing proof request")
if not pres:
raise ValueError("No proof provided")
if "requested_proof" not in pres:
raise ValueError("Presentation missing 'requested_proof'")
if "proof" not in pres:
raise ValueError("Presentation missing 'proof'")
for (uuid, req_pred) in pres_req["requested_predicates"].items():
try:
canon_attr = canon(req_pred["name"])
for ge_proof in pres["proof"]["proofs"][
pres["requested_proof"]["predicates"][uuid]["sub_proof_index"]
]["primary_proof"]["ge_proofs"]:
pred = ge_proof["predicate"]
if pred["attr_name"] == canon_attr:
if pred["value"] != req_pred["p_value"]:
raise ValueError(
f"Predicate value != p_value: {pred['attr_name']}"
)
break
else:
raise ValueError(f"Missing requested predicate '{uuid}'")
except (KeyError, TypeError):
raise ValueError(f"Missing requested predicate '{uuid}'")
revealed_attrs = pres["requested_proof"].get("revealed_attrs", {})
revealed_groups = pres["requested_proof"].get("revealed_attr_groups", {})
self_attested = pres["requested_proof"].get("self_attested_attrs", {})
for (uuid, req_attr) in pres_req["requested_attributes"].items():
if "name" in req_attr:
if uuid in revealed_attrs:
pres_req_attr_spec = {req_attr["name"]: revealed_attrs[uuid]}
elif uuid in self_attested:
if not req_attr.get("restrictions"):
continue
raise ValueError(
"Attribute with restrictions cannot be self-attested: "
f"'{req_attr['name']}'"
)
else:
raise ValueError(
f"Missing requested attribute '{req_attr['name']}'"
)
elif "names" in req_attr:
group_spec = revealed_groups[uuid]
pres_req_attr_spec = {
attr: {
"sub_proof_index": group_spec["sub_proof_index"],
**group_spec["values"].get(attr),
}
for attr in req_attr["names"]
}
else:
raise ValueError(
f"Request attribute missing 'name' and 'names': '{uuid}'"
)
for (attr, spec) in pres_req_attr_spec.items():
try:
primary_enco = pres["proof"]["proofs"][spec["sub_proof_index"]][
"primary_proof"
]["eq_proof"]["revealed_attrs"][canon(attr)]
except (KeyError, TypeError):
raise ValueError(f"Missing revealed attribute: '{attr}'")
if primary_enco != spec["encoded"]:
raise ValueError(f"Encoded representation mismatch for '{attr}'")
if primary_enco != encode(spec["raw"]):
raise ValueError(f"Encoded representation mismatch for '{attr}'")
[docs] @abstractmethod
def verify_presentation(
self,
presentation_request,
presentation,
schemas,
credential_definitions,
rev_reg_defs,
rev_reg_entries,
):
"""
Verify a presentation.
Args:
presentation_request: Presentation request data
presentation: Presentation data
schemas: Schema data
credential_definitions: credential definition data
rev_reg_defs: revocation registry definitions
rev_reg_entries: revocation registry entries
"""