Source code for aries_cloudagent.protocols.present_proof.dif.pres_exch_handler

"""
Utilities for dif presentation exchange attachment.

General Flow:
create_vp ->
make_requirement [create a Requirement from SubmissionRequirements and Descriptors] ->
apply_requirement [filter credentials] ->
merge [return applicable credential list and descriptor_map for presentation_submission]
returns VerifiablePresentation
"""
import pytz
import re
import logging

from datetime import datetime
from dateutil.parser import parse as dateutil_parser
from dateutil.parser import ParserError
from jsonpath_ng import parse
from pyld import jsonld
from pyld.jsonld import JsonLdProcessor
from typing import Sequence, Optional, Tuple, Union, Dict, List
from unflatten import unflatten
from uuid import uuid4

from ....core.error import BaseError
from ....core.profile import Profile
from ....did.did_key import DIDKey
from ....storage.vc_holder.vc_record import VCRecord
from ....vc.ld_proofs import (
    Ed25519Signature2018,
    BbsBlsSignature2020,
    BbsBlsSignatureProof2020,
    WalletKeyPair,
    DocumentLoader,
)
from ....vc.ld_proofs.constants import (
    SECURITY_CONTEXT_BBS_URL,
    EXPANDED_TYPE_CREDENTIALS_CONTEXT_V1_VC_TYPE,
)
from ....vc.vc_ld.prove import sign_presentation, create_presentation, derive_credential
from ....wallet.base import BaseWallet, DIDInfo
from ....wallet.error import WalletError, WalletNotFoundError
from ....wallet.key_type import BLS12381G2, ED25519

from .pres_exch import (
    PresentationDefinition,
    InputDescriptors,
    DIFField,
    Filter,
    Constraints,
    SubmissionRequirements,
    Requirement,
    SchemaInputDescriptor,
    SchemasInputDescriptorFilter,
    InputDescriptorMapping,
    PresentationSubmission,
)

PRESENTATION_SUBMISSION_JSONLD_CONTEXT = (
    "https://identity.foundation/presentation-exchange/submission/v1"
)
PRESENTATION_SUBMISSION_JSONLD_TYPE = "PresentationSubmission"
PYTZ_TIMEZONE_PATTERN = re.compile(r"(([a-zA-Z]+)(?:\/)([a-zA-Z]+))")
LIST_INDEX_PATTERN = re.compile(r"\[(\W+)\]|\[(\d+)\]")
LOGGER = logging.getLogger(__name__)


[docs]class DIFPresExchError(BaseError): """Base class for DIF Presentation Exchange related errors."""
[docs]class DIFPresExchHandler: """Base Presentation Exchange Handler.""" ISSUE_SIGNATURE_SUITE_KEY_TYPE_MAPPING = { Ed25519Signature2018: ED25519, } if BbsBlsSignature2020.BBS_SUPPORTED: ISSUE_SIGNATURE_SUITE_KEY_TYPE_MAPPING[BbsBlsSignature2020] = BLS12381G2 DERIVE_SIGNATURE_SUITE_KEY_TYPE_MAPPING = { BbsBlsSignatureProof2020: BLS12381G2, } PROOF_TYPE_SIGNATURE_SUITE_MAPPING = { suite.signature_type: suite for suite, key_type in ISSUE_SIGNATURE_SUITE_KEY_TYPE_MAPPING.items() } DERIVED_PROOF_TYPE_SIGNATURE_SUITE_MAPPING = { suite.signature_type: suite for suite, key_type in DERIVE_SIGNATURE_SUITE_KEY_TYPE_MAPPING.items() } def __init__( self, profile: Profile, pres_signing_did: str = None, proof_type: str = None, reveal_doc: dict = None, ): """Initialize PresExchange Handler.""" super().__init__() self.profile = profile self.pres_signing_did = pres_signing_did if not proof_type: self.proof_type = Ed25519Signature2018.signature_type else: self.proof_type = proof_type self.is_holder = False self.reveal_doc_frame = reveal_doc async def _get_issue_suite( self, *, wallet: BaseWallet, issuer_id: str, ): """Get signature suite for signing presentation.""" did_info = await self._did_info_for_did(issuer_id) verification_method = self._get_verification_method(issuer_id) # Get signature class based on proof type SignatureClass = self.PROOF_TYPE_SIGNATURE_SUITE_MAPPING[self.proof_type] # Generically create signature class return SignatureClass( verification_method=verification_method, key_pair=WalletKeyPair( wallet=wallet, key_type=self.ISSUE_SIGNATURE_SUITE_KEY_TYPE_MAPPING[SignatureClass], public_key_base58=did_info.verkey if did_info else None, ), ) async def _get_derive_suite( self, *, wallet: BaseWallet, ): """Get signature suite for deriving credentials.""" # Get signature class based on proof type SignatureClass = self.DERIVED_PROOF_TYPE_SIGNATURE_SUITE_MAPPING[ "BbsBlsSignatureProof2020" ] # Generically create signature class return SignatureClass( key_pair=WalletKeyPair( wallet=wallet, key_type=self.DERIVE_SIGNATURE_SUITE_KEY_TYPE_MAPPING[SignatureClass], ), ) def _get_verification_method(self, did: str): """Get the verification method for a did.""" if did.startswith("did:key:"): return DIDKey.from_did(did).key_id elif did.startswith("did:sov:"): # key-1 is what uniresolver uses for key id return did + "#key-1" else: raise DIFPresExchError( f"Unable to get retrieve verification method for did {did}" ) async def _did_info_for_did(self, did: str) -> DIDInfo: """Get the did info for specified did. If the did starts with did:sov it will remove the prefix for backwards compatibility with not fully qualified did. Args: did (str): The did to retrieve from the wallet. Raises: WalletNotFoundError: If the did is not found in the wallet. Returns: DIDInfo: did information """ async with self.profile.session() as session: wallet = session.inject(BaseWallet) # If the did starts with did:sov we need to query without if did.startswith("did:sov:"): return await wallet.get_local_did(did.replace("did:sov:", "")) # All other methods we can just query return await wallet.get_local_did(did)
[docs] async def get_sign_key_credential_subject_id( self, applicable_creds: Sequence[VCRecord] ) -> Tuple[Optional[str], Sequence[dict]]: """Get the issuer_id and filtered_creds from enclosed credentials subject_ids.""" issuer_id = None filtered_creds_list = [] if self.proof_type == BbsBlsSignature2020.signature_type: reqd_key_type = BLS12381G2 else: reqd_key_type = ED25519 for cred in applicable_creds: if cred.subject_ids and len(cred.subject_ids) > 0: if not issuer_id: for cred_subject_id in cred.subject_ids: if not cred_subject_id.startswith("urn:"): did_info = await self._did_info_for_did(cred_subject_id) if did_info.key_type == reqd_key_type: issuer_id = cred_subject_id filtered_creds_list.append(cred.cred_value) break else: if issuer_id in cred.subject_ids: filtered_creds_list.append(cred.cred_value) else: raise DIFPresExchError( "Applicable credentials have different credentialSubject.id, " "multiple proofs are not supported currently" ) return (issuer_id, filtered_creds_list)
[docs] async def to_requirement( self, sr: SubmissionRequirements, descriptors: Sequence[InputDescriptors] ) -> Requirement: """ Return Requirement. Args: sr: submission_requirement descriptors: list of input_descriptors Raises: DIFPresExchError: If not able to create requirement """ input_descriptors = [] nested = [] total_count = 0 if sr._from: if sr._from != "": for descriptor in descriptors: if self.contains(descriptor.groups, sr._from): input_descriptors.append(descriptor) total_count = len(input_descriptors) if total_count == 0: raise DIFPresExchError(f"No descriptors for from: {sr._from}") else: for submission_requirement in sr.from_nested: try: # recursion logic requirement = await self.to_requirement( submission_requirement, descriptors ) nested.append(requirement) except Exception as err: raise DIFPresExchError( ( "Error creating requirement from " f"nested submission_requirements, {err}" ) ) total_count = len(nested) count = sr.count if sr.rule == "all": count = total_count requirement = Requirement( count=count, maximum=sr.maximum, minimum=sr.minimum, input_descriptors=input_descriptors, nested_req=nested, ) return requirement
[docs] async def make_requirement( self, srs: Sequence[SubmissionRequirements] = None, descriptors: Sequence[InputDescriptors] = None, ) -> Requirement: """ Return Requirement. Creates and return Requirement with nesting if required using to_requirement() Args: srs: list of submission_requirements descriptors: list of input_descriptors Raises: DIFPresExchError: If not able to create requirement """ if not srs: srs = [] if not descriptors: descriptors = [] if len(srs) == 0: requirement = Requirement( count=len(descriptors), input_descriptors=descriptors, ) return requirement requirement = Requirement( count=len(srs), nested_req=[], ) for submission_requirement in srs: try: requirement.nested_req.append( await self.to_requirement(submission_requirement, descriptors) ) except Exception as err: raise DIFPresExchError( f"Error creating requirement inside to_requirement function, {err}" ) return requirement
[docs] def is_len_applicable(self, req: Requirement, val: int) -> bool: """ Check and validate requirement minimum, maximum and count. Args: req: Requirement val: int value to check Return: bool """ if req.count: if req.count > 0 and val != req.count: return False if req.minimum: if req.minimum > 0 and req.minimum > val: return False if req.maximum: if req.maximum > 0 and req.maximum < val: return False return True
[docs] def contains(self, data: Sequence[str], e: str) -> bool: """ Check for e in data. Returns True if e exists in data else return False Args: data: Sequence of str e: str value to check Return: bool """ data_list = list(data) if data else [] for k in data_list: if e == k: return True return False
[docs] async def filter_constraints( self, constraints: Constraints, credentials: Sequence[VCRecord], ) -> Sequence[VCRecord]: """ Return list of applicable VCRecords after applying filtering. Args: constraints: Constraints credentials: Sequence of credentials to apply filtering on Return: Sequence of applicable VCRecords """ document_loader = self.profile.inject(DocumentLoader) result = [] for credential in credentials: if constraints.subject_issuer == "required" and not self.subject_is_issuer( credential=credential ): continue applicable = False is_holder_field_ids = self.field_ids_for_is_holder(constraints) for field in constraints._fields: applicable = await self.filter_by_field(field, credential) # all fields in the constraint should be satisfied if not applicable: break # is_holder with required directive requested for this field if applicable and field.id and field.id in is_holder_field_ids: # Missing credentialSubject.id - cannot verify that holder of claim # is same as subject credential_subject_ids = credential.subject_ids if not credential_subject_ids or len(credential_subject_ids) == 0: applicable = False break # Holder of claim is not same as the subject is_holder_same_as_subject = await self.process_constraint_holders( subject_ids=credential_subject_ids ) if not is_holder_same_as_subject: applicable = False break if not applicable: continue if constraints.limit_disclosure == "required": credential_dict = credential.cred_value new_credential_dict = self.reveal_doc( credential_dict=credential_dict, constraints=constraints ) async with self.profile.session() as session: wallet = session.inject(BaseWallet) derive_suite = await self._get_derive_suite( wallet=wallet, ) signed_new_credential_dict = await derive_credential( credential=credential_dict, reveal_document=new_credential_dict, suite=derive_suite, document_loader=document_loader, ) credential = self.create_vcrecord(signed_new_credential_dict) result.append(credential) return result
[docs] def field_ids_for_is_holder(self, constraints: Constraints) -> Sequence[str]: """Return list of field ids for whose subject holder verification is requested.""" reqd_field_ids = set() if not constraints.holders: reqd_field_ids = [] return reqd_field_ids for holder in constraints.holders: if holder.directive == "required" or holder.directive == "preferred": reqd_field_ids = set.union(reqd_field_ids, set(holder.field_ids)) return list(reqd_field_ids)
[docs] async def process_constraint_holders( self, subject_ids: Sequence[str], ) -> bool: """Check if holder or subject of claim still controls the identifier.""" async with self.profile.session() as session: wallet = session.inject(BaseWallet) try: for subject_id in subject_ids: await wallet.get_local_did(subject_id.replace("did:sov:", "")) self.is_holder = True return True except (WalletError, WalletNotFoundError): return False
[docs] def create_vcrecord(self, cred_dict: dict) -> VCRecord: """Return VCRecord from a credential dict.""" proofs = cred_dict.get("proof") or [] proof_types = None if type(proofs) is dict: proofs = [proofs] if proofs: proof_types = [proof.get("type") for proof in proofs] contexts = [ctx for ctx in cred_dict.get("@context") if type(ctx) is str] if "@graph" in cred_dict: for enclosed_data in cred_dict.get("@graph"): if ( enclosed_data["id"].startswith("urn:") and "credentialSubject" in enclosed_data ): cred_dict.update(enclosed_data) del cred_dict["@graph"] break given_id = cred_dict.get("id") if given_id and self.check_if_cred_id_derived(given_id): given_id = str(uuid4()) # issuer issuer = cred_dict.get("issuer") if type(issuer) is dict: issuer = issuer.get("id") # subjects subject_ids = None subjects = cred_dict.get("credentialSubject") if subjects: if type(subjects) is dict: subjects = [subjects] subject_ids = [ subject.get("id") for subject in subjects if ("id" in subject) ] else: cred_dict["credentialSubject"] = {} # Schemas schemas = cred_dict.get("credentialSchema", []) if type(schemas) is dict: schemas = [schemas] schema_ids = [schema.get("id") for schema in schemas] expanded = jsonld.expand(cred_dict) types = JsonLdProcessor.get_values( expanded[0], "@type", ) return VCRecord( contexts=contexts, expanded_types=types, issuer_id=issuer, subject_ids=subject_ids, proof_types=proof_types, given_id=given_id, cred_value=cred_dict, schema_ids=schema_ids, )
[docs] def reveal_doc(self, credential_dict: dict, constraints: Constraints): """Generate reveal_doc dict for deriving credential.""" if not self.reveal_doc_frame: derived = { "@context": credential_dict.get("@context"), "type": credential_dict.get("type"), "@explicit": True, "@requireAll": True, "issuanceDate": {}, "issuer": {}, } unflatten_dict = {} for field in constraints._fields: for path in field.paths: jsonpath = parse(path) match = jsonpath.find(credential_dict) if len(match) == 0: continue for match_item in match: full_path = str(match_item.full_path) if bool(LIST_INDEX_PATTERN.search(full_path)): full_path = re.sub(r"\[(\W+)\]|\[(\d+)\]", "[0]", full_path) full_path = full_path.replace(".[", "[") unflatten_dict[full_path] = {} explicit_key_path = None key_list = full_path.split(".")[:-1] for key in key_list: if not explicit_key_path: explicit_key_path = key else: explicit_key_path = explicit_key_path + "." + key unflatten_dict[explicit_key_path + ".@explicit"] = True unflatten_dict[explicit_key_path + ".@requireAll"] = True derived = self.new_credential_builder(derived, unflatten_dict) # Fix issue related to credentialSubject type property if "credentialSubject" in derived.keys(): if "type" in credential_dict.get("credentialSubject"): derived["credentialSubject"]["type"] = credential_dict[ "credentialSubject" ].get("type") return derived else: return self.reveal_doc_frame
[docs] def new_credential_builder( self, new_credential: dict, unflatten_dict: dict ) -> dict: """ Update and return the new_credential. Args: new_credential: credential dict to be updated and returned unflatten_dict: dict with traversal path as key and match_value as value Return: dict """ new_credential.update(unflatten(unflatten_dict)) return new_credential
[docs] async def filter_by_field(self, field: DIFField, credential: VCRecord) -> bool: """ Apply filter on VCRecord. Checks if a credential is applicable Args: field: Field contains filtering spec credential: credential to apply filtering on Return: bool """ credential_dict = credential.cred_value for path in field.paths: if "$.proof." in path: raise DIFPresExchError( "JSON Path expression matching on proof object " "is not currently supported" ) try: jsonpath = parse(path) match = jsonpath.find(credential_dict) except KeyError: continue if len(match) == 0: continue for match_item in match: # No filter in constraint if not field._filter: return True if ( isinstance(match_item.value, dict) and "type" in match_item.value and "@value" in match_item.value ): to_filter_type = match_item.value.get("type") to_filter_value = match_item.value.get("@value") if "integer" in to_filter_type: to_filter_value = int(to_filter_value) elif "dateTime" in to_filter_type or "date" in to_filter_type: to_filter_value = self.string_to_timezone_aware_datetime( to_filter_value ) elif "boolean" in to_filter_type: to_filter_value = bool(to_filter_value) elif "double" in to_filter_type or "decimal" in to_filter_type: to_filter_value = float(to_filter_value) else: to_filter_value = match_item.value if self.validate_patch(to_filter_value, field._filter): return True return False
[docs] def string_to_timezone_aware_datetime(self, datetime_str: str) -> datetime: """Convert string with PYTZ timezone to datetime for comparison.""" if PYTZ_TIMEZONE_PATTERN.search(datetime_str): result = PYTZ_TIMEZONE_PATTERN.search(datetime_str).group(1) datetime_str = datetime_str.replace(result, "") return dateutil_parser(datetime_str).replace(tzinfo=pytz.timezone(result)) else: utc = pytz.UTC return dateutil_parser(datetime_str).replace(tzinfo=utc)
[docs] def validate_patch(self, to_check: any, _filter: Filter) -> bool: """ Apply filter on match_value. Utility function used in applying filtering to a cred by triggering checks according to filter specification Args: to_check: value to check, extracted from match _filter: Filter Return: bool """ return_val = False if _filter._type: if self.check_filter_only_type_enforced(_filter): if _filter._type == "number": if isinstance(to_check, (int, float)): return True elif _filter._type == "string": if isinstance(to_check, str): if _filter.fmt == "date" or _filter.fmt == "date-time": try: to_compare_date = ( self.string_to_timezone_aware_datetime(to_check) ) if isinstance(to_compare_date, datetime): return True except (ParserError, TypeError): return False else: return True else: if _filter._type == "number": return_val = self.process_numeric_val(to_check, _filter) elif _filter._type == "string": return_val = self.process_string_val(to_check, _filter) else: if _filter.enums: return_val = self.enum_check(val=to_check, _filter=_filter) if _filter.const: return_val = self.const_check(val=to_check, _filter=_filter) if _filter._not: return not return_val return return_val
[docs] def check_filter_only_type_enforced(self, _filter: Filter) -> bool: """ Check if only type is specified in filter. Args: _filter: Filter Return: bool """ if ( _filter.pattern is None and _filter.minimum is None and _filter.maximum is None and _filter.min_length is None and _filter.max_length is None and _filter.exclusive_min is None and _filter.exclusive_max is None and _filter.const is None and _filter.enums is None ): return True else: return False
[docs] def process_numeric_val(self, val: any, _filter: Filter) -> bool: """ Trigger Filter checks. Trigger appropriate check for a number type filter, according to _filter spec. Args: val: value to check, extracted from match _filter: Filter Return: bool """ if _filter.exclusive_max: return self.exclusive_maximum_check(val, _filter) elif _filter.exclusive_min: return self.exclusive_minimum_check(val, _filter) elif _filter.minimum: return self.minimum_check(val, _filter) elif _filter.maximum: return self.maximum_check(val, _filter) elif _filter.const: return self.const_check(val, _filter) elif _filter.enums: return self.enum_check(val, _filter) else: return False
[docs] def process_string_val(self, val: any, _filter: Filter) -> bool: """ Trigger Filter checks. Trigger appropriate check for a string type filter, according to _filter spec. Args: val: value to check, extracted from match _filter: Filter Return: bool """ if _filter.min_length or _filter.max_length: return self.length_check(val, _filter) elif _filter.pattern: return self.pattern_check(val, _filter) elif _filter.enums: return self.enum_check(val, _filter) elif _filter.exclusive_max and _filter.fmt: return self.exclusive_maximum_check(val, _filter) elif _filter.exclusive_min and _filter.fmt: return self.exclusive_minimum_check(val, _filter) elif _filter.minimum and _filter.fmt: return self.minimum_check(val, _filter) elif _filter.maximum and _filter.fmt: return self.maximum_check(val, _filter) elif _filter.const: return self.const_check(val, _filter) else: return False
[docs] def exclusive_minimum_check(self, val: any, _filter: Filter) -> bool: """ Exclusiveminimum check. Returns True if value greater than filter specified check Args: val: value to check, extracted from match _filter: Filter Return: bool """ try: if _filter.fmt: if _filter.fmt == "date" or _filter.fmt == "date-time": to_compare_date = self.string_to_timezone_aware_datetime( _filter.exclusive_min ) given_date = self.string_to_timezone_aware_datetime(str(val)) return given_date > to_compare_date else: try: val = self.is_numeric(val) return val > _filter.exclusive_min except DIFPresExchError as err: LOGGER.error(err) return False except (TypeError, ValueError, ParserError): return False
[docs] def exclusive_maximum_check(self, val: any, _filter: Filter) -> bool: """ Exclusivemaximum check. Returns True if value less than filter specified check Args: val: value to check, extracted from match _filter: Filter Return: bool """ try: if _filter.fmt: if _filter.fmt == "date" or _filter.fmt == "date-time": to_compare_date = self.string_to_timezone_aware_datetime( _filter.exclusive_max ) given_date = self.string_to_timezone_aware_datetime(str(val)) return given_date < to_compare_date else: try: val = self.is_numeric(val) return val < _filter.exclusive_max except DIFPresExchError as err: LOGGER.error(err) return False except (TypeError, ValueError, ParserError): return False
[docs] def maximum_check(self, val: any, _filter: Filter) -> bool: """ Maximum check. Returns True if value less than equal to filter specified check Args: val: value to check, extracted from match _filter: Filter Return: bool """ try: if _filter.fmt: if _filter.fmt == "date" or _filter.fmt == "date-time": to_compare_date = self.string_to_timezone_aware_datetime( _filter.maximum ) given_date = self.string_to_timezone_aware_datetime(str(val)) return given_date <= to_compare_date else: try: val = self.is_numeric(val) return val <= _filter.maximum except DIFPresExchError as err: LOGGER.error(err) return False except (TypeError, ValueError, ParserError): return False
[docs] def minimum_check(self, val: any, _filter: Filter) -> bool: """ Minimum check. Returns True if value greater than equal to filter specified check Args: val: value to check, extracted from match _filter: Filter Return: bool """ try: if _filter.fmt: if _filter.fmt == "date" or _filter.fmt == "date-time": to_compare_date = self.string_to_timezone_aware_datetime( _filter.minimum ) given_date = self.string_to_timezone_aware_datetime(str(val)) return given_date >= to_compare_date else: try: val = self.is_numeric(val) return val >= _filter.minimum except DIFPresExchError as err: LOGGER.error(err) return False except (TypeError, ValueError, ParserError): return False
[docs] def length_check(self, val: any, _filter: Filter) -> bool: """ Length check. Returns True if length value string meets the minLength and maxLength specs Args: val: value to check, extracted from match _filter: Filter Return: bool """ given_len = len(str(val)) if _filter.max_length and _filter.min_length: if given_len <= _filter.max_length and given_len >= _filter.min_length: return True elif _filter.max_length and not _filter.min_length: if given_len <= _filter.max_length: return True elif not _filter.max_length and _filter.min_length: if given_len >= _filter.min_length: return True return False
[docs] def pattern_check(self, val: any, _filter: Filter) -> bool: """ Pattern check. Returns True if value string matches the specified pattern Args: val: value to check, extracted from match _filter: Filter Return: bool """ if _filter.pattern: return bool(re.search(pattern=_filter.pattern, string=str(val))) return False
[docs] def const_check(self, val: any, _filter: Filter) -> bool: """ Const check. Returns True if value is equal to filter specified check Args: val: value to check, extracted from match _filter: Filter Return: bool """ if val == _filter.const: return True return False
[docs] def enum_check(self, val: any, _filter: Filter) -> bool: """ Enum check. Returns True if value is contained to filter specified list Args: val: value to check, extracted from match _filter: Filter Return: bool """ if val in _filter.enums: return True return False
[docs] def subject_is_issuer(self, credential: VCRecord) -> bool: """ subject_is_issuer check. Returns True if cred issuer_id is in subject_ids Args: credential: VCRecord Return: bool """ subject_ids = credential.subject_ids for subject_id in subject_ids: issuer_id = credential.issuer_id if subject_id != "" and subject_id == issuer_id: return True return False
async def _credential_match_schema_filter_helper( self, credential: VCRecord, filter: Sequence[Sequence[SchemaInputDescriptor]] ) -> bool: """credential_match_schema for SchemasInputDescriptorFilter.uri_groups.""" applicable = False for uri_group in filter: if applicable: return True for schema in uri_group: applicable = self.credential_match_schema( credential=credential, schema_id=schema.uri ) if schema.required and not applicable: break if applicable: if schema.uri in [ EXPANDED_TYPE_CREDENTIALS_CONTEXT_V1_VC_TYPE, ]: continue else: break return applicable
[docs] async def filter_schema( self, credentials: Sequence[VCRecord], schemas: SchemasInputDescriptorFilter, ) -> Sequence[VCRecord]: """ Filter by schema. Returns list of credentials where credentialSchema.id or types matched with input_descriptors.schema.uri Args: credentials: list of VCRecords to check schemas: list of schemas from the input_descriptors Return: Sequence of filtered VCRecord """ result = [] for credential in credentials: applicable = False applicable = await self._credential_match_schema_filter_helper( credential=credential, filter=schemas.uri_groups ) if applicable: result.append(credential) return result
[docs] def credential_match_schema(self, credential: VCRecord, schema_id: str) -> bool: """ Credential matching by schema. Used by filter_schema to check if credential.schema_ids or credential.types matched with schema_id Args: credential: VCRecord to check schema_id: schema uri to check Return: bool """ if schema_id in credential.schema_ids: return True if schema_id in credential.expanded_types: return True return False
[docs] async def filter_creds_record_id( self, credentials: Sequence[VCRecord], records_list: Sequence[str] ) -> Sequence[VCRecord]: """Return filtered list of credentials using records_list.""" filtered_cred = [] for credential in credentials: if credential.record_id in records_list: filtered_cred.append(credential) return filtered_cred
[docs] async def apply_requirements( self, req: Requirement, credentials: Sequence[VCRecord], records_filter: dict = None, ) -> dict: """ Apply Requirement. Args: req: Requirement credentials: Sequence of credentials to check against Return: dict of input_descriptor ID key to list of credential_json """ # Dict for storing descriptor_id keys and list of applicable # credentials values result = {} # Get all input_descriptors attached to the PresentationDefinition descriptor_list = req.input_descriptors or [] for descriptor in descriptor_list: # Filter credentials to apply filtering # upon by matching each credentialSchema.id # or expanded types on each InputDescriptor's schema URIs if records_filter and (descriptor.id in records_filter): filtered_creds_by_descriptor_id = await self.filter_creds_record_id( credentials, records_filter.get(descriptor.id) ) filtered_by_schema = await self.filter_schema( credentials=filtered_creds_by_descriptor_id, schemas=descriptor.schemas, ) else: filtered_by_schema = await self.filter_schema( credentials=credentials, schemas=descriptor.schemas, ) # Filter credentials based upon path expressions specified in constraints filtered = await self.filter_constraints( constraints=descriptor.constraint, credentials=filtered_by_schema, ) if len(filtered) != 0: result[descriptor.id] = filtered if len(descriptor_list) != 0: # Applies min, max or count attributes of submission_requirement if self.is_len_applicable(req, len(result)): return result return {} nested_result = [] cred_uid_descriptors = {} # recursion logic for nested requirements for requirement in req.nested_req: # recursive call result = await self.apply_requirements( requirement, credentials, records_filter ) if result == {}: continue # cred_uid_descriptors maps applicable credentials # to their respective descriptor. # Structure: {cred.given_id: { # desc_id_1: {} # }, # ...... # } # This will be used to construct exclude dict. for descriptor_id in result.keys(): credential_list = result.get(descriptor_id) for credential in credential_list: cred_id = credential.given_id or credential.record_id if cred_id: cred_uid_descriptors.setdefault(cred_id, {})[descriptor_id] = {} if len(result.keys()) != 0: nested_result.append(result) exclude = {} for uid in cred_uid_descriptors.keys(): # Check if number of applicable credentials # does not meet requirement specification if not self.is_len_applicable(req, len(cred_uid_descriptors[uid])): for descriptor_id in cred_uid_descriptors[uid]: # Add to exclude dict # with cred_uid + descriptor_id as key exclude[descriptor_id + uid] = {} # merging credentials and excluding credentials that don't satisfy the requirement return await self.merge_nested_results( nested_result=nested_result, exclude=exclude )
[docs] def is_numeric(self, val: any): """ Check if val is an int or float. Args: val: to check Return: numeric value Raises: DIFPresExchError: Provided value has invalid/incompatible type """ if isinstance(val, float) or isinstance(val, int): return val elif isinstance(val, str): if val.isdigit(): return int(val) else: try: return float(val) except ValueError: pass raise DIFPresExchError( "Invalid type provided for comparision/numeric operation." )
[docs] async def merge_nested_results( self, nested_result: Sequence[dict], exclude: dict ) -> dict: """ Merge nested results with merged credentials. Args: nested_result: Sequence of dict containing input_descriptor.id as keys and list of creds as values exclude: dict containing info about credentials to exclude Return: dict with input_descriptor.id as keys and merged_credentials_list as values """ result = {} for res in nested_result: for key in res.keys(): credentials = res[key] uid_dict = {} merged_credentials = [] if key in result: for credential in result[key]: cred_id = credential.given_id or credential.record_id if cred_id and cred_id not in uid_dict: merged_credentials.append(credential) uid_dict[cred_id] = {} for credential in credentials: cred_id = credential.given_id or credential.record_id if cred_id and cred_id not in uid_dict: if (key + cred_id) not in exclude: merged_credentials.append(credential) uid_dict[cred_id] = {} result[key] = merged_credentials return result
[docs] async def create_vp( self, credentials: Sequence[VCRecord], pd: PresentationDefinition, challenge: str = None, domain: str = None, records_filter: dict = None, ) -> Union[Sequence[dict], dict]: """ Create VerifiablePresentation. Args: credentials: Sequence of VCRecords pd: PresentationDefinition Return: VerifiablePresentation """ document_loader = self.profile.inject(DocumentLoader) req = await self.make_requirement( srs=pd.submission_requirements, descriptors=pd.input_descriptors ) result = [] if req.nested_req: for nested_req in req.nested_req: res = await self.apply_requirements( req=nested_req, credentials=credentials, records_filter=records_filter, ) result.append(res) else: res = await self.apply_requirements( req=req, credentials=credentials, records_filter=records_filter ) result.append(res) result_vp = [] for res in result: applicable_creds, descriptor_maps = await self.merge(res) applicable_creds_list = [] for credential in applicable_creds: applicable_creds_list.append(credential.cred_value) if ( not self.profile.settings.get("debug.auto_respond_presentation_request") and not records_filter and len(applicable_creds_list) > 1 ): raise DIFPresExchError( "Multiple credentials are applicable for presentation_definition " f"{pd.id} and --auto-respond-presentation-request setting is not " "enabled. Please specify which credentials should be applied to " "which input_descriptors using record_ids filter." ) # submission_property submission_property = PresentationSubmission( id=str(uuid4()), definition_id=pd.id, descriptor_maps=descriptor_maps ) if self.is_holder: ( issuer_id, filtered_creds_list, ) = await self.get_sign_key_credential_subject_id( applicable_creds=applicable_creds ) if not issuer_id and len(filtered_creds_list) == 0: vp = await create_presentation(credentials=applicable_creds_list) vp["presentation_submission"] = submission_property.serialize() if self.proof_type is BbsBlsSignature2020.signature_type: vp["@context"].append(SECURITY_CONTEXT_BBS_URL) result_vp.append(vp) continue else: vp = await create_presentation(credentials=filtered_creds_list) else: if not self.pres_signing_did: ( issuer_id, filtered_creds_list, ) = await self.get_sign_key_credential_subject_id( applicable_creds=applicable_creds ) if not issuer_id: vp = await create_presentation( credentials=applicable_creds_list ) vp["presentation_submission"] = submission_property.serialize() if self.proof_type is BbsBlsSignature2020.signature_type: vp["@context"].append(SECURITY_CONTEXT_BBS_URL) result_vp.append(vp) continue else: vp = await create_presentation(credentials=filtered_creds_list) else: issuer_id = self.pres_signing_did vp = await create_presentation(credentials=applicable_creds_list) vp["presentation_submission"] = submission_property.serialize() if self.proof_type is BbsBlsSignature2020.signature_type: vp["@context"].append(SECURITY_CONTEXT_BBS_URL) async with self.profile.session() as session: wallet = session.inject(BaseWallet) issue_suite = await self._get_issue_suite( wallet=wallet, issuer_id=issuer_id, ) signed_vp = await sign_presentation( presentation=vp, suite=issue_suite, challenge=challenge, document_loader=document_loader, ) result_vp.append(signed_vp) if len(result_vp) == 1: return result_vp[0] return result_vp
[docs] def check_if_cred_id_derived(self, id: str) -> bool: """Check if credential or credentialSubjet id is derived.""" if id.startswith("urn:bnid:_:c14n"): return True return False
[docs] async def merge( self, dict_descriptor_creds: dict, ) -> Tuple[Sequence[VCRecord], Sequence[InputDescriptorMapping]]: """ Return applicable credentials and descriptor_map for attachment. Used for generating the presentation_submission property with the descriptor_map, mantaining the order in which applicable credential list is returned. Args: dict_descriptor_creds: dict with input_descriptor.id as keys and merged_credentials_list Return: Tuple of applicable credential list and descriptor map """ dict_of_creds = {} dict_of_descriptors = {} result = [] descriptors = [] sorted_desc_keys = sorted(list(dict_descriptor_creds.keys())) for desc_id in sorted_desc_keys: credentials = dict_descriptor_creds.get(desc_id) for cred in credentials: cred_id = cred.given_id or cred.record_id if cred_id: if cred_id not in dict_of_creds: result.append(cred) dict_of_creds[cred_id] = len(descriptors) if f"{cred_id}-{cred_id}" not in dict_of_descriptors: descriptor_map = InputDescriptorMapping( id=desc_id, fmt="ldp_vc", path=(f"$.verifiableCredential[{dict_of_creds[cred_id]}]"), ) descriptors.append(descriptor_map) descriptors = sorted(descriptors, key=lambda i: i.id) return (result, descriptors)
[docs] async def verify_received_pres( self, pd: PresentationDefinition, pres: Union[Sequence[dict], dict], ): """ Verify credentials received in presentation. Args: pres: received VerifiablePresentation pd: PresentationDefinition """ input_descriptors = pd.input_descriptors if isinstance(pres, Sequence): for pr in pres: descriptor_map_list = pr["presentation_submission"].get( "descriptor_map" ) await self.__verify_desc_map_list( descriptor_map_list, pr, input_descriptors ) else: descriptor_map_list = pres["presentation_submission"].get("descriptor_map") await self.__verify_desc_map_list( descriptor_map_list, pres, input_descriptors )
async def __verify_desc_map_list( self, descriptor_map_list, pres, input_descriptors ): inp_desc_id_contraint_map = {} inp_desc_id_schema_one_of_filter = set() inp_desc_id_schemas_map = {} for input_descriptor in input_descriptors: inp_desc_id_contraint_map[input_descriptor.id] = input_descriptor.constraint inp_desc_id_schemas_map[input_descriptor.id] = input_descriptor.schemas if input_descriptor.schemas.oneof_filter: inp_desc_id_schema_one_of_filter.add(input_descriptor.id) for desc_map_item in descriptor_map_list: desc_map_item_id = desc_map_item.get("id") constraint = inp_desc_id_contraint_map.get(desc_map_item_id) schema_filter = inp_desc_id_schemas_map.get(desc_map_item_id) desc_map_item_path = desc_map_item.get("path") jsonpath = parse(desc_map_item_path) match = jsonpath.find(pres) if len(match) == 0: raise DIFPresExchError( f"{desc_map_item_path} path in descriptor_map not applicable" ) for match_item in match: if not await self.apply_constraint_received_cred( constraint, match_item.value ): raise DIFPresExchError( f"Constraint specified for {desc_map_item_id} does not " f"apply to the enclosed credential in {desc_map_item_path}" ) if ( not len( await self.filter_schema( credentials=[ self.create_vcrecord(cred_dict=match_item.value) ], schemas=schema_filter, ) ) == 1 ): raise DIFPresExchError( f"Schema filtering specified in {desc_map_item_id} does not " f"match with the enclosed credential in {desc_map_item_path}" )
[docs] async def restrict_field_paths_one_of_filter( self, field_paths: Sequence[str], cred_dict: dict ) -> Sequence[str]: """Return field_paths that are applicable to oneof_filter.""" applied_field_paths = [] for path in field_paths: jsonpath = parse(path) match = jsonpath.find(cred_dict) if len(match) > 0: applied_field_paths.append(path) return applied_field_paths
[docs] async def apply_constraint_received_cred( self, constraint: Constraints, cred_dict: dict ) -> bool: """Evaluate constraint from the request against received credential.""" fields = constraint._fields field_paths = [] credential = self.create_vcrecord(cred_dict) is_limit_disclosure = constraint.limit_disclosure == "required" for field in fields: if is_limit_disclosure: field = await self.get_updated_field(field, cred_dict) if not await self.filter_by_field(field, credential): return False field_paths = field_paths + ( await self.restrict_field_paths_one_of_filter( field_paths=field.paths, cred_dict=cred_dict ) ) # Selective Disclosure check if is_limit_disclosure: field_paths = set([path.replace("$.", "") for path in field_paths]) mandatory_paths = { "@context", "type", "issuanceDate", "issuer", "proof", "credentialSubject", "id", } to_remove_from_field_paths = set() nested_field_paths = {"credentialSubject": {"id", "type"}} for field_path in field_paths: if field_path.count(".") >= 1: split_field_path = field_path.split(".") key = ".".join(split_field_path[:-1]) value = split_field_path[-1] nested_field_paths = self.build_nested_paths_dict( key, value, nested_field_paths, cred_dict ) to_remove_from_field_paths.add(field_path) for to_remove_path in to_remove_from_field_paths: field_paths.remove(to_remove_path) field_paths = set.union(mandatory_paths, field_paths) for attrs in cred_dict.keys(): if attrs not in field_paths: return False for nested_attr_key in nested_field_paths: nested_attr_values = nested_field_paths[nested_attr_key] extracted = self.nested_get(cred_dict, nested_attr_key) if isinstance(extracted, dict): if not self.check_attr_in_extracted_dict( extracted, nested_attr_values ): return False elif isinstance(extracted, list): for extracted_dict in extracted: if not self.check_attr_in_extracted_dict( extracted_dict, nested_attr_values ): return False return True
[docs] def check_attr_in_extracted_dict( self, extracted_dict: dict, nested_attr_values: dict ) -> bool: """Check if keys of extracted_dict exists in nested_attr_values.""" for attrs in extracted_dict.keys(): if attrs not in nested_attr_values: return False return True
[docs] def get_dict_keys_from_path(self, derived_cred_dict: dict, path: str) -> List: """Return additional_attrs to build nested_field_paths.""" additional_attrs = [] mandatory_paths = ["@id", "@type"] match_item = self.nested_get(derived_cred_dict, path) if isinstance(match_item, dict): for key in match_item.keys(): if key in mandatory_paths: additional_attrs.append(key) elif isinstance(match_item, list): for key in match_item[0].keys(): if key in mandatory_paths: additional_attrs.append(key) return additional_attrs
[docs] async def get_updated_field(self, field: DIFField, cred: dict) -> DIFField: """Return field with updated json path, if necessary.""" new_paths = [] for path in field.paths: new_paths.append(await self.get_updated_path(cred, path)) field.paths = new_paths return field
[docs] async def get_updated_path(self, cred_dict: dict, json_path: str) -> str: """Return updated json path, if necessary.""" def update_path_recursive_call(path: str, level: int = 0): path_split_array = path.split(".", level) to_check = path_split_array[-1] if "." not in to_check: return path split_by_index = re.split(r"\[(\d+)\]", to_check, 1) if len(split_by_index) > 1: jsonpath = parse(split_by_index[0]) match = jsonpath.find(cred_dict) if len(match) > 0: if isinstance(match[0].value, dict): new_path = split_by_index[0] + split_by_index[2] return update_path_recursive_call(new_path, level + 1) return update_path_recursive_call(path, level + 1) return update_path_recursive_call(json_path)
[docs] def nested_get(self, input_dict: dict, path: str) -> Union[Dict, List]: """Return dict or list from nested dict given list of nested_key.""" jsonpath = parse(path) match = jsonpath.find(input_dict) if len(match) > 1: return_list = [] for match_item in match: return_list.append(match_item.value) return return_list else: return match[0].value
[docs] def build_nested_paths_dict( self, key: str, value: str, nested_field_paths: dict, cred_dict: dict, ) -> dict: """Build and return nested_field_paths dict.""" additional_attrs = self.get_dict_keys_from_path(cred_dict, key) value = re.sub(r"\[(\W+)\]|\[(\d+)\]", "", value) if key in nested_field_paths.keys(): nested_field_paths[key].add(value) else: nested_field_paths[key] = {value} if len(additional_attrs) > 0: for attr in additional_attrs: nested_field_paths[key].add(attr) split_key = key.split(".") if len(split_key) > 1: nested_field_paths.update( self.build_nested_paths_dict( ".".join(split_key[:-1]), split_key[-1], nested_field_paths, cred_dict, ) ) return nested_field_paths