Source code for aries_cloudagent.anoncreds.holder

"""Indy holder implementation."""

import asyncio
import json
import logging
import re
import uuid
from typing import Dict, Optional, Sequence, Tuple, Union

from anoncreds import (
    AnoncredsError,
    Credential,
    CredentialRequest,
    CredentialRevocationState,
    Presentation,
    PresentCredentials,
    create_link_secret,
)
from aries_askar import AskarError, AskarErrorCode

from ..anoncreds.models.anoncreds_schema import AnonCredsSchema
from ..askar.profile_anon import AskarAnoncredsProfile
from ..core.error import BaseError
from ..core.profile import Profile
from ..ledger.base import BaseLedger
from ..wallet.error import WalletNotFoundError
from .error_messages import ANONCREDS_PROFILE_REQUIRED_MSG
from .models.anoncreds_cred_def import CredDef

LOGGER = logging.getLogger(__name__)

CATEGORY_CREDENTIAL = "credential"
CATEGORY_MASTER_SECRET = "master_secret"


def _make_cred_info(cred_id, cred: Credential):
    cred_info = cred.to_dict()  # not secure!
    rev_info = cred_info["signature"]["r_credential"]
    return {
        "referent": cred_id,
        "schema_id": cred_info["schema_id"],
        "cred_def_id": cred_info["cred_def_id"],
        "rev_reg_id": cred_info["rev_reg_id"],
        "cred_rev_id": str(rev_info["i"]) if rev_info else None,
        "attrs": {name: val["raw"] for (name, val) in cred_info["values"].items()},
    }


def _normalize_attr_name(name: str) -> str:
    return name.replace(" ", "")


[docs]class AnonCredsHolderError(BaseError): """Base class for holder exceptions."""
[docs]class AnonCredsHolder: """AnonCreds holder class.""" MASTER_SECRET_ID = "default" RECORD_TYPE_MIME_TYPES = "attribute-mime-types" def __init__(self, profile: Profile): """Initialize an AnonCredsHolder instance. Args: profile: The active profile instance """ self._profile = profile @property def profile(self) -> AskarAnoncredsProfile: """Accessor for the profile instance.""" if not isinstance(self._profile, AskarAnoncredsProfile): raise ValueError(ANONCREDS_PROFILE_REQUIRED_MSG) return self._profile
[docs] async def get_master_secret(self) -> str: """Get or create the default master secret.""" while True: async with self.profile.session() as session: try: record = await session.handle.fetch( CATEGORY_MASTER_SECRET, AnonCredsHolder.MASTER_SECRET_ID ) except AskarError as err: raise AnonCredsHolderError("Error fetching master secret") from err if record: try: # TODO should be able to use raw_value but memoryview # isn't accepted by cred.process secret = record.value.decode("ascii") except AnoncredsError as err: raise AnonCredsHolderError( "Error loading master secret" ) from err break else: try: secret = create_link_secret() except AnoncredsError as err: raise AnonCredsHolderError( "Error creating master secret" ) from err try: await session.handle.insert( CATEGORY_MASTER_SECRET, AnonCredsHolder.MASTER_SECRET_ID, secret, ) except AskarError as err: if err.code != AskarErrorCode.DUPLICATE: raise AnonCredsHolderError( "Error saving master secret" ) from err # else: lost race to create record, retry else: break return secret
[docs] async def create_credential_request( self, credential_offer: dict, credential_definition: CredDef, holder_did: str ) -> Tuple[str, str]: """Create a credential request for the given credential offer. Args: credential_offer: The credential offer to create request for credential_definition: The credential definition to create an offer for holder_did: the DID of the agent making the request (may not be a real DID) Returns: A tuple of the credential request and credential request metadata """ try: secret = await self.get_master_secret() ( cred_req, cred_req_metadata, ) = await asyncio.get_event_loop().run_in_executor( None, CredentialRequest.create, None, holder_did, credential_definition.to_native(), secret, AnonCredsHolder.MASTER_SECRET_ID, credential_offer, ) except AnoncredsError as err: raise AnonCredsHolderError("Error creating credential request") from err cred_req_json, cred_req_metadata_json = ( cred_req.to_json(), cred_req_metadata.to_json(), ) LOGGER.debug( "Created credential request. " "credential_request_json=%s credential_request_metadata_json=%s", cred_req_json, cred_req_metadata_json, ) return cred_req_json, cred_req_metadata_json
[docs] async def store_credential( self, credential_definition: dict, credential_data: dict, credential_request_metadata: dict, credential_attr_mime_types: dict = None, credential_id: str = None, rev_reg_def: dict = None, ) -> str: """Store a credential in the wallet. Args: credential_definition: Credential definition for this credential credential_data: Credential data generated by the issuer credential_request_metadata: credential request metadata generated by the issuer credential_attr_mime_types: dict mapping attribute names to (optional) MIME types to store as non-secret record, if specified credential_id: optionally override the stored credential id rev_reg_def: revocation registry definition in json Returns: the ID of the stored credential """ try: secret = await self.get_master_secret() cred = Credential.load(credential_data) cred_recvd = await asyncio.get_event_loop().run_in_executor( None, cred.process, credential_request_metadata, secret, credential_definition, rev_reg_def, ) except AnoncredsError as err: raise AnonCredsHolderError("Error processing received credential") from err schema_id = cred_recvd.schema_id schema_id_parts = re.match(r"^(\w+):2:([^:]+):([^:]+)$", schema_id) if not schema_id_parts: raise AnonCredsHolderError( f"Error parsing credential schema ID: {schema_id}" ) cred_def_id = cred_recvd.cred_def_id cdef_id_parts = re.match(r"^(\w+):3:CL:([^:]+):([^:]+)$", cred_def_id) if not cdef_id_parts: raise AnonCredsHolderError( f"Error parsing credential definition ID: {cred_def_id}" ) credential_id = credential_id or str(uuid.uuid4()) tags = { "schema_id": schema_id, "schema_issuer_did": schema_id_parts[1], "schema_name": schema_id_parts[2], "schema_version": schema_id_parts[3], "issuer_did": cdef_id_parts[1], "cred_def_id": cred_def_id, "rev_reg_id": cred_recvd.rev_reg_id or "None", } # FIXME - sdk has some special handling for fully qualified DIDs here mime_types = {} for k, attr_value in credential_data["values"].items(): attr_name = _normalize_attr_name(k) # tags[f"attr::{attr_name}::marker"] = "1" tags[f"attr::{attr_name}::value"] = attr_value["raw"] if credential_attr_mime_types and k in credential_attr_mime_types: mime_types[k] = credential_attr_mime_types[k] try: async with self.profile.transaction() as txn: await txn.handle.insert( CATEGORY_CREDENTIAL, credential_id, cred_recvd.to_json_buffer(), tags=tags, ) if mime_types: await txn.handle.insert( AnonCredsHolder.RECORD_TYPE_MIME_TYPES, credential_id, value_json=mime_types, ) await txn.commit() except AskarError as err: raise AnonCredsHolderError("Error storing credential") from err return credential_id
[docs] async def get_credentials(self, start: int, count: int, wql: dict): """Get credentials stored in the wallet. Args: start: Starting index count: Number of records to return wql: wql query dict """ result = [] try: rows = self.profile.store.scan( CATEGORY_CREDENTIAL, wql, start, count, self.profile.settings.get("wallet.askar_profile"), ) async for row in rows: cred = Credential.load(row.raw_value) result.append(_make_cred_info(row.name, cred)) except AskarError as err: raise AnonCredsHolderError("Error retrieving credentials") from err except AnoncredsError as err: raise AnonCredsHolderError("Error loading stored credential") from err return result
[docs] async def get_credentials_for_presentation_request_by_referent( self, presentation_request: dict, referents: Sequence[str], start: int, count: int, extra_query: Optional[dict] = None, ): """Get credentials stored in the wallet. Args: presentation_request: Valid presentation request from issuer referents: Presentation request referents to use to search for creds start: Starting index count: Maximum number of records to return extra_query: wql query dict """ if not referents: referents = ( *presentation_request["requested_attributes"], *presentation_request["requested_predicates"], ) extra_query = extra_query or {} creds = {} for reft in referents: names = set() if reft in presentation_request["requested_attributes"]: attr = presentation_request["requested_attributes"][reft] if "name" in attr: names.add(_normalize_attr_name(attr["name"])) elif "names" in attr: names.update(_normalize_attr_name(name) for name in attr["names"]) # for name in names: # tag_filter[f"attr::{_normalize_attr_name(name)}::marker"] = "1" restr = attr.get("restrictions") elif reft in presentation_request["requested_predicates"]: pred = presentation_request["requested_predicates"][reft] if "name" in pred: names.add(_normalize_attr_name(pred["name"])) # tag_filter[f"attr::{_normalize_attr_name(name)}::marker"] = "1" restr = pred.get("restrictions") else: raise AnonCredsHolderError( f"Unknown presentation request referent: {reft}" ) tag_filter = {"$exist": [f"attr::{name}::value" for name in names]} if restr: # FIXME check if restr is a list or dict? validate WQL format tag_filter = {"$and": [tag_filter] + restr} if extra_query: tag_filter = {"$and": [tag_filter, extra_query]} rows = self.profile.store.scan( CATEGORY_CREDENTIAL, tag_filter, start, count, self.profile.settings.get("wallet.askar_profile"), ) async for row in rows: if row.name in creds: creds[row.name]["presentation_referents"].add(reft) else: cred_info = _make_cred_info( row.name, Credential.load(row.raw_value) ) creds[row.name] = { "cred_info": cred_info, "interval": presentation_request.get("non_revoked"), "presentation_referents": {reft}, } for cred in creds.values(): cred["presentation_referents"] = list(cred["presentation_referents"]) return list(creds.values())
[docs] async def get_credential(self, credential_id: str) -> str: """Get a credential stored in the wallet. Args: credential_id: Credential id to retrieve """ cred = await self._get_credential(credential_id) return json.dumps(_make_cred_info(credential_id, cred))
async def _get_credential(self, credential_id: str) -> Credential: """Get an unencoded Credential instance from the store.""" try: async with self.profile.session() as session: cred = await session.handle.fetch(CATEGORY_CREDENTIAL, credential_id) except AskarError as err: raise AnonCredsHolderError("Error retrieving credential") from err if not cred: raise WalletNotFoundError( f"Credential {credential_id} not found in wallet {self.profile.name}" ) try: return Credential.load(cred.raw_value) except AnoncredsError as err: raise AnonCredsHolderError("Error loading requested credential") from err
[docs] async def credential_revoked( self, ledger: BaseLedger, credential_id: str, fro: int = None, to: int = None ) -> bool: """Check ledger for revocation status of credential by cred id. Args: credential_id: Credential id to check """ cred = await self._get_credential(credential_id) rev_reg_id = cred.rev_reg_id # TODO Use anoncreds registry # check if cred.rev_reg_id is returning None or 'None' if rev_reg_id: cred_rev_id = cred.rev_reg_index (rev_reg_delta, _) = await ledger.get_revoc_reg_delta( rev_reg_id, fro, to, ) return cred_rev_id in rev_reg_delta["value"].get("revoked", []) else: return False
[docs] async def delete_credential(self, credential_id: str): """Remove a credential stored in the wallet. Args: credential_id: Credential id to remove """ try: async with self.profile.session() as session: await session.handle.remove(CATEGORY_CREDENTIAL, credential_id) await session.handle.remove( AnonCredsHolder.RECORD_TYPE_MIME_TYPES, credential_id ) except AskarError as err: if err.code == AskarErrorCode.NOT_FOUND: pass else: raise AnonCredsHolderError("Error deleting credential") from err
[docs] async def get_mime_type( self, credential_id: str, attr: str = None ) -> Union[dict, str]: """Get MIME type per attribute (or for all attributes). Args: credential_id: credential id attr: attribute of interest or omit for all Returns: Attribute MIME type or dict mapping attribute names to MIME types attr_meta_json = all_meta.tags.get(attr) """ try: async with self.profile.session() as session: mime_types_record = await session.handle.fetch( AnonCredsHolder.RECORD_TYPE_MIME_TYPES, credential_id, ) except AskarError as err: raise AnonCredsHolderError( "Error retrieving credential mime types" ) from err values = mime_types_record and mime_types_record.value_json if values: return values.get(attr) if attr else values
[docs] async def create_presentation( self, presentation_request: dict, requested_credentials: dict, schemas: Dict[str, AnonCredsSchema], credential_definitions: Dict[str, CredDef], rev_states: dict = None, ) -> str: """Get credentials stored in the wallet. Args: presentation_request: Valid indy format presentation request requested_credentials: Indy format requested credentials schemas: Indy formatted schemas JSON credential_definitions: Indy formatted credential definitions JSON rev_states: Indy format revocation states JSON """ creds: Dict[str, Credential] = {} def get_rev_state(cred_id: str, detail: dict): cred = creds[cred_id] rev_reg_id = cred.rev_reg_id timestamp = detail.get("timestamp") if rev_reg_id else None rev_state = None if timestamp: if not rev_states or rev_reg_id not in rev_states: raise AnonCredsHolderError( f"No revocation states provided for credential '{cred_id}' " f"with rev_reg_id '{rev_reg_id}'" ) rev_state = rev_states[rev_reg_id].get(timestamp) if not rev_state: raise AnonCredsHolderError( f"No revocation states provided for credential '{cred_id}' " f"with rev_reg_id '{rev_reg_id}' at timestamp {timestamp}" ) return timestamp, rev_state self_attest = requested_credentials.get("self_attested_attributes") or {} present_creds = PresentCredentials() req_attrs = requested_credentials.get("requested_attributes") or {} for reft, detail in req_attrs.items(): cred_id = detail["cred_id"] if cred_id not in creds: # NOTE: could be optimized if multiple creds are requested creds[cred_id] = await self._get_credential(cred_id) timestamp, rev_state = get_rev_state(cred_id, detail) present_creds.add_attributes( creds[cred_id], reft, reveal=detail["revealed"], timestamp=timestamp, rev_state=rev_state, ) req_preds = requested_credentials.get("requested_predicates") or {} for reft, detail in req_preds.items(): cred_id = detail["cred_id"] if cred_id not in creds: # NOTE: could be optimized if multiple creds are requested creds[cred_id] = await self._get_credential(cred_id) timestamp, rev_state = get_rev_state(cred_id, detail) present_creds.add_predicates( creds[cred_id], reft, timestamp=timestamp, rev_state=rev_state, ) try: secret = await self.get_master_secret() presentation = await asyncio.get_event_loop().run_in_executor( None, Presentation.create, presentation_request, present_creds, self_attest, secret, { schema_id: schema.to_native() for schema_id, schema in schemas.items() }, { cred_def_id: cred_def.to_native() for cred_def_id, cred_def in credential_definitions.items() }, ) except AnoncredsError as err: raise AnonCredsHolderError("Error creating presentation") from err return presentation.to_json()
[docs] async def create_revocation_state( self, cred_rev_id: str, rev_reg_def: dict, rev_list: dict, tails_file_path: str, ) -> str: """Create current revocation state for a received credential. Args: cred_rev_id: credential revocation id in revocation registry rev_reg_def: revocation registry definition rev_reg_delta: revocation delta timestamp: delta timestamp Returns: the revocation state """ try: rev_state = await asyncio.get_event_loop().run_in_executor( None, CredentialRevocationState.create, rev_reg_def, rev_list, int(cred_rev_id), tails_file_path, ) except AnoncredsError as err: raise AnonCredsHolderError("Error creating revocation state") from err return rev_state.to_json()