Source code for aries_cloudagent.protocols.discovery.v1_0.manager

"""Classes to manage discover features."""

import asyncio
import logging

from typing import Optional

from ....core.error import BaseError
from ....core.profile import Profile
from ....core.protocol_registry import ProtocolRegistry
from ....storage.error import StorageNotFoundError
from ....messaging.responder import BaseResponder

from .messages.disclose import Disclose
from .messages.query import Query
from .models.discovery_record import V10DiscoveryExchangeRecord


[docs]class V10DiscoveryMgrError(BaseError): """Discover feature v1_0 error."""
[docs]class V10DiscoveryMgr: """Class for discover feature v1_0 under RFC 31.""" def __init__(self, profile: Profile): """ Initialize a V10DiscoveryMgr. Args: profile: The profile for this manager """ self._profile = profile self._logger = logging.getLogger(__name__) @property def profile(self) -> Profile: """ Accessor for the current Profile. Returns: The Profile for this manager """ return self._profile
[docs] async def receive_disclose( self, disclose_msg: Disclose, connection_id: str ) -> V10DiscoveryExchangeRecord: """Receive Disclose message and return updated V10DiscoveryExchangeRecord.""" if disclose_msg._thread: thread_id = disclose_msg._thread.thid try: async with self._profile.session() as session: discover_exch_rec = await V10DiscoveryExchangeRecord.retrieve_by_id( session=session, record_id=thread_id ) except StorageNotFoundError: discover_exch_rec = await self.lookup_exchange_rec_by_connection( connection_id ) if not discover_exch_rec: discover_exch_rec = V10DiscoveryExchangeRecord() else: discover_exch_rec = await self.lookup_exchange_rec_by_connection( connection_id ) if not discover_exch_rec: discover_exch_rec = V10DiscoveryExchangeRecord() async with self._profile.session() as session: discover_exch_rec.connection_id = connection_id discover_exch_rec.disclose = disclose_msg await discover_exch_rec.save(session) return discover_exch_rec
[docs] async def lookup_exchange_rec_by_connection( self, connection_id: str ) -> Optional[V10DiscoveryExchangeRecord]: """Retrieve V20DiscoveryExchangeRecord by connection_id.""" async with self._profile.session() as session: if await V10DiscoveryExchangeRecord.exists_for_connection_id( session=session, connection_id=connection_id ): return await V10DiscoveryExchangeRecord.retrieve_by_connection_id( session=session, connection_id=connection_id ) else: return None
[docs] async def receive_query(self, query_msg: Query) -> Disclose: """Process query and return the corresponding disclose message.""" registry = self._profile.context.inject_or(ProtocolRegistry) query_str = query_msg.query published_results = [] protocols = registry.protocols_matching_query(query_str) results = await registry.prepare_disclosed(self._profile.context, protocols) async with self._profile.session() as session: to_publish_protocols = None if ( session.settings.get("disclose_protocol_list") and len(session.settings.get("disclose_protocol_list")) > 0 ): to_publish_protocols = session.settings.get("disclose_protocol_list") for result in results: to_publish_result = {} if "pid" in result: if ( to_publish_protocols and result.get("pid") not in to_publish_protocols ): continue to_publish_result["pid"] = result.get("pid") else: continue if "roles" in result: to_publish_result["roles"] = result.get("roles") published_results.append(to_publish_result) disclose_msg = Disclose(protocols=published_results) # Check if query message has a thid # If disclosing this agents feature if query_msg._thread: disclose_msg.assign_thread_id(query_msg._thread.thid) return disclose_msg
[docs] async def check_if_disclosure_received( self, record_id: str ) -> V10DiscoveryExchangeRecord: """Check if disclosures has been received.""" while True: async with self._profile.session() as session: ex_rec = await V10DiscoveryExchangeRecord.retrieve_by_id( session=session, record_id=record_id ) if ex_rec.disclose: return ex_rec await asyncio.sleep(0.5)
[docs] async def create_and_send_query( self, query: str, comment: str = None, connection_id: str = None ) -> V10DiscoveryExchangeRecord: """Create and send a Query message.""" query_msg = Query(query=query, comment=comment) if connection_id: async with self._profile.session() as session: # If existing record exists for a connection_id if await V10DiscoveryExchangeRecord.exists_for_connection_id( session=session, connection_id=connection_id ): discovery_ex_rec = ( await V10DiscoveryExchangeRecord.retrieve_by_connection_id( session=session, connection_id=connection_id ) ) discovery_ex_rec.disclose = None await discovery_ex_rec.save(session) else: discovery_ex_rec = V10DiscoveryExchangeRecord() discovery_ex_rec.query_msg = query_msg discovery_ex_rec.connection_id = connection_id await discovery_ex_rec.save(session) query_msg.assign_thread_id(discovery_ex_rec.discovery_exchange_id) responder = self._profile.inject_or(BaseResponder) if responder: await responder.send(query_msg, connection_id=connection_id) else: self._logger.exception( "Unable to send discover-features v1 query message" ": BaseResponder unavailable" ) try: return await asyncio.wait_for( self.check_if_disclosure_received( record_id=discovery_ex_rec.discovery_exchange_id, ), 5, ) except asyncio.TimeoutError: return discovery_ex_rec else: # Disclose this agent's features and/or goal codes discovery_ex_rec = V10DiscoveryExchangeRecord() discovery_ex_rec.query_msg = query_msg disclose = await self.receive_query(query_msg=query_msg) discovery_ex_rec.disclose = disclose return discovery_ex_rec