Source code for aries_cloudagent.anoncreds.events

"""Events fired by AnonCreds interface."""

import re
from typing import NamedTuple, Optional

from ..core.event_bus import Event
from .models.anoncreds_revocation import RevRegDef

CRED_DEF_FINISHED_EVENT = "anoncreds::credential-definition::finished"
REV_REG_DEF_FINISHED_EVENT = "anoncreds::revocation-registry-definition::finished"
REV_LIST_FINISHED_EVENT = "anoncreds::revocation-list::finished"

CRED_DEF_FINISHED_PATTERN = re.compile(CRED_DEF_FINISHED_EVENT)
REV_REG_DEF_FINISHED_PATTERN = re.compile(REV_REG_DEF_FINISHED_EVENT)
REV_LIST_FINISHED_PATTERN = re.compile(REV_LIST_FINISHED_EVENT)


[docs]class CredDefFinishedPayload(NamedTuple): """Payload of cred def finished event.""" schema_id: str cred_def_id: str issuer_id: str support_revocation: bool max_cred_num: int options: dict
[docs]class CredDefFinishedEvent(Event): """Event for cred def finished.""" def __init__( self, payload: CredDefFinishedPayload, ): """Initialize an instance. Args: payload: CredDefFinishedPayload TODO: update this docstring - Anoncreds-break. """ self._topic = CRED_DEF_FINISHED_EVENT self._payload = payload
[docs] @classmethod def with_payload( cls, schema_id: str, cred_def_id: str, issuer_id: str, support_revocation: bool, max_cred_num: int, options: Optional[dict] = None, ): """With payload.""" payload = CredDefFinishedPayload( schema_id, cred_def_id, issuer_id, support_revocation, max_cred_num, options ) return cls(payload)
@property def payload(self) -> CredDefFinishedPayload: """Return payload.""" return self._payload
[docs]class RevRegDefFinishedPayload(NamedTuple): """Payload of rev reg def finished event.""" rev_reg_def_id: str rev_reg_def: RevRegDef options: dict
[docs]class RevRegDefFinishedEvent(Event): """Event for rev reg def finished.""" def __init__(self, payload: RevRegDefFinishedPayload): """Initialize an instance. Args: payload: RevRegDefFinishedPayload TODO: update this docstring - Anoncreds-break. """ self._topic = REV_REG_DEF_FINISHED_EVENT self._payload = payload
[docs] @classmethod def with_payload( cls, rev_reg_def_id: str, rev_reg_def: RevRegDef, options: Optional[dict] = None, ): """With payload.""" payload = RevRegDefFinishedPayload(rev_reg_def_id, rev_reg_def, options) return cls(payload)
@property def payload(self) -> RevRegDefFinishedPayload: """Return payload.""" return self._payload
[docs]class RevListFinishedPayload(NamedTuple): """Payload of rev list finished event.""" rev_reg_id: str revoked: list options: dict
[docs]class RevListFinishedEvent(Event): """Event for rev list finished.""" def __init__(self, payload: RevListFinishedPayload): """Initialize an instance. Args: payload: RevListFinishedPayload TODO: update this docstring - Anoncreds-break. """ self._topic = REV_LIST_FINISHED_EVENT self._payload = payload
[docs] @classmethod def with_payload( cls, rev_reg_id: str, revoked: list, options: Optional[dict] = None, ): """With payload.""" payload = RevListFinishedPayload(rev_reg_id, revoked, options) return cls(payload)
@property def payload(self) -> RevListFinishedPayload: """Return payload.""" return self._payload