Source code for aries_cloudagent.protocols.coordinate_mediation.mediation_invite_store

"""Storage management for configuration-provided mediation invite.

Handle storage and retrieval of mediation invites provided through arguments.
Enables having the mediation invite config be the same
for `provision` and `starting` commands.
"""

import json
from typing import NamedTuple, Optional

from aries_cloudagent.storage.base import BaseStorage
from aries_cloudagent.storage.error import StorageNotFoundError
from aries_cloudagent.storage.record import StorageRecord


[docs]class MediationInviteRecord(NamedTuple): """A record to store mediation invites and their freshness.""" invite: str used: bool
[docs] def to_json(self) -> str: """:return: The current record serialized into a json string.""" return json.dumps({"invite": self.invite, "used": self.used})
[docs] @staticmethod def from_json(json_invite_record: str) -> "MediationInviteRecord": """:return: a mediation invite record deserialized from a json string.""" return MediationInviteRecord(**json.loads(json_invite_record))
[docs] @staticmethod def unused(invite: str) -> "MediationInviteRecord": """:param invite: invite string as provided by the mediator. :return: An unused mediation invitation for the given invite string """ return MediationInviteRecord(invite, False)
[docs]class NoDefaultMediationInviteException(Exception): """Raised if trying to mark a default invite as used when none exist."""
[docs]class MediationInviteStore: """Store and retrieve mediation invite configuration.""" INVITE_RECORD_CATEGORY = "config" MEDIATION_INVITE_ID = "mediation_invite" def __init__(self, storage: BaseStorage): """:param storage: storage facility to be used to store mediation invitation.""" self.__storage = storage async def __retrieve_record(self, key: str) -> Optional[StorageRecord]: try: return await self.__storage.get_record(self.INVITE_RECORD_CATEGORY, key) except StorageNotFoundError: return None
[docs] async def store( self, mediation_invite: MediationInviteRecord ) -> MediationInviteRecord: """Store the mediator's invite for further use when starting the agent. Update the currently stored invite if one already exists. This assumes a new invite and as such, marks it as unused. :param mediation_invite: mediation invite url :return: stored mediation invite """ current_invite_record = await self.__retrieve_record(self.MEDIATION_INVITE_ID) if current_invite_record is None: await self.__storage.add_record( StorageRecord( type=self.INVITE_RECORD_CATEGORY, id=self.MEDIATION_INVITE_ID, value=mediation_invite.to_json(), ) ) else: await self.__storage.update_record( current_invite_record, mediation_invite.to_json(), tags=current_invite_record.tags, ) return mediation_invite
async def __retrieve(self) -> Optional[MediationInviteRecord]: """:return: the currently stored mediation invite url.""" invite_record = await self.__retrieve_record(self.MEDIATION_INVITE_ID) return ( MediationInviteRecord.from_json(invite_record.value) if invite_record is not None else None ) async def __update_mediation_record( self, provided_mediation_invitation: str ) -> MediationInviteRecord: """Update the stored invitation when a new invitation is provided. Stored value is only updated if `provided_mediation_invitation` has changed. Updated record is marked as unused. :param provided_mediation_invitation: mediation invite provided by user :return: stored mediation invite """ default_invite = await self.__retrieve() if default_invite != provided_mediation_invitation: default_invite = await self.store( MediationInviteRecord.unused(provided_mediation_invitation) ) return default_invite
[docs] async def mark_default_invite_as_used(self): """Mark the currently stored invitation as used if one exists. :raises NoDefaultMediationInviteException: if trying to mark invite as used when there is no invite stored. """ record = await self.__retrieve() if not record: raise NoDefaultMediationInviteException( "No default mediation invite: cannot mark it as used." ) updated_record = MediationInviteRecord(record.invite, used=True) await self.store(updated_record) return updated_record
[docs] async def get_mediation_invite_record( self, provided_mediation_invitation: Optional[str] ) -> Optional[MediationInviteRecord]: """Provide the MediationInviteRecord to use/that was used for mediation. Returned record may have been used already. Stored record is updated if `provided_mediation_invitation` has changed. Updated record is marked as unused. :param provided_mediation_invitation: mediation invite provided by user :return: mediation invite to use/that was used to connect to the mediator. None if no invitation was provided/provisioned. """ stored_invite = await self.__retrieve() if stored_invite is None and provided_mediation_invitation is None: return None elif stored_invite is None and provided_mediation_invitation is not None: return await self.store( MediationInviteRecord.unused(provided_mediation_invitation) ) elif stored_invite is not None and provided_mediation_invitation is None: return stored_invite elif stored_invite is not None and provided_mediation_invitation is not None: return await self.__update_mediation_record(provided_mediation_invitation)