"""Classes to manage connection establishment under RFC 23 (DID exchange)."""
import json
import logging
from typing import Optional
import pydid
from pydid import BaseDIDDocument as ResolvedDocument
from pydid import DIDCommService
from ....connections.base_manager import BaseConnectionManager
from ....connections.models.conn_record import ConnRecord
from ....connections.models.diddoc import DIDDoc
from ....core.error import BaseError
from ....core.oob_processor import OobMessageProcessor
from ....core.profile import Profile
from ....did.did_key import DIDKey
from ....messaging.decorators.attach_decorator import AttachDecorator
from ....messaging.responder import BaseResponder
from ....resolver.base import ResolverError
from ....resolver.did_resolver import DIDResolver
from ....storage.error import StorageNotFoundError
from ....transport.inbound.receipt import MessageReceipt
from ....wallet.base import BaseWallet
from ....wallet.did_method import SOV
from ....wallet.did_posture import DIDPosture
from ....wallet.error import WalletError
from ....wallet.key_type import ED25519
from ...coordinate_mediation.v1_0.manager import MediationManager
from ...discovery.v2_0.manager import V20DiscoveryMgr
from ...out_of_band.v1_0.messages.invitation import (
InvitationMessage as OOBInvitationMessage,
)
from ...out_of_band.v1_0.messages.service import Service as OOBService
from .message_types import ARIES_PROTOCOL as DIDX_PROTO
from .messages.complete import DIDXComplete
from .messages.problem_report import DIDXProblemReport, ProblemReportReason
from .messages.request import DIDXRequest
from .messages.response import DIDXResponse
[docs]class DIDXManagerError(BaseError):
"""Connection error."""
[docs]class DIDXManager(BaseConnectionManager):
"""Class for managing connections under RFC 23 (DID exchange)."""
def __init__(self, profile: Profile):
"""Initialize a DIDXManager.
Args:
profile: The profile for this did exchange manager
"""
self._profile = profile
self._logger = logging.getLogger(__name__)
super().__init__(self._profile)
@property
def profile(self) -> Profile:
"""Accessor for the current profile.
Returns:
The profile for this did exchange manager
"""
return self._profile
[docs] async def receive_invitation(
self,
invitation: OOBInvitationMessage,
their_public_did: Optional[str] = None,
auto_accept: Optional[bool] = None,
alias: Optional[str] = None,
mediation_id: Optional[str] = None,
) -> ConnRecord: # leave in didexchange as it uses a responder: not out-of-band
"""Create a new connection record to track a received invitation.
Args:
invitation: invitation to store
their_public_did: their public DID
auto_accept: set to auto-accept invitation (None to use config)
alias: optional alias to set on record
mediation_id: record id for mediation with routing_keys, service endpoint
Returns:
The new `ConnRecord` instance
"""
if not invitation.services:
raise DIDXManagerError(
"Invitation must contain service blocks or service DIDs"
)
else:
for s in invitation.services:
if isinstance(s, OOBService):
if not s.recipient_keys or not s.service_endpoint:
raise DIDXManagerError(
"All service blocks in invitation with no service DIDs "
"must contain recipient key(s) and service endpoint(s)"
)
accept = (
ConnRecord.ACCEPT_AUTO
if (
auto_accept
or (
auto_accept is None
and self.profile.settings.get("debug.auto_accept_invites")
)
)
else ConnRecord.ACCEPT_MANUAL
)
service_item = invitation.services[0]
# Create connection record
conn_rec = ConnRecord(
invitation_key=(
DIDKey.from_did(service_item.recipient_keys[0]).public_key_b58
if isinstance(service_item, OOBService)
else None
),
invitation_msg_id=invitation._id,
their_label=invitation.label,
their_role=ConnRecord.Role.RESPONDER.rfc23,
state=ConnRecord.State.INVITATION.rfc160,
accept=accept,
alias=alias,
their_public_did=their_public_did,
connection_protocol=DIDX_PROTO,
)
async with self.profile.session() as session:
await conn_rec.save(
session,
reason="Created new connection record from invitation",
log_params={
"invitation": invitation,
"their_role": ConnRecord.Role.RESPONDER.rfc23,
},
)
# Save the invitation for later processing
await conn_rec.attach_invitation(session, invitation)
if not conn_rec.invitation_key and conn_rec.their_public_did:
did_document = await self.get_resolved_did_document(
conn_rec.their_public_did
)
conn_rec.invitation_key = did_document.verification_method[
0
].public_key_base58
await self._route_manager.save_mediator_for_connection(
self.profile, conn_rec, mediation_id=mediation_id
)
if conn_rec.accept == ConnRecord.ACCEPT_AUTO:
request = await self.create_request(conn_rec, mediation_id=mediation_id)
responder = self.profile.inject_or(BaseResponder)
if responder:
await responder.send_reply(
request,
connection_id=conn_rec.connection_id,
)
conn_rec.state = ConnRecord.State.REQUEST.rfc160
async with self.profile.session() as session:
await conn_rec.save(session, reason="Sent connection request")
else:
self._logger.debug("Connection invitation will await acceptance")
return conn_rec
[docs] async def create_request_implicit(
self,
their_public_did: str,
my_label: str = None,
my_endpoint: str = None,
mediation_id: str = None,
use_public_did: bool = False,
alias: str = None,
goal_code: str = None,
goal: str = None,
) -> ConnRecord:
"""Create and send a request against a public DID only (no explicit invitation).
Args:
their_public_did: public DID to which to request a connection
my_label: my label for request
my_endpoint: my endpoint
mediation_id: record id for mediation with routing_keys, service endpoint
use_public_did: use my public DID for this connection
goal_code: Optional self-attested code for sharing intent of connection
goal: Optional self-attested string for sharing intent of connection
Returns:
The new `ConnRecord` instance
"""
my_public_info = None
if use_public_did:
async with self.profile.session() as session:
wallet = session.inject(BaseWallet)
my_public_info = await wallet.get_public_did()
if not my_public_info:
raise WalletError("No public DID configured")
if (
my_public_info.did == their_public_did
or f"did:sov:{my_public_info.did}" == their_public_did
):
raise DIDXManagerError(
"Cannot connect to yourself through public DID"
)
try:
await ConnRecord.retrieve_by_did(
session,
their_did=their_public_did,
my_did=my_public_info.did,
)
raise DIDXManagerError(
"Connection already exists for their_did "
f"{their_public_did} and my_did {my_public_info.did}"
)
except StorageNotFoundError:
pass
conn_rec = ConnRecord(
my_did=my_public_info.did
if my_public_info
else None, # create-request will fill in on local DID creation
their_did=their_public_did,
their_label=None,
their_role=ConnRecord.Role.RESPONDER.rfc23,
invitation_key=None,
invitation_msg_id=None,
accept=None,
alias=alias,
their_public_did=their_public_did,
connection_protocol=DIDX_PROTO,
)
request = await self.create_request( # saves and updates conn_rec
conn_rec=conn_rec,
my_label=my_label,
my_endpoint=my_endpoint,
mediation_id=mediation_id,
goal_code=goal_code,
goal=goal,
use_public_did=bool(my_public_info),
)
conn_rec.request_id = request._id
conn_rec.state = ConnRecord.State.REQUEST.rfc160
async with self.profile.session() as session:
await conn_rec.save(session, reason="Created connection request")
responder = self.profile.inject_or(BaseResponder)
if responder:
await responder.send(request, connection_id=conn_rec.connection_id)
return conn_rec
[docs] async def create_request(
self,
conn_rec: ConnRecord,
my_label: Optional[str] = None,
my_endpoint: Optional[str] = None,
mediation_id: Optional[str] = None,
goal_code: Optional[str] = None,
goal: Optional[str] = None,
use_public_did: bool = False,
) -> DIDXRequest:
"""Create a new connection request for a previously-received invitation.
Args:
conn_rec: The `ConnRecord` representing the invitation to accept
my_label: My label for request
my_endpoint: My endpoint
mediation_id: The record id for mediation that contains routing_keys and
service endpoint
goal_code: Optional self-attested code for sharing intent of connection
goal: Optional self-attested string for sharing intent of connection
use_public_did: Flag whether to use public DID and omit DID Doc
attachment on request
Returns:
A new `DIDXRequest` message to send to the other agent
"""
# Mediation Support
mediation_records = await self._route_manager.mediation_records_for_connection(
self.profile,
conn_rec,
mediation_id,
or_default=True,
)
my_info = None
if conn_rec.my_did:
async with self.profile.session() as session:
wallet = session.inject(BaseWallet)
my_info = await wallet.get_local_did(conn_rec.my_did)
else:
# Create new DID for connection
async with self.profile.session() as session:
wallet = session.inject(BaseWallet)
my_info = await wallet.create_local_did(
method=SOV,
key_type=ED25519,
)
conn_rec.my_did = my_info.did
# Create connection request message
if my_endpoint:
my_endpoints = [my_endpoint]
else:
my_endpoints = []
default_endpoint = self.profile.settings.get("default_endpoint")
if default_endpoint:
my_endpoints.append(default_endpoint)
my_endpoints.extend(self.profile.settings.get("additional_endpoints", []))
if use_public_did:
# Omit DID Doc attachment if we're using a public DID
did_doc = None
attach = None
did = conn_rec.my_did
if not did.startswith("did:"):
did = f"did:sov:{did}"
else:
did_doc = await self.create_did_document(
my_info,
my_endpoints,
mediation_records=mediation_records,
)
attach = AttachDecorator.data_base64(did_doc.serialize())
async with self.profile.session() as session:
wallet = session.inject(BaseWallet)
await attach.data.sign(my_info.verkey, wallet)
did = conn_rec.my_did
if conn_rec.their_public_did is not None:
qualified_did = conn_rec.their_public_did
did_document = await self.get_resolved_did_document(qualified_did)
did_url = await self.get_first_applicable_didcomm_service(did_document)
else:
did_url = None
pthid = conn_rec.invitation_msg_id or did_url
if not my_label:
my_label = self.profile.settings.get("default_label")
request = DIDXRequest(
label=my_label,
did=did,
did_doc_attach=attach,
goal_code=goal_code,
goal=goal,
)
request.assign_thread_id(thid=request._id, pthid=pthid)
# Update connection state
conn_rec.request_id = request._id
conn_rec.state = ConnRecord.State.REQUEST.rfc160
async with self.profile.session() as session:
await conn_rec.save(session, reason="Created connection request")
# Idempotent; if routing has already been set up, no action taken
await self._route_manager.route_connection_as_invitee(
self.profile, conn_rec, mediation_records
)
return request
[docs] async def receive_request(
self,
request: DIDXRequest,
recipient_did: str,
recipient_verkey: Optional[str] = None,
my_endpoint: Optional[str] = None,
alias: Optional[str] = None,
auto_accept_implicit: Optional[bool] = None,
) -> ConnRecord:
"""Receive and store a connection request.
Args:
request: The `DIDXRequest` to accept
recipient_did: The (unqualified) recipient DID
recipient_verkey: The recipient verkey: None for public recipient DID
my_endpoint: My endpoint
alias: Alias for the connection
auto_accept: Auto-accept request against implicit invitation
Returns:
The new or updated `ConnRecord` instance
"""
ConnRecord.log_state(
"Receiving connection request",
{"request": request},
settings=self.profile.settings,
)
conn_rec = None
connection_key = None
my_info = None
# Determine what key will need to sign the response
if recipient_verkey: # peer DID
connection_key = recipient_verkey
try:
async with self.profile.session() as session:
conn_rec = await ConnRecord.retrieve_by_invitation_key(
session=session,
invitation_key=connection_key,
their_role=ConnRecord.Role.REQUESTER.rfc23,
)
except StorageNotFoundError:
if recipient_verkey:
raise DIDXManagerError(
"No explicit invitation found for pairwise connection "
f"in state {ConnRecord.State.INVITATION.rfc23}: "
"a prior connection request may have updated the connection state"
)
else:
if not self.profile.settings.get("public_invites"):
raise DIDXManagerError(
"Public invitations are not enabled: connection request refused"
)
async with self.profile.session() as session:
wallet = session.inject(BaseWallet)
my_info = await wallet.get_local_did(recipient_did)
if DIDPosture.get(my_info.metadata) not in (
DIDPosture.PUBLIC,
DIDPosture.POSTED,
):
raise DIDXManagerError(f"Request DID {recipient_did} is not public")
connection_key = my_info.verkey
async with self.profile.session() as session:
conn_rec = await ConnRecord.retrieve_by_invitation_msg_id(
session=session,
invitation_msg_id=request._thread.pthid,
their_role=ConnRecord.Role.REQUESTER.rfc23,
)
if conn_rec: # invitation was explicit
connection_key = conn_rec.invitation_key
if conn_rec.is_multiuse_invitation:
async with self.profile.session() as session:
wallet = session.inject(BaseWallet)
my_info = await wallet.create_local_did(
method=SOV,
key_type=ED25519,
)
new_conn_rec = ConnRecord(
invitation_key=connection_key,
my_did=my_info.did,
state=ConnRecord.State.REQUEST.rfc160,
accept=conn_rec.accept,
their_role=conn_rec.their_role,
connection_protocol=DIDX_PROTO,
)
async with self.profile.session() as session:
await new_conn_rec.save(
session,
reason=(
"Received connection request from multi-use invitation DID"
),
)
# Transfer metadata from multi-use to new connection
# Must come after save so there's an ID to associate with metadata
async with self.profile.session() as session:
for key, value in (
await conn_rec.metadata_get_all(session)
).items():
await new_conn_rec.metadata_set(session, key, value)
conn_rec = new_conn_rec
# request DID doc describes requester DID
if request.did_doc_attach and request.did_doc_attach.data:
self._logger.debug("Received DID Doc attachment in request")
async with self.profile.session() as session:
wallet = session.inject(BaseWallet)
conn_did_doc = await self.verify_diddoc(wallet, request.did_doc_attach)
await self.store_did_document(conn_did_doc)
if request.did != conn_did_doc.did:
raise DIDXManagerError(
(
f"Connection DID {request.did} does not match "
f"DID Doc id {conn_did_doc.did}"
),
error_code=ProblemReportReason.REQUEST_NOT_ACCEPTED.value,
)
else:
if request.did is None:
raise DIDXManagerError("No DID in request")
self._logger.debug(
"No DID Doc attachment in request; doc will be resolved from DID"
)
await self.record_did(request.did)
if conn_rec: # request is against explicit invitation
auto_accept = (
conn_rec.accept == ConnRecord.ACCEPT_AUTO
) # null=manual; oob-manager calculated at conn rec creation
conn_rec.their_label = request.label
if alias:
conn_rec.alias = alias
conn_rec.their_did = request.did
conn_rec.state = ConnRecord.State.REQUEST.rfc160
conn_rec.request_id = request._id
async with self.profile.session() as session:
await conn_rec.save(
session, reason="Received connection request from invitation"
)
else:
# request is against implicit invitation on public DID
if not self.profile.settings.get("requests_through_public_did"):
raise DIDXManagerError(
"Unsolicited connection requests to public DID is not enabled"
)
auto_accept = bool(
auto_accept_implicit
or (
auto_accept_implicit is None
and self.profile.settings.get("debug.auto_accept_requests", False)
)
)
conn_rec = ConnRecord(
my_did=None, # Defer DID creation until create_response
accept=(
ConnRecord.ACCEPT_AUTO if auto_accept else ConnRecord.ACCEPT_MANUAL
),
their_did=request.did,
their_label=request.label,
alias=alias,
their_role=ConnRecord.Role.REQUESTER.rfc23,
invitation_key=connection_key,
invitation_msg_id=None,
request_id=request._id,
state=ConnRecord.State.REQUEST.rfc160,
connection_protocol=DIDX_PROTO,
)
async with self.profile.session() as session:
await conn_rec.save(
session, reason="Received connection request from public DID"
)
async with self.profile.session() as session:
# Attach the connection request so it can be found and responded to
await conn_rec.attach_request(session, request)
# Clean associated oob record if not needed anymore
oob_processor = self.profile.inject(OobMessageProcessor)
await oob_processor.clean_finished_oob_record(self.profile, request)
return conn_rec
[docs] async def create_response(
self,
conn_rec: ConnRecord,
my_endpoint: Optional[str] = None,
mediation_id: Optional[str] = None,
use_public_did: Optional[bool] = None,
) -> DIDXResponse:
"""Create a connection response for a received connection request.
Args:
conn_rec: The `ConnRecord` with a pending connection request
my_endpoint: Current agent endpoint
mediation_id: The record id for mediation that contains routing_keys and
service endpoint
Returns:
New `DIDXResponse` message
"""
ConnRecord.log_state(
"Creating connection response",
{"connection_id": conn_rec.connection_id},
settings=self.profile.settings,
)
mediation_records = await self._route_manager.mediation_records_for_connection(
self.profile, conn_rec, mediation_id
)
if ConnRecord.State.get(conn_rec.state) is not ConnRecord.State.REQUEST:
raise DIDXManagerError(
f"Connection not in state {ConnRecord.State.REQUEST.rfc23}"
)
async with self.profile.session() as session:
request = await conn_rec.retrieve_request(session)
if conn_rec.my_did:
async with self.profile.session() as session:
wallet = session.inject(BaseWallet)
my_info = await wallet.get_local_did(conn_rec.my_did)
did = my_info.did
elif use_public_did:
async with self.profile.session() as session:
wallet = session.inject(BaseWallet)
my_info = await wallet.get_public_did()
if not my_info:
raise DIDXManagerError("No public DID configured")
conn_rec.my_did = my_info.did
did = my_info.did
if not did.startswith("did:"):
did = f"did:sov:{did}"
else:
async with self.profile.session() as session:
wallet = session.inject(BaseWallet)
my_info = await wallet.create_local_did(
method=SOV,
key_type=ED25519,
)
conn_rec.my_did = my_info.did
did = my_info.did
# Idempotent; if routing has already been set up, no action taken
await self._route_manager.route_connection_as_inviter(
self.profile, conn_rec, mediation_records
)
# Create connection response message
if my_endpoint:
my_endpoints = [my_endpoint]
else:
my_endpoints = []
default_endpoint = self.profile.settings.get("default_endpoint")
if default_endpoint:
my_endpoints.append(default_endpoint)
my_endpoints.extend(self.profile.settings.get("additional_endpoints", []))
if use_public_did:
# Omit DID Doc attachment if we're using a public DID
did_doc = None
attach = None
else:
did_doc = await self.create_did_document(
my_info,
my_endpoints,
mediation_records=mediation_records,
)
attach = AttachDecorator.data_base64(did_doc.serialize())
async with self.profile.session() as session:
wallet = session.inject(BaseWallet)
await attach.data.sign(conn_rec.invitation_key, wallet)
response = DIDXResponse(did=did, did_doc_attach=attach)
# Assign thread information
response.assign_thread_from(request)
response.assign_trace_from(request)
# Update connection state
conn_rec.state = ConnRecord.State.RESPONSE.rfc23
async with self.profile.session() as session:
await conn_rec.save(
session,
reason="Created connection response",
log_params={"response": response},
)
async with self.profile.session() as session:
send_mediation_request = await conn_rec.metadata_get(
session, MediationManager.SEND_REQ_AFTER_CONNECTION
)
if send_mediation_request:
temp_mediation_mgr = MediationManager(self.profile)
_record, request = await temp_mediation_mgr.prepare_request(
conn_rec.connection_id
)
responder = self.profile.inject(BaseResponder)
await responder.send(request, connection_id=conn_rec.connection_id)
return response
[docs] async def accept_response(
self,
response: DIDXResponse,
receipt: MessageReceipt,
) -> ConnRecord:
"""Accept a connection response under RFC 23 (DID exchange).
Process a `DIDXResponse` message by looking up
the connection request and setting up the pairwise connection.
Args:
response: The `DIDXResponse` to accept
receipt: The message receipt
Returns:
The updated `ConnRecord` representing the connection
Raises:
DIDXManagerError: If there is no DID associated with the
connection response
DIDXManagerError: If the corresponding connection is not
in the request-sent state
"""
conn_rec = None
if response._thread:
# identify the request by the thread ID
async with self.profile.session() as session:
try:
conn_rec = await ConnRecord.retrieve_by_request_id(
session,
response._thread_id,
their_role=ConnRecord.Role.RESPONDER.rfc23,
)
except StorageNotFoundError:
pass
if not conn_rec:
try:
conn_rec = await ConnRecord.retrieve_by_request_id(
session,
response._thread_id,
their_role=ConnRecord.Role.RESPONDER.rfc160,
)
except StorageNotFoundError:
pass
if not conn_rec and receipt.sender_did:
# identify connection by the DID they used for us
try:
async with self.profile.session() as session:
conn_rec = await ConnRecord.retrieve_by_did(
session=session,
their_did=receipt.sender_did,
my_did=receipt.recipient_did,
their_role=ConnRecord.Role.RESPONDER.rfc23,
)
except StorageNotFoundError:
pass
if not conn_rec:
raise DIDXManagerError(
"No corresponding connection request found",
error_code=ProblemReportReason.RESPONSE_NOT_ACCEPTED.value,
)
if ConnRecord.State.get(conn_rec.state) is not ConnRecord.State.REQUEST:
raise DIDXManagerError(
"Cannot accept connection response for connection"
f" in state: {conn_rec.state}"
)
their_did = response.did
if response.did_doc_attach:
async with self.profile.session() as session:
wallet = session.inject(BaseWallet)
conn_did_doc = await self.verify_diddoc(
wallet, response.did_doc_attach, conn_rec.invitation_key
)
if their_did != conn_did_doc.did:
raise DIDXManagerError(
f"Connection DID {their_did} "
f"does not match DID doc id {conn_did_doc.did}"
)
await self.store_did_document(conn_did_doc)
else:
if response.did is None:
raise DIDXManagerError("No DID in response")
self._logger.debug(
"No DID Doc attachment in response; doc will be resolved from DID"
)
await self.record_did(response.did)
conn_rec.their_did = their_did
conn_rec.state = ConnRecord.State.RESPONSE.rfc160
async with self.profile.session() as session:
await conn_rec.save(session, reason="Accepted connection response")
async with self.profile.session() as session:
send_mediation_request = await conn_rec.metadata_get(
session, MediationManager.SEND_REQ_AFTER_CONNECTION
)
if send_mediation_request:
temp_mediation_mgr = MediationManager(self.profile)
_record, request = await temp_mediation_mgr.prepare_request(
conn_rec.connection_id
)
responder = self.profile.inject(BaseResponder)
await responder.send(request, connection_id=conn_rec.connection_id)
# create and send connection-complete message
complete = DIDXComplete()
complete.assign_thread_from(response)
responder = self.profile.inject_or(BaseResponder)
if responder:
await responder.send_reply(complete, connection_id=conn_rec.connection_id)
conn_rec.state = ConnRecord.State.COMPLETED.rfc160
async with self.profile.session() as session:
await conn_rec.save(session, reason="Sent connection complete")
if session.settings.get("auto_disclose_features"):
discovery_mgr = V20DiscoveryMgr(self._profile)
await discovery_mgr.proactive_disclose_features(
connection_id=conn_rec.connection_id
)
return conn_rec
[docs] async def accept_complete(
self,
complete: DIDXComplete,
receipt: MessageReceipt,
) -> ConnRecord:
"""Accept a connection complete message under RFC 23 (DID exchange).
Process a `DIDXComplete` message by looking up
the connection record and marking the exchange complete.
Args:
complete: The `DIDXComplete` to accept
receipt: The message receipt
Returns:
The updated `ConnRecord` representing the connection
Raises:
DIDXManagerError: If the corresponding connection does not exist
or is not in the response-sent state
"""
conn_rec = None
# identify the request by the thread ID
async with self.profile.session() as session:
try:
conn_rec = await ConnRecord.retrieve_by_request_id(
session,
complete._thread_id,
their_role=ConnRecord.Role.REQUESTER.rfc23,
)
except StorageNotFoundError:
pass
if not conn_rec:
try:
conn_rec = await ConnRecord.retrieve_by_request_id(
session,
complete._thread_id,
their_role=ConnRecord.Role.REQUESTER.rfc160,
)
except StorageNotFoundError:
pass
if not conn_rec:
raise DIDXManagerError(
"No corresponding connection request found",
error_code=ProblemReportReason.COMPLETE_NOT_ACCEPTED.value,
)
conn_rec.state = ConnRecord.State.COMPLETED.rfc160
async with self.profile.session() as session:
await conn_rec.save(session, reason="Received connection complete")
if session.settings.get("auto_disclose_features"):
discovery_mgr = V20DiscoveryMgr(self._profile)
await discovery_mgr.proactive_disclose_features(
connection_id=conn_rec.connection_id
)
return conn_rec
[docs] async def reject(
self,
conn_rec: ConnRecord,
*,
reason: Optional[str] = None,
) -> DIDXProblemReport:
"""Abandon an existing DID exchange."""
state_to_reject_code = {
ConnRecord.State.INVITATION.rfc23
+ "-received": ProblemReportReason.INVITATION_NOT_ACCEPTED,
ConnRecord.State.REQUEST.rfc23
+ "-received": ProblemReportReason.REQUEST_NOT_ACCEPTED,
}
code = state_to_reject_code.get(conn_rec.rfc23_state)
if not code:
raise DIDXManagerError(
f"Cannot reject connection in state: {conn_rec.rfc23_state}"
)
async with self.profile.session() as session:
await conn_rec.abandon(session, reason=reason)
report = DIDXProblemReport(
description={
"code": code.value,
"en": reason or "DID exchange rejected",
},
)
# TODO Delete the record?
return report
[docs] async def receive_problem_report(
self,
conn_rec: ConnRecord,
report: DIDXProblemReport,
):
"""Receive problem report."""
if not report.description:
raise DIDXManagerError("Missing description in problem report")
if report.description.get("code") in {
reason.value for reason in ProblemReportReason
}:
self._logger.info("Problem report indicates connection is abandoned")
async with self.profile.session() as session:
await conn_rec.abandon(
session,
reason=report.description.get("en"),
)
else:
raise DIDXManagerError(
f"Received unrecognized problem report: {report.description}"
)
[docs] async def verify_diddoc(
self,
wallet: BaseWallet,
attached: AttachDecorator,
invi_key: str = None,
) -> DIDDoc:
"""Verify DIDDoc attachment and return signed data."""
signed_diddoc_bytes = attached.data.signed
if not signed_diddoc_bytes:
raise DIDXManagerError("DID doc attachment is not signed.")
if not await attached.data.verify(wallet, invi_key):
raise DIDXManagerError("DID doc attachment signature failed verification")
return DIDDoc.deserialize(json.loads(signed_diddoc_bytes.decode()))
[docs] async def get_resolved_did_document(self, qualified_did: str) -> ResolvedDocument:
"""Return resolved DID document."""
resolver = self._profile.inject(DIDResolver)
if not qualified_did.startswith("did:"):
qualified_did = f"did:sov:{qualified_did}"
try:
doc_dict: dict = await resolver.resolve(self._profile, qualified_did)
doc = pydid.deserialize_document(doc_dict, strict=True)
return doc
except ResolverError as error:
raise DIDXManagerError(
"Failed to resolve public DID in invitation"
) from error
[docs] async def get_first_applicable_didcomm_service(
self, did_doc: ResolvedDocument
) -> str:
"""Return first applicable DIDComm service url with highest priority."""
if not did_doc.service:
raise DIDXManagerError(
"Cannot connect via public DID that has no associated services"
)
didcomm_services = sorted(
[
service
for service in did_doc.service
if isinstance(service, DIDCommService)
],
key=lambda service: service.priority,
)
if not didcomm_services:
raise DIDXManagerError(
"Cannot connect via public DID that has no associated DIDComm services"
)
first_didcomm_service, *_ = didcomm_services
return first_didcomm_service.id