Source code for aries_cloudagent.protocols.revocation_notification.v2_0.routes

"""Routes for revocation notification."""

import logging
import re

from ....core.event_bus import Event, EventBus
from ....core.profile import Profile
from ....messaging.responder import BaseResponder
from ....revocation.util import (
    REVOCATION_CLEAR_PENDING_EVENT,
    REVOCATION_EVENT_PREFIX,
    REVOCATION_PUBLISHED_EVENT,
)
from ....storage.error import StorageError, StorageNotFoundError
from .models.rev_notification_record import RevNotificationRecord

LOGGER = logging.getLogger(__name__)


[docs]def register_events(event_bus: EventBus): """Register to handle events.""" event_bus.subscribe( re.compile(f"^{REVOCATION_EVENT_PREFIX}{REVOCATION_PUBLISHED_EVENT}.*"), on_revocation_published, ) event_bus.subscribe( re.compile(f"^{REVOCATION_EVENT_PREFIX}{REVOCATION_CLEAR_PENDING_EVENT}.*"), on_pending_cleared, )
[docs]async def on_revocation_published(profile: Profile, event: Event): """Handle issuer revoke event.""" LOGGER.debug("Received notification of revocation publication: %s", event.payload) should_notify = profile.settings.get("revocation.notify", False) responder = profile.inject(BaseResponder) crids = event.payload.get("crids") or [] # Allow for crids to be integers if crids and isinstance(crids[0], int): crids = [str(crid) for crid in crids] try: async with profile.session() as session: records = await RevNotificationRecord.query_by_rev_reg_id( session, rev_reg_id=event.payload["rev_reg_id"], ) records = [record for record in records if record.cred_rev_id in crids] for record in records: await record.delete_record(session) if should_notify: await responder.send( record.to_message(), connection_id=record.connection_id ) LOGGER.info( "Sent revocation notification for credential to %s", record.connection_id, ) except StorageNotFoundError: LOGGER.info( "No revocation notification record found for revoked credential; " "no notification will be sent" ) except StorageError: LOGGER.exception("Failed to retrieve revocation notification record")
[docs]async def on_pending_cleared(profile: Profile, event: Event): """Handle pending cleared event.""" # Query by rev reg ID async with profile.session() as session: notifications = await RevNotificationRecord.query_by_rev_reg_id( session, event.payload["rev_reg_id"] ) # Delete async with profile.transaction() as txn: for notification in notifications: await notification.delete_record(txn) await txn.commit()