"""
Class to provide some common utilities.
For Connection, DIDExchange and OutOfBand Manager.
"""
import logging
from typing import List, Sequence, Tuple
from pydid import (
BaseDIDDocument as ResolvedDocument,
DIDCommService,
VerificationMethod,
)
import pydid
from pydid.verification_method import Ed25519VerificationKey2018
from ..core.error import BaseError
from ..core.profile import ProfileSession
from ..did.did_key import DIDKey
from ..protocols.connections.v1_0.messages.connection_invitation import (
ConnectionInvitation,
)
from ..protocols.coordinate_mediation.v1_0.models.mediation_record import (
MediationRecord,
)
from ..resolver.base import ResolverError
from ..resolver.did_resolver import DIDResolver
from ..storage.base import BaseStorage
from ..storage.error import StorageNotFoundError
from ..storage.record import StorageRecord
from ..wallet.base import BaseWallet
from ..wallet.did_info import DIDInfo
from .models.conn_record import ConnRecord
from .models.connection_target import ConnectionTarget
from .models.diddoc import DIDDoc, PublicKey, PublicKeyType, Service
[docs]class BaseConnectionManagerError(BaseError):
"""BaseConnectionManager error."""
[docs]class BaseConnectionManager:
"""Class to provide utilities regarding connection_targets."""
RECORD_TYPE_DID_DOC = "did_doc"
RECORD_TYPE_DID_KEY = "did_key"
SUPPORTED_KEY_TYPES = (Ed25519VerificationKey2018,)
def __init__(self, session: ProfileSession):
"""
Initialize a BaseConnectionManager.
Args:
session: The profile session for this presentation
"""
self._logger = logging.getLogger(__name__)
self._session = session
[docs] async def create_did_document(
self,
did_info: DIDInfo,
inbound_connection_id: str = None,
svc_endpoints: Sequence[str] = None,
mediation_records: List[MediationRecord] = None,
) -> DIDDoc:
"""Create our DID doc for a given DID.
Args:
did_info: The DID information (DID and verkey) used in the connection
inbound_connection_id: The ID of the inbound routing connection to use
svc_endpoints: Custom endpoints for the DID Document
mediation_record: The record for mediation that contains routing_keys and
service endpoint
Returns:
The prepared `DIDDoc` instance
"""
did_doc = DIDDoc(did=did_info.did)
did_controller = did_info.did
did_key = did_info.verkey
pk = PublicKey(
did_info.did,
"1",
did_key,
PublicKeyType.ED25519_SIG_2018,
did_controller,
True,
)
did_doc.set(pk)
router_id = inbound_connection_id
routing_keys = []
router_idx = 1
while router_id:
# look up routing connection information
router = await ConnRecord.retrieve_by_id(self._session, router_id)
if ConnRecord.State.get(router.state) != ConnRecord.State.COMPLETED:
raise BaseConnectionManagerError(
f"Router connection not completed: {router_id}"
)
routing_doc, _ = await self.fetch_did_document(router.their_did)
if not routing_doc.service:
raise BaseConnectionManagerError(
f"No services defined by routing DIDDoc: {router_id}"
)
for service in routing_doc.service.values():
if not service.endpoint:
raise BaseConnectionManagerError(
"Routing DIDDoc service has no service endpoint"
)
if not service.recip_keys:
raise BaseConnectionManagerError(
"Routing DIDDoc service has no recipient key(s)"
)
rk = PublicKey(
did_info.did,
f"routing-{router_idx}",
service.recip_keys[0].value,
PublicKeyType.ED25519_SIG_2018,
did_controller,
True,
)
routing_keys.append(rk)
svc_endpoints = [service.endpoint]
break
router_id = router.inbound_connection_id
if mediation_records:
for mediation_record in mediation_records:
mediator_routing_keys = [
PublicKey(
did_info.did,
f"routing-{idx}",
key,
PublicKeyType.ED25519_SIG_2018,
did_controller, # TODO: get correct controller did_info
True, # TODO: should this be true?
)
for idx, key in enumerate(mediation_record.routing_keys)
]
routing_keys = [*routing_keys, *mediator_routing_keys]
svc_endpoints = [mediation_record.endpoint]
for (endpoint_index, svc_endpoint) in enumerate(svc_endpoints or []):
endpoint_ident = "indy" if endpoint_index == 0 else f"indy{endpoint_index}"
service = Service(
did_info.did,
endpoint_ident,
"IndyAgent",
[pk],
routing_keys,
svc_endpoint,
)
did_doc.set(service)
return did_doc
[docs] async def store_did_document(self, did_doc: DIDDoc):
"""Store a DID document.
Args:
did_doc: The `DIDDoc` instance to persist
"""
assert did_doc.did
storage: BaseStorage = self._session.inject(BaseStorage)
try:
stored_doc, record = await self.fetch_did_document(did_doc.did)
except StorageNotFoundError:
record = StorageRecord(
self.RECORD_TYPE_DID_DOC,
did_doc.to_json(),
{"did": did_doc.did},
)
await storage.add_record(record)
else:
await storage.update_record(record, did_doc.to_json(), {"did": did_doc.did})
await self.remove_keys_for_did(did_doc.did)
for key in did_doc.pubkey.values():
if key.controller == did_doc.did:
await self.add_key_for_did(did_doc.did, key.value)
[docs] async def add_key_for_did(self, did: str, key: str):
"""Store a verkey for lookup against a DID.
Args:
did: The DID to associate with this key
key: The verkey to be added
"""
record = StorageRecord(self.RECORD_TYPE_DID_KEY, key, {"did": did, "key": key})
storage = self._session.inject(BaseStorage)
await storage.add_record(record)
[docs] async def find_did_for_key(self, key: str) -> str:
"""Find the DID previously associated with a key.
Args:
key: The verkey to look up
"""
storage = self._session.inject(BaseStorage)
record = await storage.find_record(self.RECORD_TYPE_DID_KEY, {"key": key})
return record.tags["did"]
[docs] async def remove_keys_for_did(self, did: str):
"""Remove all keys associated with a DID.
Args:
did: The DID for which to remove keys
"""
storage = self._session.inject(BaseStorage)
await storage.delete_all_records(self.RECORD_TYPE_DID_KEY, {"did": did})
[docs] async def resolve_invitation(self, did: str):
"""
Resolve invitation with the DID Resolver.
Args:
did: Document ID to resolve
"""
if not did.startswith("did:"):
# DID is bare indy "nym"
# prefix with did:sov: for backwards compatibility
did = f"did:sov:{did}"
resolver = self._session.inject(DIDResolver)
try:
doc_dict: dict = await resolver.resolve(self._session.profile, did)
doc: ResolvedDocument = pydid.deserialize_document(doc_dict, strict=True)
except ResolverError as error:
raise BaseConnectionManagerError(
"Failed to resolve public DID in invitation"
) from error
if not doc.service:
raise BaseConnectionManagerError(
"Cannot connect via public DID that has no associated services"
)
didcomm_services = sorted(
[service for service in doc.service if isinstance(service, DIDCommService)],
key=lambda service: service.priority,
)
if not didcomm_services:
raise BaseConnectionManagerError(
"Cannot connect via public DID that has no associated DIDComm services"
)
first_didcomm_service, *_ = didcomm_services
endpoint = first_didcomm_service.service_endpoint
recipient_keys: List[VerificationMethod] = [
doc.dereference(url) for url in first_didcomm_service.recipient_keys
]
routing_keys: List[VerificationMethod] = [
doc.dereference(url) for url in first_didcomm_service.routing_keys
]
for key in [*recipient_keys, *routing_keys]:
if not isinstance(key, self.SUPPORTED_KEY_TYPES):
raise BaseConnectionManagerError(
f"Key type {key.type} is not supported"
)
return (
endpoint,
[key.material for key in recipient_keys],
[key.material for key in routing_keys],
)
[docs] async def fetch_connection_targets(
self, connection: ConnRecord
) -> Sequence[ConnectionTarget]:
"""Get a list of connection targets from a `ConnRecord`.
Args:
connection: The connection record (with associated `DIDDoc`)
used to generate the connection target
"""
if not connection.my_did:
self._logger.debug("No local DID associated with connection")
return None
wallet = self._session.inject(BaseWallet)
my_info = await wallet.get_local_did(connection.my_did)
results = None
if (
ConnRecord.State.get(connection.state)
in (ConnRecord.State.INVITATION, ConnRecord.State.REQUEST)
and ConnRecord.Role.get(connection.their_role) is ConnRecord.Role.RESPONDER
):
if (
connection.invitation_msg_id
or connection.invitation_key
or not connection.their_did
):
invitation = await connection.retrieve_invitation(self._session)
if isinstance(
invitation, ConnectionInvitation
): # conn protocol invitation
if invitation.did:
did = invitation.did
(
endpoint,
recipient_keys,
routing_keys,
) = await self.resolve_invitation(did)
else:
endpoint = invitation.endpoint
recipient_keys = invitation.recipient_keys
routing_keys = invitation.routing_keys
else: # out-of-band invitation
oob_service_item = invitation.services[0]
if isinstance(oob_service_item, str):
(
endpoint,
recipient_keys,
routing_keys,
) = await self.resolve_invitation(oob_service_item)
else:
endpoint = oob_service_item.service_endpoint
recipient_keys = [
DIDKey.from_did(k).public_key_b58
for k in oob_service_item.recipient_keys
]
routing_keys = [
DIDKey.from_did(k).public_key_b58
for k in oob_service_item.routing_keys
]
else:
if connection.their_did:
invitation = None
did = connection.their_did
(
endpoint,
recipient_keys,
routing_keys,
) = await self.resolve_invitation(did)
results = [
ConnectionTarget(
did=connection.their_did,
endpoint=endpoint,
label=invitation.label if invitation else None,
recipient_keys=recipient_keys,
routing_keys=routing_keys,
sender_key=my_info.verkey,
)
]
else:
if not connection.their_did:
self._logger.debug("No target DID associated with connection")
return None
did_doc, _ = await self.fetch_did_document(connection.their_did)
results = self.diddoc_connection_targets(
did_doc, my_info.verkey, connection.their_label
)
return results
[docs] def diddoc_connection_targets(
self, doc: DIDDoc, sender_verkey: str, their_label: str = None
) -> Sequence[ConnectionTarget]:
"""Get a list of connection targets from a DID Document.
Args:
doc: The DID Document to create the target from
sender_verkey: The verkey we are using
their_label: The connection label they are using
"""
if not doc:
raise BaseConnectionManagerError("No DIDDoc provided for connection target")
if not doc.did:
raise BaseConnectionManagerError("DIDDoc has no DID")
if not doc.service:
raise BaseConnectionManagerError("No services defined by DIDDoc")
targets = []
for service in doc.service.values():
if service.recip_keys:
targets.append(
ConnectionTarget(
did=doc.did,
endpoint=service.endpoint,
label=their_label,
recipient_keys=[
key.value for key in (service.recip_keys or ())
],
routing_keys=[
key.value for key in (service.routing_keys or ())
],
sender_key=sender_verkey,
)
)
return targets
[docs] async def fetch_did_document(self, did: str) -> Tuple[DIDDoc, StorageRecord]:
"""Retrieve a DID Document for a given DID.
Args:
did: The DID to search for
"""
storage = self._session.inject(BaseStorage)
record = await storage.find_record(self.RECORD_TYPE_DID_DOC, {"did": did})
return DIDDoc.from_json(record.value), record