Source code for acapy_agent.revocation.models.revocation_registry

"""Classes for managing a revocation registry."""

import hashlib
import http
import logging
import os
import re
from os.path import join
from pathlib import Path
from typing import Optional

import base58
from requests import Session
from requests.exceptions import RequestException

from ...indy.util import indy_client_dir
from ..error import RevocationError

LOGGER = logging.getLogger(__name__)


[docs] class RevocationRegistry: """Manage a revocation registry and tails file.""" MIN_SIZE = 4 MAX_SIZE = 32768 def __init__( self, registry_id: Optional[str] = None, *, cred_def_id: Optional[str] = None, issuer_did: Optional[str] = None, max_creds: Optional[int] = None, reg_def_type: Optional[str] = None, tag: Optional[str] = None, tails_local_path: Optional[str] = None, tails_public_uri: Optional[str] = None, tails_hash: Optional[str] = None, reg_def: Optional[dict] = None, ): """Initialize the revocation registry instance.""" self._cred_def_id = cred_def_id self._issuer_did = issuer_did self._max_creds = max_creds self._reg_def_type = reg_def_type self._registry_id = registry_id self._tag = tag self._tails_local_path = tails_local_path self._tails_public_uri = tails_public_uri self._tails_hash = tails_hash self._reg_def = reg_def
[docs] @classmethod def from_definition( cls, revoc_reg_def: dict, public_def: bool ) -> "RevocationRegistry": """Initialize a revocation registry instance from a definition.""" rev_reg = None reg_id = revoc_reg_def["id"] tails_location = revoc_reg_def["value"]["tailsLocation"] issuer_did_match = re.match(r"^.*?([^:]*):3:CL:.*", revoc_reg_def["credDefId"]) issuer_did = issuer_did_match.group(1) if issuer_did_match else None init = { "cred_def_id": revoc_reg_def["credDefId"], "issuer_did": issuer_did, "reg_def_type": revoc_reg_def["revocDefType"], "max_creds": revoc_reg_def["value"]["maxCredNum"], "tag": revoc_reg_def["tag"], "tails_hash": revoc_reg_def["value"]["tailsHash"], "reg_def": revoc_reg_def, } if public_def: init["tails_public_uri"] = tails_location rev_reg = cls(reg_id, **init) # ignores def ver, issuance type, public keys rev_reg.tails_local_path = rev_reg.get_receiving_tails_local_path() else: init["tails_local_path"] = tails_location rev_reg = cls(reg_id, **init) # ignores def ver, issuance type, public keys return rev_reg
@property def cred_def_id(self) -> str: """Accessor for the credential definition ID.""" return self._cred_def_id @property def issuer_did(self) -> str: """Accessor for the issuer DID.""" return self._issuer_did @property def max_creds(self) -> int: """Accessor for the maximum number of issued credentials.""" return self._max_creds @property def reg_def_type(self) -> str: """Accessor for the revocation registry type.""" return self._reg_def_type @property def reg_def(self) -> dict: """Accessor for the revocation registry definition.""" return self._reg_def @property def registry_id(self) -> str: """Accessor for the revocation registry ID.""" return self._registry_id @property def tag(self) -> str: """Accessor for the tag part of the revoc. reg. ID.""" return self._tag @property def tails_hash(self) -> str: """Accessor for the tails file hash.""" return self._tails_hash @property def tails_local_path(self) -> str: """Accessor for the tails file local path.""" return self._tails_local_path @tails_local_path.setter def tails_local_path(self, new_path: str): """Setter for the tails file local path.""" self._tails_local_path = new_path @property def tails_public_uri(self) -> str: """Accessor for the tails file public URI.""" return self._tails_public_uri @tails_public_uri.setter def tails_public_uri(self, new_uri: str): """Setter for the tails file public URI.""" self._tails_public_uri = new_uri
[docs] def get_receiving_tails_local_path(self): """Make the local path to the tails file we download from remote URI.""" if self._tails_local_path: return self._tails_local_path tails_dir = indy_client_dir(join("tails", self.registry_id), create=False) return join(tails_dir, self._tails_hash)
[docs] def has_local_tails_file(self) -> bool: """Test if the tails file exists locally.""" tails_file_path = Path(self.get_receiving_tails_local_path()) return tails_file_path.is_file()
[docs] async def retrieve_tails(self): """Fetch the tails file from the public URI.""" if not self._tails_public_uri: raise RevocationError("Tails file public URI is empty") LOGGER.info( "Downloading the tails file for the revocation registry: %s", self.registry_id, ) tails_file_path = Path(self.get_receiving_tails_local_path()) tails_file_dir = tails_file_path.parent if not tails_file_dir.exists(): tails_file_dir.mkdir(parents=True) buffer_size = 65536 # should be multiple of 32 bytes for sha256 file_hasher = hashlib.sha256() with open(tails_file_path, "wb", buffer_size) as tails_file: with Session() as req_session: try: resp = req_session.get(self._tails_public_uri, stream=True) # Should this directly raise an Error? if resp.status_code != http.HTTPStatus.OK: LOGGER.warning( f"Unexpected status code for tails file: {resp.status_code}" ) for buf in resp.iter_content(chunk_size=buffer_size): tails_file.write(buf) file_hasher.update(buf) except RequestException as rx: raise RevocationError(f"Error retrieving tails file: {rx}") download_tails_hash = base58.b58encode(file_hasher.digest()).decode("utf-8") if download_tails_hash != self.tails_hash: try: os.remove(tails_file_path) tails_file_dir.rmdir() except OSError as err: LOGGER.warning(f"Could not delete invalid tails file: {err}") raise RevocationError("The hash of the downloaded tails file does not match.") self.tails_local_path = str(tails_file_path) return self.tails_local_path
[docs] async def get_or_fetch_local_tails_path(self): """Get the local tails path, retrieving from the remote if necessary.""" tails_file_path = self.get_receiving_tails_local_path() if Path(tails_file_path).is_file(): return tails_file_path return await self.retrieve_tails()
def __repr__(self) -> str: """Return a human readable representation of this class.""" items = ("{}={}".format(k, repr(v)) for k, v in self.__dict__.items()) return "<{}({})>".format(self.__class__.__name__, ", ".join(items))