Source code for aries_cloudagent.protocols.connections.v1_0.manager

"""Classes to manage connections."""

import logging
from typing import Optional, Sequence, Tuple, cast


from ....core.oob_processor import OobMessageProcessor
from ....connections.base_manager import BaseConnectionManager
from ....connections.models.conn_record import ConnRecord
from ....core.error import BaseError
from ....core.profile import Profile
from ....messaging.responder import BaseResponder
from ....messaging.valid import IndyDID
from ....storage.error import StorageNotFoundError
from ....transport.inbound.receipt import MessageReceipt
from ....wallet.base import BaseWallet
from ....wallet.did_method import SOV
from ....wallet.key_type import ED25519
from ...coordinate_mediation.v1_0.manager import MediationManager
from .message_types import ARIES_PROTOCOL as CONN_PROTO
from .messages.connection_invitation import ConnectionInvitation
from .messages.connection_request import ConnectionRequest
from .messages.connection_response import ConnectionResponse
from .messages.problem_report import ProblemReportReason
from .models.connection_detail import ConnectionDetail


[docs]class ConnectionManagerError(BaseError): """Connection error."""
[docs]class ConnectionManager(BaseConnectionManager): """Class for managing connections.""" def __init__(self, profile: Profile): """Initialize a ConnectionManager. Args: profile: The profile for this connection manager """ self._profile = profile self._logger = logging.getLogger(__name__) super().__init__(self._profile) @property def profile(self) -> Profile: """Accessor for the current profile. Returns: The profile for this connection manager """ return self._profile
[docs] async def create_invitation( self, my_label: Optional[str] = None, my_endpoint: Optional[str] = None, auto_accept: Optional[bool] = None, public: bool = False, multi_use: bool = False, alias: Optional[str] = None, routing_keys: Optional[Sequence[str]] = None, recipient_keys: Optional[Sequence[str]] = None, metadata: Optional[dict] = None, mediation_id: Optional[str] = None, ) -> Tuple[ConnRecord, ConnectionInvitation]: """Generate new connection invitation. This interaction represents an out-of-band communication channel. In the future and in practice, these sort of invitations will be received over any number of channels such as SMS, Email, QR Code, NFC, etc. Structure of an invite message: :: { "@type": "https://didcomm.org/connections/1.0/invitation", "label": "Alice", "did": "did:sov:QmWbsNYhMrjHiqZDTUTEJs" } Or, in the case of a peer DID: :: { "@type": "https://didcomm.org/connections/1.0/invitation", "label": "Alice", "did": "did:peer:oiSqsNYhMrjHiqZDTUthsw", "recipient_keys": ["8HH5gYEeNc3z7PYXmd54d4x6qAfCNrqQqEB3nS7Zfu7K"], "service_endpoint": "https://example.com/endpoint" "routing_keys": ["9EH5gYEeNc3z7PYXmd53d5x6qAfCNrqQqEB4nS7Zfu6K"], } Args: my_label: label for this connection my_endpoint: endpoint where other party can reach me auto_accept: auto-accept a corresponding connection request (None to use config) public: set to create an invitation from the public DID multi_use: set to True to create an invitation for multiple use alias: optional alias to apply to connection for later use Returns: A tuple of the new `ConnRecord` and `ConnectionInvitation` instances """ # Mediation Record can still be None after this operation if no # mediation id passed and no default mediation_record = await self._route_manager.mediation_record_if_id( self.profile, mediation_id, or_default=True, ) image_url = self.profile.context.settings.get("image_url") invitation = None connection = None invitation_mode = ConnRecord.INVITATION_MODE_ONCE if multi_use: invitation_mode = ConnRecord.INVITATION_MODE_MULTI if not my_label: my_label = self.profile.settings.get("default_label") accept = ( ConnRecord.ACCEPT_AUTO if ( auto_accept or ( auto_accept is None and self.profile.settings.get("debug.auto_accept_requests") ) ) else ConnRecord.ACCEPT_MANUAL ) if recipient_keys: # TODO: register recipient keys for relay # TODO: check that recipient keys are in wallet invitation_key = recipient_keys[0] # TODO first key appropriate? else: # Create and store new invitation key async with self.profile.session() as session: wallet = session.inject(BaseWallet) invitation_signing_key = await wallet.create_signing_key( key_type=ED25519 ) invitation_key = invitation_signing_key.verkey recipient_keys = [invitation_key] if public: if not self.profile.settings.get("public_invites"): raise ConnectionManagerError("Public invitations are not enabled") async with self.profile.session() as session: wallet = session.inject(BaseWallet) public_did = await wallet.get_public_did() if not public_did: raise ConnectionManagerError( "Cannot create public invitation with no public DID" ) # FIXME - allow ledger instance to format public DID with prefix? public_did_did = public_did.did if bool(IndyDID.PATTERN.match(public_did_did)): public_did_did = f"did:sov:{public_did.did}" invitation = ConnectionInvitation( label=my_label, did=public_did_did, image_url=image_url ) connection = ConnRecord( # create connection record invitation_key=public_did.verkey, invitation_msg_id=invitation._id, invitation_mode=invitation_mode, their_role=ConnRecord.Role.REQUESTER.rfc23, state=ConnRecord.State.INVITATION.rfc23, accept=accept, alias=alias, connection_protocol=CONN_PROTO, ) async with self.profile.session() as session: await connection.save(session, reason="Created new invitation") # Add mapping for multitenant relaying. # Mediation of public keys is not supported yet await self._route_manager.route_verkey(self.profile, public_did.verkey) else: # Create connection record connection = ConnRecord( invitation_key=invitation_key, # TODO: determine correct key to use their_role=ConnRecord.Role.REQUESTER.rfc160, state=ConnRecord.State.INVITATION.rfc160, accept=accept, invitation_mode=invitation_mode, alias=alias, connection_protocol=CONN_PROTO, ) async with self.profile.session() as session: await connection.save(session, reason="Created new invitation") await self._route_manager.route_invitation( self.profile, connection, mediation_record ) routing_keys, routing_endpoint = await self._route_manager.routing_info( self.profile, mediation_record, ) my_endpoint = ( routing_endpoint or my_endpoint or cast(str, self.profile.settings.get("default_endpoint")) ) # Create connection invitation message # Note: Need to split this into two stages # to support inbound routing of invites # Would want to reuse create_did_document and convert the result invitation = ConnectionInvitation( label=my_label, recipient_keys=recipient_keys, routing_keys=routing_keys, endpoint=my_endpoint, image_url=image_url, ) async with self.profile.session() as session: await connection.attach_invitation(session, invitation) if metadata: for key, value in metadata.items(): await connection.metadata_set(session, key, value) return connection, invitation
[docs] async def receive_invitation( self, invitation: ConnectionInvitation, their_public_did: Optional[str] = None, auto_accept: Optional[bool] = None, alias: Optional[str] = None, mediation_id: Optional[str] = None, ) -> ConnRecord: """Create a new connection record to track a received invitation. Args: invitation: The `ConnectionInvitation` to store auto_accept: set to auto-accept the invitation (None to use config) alias: optional alias to set on the record Returns: The new `ConnRecord` instance """ if not invitation.did: if not invitation.recipient_keys: raise ConnectionManagerError( "Invitation must contain recipient key(s)", error_code="missing-recipient-keys", ) if not invitation.endpoint: raise ConnectionManagerError( "Invitation must contain an endpoint", error_code="missing-endpoint", ) accept = ( ConnRecord.ACCEPT_AUTO if ( auto_accept or ( auto_accept is None and self.profile.settings.get("debug.auto_accept_invites") ) ) else ConnRecord.ACCEPT_MANUAL ) # Create connection record connection = ConnRecord( invitation_key=invitation.recipient_keys and invitation.recipient_keys[0], their_label=invitation.label, invitation_msg_id=invitation._id, their_role=ConnRecord.Role.RESPONDER.rfc160, state=ConnRecord.State.INVITATION.rfc160, accept=accept, alias=alias, their_public_did=their_public_did, connection_protocol=CONN_PROTO, ) async with self.profile.session() as session: await connection.save( session, reason="Created new connection record from invitation", log_params={"invitation": invitation, "their_label": invitation.label}, ) # Save the invitation for later processing await connection.attach_invitation(session, invitation) await self._route_manager.save_mediator_for_connection( self.profile, connection, mediation_id=mediation_id ) if connection.accept == ConnRecord.ACCEPT_AUTO: request = await self.create_request(connection, mediation_id=mediation_id) responder = self.profile.inject_or(BaseResponder) if responder: await responder.send(request, connection_id=connection.connection_id) # refetch connection for accurate state async with self.profile.session() as session: connection = await ConnRecord.retrieve_by_id( session, connection.connection_id ) else: self._logger.debug("Connection invitation will await acceptance") return connection
[docs] async def create_request( self, connection: ConnRecord, my_label: str = None, my_endpoint: str = None, mediation_id: str = None, ) -> ConnectionRequest: """Create a new connection request for a previously-received invitation. Args: connection: The `ConnRecord` representing the invitation to accept my_label: My label my_endpoint: My endpoint Returns: A new `ConnectionRequest` message to send to the other agent """ mediation_records = await self._route_manager.mediation_records_for_connection( self.profile, connection, mediation_id, or_default=True, ) if connection.my_did: async with self.profile.session() as session: wallet = session.inject(BaseWallet) my_info = await wallet.get_local_did(connection.my_did) else: async with self.profile.session() as session: wallet = session.inject(BaseWallet) # Create new DID for connection my_info = await wallet.create_local_did(SOV, ED25519) connection.my_did = my_info.did # Idempotent; if routing has already been set up, no action taken await self._route_manager.route_connection_as_invitee( self.profile, connection, mediation_records ) # Create connection request message if my_endpoint: my_endpoints = [my_endpoint] else: my_endpoints = [] default_endpoint = self.profile.settings.get("default_endpoint") if default_endpoint: my_endpoints.append(default_endpoint) my_endpoints.extend(self.profile.settings.get("additional_endpoints", [])) did_doc = await self.create_did_document( my_info, my_endpoints, mediation_records=mediation_records, ) if not my_label: my_label = self.profile.settings.get("default_label") request = ConnectionRequest( label=my_label, connection=ConnectionDetail(did=connection.my_did, did_doc=did_doc), image_url=self.profile.settings.get("image_url"), ) request.assign_thread_id(thid=request._id, pthid=connection.invitation_msg_id) # Update connection state connection.request_id = request._id connection.state = ConnRecord.State.REQUEST.rfc160 async with self.profile.session() as session: await connection.save(session, reason="Created connection request") return request
[docs] async def receive_request( self, request: ConnectionRequest, receipt: MessageReceipt, ) -> ConnRecord: """Receive and store a connection request. Args: request: The `ConnectionRequest` to accept receipt: The message receipt Returns: The new or updated `ConnRecord` instance """ ConnRecord.log_state( "Receiving connection request", {"request": request}, settings=self.profile.settings, ) connection = None connection_key = None my_info = None # Determine what key will need to sign the response if receipt.recipient_did_public: async with self.profile.session() as session: wallet = session.inject(BaseWallet) my_info = await wallet.get_local_did(receipt.recipient_did) connection_key = my_info.verkey else: connection_key = receipt.recipient_verkey try: async with self.profile.session() as session: connection = await ConnRecord.retrieve_by_invitation_key( session=session, invitation_key=connection_key, their_role=ConnRecord.Role.REQUESTER.rfc160, ) except StorageNotFoundError: raise ConnectionManagerError( "No invitation found for pairwise connection " f"in state {ConnRecord.State.INVITATION.rfc160}: " "a prior connection request may have updated the connection state" ) invitation = None if connection: async with self.profile.session() as session: invitation = await connection.retrieve_invitation(session) connection_key = connection.invitation_key ConnRecord.log_state( "Found invitation", {"invitation": invitation}, settings=self.profile.settings, ) if connection.is_multiuse_invitation: async with self.profile.session() as session: wallet = session.inject(BaseWallet) my_info = await wallet.create_local_did(SOV, ED25519) new_connection = ConnRecord( invitation_key=connection_key, my_did=my_info.did, state=ConnRecord.State.REQUEST.rfc160, accept=connection.accept, their_role=connection.their_role, connection_protocol=CONN_PROTO, ) async with self.profile.session() as session: await new_connection.save( session, reason=( "Received connection request from multi-use invitation DID" ), event=False, ) # Transfer metadata from multi-use to new connection # Must come after save so there's an ID to associate with metadata async with self.profile.session() as session: for key, value in ( await connection.metadata_get_all(session) ).items(): await new_connection.metadata_set(session, key, value) connection = new_connection conn_did_doc = request.connection.did_doc if not conn_did_doc: raise ConnectionManagerError( "No DIDDoc provided; cannot connect to public DID" ) if request.connection.did != conn_did_doc.did: raise ConnectionManagerError( "Connection DID does not match DIDDoc id", error_code=ProblemReportReason.REQUEST_NOT_ACCEPTED.value, ) await self.store_did_document(conn_did_doc) if connection: connection.their_label = request.label connection.their_did = request.connection.did connection.state = ConnRecord.State.REQUEST.rfc160 async with self.profile.session() as session: # force emitting event that would be ignored for multi-use invitations # since the record is not new, and the state was not updated await connection.save( session, reason="Received connection request from invitation", event=True, ) elif not self.profile.settings.get("public_invites"): raise ConnectionManagerError("Public invitations are not enabled") else: # request from public did async with self.profile.session() as session: wallet = session.inject(BaseWallet) my_info = await wallet.create_local_did(SOV, ED25519) async with self.profile.session() as session: connection = await ConnRecord.retrieve_by_invitation_msg_id( session=session, invitation_msg_id=request._thread.pthid, their_role=ConnRecord.Role.REQUESTER.rfc160, ) if not connection: if not self.profile.settings.get("requests_through_public_did"): raise ConnectionManagerError( "Unsolicited connection requests to " "public DID is not enabled" ) connection = ConnRecord() connection.invitation_key = connection_key connection.my_did = my_info.did connection.their_role = ConnRecord.Role.RESPONDER.rfc160 connection.their_did = request.connection.did connection.their_label = request.label connection.accept = ( ConnRecord.ACCEPT_AUTO if self.profile.settings.get("debug.auto_accept_requests") else ConnRecord.ACCEPT_MANUAL ) connection.state = ConnRecord.State.REQUEST.rfc160 connection.connection_protocol = CONN_PROTO async with self.profile.session() as session: await connection.save( session, reason="Received connection request from public DID" ) async with self.profile.session() as session: # Attach the connection request so it can be found and responded to await connection.attach_request(session, request) # Clean associated oob record if not needed anymore oob_processor = self.profile.inject(OobMessageProcessor) await oob_processor.clean_finished_oob_record(self.profile, request) return connection
[docs] async def create_response( self, connection: ConnRecord, my_endpoint: str = None, mediation_id: str = None, ) -> ConnectionResponse: """Create a connection response for a received connection request. Args: connection: The `ConnRecord` with a pending connection request my_endpoint: The endpoint I can be reached at mediation_id: The record id for mediation that contains routing_keys and service endpoint Returns: A tuple of the updated `ConnRecord` new `ConnectionResponse` message """ ConnRecord.log_state( "Creating connection response", {"connection_id": connection.connection_id}, settings=self.profile.settings, ) mediation_records = await self._route_manager.mediation_records_for_connection( self.profile, connection, mediation_id ) if ConnRecord.State.get(connection.state) not in ( ConnRecord.State.REQUEST, ConnRecord.State.RESPONSE, ): raise ConnectionManagerError( "Connection is not in the request or response state" ) async with self.profile.session() as session: request = await connection.retrieve_request(session) if connection.my_did: async with self.profile.session() as session: wallet = session.inject(BaseWallet) my_info = await wallet.get_local_did(connection.my_did) else: async with self.profile.session() as session: wallet = session.inject(BaseWallet) my_info = await wallet.create_local_did(SOV, ED25519) connection.my_did = my_info.did # Idempotent; if routing has already been set up, no action taken await self._route_manager.route_connection_as_inviter( self.profile, connection, mediation_records ) # Create connection response message if my_endpoint: my_endpoints = [my_endpoint] else: my_endpoints = [] default_endpoint = self.profile.settings.get("default_endpoint") if default_endpoint: my_endpoints.append(default_endpoint) my_endpoints.extend(self.profile.settings.get("additional_endpoints", [])) did_doc = await self.create_did_document( my_info, my_endpoints, mediation_records=mediation_records, ) response = ConnectionResponse( connection=ConnectionDetail(did=my_info.did, did_doc=did_doc) ) # Assign thread information response.assign_thread_from(request) response.assign_trace_from(request) # Sign connection field using the invitation key async with self.profile.session() as session: wallet = session.inject(BaseWallet) await response.sign_field("connection", connection.invitation_key, wallet) # Update connection state connection.state = ConnRecord.State.RESPONSE.rfc160 await connection.save( session, reason="Created connection response", log_params={"response": response}, ) # TODO It's possible the mediation request sent here might arrive # before the connection response. This would result in an error condition # difficult to accomodate for without modifying handlers for trust ping # to ensure the connection is active. async with self.profile.session() as session: send_mediation_request = await connection.metadata_get( session, MediationManager.SEND_REQ_AFTER_CONNECTION ) if send_mediation_request: mgr = MediationManager(self.profile) _record, request = await mgr.prepare_request(connection.connection_id) responder = self.profile.inject(BaseResponder) await responder.send(request, connection_id=connection.connection_id) return response
[docs] async def accept_response( self, response: ConnectionResponse, receipt: MessageReceipt ) -> ConnRecord: """Accept a connection response. Process a ConnectionResponse message by looking up the connection request and setting up the pairwise connection. Args: response: The `ConnectionResponse` to accept receipt: The message receipt Returns: The updated `ConnRecord` representing the connection Raises: ConnectionManagerError: If there is no DID associated with the connection response ConnectionManagerError: If the corresponding connection is not at the request or response stage """ connection = None if response._thread: # identify the request by the thread ID try: async with self.profile.session() as session: connection = await ConnRecord.retrieve_by_request_id( session, response._thread_id ) except StorageNotFoundError: pass if not connection and receipt.sender_did: # identify connection by the DID they used for us try: async with self.profile.session() as session: connection = await ConnRecord.retrieve_by_did( session, receipt.sender_did, receipt.recipient_did ) except StorageNotFoundError: pass if not connection: raise ConnectionManagerError( "No corresponding connection request found", error_code=ProblemReportReason.RESPONSE_NOT_ACCEPTED.value, ) if ConnRecord.State.get(connection.state) not in ( ConnRecord.State.REQUEST, ConnRecord.State.RESPONSE, ): raise ConnectionManagerError( f"Cannot accept connection response for connection" f" in state: {connection.state}" ) their_did = response.connection.did conn_did_doc = response.connection.did_doc if not conn_did_doc: raise ConnectionManagerError( "No DIDDoc provided; cannot connect to public DID" ) if their_did != conn_did_doc.did: raise ConnectionManagerError("Connection DID does not match DIDDoc id") # Verify connection response using connection field async with self.profile.session() as session: wallet = session.inject(BaseWallet) try: await response.verify_signed_field( "connection", wallet, connection.invitation_key ) except ValueError: raise ConnectionManagerError( "connection field verification using invitation_key failed" ) await self.store_did_document(conn_did_doc) connection.their_did = their_did connection.state = ConnRecord.State.RESPONSE.rfc160 async with self.profile.session() as session: await connection.save(session, reason="Accepted connection response") send_mediation_request = await connection.metadata_get( session, MediationManager.SEND_REQ_AFTER_CONNECTION ) if send_mediation_request: mgr = MediationManager(self.profile) _record, request = await mgr.prepare_request(connection.connection_id) responder = self.profile.inject(BaseResponder) await responder.send(request, connection_id=connection.connection_id) return connection