Source code for aries_cloudagent.protocols.introduction.demo_service

"""Introduction service demo classes."""

import json
import logging

from ...connections.models.connection_record import ConnectionRecord
from ...storage.base import BaseStorage, StorageRecord, StorageNotFoundError

from .base_service import BaseIntroductionService, IntroductionError
from .messages.forward_invitation import ForwardInvitation
from .messages.invitation import Invitation
from .messages.invitation_request import InvitationRequest

LOGGER = logging.getLogger(__name__)


[docs]class DemoIntroductionService(BaseIntroductionService): """Service handler for allowing connections to exchange invitations.""" RECORD_TYPE = "introduction_record"
[docs] async def start_introduction( self, init_connection_id: str, target_connection_id: str, message: str, outbound_handler, ): """ Start the introduction process between two connections. Args: init_connection_id: The connection initiating the request target_connection_id: The connection which is asked for an invitation outbound_handler: The outbound handler coroutine for sending a message message: The message to use when requesting the invitation """ try: init_connection = await ConnectionRecord.retrieve_by_id( self._context, init_connection_id ) except StorageNotFoundError: raise IntroductionError("Initiator connection not found") if init_connection.state != "active": raise IntroductionError("Initiator connection is not active") try: target_connection = await ConnectionRecord.retrieve_by_id( self._context, target_connection_id ) except StorageNotFoundError: raise IntroductionError("Target connection not found") if target_connection.state != "active": raise IntroductionError("Target connection is not active") msg = InvitationRequest(responder=init_connection.their_label, message=message) record = StorageRecord( type=DemoIntroductionService.RECORD_TYPE, value=json.dumps({"thread_id": msg._id, "state": "pending"}), tags={ "init_connection_id": init_connection_id, "target_connection_id": target_connection_id, }, ) storage: BaseStorage = await self._context.inject(BaseStorage) await storage.add_record(record) await outbound_handler(msg, connection_id=target_connection_id)
[docs] async def return_invitation( self, target_connection_id: str, invitation: Invitation, outbound_handler ): """ Handle the forwarding of an invitation to the responder. Args: target_connection_id: The ID of the connection sending the Invitation invitation: The received Invitation message outbound_handler: The outbound handler coroutine for sending a message """ thread_id = invitation._thread_id tag_filter = {"target_connection_id": target_connection_id} storage: BaseStorage = await self._context.inject(BaseStorage) records = await storage.search_records( DemoIntroductionService.RECORD_TYPE, tag_filter, ).fetch_all() found = False for row in records: value = json.loads(row.value) if value["thread_id"] == thread_id and value["state"] == "pending": msg = ForwardInvitation( invitation=invitation.invitation, message=invitation.message ) msg.assign_thread_from(invitation) value["state"] = "complete" await storage.update_record_value(row, json.dumps(value)) init_connection_id = row.tags["init_connection_id"] await outbound_handler(msg, connection_id=init_connection_id) found = True LOGGER.info("Forwarded invitation to %s", init_connection_id) break if not found: LOGGER.error("Could not forward invitation, no pending introduction found")