Source code for aries_cloudagent.revocation.manager

"""Classes to manage credential revocation."""

import json
import logging
from typing import Mapping, Sequence, Text

from ..core.error import BaseError
from ..core.profile import Profile
from ..indy.issuer import IndyIssuer
from ..storage.error import StorageNotFoundError
from .indy import IndyRevocation
from .models.issuer_cred_rev_record import IssuerCredRevRecord
from .models.issuer_rev_reg_record import IssuerRevRegRecord


[docs]class RevocationManagerError(BaseError): """Revocation manager error."""
[docs]class RevocationManager: """Class for managing revocation operations.""" def __init__(self, profile: Profile): """ Initialize a RevocationManager. Args: context: The context for this revocation manager """ self._profile = profile self._logger = logging.getLogger(__name__)
[docs] async def revoke_credential_by_cred_ex_id( self, cred_ex_id: str, publish: bool = False ): """ Revoke a credential by its credential exchange identifier at issue. Optionally, publish the corresponding revocation registry delta to the ledger. Args: cred_ex_id: credential exchange identifier publish: whether to publish the resulting revocation registry delta, along with any revocations pending against it """ try: async with self._profile.session() as session: rec = await IssuerCredRevRecord.retrieve_by_cred_ex_id( session, cred_ex_id, ) except StorageNotFoundError as err: raise RevocationManagerError( "No issuer credential revocation record found for " f"credential exchange id {cred_ex_id}" ) from err return await self.revoke_credential( rev_reg_id=rec.rev_reg_id, cred_rev_id=rec.cred_rev_id, publish=publish )
[docs] async def revoke_credential( self, rev_reg_id: str, cred_rev_id: str, publish: bool = False, ): """ Revoke a credential. Optionally, publish the corresponding revocation registry delta to the ledger. Args: rev_reg_id: revocation registry id cred_rev_id: credential revocation id publish: whether to publish the resulting revocation registry delta, along with any revocations pending against it """ issuer = self._profile.inject(IndyIssuer) revoc = IndyRevocation(self._profile) issuer_rr_rec = await revoc.get_issuer_rev_reg_record(rev_reg_id) if not issuer_rr_rec: raise RevocationManagerError( f"No revocation registry record found for id {rev_reg_id}" ) if publish: rev_reg = await revoc.get_ledger_registry(rev_reg_id) await rev_reg.get_or_fetch_local_tails_path() # pick up pending revocations on input revocation registry crids = list(set(issuer_rr_rec.pending_pub + [cred_rev_id])) (delta_json, _) = await issuer.revoke_credentials( issuer_rr_rec.revoc_reg_id, issuer_rr_rec.tails_local_path, crids ) if delta_json: issuer_rr_rec.revoc_reg_entry = json.loads(delta_json) await issuer_rr_rec.send_entry(self._profile) async with self._profile.session() as session: await issuer_rr_rec.clear_pending(session) else: async with self._profile.session() as session: await issuer_rr_rec.mark_pending(session, cred_rev_id)
[docs] async def publish_pending_revocations( self, rrid2crid: Mapping[Text, Sequence[Text]] = None, write_ledger: bool = True, endorser_did: str = None, ) -> Mapping[Text, Sequence[Text]]: """ Publish pending revocations to the ledger. Args: rrid2crid: Mapping from revocation registry identifiers to all credential revocation identifiers within each to publish. Specify null/empty map for all revocation registries. Specify empty sequence per revocation registry identifier for all pending within the revocation registry; e.g., {} - publish all pending revocations from all revocation registries { "R17v42T4pk...:4:R17v42T4pk...:3:CL:19:tag:CL_ACCUM:0": [], "R17v42T4pk...:4:R17v42T4pk...:3:CL:19:tag:CL_ACCUM:1": ["1", "2"] } - publish: - all pending revocations from all revocation registry tagged 0 - pending ["1", "2"] from revocation registry tagged 1 - no pending revocations from any other revocation registries. write_ledger: wether to write the transaction to the ledger, or prepare a transaction to be endorsed endorser_did: the did of the endorser, if endorsing the transaction Returns: mapping from each revocation registry id to its cred rev ids published. """ result = {} issuer = self._profile.inject(IndyIssuer) txn = await self._profile.transaction() issuer_rr_recs = await IssuerRevRegRecord.query_by_pending(txn) for issuer_rr_rec in issuer_rr_recs: rrid = issuer_rr_rec.revoc_reg_id crids = [] if not rrid2crid: crids = issuer_rr_rec.pending_pub elif rrid in rrid2crid: crids = [ crid for crid in issuer_rr_rec.pending_pub if crid in (rrid2crid[rrid] or []) or not rrid2crid[rrid] ] if crids: # FIXME - must use the same transaction (delta_json, failed_crids) = await issuer.revoke_credentials( issuer_rr_rec.revoc_reg_id, issuer_rr_rec.tails_local_path, crids, ) issuer_rr_rec.revoc_reg_entry = json.loads(delta_json) send_entry_result = await issuer_rr_rec.send_entry( self._profile, write_ledger=write_ledger, endorser_did=endorser_did ) if endorser_did and not write_ledger: return send_entry_result published = [crid for crid in crids if crid not in failed_crids] result[issuer_rr_rec.revoc_reg_id] = published await issuer_rr_rec.clear_pending(txn, published) await txn.commit() return result
[docs] async def clear_pending_revocations( self, purge: Mapping[Text, Sequence[Text]] = None ) -> Mapping[Text, Sequence[Text]]: """ Clear pending revocation publications. Args: purge: Mapping from revocation registry identifiers to all credential revocation identifiers within each to clear. Specify null/empty map for all revocation registries. Specify empty sequence per revocation registry identifier for all pending within the revocation registry; e.g., {} - clear all pending revocations from all revocation registries { "R17v42T4pk...:4:R17v42T4pk...:3:CL:19:tag:CL_ACCUM:0": [], "R17v42T4pk...:4:R17v42T4pk...:3:CL:19:tag:CL_ACCUM:1": ["1", "2"] } - clear - all pending revocations from all revocation registry tagged 0 - pending ["1", "2"] from revocation registry tagged 1 - no pending revocations from any other revocation registries. Returns: mapping from revocation registry id to its remaining cred rev ids still marked pending, omitting revocation registries with no remaining pending publications. """ result = {} async with self._profile.transaction() as txn: issuer_rr_recs = await IssuerRevRegRecord.query_by_pending(txn) for issuer_rr_rec in issuer_rr_recs: rrid = issuer_rr_rec.revoc_reg_id await issuer_rr_rec.clear_pending(txn, (purge or {}).get(rrid)) if issuer_rr_rec.pending_pub: result[rrid] = issuer_rr_rec.pending_pub await txn.commit() return result