"""
Class to provide some common utilities.
For Connection, DIDExchange and OutOfBand Manager.
"""
import logging
from typing import Sequence, Tuple, List
from ..core.error import BaseError
from ..core.profile import ProfileSession
from ..ledger.base import BaseLedger
from ..protocols.connections.v1_0.messages.connection_invitation import (
ConnectionInvitation,
)
from ..protocols.coordinate_mediation.v1_0.models.mediation_record import (
MediationRecord,
)
from ..storage.base import BaseStorage
from ..storage.error import StorageNotFoundError
from ..storage.record import StorageRecord
from ..wallet.base import BaseWallet, DIDInfo
from ..wallet.util import did_key_to_naked
from .models.conn_record import ConnRecord
from .models.connection_target import ConnectionTarget
from .models.diddoc import (
DIDDoc,
PublicKey,
PublicKeyType,
Service,
)
[docs]class BaseConnectionManagerError(BaseError):
"""BaseConnectionManager error."""
[docs]class BaseConnectionManager:
"""Class to provide utilities regarding connection_targets."""
RECORD_TYPE_DID_DOC = "did_doc"
RECORD_TYPE_DID_KEY = "did_key"
def __init__(self, session: ProfileSession):
"""
Initialize a BaseConnectionManager.
Args:
session: The profile session for this presentation
"""
self._logger = logging.getLogger(__name__)
self._session = session
[docs] async def create_did_document(
self,
did_info: DIDInfo,
inbound_connection_id: str = None,
svc_endpoints: Sequence[str] = None,
mediation_records: List[MediationRecord] = None,
) -> DIDDoc:
"""Create our DID doc for a given DID.
Args:
did_info: The DID information (DID and verkey) used in the connection
inbound_connection_id: The ID of the inbound routing connection to use
svc_endpoints: Custom endpoints for the DID Document
mediation_record: The record for mediation that contains routing_keys and
service endpoint
Returns:
The prepared `DIDDoc` instance
"""
did_doc = DIDDoc(did=did_info.did)
did_controller = did_info.did
did_key = did_info.verkey
pk = PublicKey(
did_info.did,
"1",
did_key,
PublicKeyType.ED25519_SIG_2018,
did_controller,
True,
)
did_doc.set(pk)
router_id = inbound_connection_id
routing_keys = []
router_idx = 1
while router_id:
# look up routing connection information
router = await ConnRecord.retrieve_by_id(self._session, router_id)
if ConnRecord.State.get(router.state) != ConnRecord.State.COMPLETED:
raise BaseConnectionManagerError(
f"Router connection not completed: {router_id}"
)
routing_doc, _ = await self.fetch_did_document(router.their_did)
if not routing_doc.service:
raise BaseConnectionManagerError(
f"No services defined by routing DIDDoc: {router_id}"
)
for service in routing_doc.service.values():
if not service.endpoint:
raise BaseConnectionManagerError(
"Routing DIDDoc service has no service endpoint"
)
if not service.recip_keys:
raise BaseConnectionManagerError(
"Routing DIDDoc service has no recipient key(s)"
)
rk = PublicKey(
did_info.did,
f"routing-{router_idx}",
service.recip_keys[0].value,
PublicKeyType.ED25519_SIG_2018,
did_controller,
True,
)
routing_keys.append(rk)
svc_endpoints = [service.endpoint]
break
router_id = router.inbound_connection_id
if mediation_records:
for mediation_record in mediation_records:
mediator_routing_keys = [
PublicKey(
did_info.did,
f"routing-{idx}",
key,
PublicKeyType.ED25519_SIG_2018,
did_controller, # TODO: get correct controller did_info
True, # TODO: should this be true?
)
for idx, key in enumerate(mediation_record.routing_keys)
]
routing_keys = [*routing_keys, *mediator_routing_keys]
svc_endpoints = [mediation_record.endpoint]
for (endpoint_index, svc_endpoint) in enumerate(svc_endpoints or []):
endpoint_ident = "indy" if endpoint_index == 0 else f"indy{endpoint_index}"
service = Service(
did_info.did,
endpoint_ident,
"IndyAgent",
[pk],
routing_keys,
svc_endpoint,
)
did_doc.set(service)
return did_doc
[docs] async def store_did_document(self, did_doc: DIDDoc):
"""Store a DID document.
Args:
did_doc: The `DIDDoc` instance to persist
"""
assert did_doc.did
storage: BaseStorage = self._session.inject(BaseStorage)
try:
stored_doc, record = await self.fetch_did_document(did_doc.did)
except StorageNotFoundError:
record = StorageRecord(
self.RECORD_TYPE_DID_DOC,
did_doc.to_json(),
{"did": did_doc.did},
)
await storage.add_record(record)
else:
await storage.update_record(record, did_doc.to_json(), {"did": did_doc.did})
await self.remove_keys_for_did(did_doc.did)
for key in did_doc.pubkey.values():
if key.controller == did_doc.did:
await self.add_key_for_did(did_doc.did, key.value)
[docs] async def add_key_for_did(self, did: str, key: str):
"""Store a verkey for lookup against a DID.
Args:
did: The DID to associate with this key
key: The verkey to be added
"""
record = StorageRecord(self.RECORD_TYPE_DID_KEY, key, {"did": did, "key": key})
storage = self._session.inject(BaseStorage)
await storage.add_record(record)
[docs] async def find_did_for_key(self, key: str) -> str:
"""Find the DID previously associated with a key.
Args:
key: The verkey to look up
"""
storage = self._session.inject(BaseStorage)
record = await storage.find_record(self.RECORD_TYPE_DID_KEY, {"key": key})
return record.tags["did"]
[docs] async def remove_keys_for_did(self, did: str):
"""Remove all keys associated with a DID.
Args:
did: The DID for which to remove keys
"""
storage = self._session.inject(BaseStorage)
await storage.delete_all_records(self.RECORD_TYPE_DID_KEY, {"did": did})
async def _get_from_ledger(self, public_did: str) -> Tuple[str, Sequence[str]]:
"""Get endpoint and recipient keys for public DID from ledger."""
ledger = self._session.inject(BaseLedger, required=False)
if not ledger:
raise BaseConnectionManagerError(
f"Cannot resolve DID {public_did} without ledger instance"
)
async with ledger:
endpoint = await ledger.get_endpoint_for_did(public_did)
recipient_keys = [await ledger.get_key_for_did(public_did)]
return (endpoint, recipient_keys)
[docs] async def fetch_connection_targets(
self, connection: ConnRecord
) -> Sequence[ConnectionTarget]:
"""Get a list of connection targets from a `ConnRecord`.
Args:
connection: The connection record (with associated `DIDDoc`)
used to generate the connection target
"""
if not connection.my_did:
self._logger.debug("No local DID associated with connection")
return None
wallet = self._session.inject(BaseWallet)
my_info = await wallet.get_local_did(connection.my_did)
results = None
if (
ConnRecord.State.get(connection.state)
in (ConnRecord.State.INVITATION, ConnRecord.State.REQUEST)
and ConnRecord.Role.get(connection.their_role) is ConnRecord.Role.RESPONDER
):
invitation = await connection.retrieve_invitation(self._session)
if isinstance(invitation, ConnectionInvitation): # conn protocol invitation
if invitation.did:
# populate recipient keys, endpoint from ledger
(endpoint, recipient_keys) = await self._get_from_ledger(
invitation.did
)
routing_keys = []
else:
endpoint = invitation.endpoint
recipient_keys = invitation.recipient_keys
routing_keys = invitation.routing_keys
else: # out-of-band invitation
if invitation.service_dids:
# populate recipient keys, endpoint from ledger
(endpoint, recipient_keys) = await self._get_from_ledger(
invitation.service_dids[0]
)
routing_keys = []
else:
endpoint = invitation.service_blocks[0].service_endpoint
recipient_keys = [
did_key_to_naked(k)
for k in invitation.service_blocks[0].recipient_keys
]
routing_keys = [
did_key_to_naked(k)
for k in invitation.service_blocks[0].routing_keys
]
results = [
ConnectionTarget(
did=connection.their_did,
endpoint=endpoint,
label=invitation.label,
recipient_keys=recipient_keys,
routing_keys=routing_keys,
sender_key=my_info.verkey,
)
]
else:
if not connection.their_did:
self._logger.debug("No target DID associated with connection")
return None
did_doc, _ = await self.fetch_did_document(connection.their_did)
results = self.diddoc_connection_targets(
did_doc, my_info.verkey, connection.their_label
)
return results
[docs] def diddoc_connection_targets(
self, doc: DIDDoc, sender_verkey: str, their_label: str = None
) -> Sequence[ConnectionTarget]:
"""Get a list of connection targets from a DID Document.
Args:
doc: The DID Document to create the target from
sender_verkey: The verkey we are using
their_label: The connection label they are using
"""
if not doc:
raise BaseConnectionManagerError("No DIDDoc provided for connection target")
if not doc.did:
raise BaseConnectionManagerError("DIDDoc has no DID")
if not doc.service:
raise BaseConnectionManagerError("No services defined by DIDDoc")
targets = []
for service in doc.service.values():
if service.recip_keys:
targets.append(
ConnectionTarget(
did=doc.did,
endpoint=service.endpoint,
label=their_label,
recipient_keys=[
key.value for key in (service.recip_keys or ())
],
routing_keys=[
key.value for key in (service.routing_keys or ())
],
sender_key=sender_verkey,
)
)
return targets
[docs] async def fetch_did_document(self, did: str) -> Tuple[DIDDoc, StorageRecord]:
"""Retrieve a DID Document for a given DID.
Args:
did: The DID to search for
"""
storage = self._session.inject(BaseStorage)
record = await storage.find_record(self.RECORD_TYPE_DID_DOC, {"did": did})
return DIDDoc.from_json(record.value), record