"""Classes to manage connection establishment under RFC 23 (DID exchange)."""
import json
import logging
from ....connections.models.conn_record import ConnRecord
from ....connections.models.diddoc import DIDDoc
from ....connections.base_manager import BaseConnectionManager
from ....connections.util import mediation_record_if_id
from ....core.error import BaseError
from ....core.profile import ProfileSession
from ....messaging.decorators.attach_decorator import AttachDecorator
from ....messaging.responder import BaseResponder
from ....storage.error import StorageNotFoundError
from ....transport.inbound.receipt import MessageReceipt
from ....wallet.base import BaseWallet
from ....wallet.error import WalletError
from ....wallet.key_type import KeyType
from ....wallet.did_method import DIDMethod
from ....wallet.did_posture import DIDPosture
from ....did.did_key import DIDKey
from ....multitenant.manager import MultitenantManager
from ...coordinate_mediation.v1_0.manager import MediationManager
from ...out_of_band.v1_0.messages.invitation import (
InvitationMessage as OOBInvitationMessage,
)
from ...out_of_band.v1_0.messages.service import Service as OOBService
from .message_types import ARIES_PROTOCOL as DIDX_PROTO
from .messages.complete import DIDXComplete
from .messages.request import DIDXRequest
from .messages.response import DIDXResponse
from .messages.problem_report_reason import ProblemReportReason
[docs]class DIDXManagerError(BaseError):
"""Connection error."""
[docs]class DIDXManager(BaseConnectionManager):
"""Class for managing connections under RFC 23 (DID exchange)."""
def __init__(self, session: ProfileSession):
"""
Initialize a DIDXManager.
Args:
session: The profile session for this did exchange manager
"""
self._session = session
self._logger = logging.getLogger(__name__)
super().__init__(self._session)
@property
def session(self) -> ProfileSession:
"""
Accessor for the current profile session.
Returns:
The profile session for this did exchange manager
"""
return self._session
[docs] async def receive_invitation(
self,
invitation: OOBInvitationMessage,
their_public_did: str = None,
auto_accept: bool = None,
alias: str = None,
mediation_id: str = None,
) -> ConnRecord: # leave in didexchange as it uses a responder: not out-of-band
"""
Create a new connection record to track a received invitation.
Args:
invitation: invitation to store
their_public_did: their public DID
auto_accept: set to auto-accept invitation (None to use config)
alias: optional alias to set on record
mediation_id: record id for mediation with routing_keys, service endpoint
Returns:
The new `ConnRecord` instance
"""
if not invitation.services:
raise DIDXManagerError(
"Invitation must contain service blocks or service DIDs"
)
else:
for s in invitation.services:
if isinstance(s, OOBService):
if not s.recipient_keys or not s.service_endpoint:
raise DIDXManagerError(
"All service blocks in invitation with no service DIDs "
"must contain recipient key(s) and service endpoint(s)"
)
accept = (
ConnRecord.ACCEPT_AUTO
if (
auto_accept
or (
auto_accept is None
and self._session.settings.get("debug.auto_accept_invites")
)
)
else ConnRecord.ACCEPT_MANUAL
)
service_item = invitation.services[0]
# Create connection record
conn_rec = ConnRecord(
invitation_key=(
DIDKey.from_did(service_item.recipient_keys[0]).public_key_b58
if isinstance(service_item, OOBService)
else None
),
invitation_msg_id=invitation._id,
their_label=invitation.label,
their_role=ConnRecord.Role.RESPONDER.rfc23,
state=ConnRecord.State.INVITATION.rfc23,
accept=accept,
alias=alias,
their_public_did=their_public_did,
connection_protocol=DIDX_PROTO,
)
await conn_rec.save(
self._session,
reason="Created new connection record from invitation",
log_params={
"invitation": invitation,
"their_role": ConnRecord.Role.RESPONDER.rfc23,
},
)
# Save the invitation for later processing
await conn_rec.attach_invitation(self._session, invitation)
if conn_rec.accept == ConnRecord.ACCEPT_AUTO:
request = await self.create_request(conn_rec, mediation_id=mediation_id)
responder = self._session.inject(BaseResponder, required=False)
if responder:
await responder.send_reply(
request,
connection_id=conn_rec.connection_id,
)
conn_rec.state = ConnRecord.State.REQUEST.rfc23
await conn_rec.save(self._session, reason="Sent connection request")
else:
self._logger.debug("Connection invitation will await acceptance")
return conn_rec
[docs] async def create_request_implicit(
self,
their_public_did: str,
my_label: str = None,
my_endpoint: str = None,
mediation_id: str = None,
use_public_did: bool = False,
) -> ConnRecord:
"""
Create and send a request against a public DID only (no explicit invitation).
Args:
their_public_did: public DID to which to request a connection
my_label: my label for request
my_endpoint: my endpoint
mediation_id: record id for mediation with routing_keys, service endpoint
use_public_did: use my public DID for this connection
Returns:
The new `ConnRecord` instance
"""
my_public_info = None
if use_public_did:
wallet = self._session.inject(BaseWallet)
my_public_info = await wallet.get_public_did()
if not my_public_info:
raise WalletError("No public DID configured")
conn_rec = ConnRecord(
my_did=my_public_info.did
if my_public_info
else None, # create-request will fill in on local DID creation
their_did=their_public_did,
their_label=None,
their_role=ConnRecord.Role.RESPONDER.rfc23,
invitation_key=None,
invitation_msg_id=None,
accept=None,
alias=my_label,
their_public_did=their_public_did,
connection_protocol=DIDX_PROTO,
)
request = await self.create_request( # saves and updates conn_rec
conn_rec=conn_rec,
my_label=my_label,
my_endpoint=my_endpoint,
mediation_id=mediation_id,
)
conn_rec.request_id = request._id
conn_rec.state = ConnRecord.State.REQUEST.rfc23
await conn_rec.save(self._session, reason="Created connection request")
responder = self._session.inject(BaseResponder, required=False)
if responder:
await responder.send(request, connection_id=conn_rec.connection_id)
return conn_rec
[docs] async def create_request(
self,
conn_rec: ConnRecord,
my_label: str = None,
my_endpoint: str = None,
mediation_id: str = None,
) -> DIDXRequest:
"""
Create a new connection request for a previously-received invitation.
Args:
conn_rec: The `ConnRecord` representing the invitation to accept
my_label: My label for request
my_endpoint: My endpoint
mediation_id: The record id for mediation that contains routing_keys and
service endpoint
Returns:
A new `DIDXRequest` message to send to the other agent
"""
# Mediation Support
mediation_mgr = MediationManager(self._session.profile)
keylist_updates = None
mediation_record = await mediation_record_if_id(
self._session,
mediation_id,
or_default=True,
)
base_mediation_record = None
# Multitenancy setup
multitenant_mgr = self._session.inject(MultitenantManager, required=False)
wallet_id = self._session.settings.get("wallet.id")
if multitenant_mgr and wallet_id:
base_mediation_record = await multitenant_mgr.get_default_mediator()
wallet = self._session.inject(BaseWallet)
if conn_rec.my_did:
my_info = await wallet.get_local_did(conn_rec.my_did)
else:
# Create new DID for connection
my_info = await wallet.create_local_did(
method=DIDMethod.SOV,
key_type=KeyType.ED25519,
)
conn_rec.my_did = my_info.did
keylist_updates = await mediation_mgr.add_key(
my_info.verkey, keylist_updates
)
# Add mapping for multitenant relay
if multitenant_mgr and wallet_id:
await multitenant_mgr.add_key(wallet_id, my_info.verkey)
# Create connection request message
if my_endpoint:
my_endpoints = [my_endpoint]
else:
my_endpoints = []
default_endpoint = self._session.settings.get("default_endpoint")
if default_endpoint:
my_endpoints.append(default_endpoint)
my_endpoints.extend(self._session.settings.get("additional_endpoints", []))
did_doc = await self.create_did_document(
my_info,
conn_rec.inbound_connection_id,
my_endpoints,
mediation_records=list(
filter(None, [base_mediation_record, mediation_record])
),
)
if (
conn_rec.their_public_did is not None
and conn_rec.their_public_did.startswith("did:")
):
qualified_did = conn_rec.their_public_did
else:
qualified_did = f"did:sov:{conn_rec.their_public_did}"
pthid = conn_rec.invitation_msg_id or qualified_did
attach = AttachDecorator.data_base64(did_doc.serialize())
await attach.data.sign(my_info.verkey, wallet)
if not my_label:
my_label = self._session.settings.get("default_label")
request = DIDXRequest(
label=my_label,
did=conn_rec.my_did,
did_doc_attach=attach,
)
request.assign_thread_id(thid=request._id, pthid=pthid)
# Update connection state
conn_rec.request_id = request._id
conn_rec.state = ConnRecord.State.REQUEST.rfc23
await conn_rec.save(self._session, reason="Created connection request")
# Notify Mediator
if keylist_updates and mediation_record:
responder = self._session.inject(BaseResponder, required=False)
await responder.send(
keylist_updates, connection_id=mediation_record.connection_id
)
return request
[docs] async def receive_request(
self,
request: DIDXRequest,
recipient_did: str,
recipient_verkey: str = None,
my_endpoint: str = None,
alias: str = None,
auto_accept_implicit: bool = None,
mediation_id: str = None,
) -> ConnRecord:
"""
Receive and store a connection request.
Args:
request: The `DIDXRequest` to accept
recipient_did: The (unqualified) recipient DID
recipient_verkey: The recipient verkey: None for public recipient DID
my_endpoint: My endpoint
alias: Alias for the connection
auto_accept: Auto-accept request against implicit invitation
mediation_id: The record id for mediation that contains routing_keys and
service endpoint
Returns:
The new or updated `ConnRecord` instance
"""
ConnRecord.log_state(
"Receiving connection request",
{"request": request},
settings=self._session.settings,
)
mediation_mgr = MediationManager(self._session.profile)
keylist_updates = None
conn_rec = None
connection_key = None
my_info = None
wallet = self._session.inject(BaseWallet)
# Multitenancy setup
multitenant_mgr = self._session.inject(MultitenantManager, required=False)
wallet_id = self._session.settings.get("wallet.id")
# Determine what key will need to sign the response
if recipient_verkey: # peer DID
connection_key = recipient_verkey
else:
if not self._session.settings.get("public_invites"):
raise DIDXManagerError(
"Public invitations are not enabled: connection request refused"
)
my_info = await wallet.get_local_did(recipient_did)
if DIDPosture.get(my_info.metadata) not in (
DIDPosture.PUBLIC,
DIDPosture.POSTED,
):
raise DIDXManagerError(f"Request DID {recipient_did} is not public")
connection_key = my_info.verkey
try:
conn_rec = await ConnRecord.retrieve_by_invitation_key(
session=self._session,
invitation_key=connection_key,
their_role=ConnRecord.Role.REQUESTER.rfc23,
)
except StorageNotFoundError:
if recipient_verkey:
raise DIDXManagerError(
"No explicit invitation found for pairwise connection "
f"in state {ConnRecord.State.INVITATION.rfc23}: "
"a prior connection request may have updated the connection state"
)
if conn_rec: # invitation was explicit
connection_key = conn_rec.invitation_key
if conn_rec.is_multiuse_invitation:
wallet = self._session.inject(BaseWallet)
my_info = await wallet.create_local_did(
method=DIDMethod.SOV,
key_type=KeyType.ED25519,
)
keylist_updates = await mediation_mgr.add_key(
my_info.verkey, keylist_updates
)
new_conn_rec = ConnRecord(
invitation_key=connection_key,
my_did=my_info.did,
state=ConnRecord.State.REQUEST.rfc23,
accept=conn_rec.accept,
their_role=conn_rec.their_role,
connection_protocol=DIDX_PROTO,
)
await new_conn_rec.save(
self._session,
reason="Received connection request from multi-use invitation DID",
)
# Transfer metadata from multi-use to new connection
# Must come after save so there's an ID to associate with metadata
for key, value in (
await conn_rec.metadata_get_all(self._session)
).items():
await new_conn_rec.metadata_set(self._session, key, value)
conn_rec = new_conn_rec
# Add mapping for multitenant relay
if multitenant_mgr and wallet_id:
await multitenant_mgr.add_key(wallet_id, my_info.verkey)
else:
keylist_updates = await mediation_mgr.remove_key(
connection_key, keylist_updates
)
# request DID doc describes requester DID
if not (request.did_doc_attach and request.did_doc_attach.data):
raise DIDXManagerError(
"DID Doc attachment missing or has no data: "
"cannot connect to public DID"
)
if not await request.did_doc_attach.data.verify(wallet):
raise DIDXManagerError("DID Doc signature failed verification")
conn_did_doc = DIDDoc.from_json(request.did_doc_attach.data.signed.decode())
if request.did != conn_did_doc.did:
raise DIDXManagerError(
(
f"Connection DID {request.did} does not match "
f"DID Doc id {conn_did_doc.did}"
),
error_code=ProblemReportReason.REQUEST_NOT_ACCEPTED.value,
)
await self.store_did_document(conn_did_doc)
if conn_rec: # request is against explicit invitation
auto_accept = (
conn_rec.accept == ConnRecord.ACCEPT_AUTO
) # null=manual; oob-manager calculated at conn rec creation
conn_rec.their_label = request.label
if alias:
conn_rec.alias = alias
conn_rec.their_did = request.did
conn_rec.state = ConnRecord.State.REQUEST.rfc23
conn_rec.request_id = request._id
await conn_rec.save(
self._session, reason="Received connection request from invitation"
)
else:
# request is against implicit invitation on public DID
my_info = await wallet.create_local_did(
method=DIDMethod.SOV,
key_type=KeyType.ED25519,
)
keylist_updates = await mediation_mgr.add_key(
my_info.verkey, keylist_updates
)
# Add mapping for multitenant relay
if multitenant_mgr and wallet_id:
await multitenant_mgr.add_key(wallet_id, my_info.verkey)
auto_accept = bool(
auto_accept_implicit
or (
auto_accept_implicit is None
and self._session.settings.get("debug.auto_accept_requests", False)
)
)
conn_rec = ConnRecord(
my_did=my_info.did,
accept=(
ConnRecord.ACCEPT_AUTO if auto_accept else ConnRecord.ACCEPT_MANUAL
),
their_did=request.did,
their_label=request.label,
alias=alias,
their_role=ConnRecord.Role.REQUESTER.rfc23,
invitation_key=connection_key,
invitation_msg_id=None,
request_id=request._id,
state=ConnRecord.State.REQUEST.rfc23,
connection_protocol=DIDX_PROTO,
)
await conn_rec.save(
self._session, reason="Received connection request from public DID"
)
# Attach the connection request so it can be found and responded to
await conn_rec.attach_request(self._session, request)
# Send keylist updates to mediator
mediation_record = await mediation_record_if_id(self._session, mediation_id)
if keylist_updates and mediation_record:
responder = self._session.inject(BaseResponder, required=False)
await responder.send(
keylist_updates, connection_id=mediation_record.connection_id
)
if auto_accept:
response = await self.create_response(
conn_rec,
my_endpoint,
mediation_id=mediation_id,
)
responder = self._session.inject(BaseResponder, required=False)
if responder:
await responder.send_reply(
response, connection_id=conn_rec.connection_id
)
conn_rec.state = ConnRecord.State.RESPONSE.rfc23
await conn_rec.save(self._session, reason="Sent connection response")
else:
self._logger.debug("DID exchange request will await acceptance")
return conn_rec
[docs] async def create_response(
self,
conn_rec: ConnRecord,
my_endpoint: str = None,
mediation_id: str = None,
) -> DIDXResponse:
"""
Create a connection response for a received connection request.
Args:
conn_rec: The `ConnRecord` with a pending connection request
my_endpoint: Current agent endpoint
mediation_id: The record id for mediation that contains routing_keys and
service endpoint
Returns:
New `DIDXResponse` message
"""
ConnRecord.log_state(
"Creating connection response",
{"connection_id": conn_rec.connection_id},
settings=self._session.settings,
)
mediation_mgr = MediationManager(self._session.profile)
keylist_updates = None
mediation_record = await mediation_record_if_id(self._session, mediation_id)
base_mediation_record = None
# Multitenancy setup
multitenant_mgr = self._session.inject(MultitenantManager, required=False)
wallet_id = self._session.settings.get("wallet.id")
if multitenant_mgr and wallet_id:
base_mediation_record = await multitenant_mgr.get_default_mediator()
if ConnRecord.State.get(conn_rec.state) is not ConnRecord.State.REQUEST:
raise DIDXManagerError(
f"Connection not in state {ConnRecord.State.REQUEST.rfc23}"
)
request = await conn_rec.retrieve_request(self._session)
wallet = self._session.inject(BaseWallet)
if conn_rec.my_did:
my_info = await wallet.get_local_did(conn_rec.my_did)
else:
my_info = await wallet.create_local_did(
method=DIDMethod.SOV,
key_type=KeyType.ED25519,
)
conn_rec.my_did = my_info.did
keylist_updates = await mediation_mgr.add_key(
my_info.verkey, keylist_updates
)
# Add mapping for multitenant relay
if multitenant_mgr and wallet_id:
await multitenant_mgr.add_key(wallet_id, my_info.verkey)
# Create connection response message
if my_endpoint:
my_endpoints = [my_endpoint]
else:
my_endpoints = []
default_endpoint = self._session.settings.get("default_endpoint")
if default_endpoint:
my_endpoints.append(default_endpoint)
my_endpoints.extend(self._session.settings.get("additional_endpoints", []))
did_doc = await self.create_did_document(
my_info,
conn_rec.inbound_connection_id,
my_endpoints,
mediation_records=list(
filter(None, [base_mediation_record, mediation_record])
),
)
attach = AttachDecorator.data_base64(did_doc.serialize())
await attach.data.sign(conn_rec.invitation_key, wallet)
response = DIDXResponse(did=my_info.did, did_doc_attach=attach)
# Assign thread information
response.assign_thread_from(request)
response.assign_trace_from(request)
# Update connection state
conn_rec.state = ConnRecord.State.RESPONSE.rfc23
await conn_rec.save(
self._session,
reason="Created connection response",
log_params={"response": response},
)
# Update Mediator if necessary
if keylist_updates and mediation_record:
responder = self._session.inject(BaseResponder, required=False)
await responder.send(
keylist_updates, connection_id=mediation_record.connection_id
)
send_mediation_request = await conn_rec.metadata_get(
self._session, MediationManager.SEND_REQ_AFTER_CONNECTION
)
if send_mediation_request:
temp_mediation_mgr = MediationManager(self._session.profile)
_record, request = await temp_mediation_mgr.prepare_request(
conn_rec.connection_id
)
responder = self._session.inject(BaseResponder)
await responder.send(request, connection_id=conn_rec.connection_id)
return response
[docs] async def accept_response(
self,
response: DIDXResponse,
receipt: MessageReceipt,
) -> ConnRecord:
"""
Accept a connection response under RFC 23 (DID exchange).
Process a `DIDXResponse` message by looking up
the connection request and setting up the pairwise connection.
Args:
response: The `DIDXResponse` to accept
receipt: The message receipt
Returns:
The updated `ConnRecord` representing the connection
Raises:
DIDXManagerError: If there is no DID associated with the
connection response
DIDXManagerError: If the corresponding connection is not
in the request-sent state
"""
wallet = self._session.inject(BaseWallet)
conn_rec = None
if response._thread:
# identify the request by the thread ID
try:
conn_rec = await ConnRecord.retrieve_by_request_id(
self._session, response._thread_id
)
except StorageNotFoundError:
pass
if not conn_rec and receipt.sender_did:
# identify connection by the DID they used for us
try:
conn_rec = await ConnRecord.retrieve_by_did(
session=self._session,
their_did=receipt.sender_did,
my_did=receipt.recipient_did,
their_role=ConnRecord.Role.RESPONDER.rfc23,
)
except StorageNotFoundError:
pass
if not conn_rec:
raise DIDXManagerError(
"No corresponding connection request found",
error_code=ProblemReportReason.RESPONSE_NOT_ACCEPTED.value,
)
if ConnRecord.State.get(conn_rec.state) is not ConnRecord.State.REQUEST:
raise DIDXManagerError(
"Cannot accept connection response for connection"
f" in state: {conn_rec.state}"
)
their_did = response.did
if not response.did_doc_attach:
raise DIDXManagerError("No DIDDoc attached; cannot connect to public DID")
conn_did_doc = await self.verify_diddoc(wallet, response.did_doc_attach)
if their_did != conn_did_doc.did:
raise DIDXManagerError(
f"Connection DID {their_did} "
f"does not match DID doc id {conn_did_doc.did}"
)
await self.store_did_document(conn_did_doc)
conn_rec.their_did = their_did
conn_rec.state = ConnRecord.State.RESPONSE.rfc23
await conn_rec.save(self._session, reason="Accepted connection response")
send_mediation_request = await conn_rec.metadata_get(
self._session, MediationManager.SEND_REQ_AFTER_CONNECTION
)
if send_mediation_request:
temp_mediation_mgr = MediationManager(self._session.profile)
_record, request = await temp_mediation_mgr.prepare_request(
conn_rec.connection_id
)
responder = self._session.inject(BaseResponder)
await responder.send(request, connection_id=conn_rec.connection_id)
# create and send connection-complete message
complete = DIDXComplete()
complete.assign_thread_from(response)
responder = self._session.inject(BaseResponder, required=False)
if responder:
await responder.send_reply(complete, connection_id=conn_rec.connection_id)
conn_rec.state = ConnRecord.State.COMPLETED.rfc23
await conn_rec.save(self._session, reason="Sent connection complete")
return conn_rec
[docs] async def accept_complete(
self,
complete: DIDXComplete,
receipt: MessageReceipt,
) -> ConnRecord:
"""
Accept a connection complete message under RFC 23 (DID exchange).
Process a `DIDXComplete` message by looking up
the connection record and marking the exchange complete.
Args:
complete: The `DIDXComplete` to accept
receipt: The message receipt
Returns:
The updated `ConnRecord` representing the connection
Raises:
DIDXManagerError: If the corresponding connection does not exist
or is not in the response-sent state
"""
conn_rec = None
# identify the request by the thread ID
try:
conn_rec = await ConnRecord.retrieve_by_request_id(
self._session, complete._thread_id
)
except StorageNotFoundError:
raise DIDXManagerError(
"No corresponding connection request found",
error_code=ProblemReportReason.COMPLETE_NOT_ACCEPTED.value,
)
conn_rec.state = ConnRecord.State.COMPLETED.rfc23
await conn_rec.save(self._session, reason="Received connection complete")
return conn_rec
[docs] async def verify_diddoc(
self,
wallet: BaseWallet,
attached: AttachDecorator,
) -> DIDDoc:
"""Verify DIDDoc attachment and return signed data."""
signed_diddoc_bytes = attached.data.signed
if not signed_diddoc_bytes:
raise DIDXManagerError("DID doc attachment is not signed.")
if not await attached.data.verify(wallet):
raise DIDXManagerError("DID doc attachment signature failed verification")
return DIDDoc.deserialize(json.loads(signed_diddoc_bytes.decode()))