Source code for aries_cloudagent.protocols.discovery.v2_0.manager

"""Classes to manage discover features."""

import asyncio
import logging

from typing import Tuple, Optional, Sequence

from ....core.error import BaseError
from ....core.profile import Profile
from ....core.protocol_registry import ProtocolRegistry
from ....core.goal_code_registry import GoalCodeRegistry
from ....storage.error import StorageNotFoundError
from ....messaging.responder import BaseResponder

from .messages.disclosures import Disclosures
from .messages.queries import QueryItem, Queries
from .models.discovery_record import V20DiscoveryExchangeRecord


[docs]class V20DiscoveryMgrError(BaseError): """Discover feature v2_0 error."""
[docs]class V20DiscoveryMgr: """Class for discover feature v1_0 under RFC 31.""" def __init__(self, profile: Profile): """ Initialize a V20DiscoveryMgr. Args: profile: The profile for this manager """ self._profile = profile self._logger = logging.getLogger(__name__) @property def profile(self) -> Profile: """ Accessor for the current Profile. Returns: The Profile for this manager """ return self._profile
[docs] async def receive_disclose( self, disclose_msg: Disclosures, connection_id: str = None ) -> V20DiscoveryExchangeRecord: """Receive Disclose message and return updated V20DiscoveryExchangeRecord.""" if disclose_msg._thread: thread_id = disclose_msg._thread.thid try: async with self._profile.session() as session: discover_exch_rec = await V20DiscoveryExchangeRecord.retrieve_by_id( session=session, record_id=thread_id ) except StorageNotFoundError: discover_exch_rec = await self.lookup_exchange_rec_by_connection( connection_id ) if not discover_exch_rec: discover_exch_rec = V20DiscoveryExchangeRecord() else: discover_exch_rec = await self.lookup_exchange_rec_by_connection( connection_id ) if not discover_exch_rec: discover_exch_rec = V20DiscoveryExchangeRecord() async with self._profile.session() as session: discover_exch_rec.disclosures = disclose_msg discover_exch_rec.connection_id = connection_id await discover_exch_rec.save(session) return discover_exch_rec
[docs] async def lookup_exchange_rec_by_connection( self, connection_id: str ) -> Optional[V20DiscoveryExchangeRecord]: """Retrieve V20DiscoveryExchangeRecord by connection_id.""" async with self._profile.session() as session: if await V20DiscoveryExchangeRecord.exists_for_connection_id( session=session, connection_id=connection_id ): return await V20DiscoveryExchangeRecord.retrieve_by_connection_id( session=session, connection_id=connection_id ) else: return None
[docs] async def proactive_disclose_features(self, connection_id: str): """Proactively dislose features on active connection setup.""" queries_msg = Queries( queries=[ QueryItem(feature_type="protocol", match="*"), QueryItem(feature_type="goal-code", match="*"), ] ) disclosures = await self.receive_query(queries_msg=queries_msg) responder = self.profile.inject_or(BaseResponder) if responder: await responder.send(disclosures, connection_id=connection_id) else: self._logger.exception( "Unable to send discover-features v2 disclosures message" ": BaseResponder unavailable" )
[docs] async def return_to_publish_features( self, ) -> Tuple[Optional[Sequence[str]], Optional[Sequence[str]]]: """Return to_publish features filter, if specified.""" to_publish_protocols = None to_publish_goal_codes = None async with self._profile.session() as session: if ( session.settings.get("disclose_protocol_list") and len(session.settings.get("disclose_protocol_list")) > 0 ): to_publish_protocols = session.settings.get("disclose_protocol_list") if ( session.settings.get("disclose_goal_code_list") and len(session.settings.get("disclose_goal_code_list")) > 0 ): to_publish_goal_codes = session.settings.get("disclose_goal_code_list") return (to_publish_protocols, to_publish_goal_codes)
[docs] async def execute_protocol_query(self, match: str): """Execute protocol match query.""" protocol_registry = self._profile.context.inject_or(ProtocolRegistry) protocols = protocol_registry.protocols_matching_query(match) results = await protocol_registry.prepare_disclosed( self._profile.context, protocols ) return results
[docs] async def execute_goal_code_query(self, match: str): """Execute goal code match query.""" goal_code_registry = self._profile.context.inject_or(GoalCodeRegistry) results = goal_code_registry.goal_codes_matching_query(match) return results
[docs] async def receive_query(self, queries_msg: Queries) -> Disclosures: """Process query and return the corresponding disclose message.""" disclosures = Disclosures(disclosures=[]) published_results = [] ( to_publish_protocols, to_publish_goal_codes, ) = await self.return_to_publish_features() for query_item in queries_msg.queries: assert isinstance(query_item, QueryItem) if query_item.feature_type == "protocol": results = await self.execute_protocol_query(query_item.match) for result in results: to_publish_result = {"feature-type": "protocol"} if "pid" in result: if ( to_publish_protocols and result.get("pid") not in to_publish_protocols ): continue to_publish_result["id"] = result.get("pid") else: continue if "roles" in result: to_publish_result["roles"] = result.get("roles") published_results.append(to_publish_result) elif query_item.feature_type == "goal-code": results = await self.execute_goal_code_query(query_item.match) for result in results: to_publish_result = {"feature-type": "goal-code"} if to_publish_goal_codes and result not in to_publish_goal_codes: continue to_publish_result["id"] = result published_results.append(to_publish_result) disclosures.disclosures = published_results # Check if query message has a thid # If disclosing this agents feature if queries_msg._thread: disclosures.assign_thread_id(queries_msg._thread.thid) return disclosures
[docs] async def check_if_disclosure_received( self, record_id: str ) -> V20DiscoveryExchangeRecord: """Check if disclosures has been received.""" while True: async with self._profile.session() as session: ex_rec = await V20DiscoveryExchangeRecord.retrieve_by_id( session=session, record_id=record_id ) if ex_rec.disclosures: return ex_rec await asyncio.sleep(0.5)
[docs] async def create_and_send_query( self, connection_id: str = None, query_protocol: str = None, query_goal_code: str = None, ) -> V20DiscoveryExchangeRecord: """Create and send a Query message.""" queries = [] if not query_goal_code and not query_protocol: raise V20DiscoveryMgrError( "Atleast one protocol or goal-code feature-type query is required." ) if query_protocol: queries.append(QueryItem(feature_type="protocol", match=query_protocol)) if query_goal_code: queries.append(QueryItem(feature_type="goal-code", match=query_goal_code)) queries_msg = Queries(queries=queries) if connection_id: async with self._profile.session() as session: # If existing record exists for a connection_id if await V20DiscoveryExchangeRecord.exists_for_connection_id( session=session, connection_id=connection_id ): discovery_ex_rec = ( await V20DiscoveryExchangeRecord.retrieve_by_connection_id( session=session, connection_id=connection_id ) ) discovery_ex_rec.disclosures = None await discovery_ex_rec.save(session) else: discovery_ex_rec = V20DiscoveryExchangeRecord() discovery_ex_rec.queries_msg = queries_msg discovery_ex_rec.connection_id = connection_id await discovery_ex_rec.save(session) queries_msg.assign_thread_id(discovery_ex_rec.discovery_exchange_id) responder = self.profile.inject_or(BaseResponder) if responder: await responder.send(queries_msg, connection_id=connection_id) else: self._logger.exception( "Unable to send discover-features v2 query message" ": BaseResponder unavailable" ) try: return await asyncio.wait_for( self.check_if_disclosure_received( record_id=discovery_ex_rec.discovery_exchange_id, ), 5, ) except asyncio.TimeoutError: return discovery_ex_rec else: # Disclose this agent's features and/or goal codes discovery_ex_rec = V20DiscoveryExchangeRecord() discovery_ex_rec.queries_msg = queries_msg disclosures = await self.receive_query(queries_msg=queries_msg) discovery_ex_rec.disclosures = disclosures return discovery_ex_rec