"""Class to provide some common utilities.
For Connection, DIDExchange and OutOfBand Manager.
"""
import json
import logging
from typing import List, Optional, Sequence, Text, Tuple, Union
import pydid
from base58 import b58decode
from did_peer_2 import KeySpec, generate
from did_peer_4 import encode, long_to_short
from did_peer_4.input_doc import KeySpec as KeySpec_DP4
from did_peer_4.input_doc import input_doc_from_keys_and_services
from pydid import (
BaseDIDDocument as ResolvedDocument,
)
from pydid import (
DIDCommService,
VerificationMethod,
)
from pydid.verification_method import (
Ed25519VerificationKey2018,
Ed25519VerificationKey2020,
JsonWebKey2020,
Multikey,
)
from ..cache.base import BaseCache
from ..config.base import InjectionError
from ..core.error import BaseError
from ..core.profile import Profile
from ..did.did_key import DIDKey
from ..multitenant.base import BaseMultitenantManager
from ..protocols.connections.v1_0.message_types import ARIES_PROTOCOL as CONN_PROTO
from ..protocols.connections.v1_0.messages.connection_invitation import (
ConnectionInvitation,
)
from ..protocols.coordinate_mediation.v1_0.models.mediation_record import (
MediationRecord,
)
from ..protocols.coordinate_mediation.v1_0.route_manager import RouteManager
from ..protocols.discovery.v2_0.manager import V20DiscoveryMgr
from ..protocols.out_of_band.v1_0.messages.invitation import InvitationMessage
from ..resolver.base import ResolverError
from ..resolver.did_resolver import DIDResolver
from ..storage.base import BaseStorage
from ..storage.error import StorageDuplicateError, StorageError, StorageNotFoundError
from ..storage.record import StorageRecord
from ..transport.inbound.receipt import MessageReceipt
from ..utils.multiformats import multibase, multicodec
from ..wallet.base import BaseWallet
from ..wallet.crypto import create_keypair, seed_to_did
from ..wallet.did_info import DIDInfo, KeyInfo
from ..wallet.did_method import PEER2, PEER4, SOV
from ..wallet.error import WalletNotFoundError
from ..wallet.key_type import ED25519
from ..wallet.util import b64_to_bytes, bytes_to_b58
from .models.conn_record import ConnRecord
from .models.connection_target import ConnectionTarget
from .models.diddoc import DIDDoc, PublicKey, PublicKeyType, Service
[docs]class BaseConnectionManagerError(BaseError):
"""BaseConnectionManager error."""
[docs]class BaseConnectionManager:
"""Class to provide utilities regarding connection_targets."""
RECORD_TYPE_DID_DOC = "did_doc"
RECORD_TYPE_DID_KEY = "did_key"
def __init__(self, profile: Profile):
"""Initialize a BaseConnectionManager.
Args:
session: The profile session for this presentation
"""
self._profile = profile
self._route_manager = profile.inject(RouteManager)
self._logger = logging.getLogger(__name__)
@staticmethod
def _key_info_to_multikey(key_info: KeyInfo) -> str:
"""Convert a KeyInfo to a multikey."""
return multibase.encode(
multicodec.wrap("ed25519-pub", b58decode(key_info.verkey)), "base58btc"
)
[docs] async def long_did_peer_4_to_short(self, long_dp4: str) -> DIDInfo:
"""Convert did:peer:4 long format to short format and store in wallet."""
async with self._profile.session() as session:
wallet = session.inject(BaseWallet)
long_dp4_info = await wallet.get_local_did(long_dp4)
short_did_peer_4 = long_to_short(long_dp4)
did_info = DIDInfo(
did=short_did_peer_4,
method=PEER4,
verkey=long_dp4_info.verkey,
metadata={},
key_type=ED25519,
)
async with self._profile.session() as session:
wallet = session.inject(BaseWallet)
await wallet.store_did(did_info)
return did_info.did
[docs] async def create_did_peer_4(
self,
svc_endpoints: Optional[Sequence[str]] = None,
mediation_records: Optional[List[MediationRecord]] = None,
) -> DIDInfo:
"""Create a did:peer:4 DID for a connection.
Args:
svc_endpoints: Custom endpoints for the DID Document
mediation_record: The record for mediation that contains routing_keys and
service endpoint
Returns:
The new `DIDInfo` instance
"""
routing_keys: List[str] = []
if mediation_records:
for mediation_record in mediation_records:
(
mediator_routing_keys,
endpoint,
) = await self._route_manager.routing_info(
self._profile, mediation_record
)
routing_keys = [*routing_keys, *(mediator_routing_keys or [])]
if endpoint:
svc_endpoints = [endpoint]
services = []
for index, endpoint in enumerate(svc_endpoints or []):
services.append(
{
"id": f"#didcomm-{index}",
"type": "did-communication",
"recipientKeys": ["#key-0"],
"routingKeys": routing_keys,
"serviceEndpoint": endpoint,
"priority": index,
}
)
async with self._profile.session() as session:
wallet = session.inject(BaseWallet)
key = await wallet.create_key(ED25519)
key_spec = KeySpec_DP4(multikey=self._key_info_to_multikey(key))
input_doc = input_doc_from_keys_and_services(
keys=[key_spec], services=services
)
did = encode(input_doc)
did_info = DIDInfo(
did=did, method=PEER4, verkey=key.verkey, metadata={}, key_type=ED25519
)
await wallet.store_did(did_info)
return did_info
[docs] async def create_did_peer_2(
self,
svc_endpoints: Optional[Sequence[str]] = None,
mediation_records: Optional[List[MediationRecord]] = None,
) -> DIDInfo:
"""Create a did:peer:2 DID for a connection.
Args:
svc_endpoints: Custom endpoints for the DID Document
mediation_record: The record for mediation that contains routing_keys and
service endpoint
Returns:
The new `DIDInfo` instance
"""
routing_keys: List[str] = []
if mediation_records:
for mediation_record in mediation_records:
(
mediator_routing_keys,
endpoint,
) = await self._route_manager.routing_info(
self._profile, mediation_record
)
routing_keys = [*routing_keys, *(mediator_routing_keys or [])]
if endpoint:
svc_endpoints = [endpoint]
services = []
for index, endpoint in enumerate(svc_endpoints or []):
services.append(
{
"id": f"#didcomm-{index}",
"type": "did-communication",
"priority": index,
"recipientKeys": ["#key-1"],
"routingKeys": routing_keys,
"serviceEndpoint": endpoint,
}
)
async with self._profile.session() as session:
wallet = session.inject(BaseWallet)
key = await wallet.create_key(ED25519)
did = generate(
[KeySpec.verification(self._key_info_to_multikey(key))], services
)
did_info = DIDInfo(
did=did, method=PEER2, verkey=key.verkey, metadata={}, key_type=ED25519
)
await wallet.store_did(did_info)
return did_info
[docs] async def create_did_document(
self,
did_info: DIDInfo,
svc_endpoints: Optional[Sequence[str]] = None,
mediation_records: Optional[List[MediationRecord]] = None,
) -> DIDDoc:
"""Create our DID doc for a given DID.
Args:
did_info: The DID information (DID and verkey) used in the connection
svc_endpoints: Custom endpoints for the DID Document
mediation_record: The record for mediation that contains routing_keys and
service endpoint
Returns:
The prepared `DIDDoc` instance
"""
did_doc = DIDDoc(did=did_info.did)
did_controller = did_info.did
did_key = did_info.verkey
pk = PublicKey(
did_info.did,
"1",
did_key,
PublicKeyType.ED25519_SIG_2018,
did_controller,
True,
)
did_doc.set(pk)
routing_keys: List[str] = []
if mediation_records:
for mediation_record in mediation_records:
(
mediator_routing_keys,
endpoint,
) = await self._route_manager.routing_info(
self._profile, mediation_record
)
routing_keys = [*routing_keys, *(mediator_routing_keys or [])]
if endpoint:
svc_endpoints = [endpoint]
for endpoint_index, svc_endpoint in enumerate(svc_endpoints or []):
endpoint_ident = "indy" if endpoint_index == 0 else f"indy{endpoint_index}"
service = Service(
did_info.did,
endpoint_ident,
"IndyAgent",
[pk],
routing_keys,
svc_endpoint,
)
did_doc.set(service)
return did_doc
[docs] async def store_did_document(self, value: Union[DIDDoc, dict]):
"""Store a DID document.
Args:
value: The `DIDDoc` instance to persist
"""
if isinstance(value, DIDDoc):
did = value.did
doc = value.to_json()
else:
did = value["id"]
doc = json.dumps(value)
# Special case: we used to store did:sov dids as unqualified.
# For backwards compatibility, we'll strip off the prefix.
if did.startswith("did:sov:"):
did = did[8:]
self._logger.debug("Storing DID document for %s: %s", did, doc)
try:
stored_doc, record = await self.fetch_did_document(did)
except StorageNotFoundError:
record = StorageRecord(self.RECORD_TYPE_DID_DOC, doc, {"did": did})
async with self._profile.session() as session:
storage: BaseStorage = session.inject(BaseStorage)
await storage.add_record(record)
else:
async with self._profile.session() as session:
storage: BaseStorage = session.inject(BaseStorage)
await storage.update_record(record, doc, {"did": did})
await self.remove_keys_for_did(did)
await self.record_did(did)
[docs] async def add_key_for_did(self, did: str, key: str):
"""Store a verkey for lookup against a DID.
Args:
did: The DID to associate with this key
key: The verkey to be added
"""
record = StorageRecord(self.RECORD_TYPE_DID_KEY, key, {"did": did, "key": key})
async with self._profile.session() as session:
storage: BaseStorage = session.inject(BaseStorage)
try:
await storage.find_record(self.RECORD_TYPE_DID_KEY, {"key": key})
except StorageNotFoundError:
await storage.add_record(record)
except StorageDuplicateError:
self._logger.warning(
"Key already associated with DID: %s; this is likely caused by "
"routing keys being erroneously stored in the past",
key,
)
[docs] async def find_did_for_key(self, key: str) -> str:
"""Find the DID previously associated with a key.
Args:
key: The verkey to look up
"""
async with self._profile.session() as session:
storage: BaseStorage = session.inject(BaseStorage)
record = await storage.find_record(self.RECORD_TYPE_DID_KEY, {"key": key})
return record.tags["did"]
[docs] async def remove_keys_for_did(self, did: str):
"""Remove all keys associated with a DID.
Args:
did: The DID for which to remove keys
"""
async with self._profile.session() as session:
storage: BaseStorage = session.inject(BaseStorage)
await storage.delete_all_records(self.RECORD_TYPE_DID_KEY, {"did": did})
[docs] async def resolve_didcomm_services(
self, did: str, service_accept: Optional[Sequence[Text]] = None
) -> Tuple[ResolvedDocument, List[DIDCommService]]:
"""Resolve a DIDComm services for a given DID."""
if not did.startswith("did:"):
# DID is bare indy "nym"
# prefix with did:sov: for backwards compatibility
did = f"did:sov:{did}"
resolver = self._profile.inject(DIDResolver)
try:
doc_dict: dict = await resolver.resolve(self._profile, did, service_accept)
doc: ResolvedDocument = pydid.deserialize_document(doc_dict, strict=True)
except ResolverError as error:
raise BaseConnectionManagerError(
"Failed to resolve DID services"
) from error
if not doc.service:
raise BaseConnectionManagerError(
"Cannot connect via DID that has no associated services"
)
didcomm_services = sorted(
[service for service in doc.service if isinstance(service, DIDCommService)],
key=lambda service: service.priority,
)
return doc, didcomm_services
[docs] async def verification_methods_for_service(
self, doc: ResolvedDocument, service: DIDCommService
) -> Tuple[List[VerificationMethod], List[VerificationMethod]]:
"""Dereference recipient and routing keys.
Returns verification methods for a DIDComm service to enable extracting
key material.
"""
resolver = self._profile.inject(DIDResolver)
recipient_keys: List[VerificationMethod] = [
await resolver.dereference_verification_method(
self._profile, url, document=doc
)
for url in service.recipient_keys
]
routing_keys: List[VerificationMethod] = [
await resolver.dereference_verification_method(
self._profile, url, document=doc
)
for url in service.routing_keys
]
return recipient_keys, routing_keys
[docs] async def resolve_invitation(
self, did: str, service_accept: Optional[Sequence[Text]] = None
) -> Tuple[str, List[str], List[str]]:
"""Resolve invitation with the DID Resolver.
Args:
did: Document ID to resolve
"""
doc, didcomm_services = await self.resolve_didcomm_services(did, service_accept)
if not didcomm_services:
raise BaseConnectionManagerError(
"Cannot connect via public DID that has no associated DIDComm services"
)
first_didcomm_service, *_ = didcomm_services
endpoint = str(first_didcomm_service.service_endpoint)
recipient_keys, routing_keys = await self.verification_methods_for_service(
doc, first_didcomm_service
)
return (
endpoint,
[
self._extract_key_material_in_base58_format(key)
for key in recipient_keys
],
[self._extract_key_material_in_base58_format(key) for key in routing_keys],
)
[docs] async def record_did(self, did: str):
"""Record DID for later use.
This is required to correlate sender verkeys back to a connection.
"""
doc, didcomm_services = await self.resolve_didcomm_services(did)
for service in didcomm_services:
recips, _ = await self.verification_methods_for_service(doc, service)
for recip in recips:
await self.add_key_for_did(
did, self._extract_key_material_in_base58_format(recip)
)
[docs] async def resolve_connection_targets(
self,
did: str,
sender_verkey: Optional[str] = None,
their_label: Optional[str] = None,
) -> List[ConnectionTarget]:
"""Resolve connection targets for a DID."""
self._logger.debug("Resolving connection targets for DID %s", did)
doc, didcomm_services = await self.resolve_didcomm_services(did)
self._logger.debug("Resolved DID document: %s", doc)
self._logger.debug("Resolved DIDComm services: %s", didcomm_services)
targets = []
for service in didcomm_services:
try:
recips, routing = await self.verification_methods_for_service(
doc, service
)
endpoint = str(service.service_endpoint)
targets.append(
ConnectionTarget(
did=doc.id,
endpoint=endpoint,
label=their_label,
recipient_keys=[
self._extract_key_material_in_base58_format(key)
for key in recips
],
routing_keys=[
self._extract_key_material_in_base58_format(key)
for key in routing
],
sender_key=sender_verkey,
)
)
except ResolverError:
self._logger.exception(
"Failed to resolve service details while determining "
"connection targets; skipping service"
)
continue
return targets
@staticmethod
def _extract_key_material_in_base58_format(method: VerificationMethod) -> str:
if isinstance(method, Ed25519VerificationKey2018):
return method.material
elif isinstance(method, Ed25519VerificationKey2020):
raw_data = multibase.decode(method.material)
if len(raw_data) == 32: # No multicodec prefix
return bytes_to_b58(raw_data)
else:
codec, key = multicodec.unwrap(raw_data)
if codec == multicodec.multicodec("ed25519-pub"):
return bytes_to_b58(key)
else:
raise BaseConnectionManagerError(
f"Key type {type(method).__name__} "
f"with multicodec value {codec} is not supported"
)
elif isinstance(method, JsonWebKey2020):
if method.public_key_jwk.get("kty") == "OKP":
return bytes_to_b58(b64_to_bytes(method.public_key_jwk.get("x"), True))
else:
raise BaseConnectionManagerError(
f"Key type {type(method).__name__} "
f"with kty {method.public_key_jwk.get('kty')} is not supported"
)
elif isinstance(method, Multikey):
codec, key = multicodec.unwrap(multibase.decode(method.material))
if codec != multicodec.multicodec("ed25519-pub"):
raise BaseConnectionManagerError(
"Expected ed25519 multicodec, got: %s", codec
)
return bytes_to_b58(key)
else:
raise BaseConnectionManagerError(
f"Key type {type(method).__name__} is not supported"
)
async def _fetch_connection_targets_for_invitation(
self,
connection: ConnRecord,
invitation: Union[ConnectionInvitation, InvitationMessage],
sender_verkey: str,
) -> Sequence[ConnectionTarget]:
"""Get a list of connection targets for an invitation.
This will extract target info for either a connection or OOB invitation.
Args:
connection: ConnRecord the invitation is associated with.
invitation: Connection or OOB invitation retrieved from conn record.
Returns:
A list of `ConnectionTarget` objects
"""
if isinstance(invitation, ConnectionInvitation):
# conn protocol invitation
if invitation.did:
did = invitation.did
(
endpoint,
recipient_keys,
routing_keys,
) = await self.resolve_invitation(did)
else:
endpoint = invitation.endpoint
recipient_keys = invitation.recipient_keys
routing_keys = invitation.routing_keys
else:
# out-of-band invitation
oob_service_item = invitation.services[0]
if isinstance(oob_service_item, str):
(
endpoint,
recipient_keys,
routing_keys,
) = await self.resolve_invitation(oob_service_item)
else:
endpoint = oob_service_item.service_endpoint
recipient_keys = [
DIDKey.from_did(k).public_key_b58
for k in oob_service_item.recipient_keys
]
routing_keys = [
DIDKey.from_did(k).public_key_b58
for k in oob_service_item.routing_keys
]
return [
ConnectionTarget(
did=connection.their_did,
endpoint=endpoint,
label=invitation.label if invitation else None,
recipient_keys=recipient_keys,
routing_keys=routing_keys,
sender_key=sender_verkey,
)
]
async def _fetch_targets_for_connection_in_progress(
self, connection: ConnRecord, sender_verkey: str
) -> Sequence[ConnectionTarget]:
"""Get a list of connection targets from an incomplete `ConnRecord`.
This covers retrieving targets for connections that are still in the
process of bootstrapping. This includes connections that are in states
invitation-received or request-received.
Args:
connection: The connection record (with associated `DIDDoc`)
used to generate the connection target
Returns:
A list of `ConnectionTarget` objects
"""
if (
connection.invitation_msg_id
or connection.invitation_key
or not connection.their_did
): # invitation received or sending request to invitation
async with self._profile.session() as session:
invitation = await connection.retrieve_invitation(session)
targets = await self._fetch_connection_targets_for_invitation(
connection,
invitation,
sender_verkey,
)
else: # sending implicit request
# request is implicit; did isn't set if we've received an
# invitation, only the invitation key
(
endpoint,
recipient_keys,
routing_keys,
) = await self.resolve_invitation(connection.their_did)
targets = [
ConnectionTarget(
did=connection.their_did,
endpoint=endpoint,
label=None,
recipient_keys=recipient_keys,
routing_keys=routing_keys,
sender_key=sender_verkey,
)
]
return targets
[docs] async def fetch_connection_targets(
self, connection: ConnRecord
) -> Sequence[ConnectionTarget]:
"""Get a list of connection targets from a `ConnRecord`.
Args:
connection: The connection record (with associated `DIDDoc`)
used to generate the connection target
"""
if not connection.my_did:
self._logger.debug("No local DID associated with connection")
return []
async with self._profile.session() as session:
wallet = session.inject(BaseWallet)
my_info = await wallet.get_local_did(connection.my_did)
if (
ConnRecord.State.get(connection.state)
in (ConnRecord.State.INVITATION, ConnRecord.State.REQUEST)
and ConnRecord.Role.get(connection.their_role) is ConnRecord.Role.RESPONDER
): # invitation received or sending request
return await self._fetch_targets_for_connection_in_progress(
connection, my_info.verkey
)
if not connection.their_did:
self._logger.debug("No target DID associated with connection")
return []
return await self.resolve_connection_targets(
connection.their_did, my_info.verkey, connection.their_label
)
[docs] async def get_connection_targets(
self,
*,
connection_id: Optional[str] = None,
connection: Optional[ConnRecord] = None,
):
"""Create a connection target from a `ConnRecord`.
Args:
connection_id: The connection ID to search for
connection: The connection record itself, if already available
"""
if connection_id is None and connection is None:
raise ValueError("Must supply either connection_id or connection")
if not connection_id:
assert connection
connection_id = connection.connection_id
cache = self._profile.inject_or(BaseCache)
cache_key = f"connection_target::{connection_id}"
if cache:
async with cache.acquire(cache_key) as entry:
if entry.result:
self._logger.debug("Connection targets retrieved from cache")
targets = [
ConnectionTarget.deserialize(row) for row in entry.result
]
else:
if not connection:
async with self._profile.session() as session:
connection = await ConnRecord.retrieve_by_id(
session, connection_id
)
targets = await self.fetch_connection_targets(connection)
if connection.state == ConnRecord.State.COMPLETED.rfc160:
# Only set cache if connection has reached completed state
# Otherwise, a replica that participated early in exchange
# may have bad data set in cache.
self._logger.debug("Caching connection targets")
await entry.set_result(
[row.serialize() for row in targets], 3600
)
else:
self._logger.debug(
"Not caching connection targets for connection in "
f"state ({connection.state})"
)
else:
if not connection:
async with self._profile.session() as session:
connection = await ConnRecord.retrieve_by_id(session, connection_id)
targets = await self.fetch_connection_targets(connection)
return targets
[docs] def diddoc_connection_targets(
self,
doc: Optional[Union[DIDDoc, dict]],
sender_verkey: str,
their_label: Optional[str] = None,
) -> Sequence[ConnectionTarget]:
"""Get a list of connection targets from a DID Document.
Args:
doc: The DID Document to create the target from
sender_verkey: The verkey we are using
their_label: The connection label they are using
"""
if isinstance(doc, dict):
doc = DIDDoc.deserialize(doc)
if not doc:
raise BaseConnectionManagerError("No DIDDoc provided for connection target")
if not doc.did:
raise BaseConnectionManagerError("DIDDoc has no DID")
if not doc.service:
raise BaseConnectionManagerError("No services defined by DIDDoc")
targets = []
for service in doc.service.values():
if service.recip_keys:
targets.append(
ConnectionTarget(
did=doc.did,
endpoint=service.endpoint,
label=their_label,
recipient_keys=[
key.value for key in (service.recip_keys or ())
],
routing_keys=[
key.value for key in (service.routing_keys or ())
],
sender_key=sender_verkey,
)
)
return targets
[docs] async def fetch_did_document(self, did: str) -> Tuple[dict, StorageRecord]:
"""Retrieve a DID Document for a given DID.
Args:
did: The DID to search for
"""
async with self._profile.session() as session:
storage = session.inject(BaseStorage)
record = await storage.find_record(self.RECORD_TYPE_DID_DOC, {"did": did})
return json.loads(record.value), record
[docs] async def find_connection(
self,
their_did: str,
my_did: Optional[str] = None,
my_verkey: Optional[str] = None,
auto_complete=False,
) -> Optional[ConnRecord]:
"""Look up existing connection information for a sender verkey.
Args:
their_did: Their DID
my_did: My DID
my_verkey: My verkey
auto_complete: Should this connection automatically be promoted to active
Returns:
The located `ConnRecord`, if any
"""
connection = None
if their_did:
try:
async with self._profile.session() as session:
connection = await ConnRecord.retrieve_by_did(
session, their_did, my_did
)
except StorageNotFoundError:
pass
if (
connection
and ConnRecord.State.get(connection.state) is ConnRecord.State.RESPONSE
and auto_complete
):
connection.state = ConnRecord.State.COMPLETED.rfc160
async with self._profile.session() as session:
await connection.save(session, reason="Connection promoted to active")
if session.settings.get("auto_disclose_features"):
discovery_mgr = V20DiscoveryMgr(self._profile)
await discovery_mgr.proactive_disclose_features(
connection_id=connection.connection_id
)
if not connection and my_verkey:
try:
async with self._profile.session() as session:
connection = await ConnRecord.retrieve_by_invitation_key(
session,
my_verkey,
their_role=ConnRecord.Role.REQUESTER.rfc160,
)
except StorageError:
pass
return connection
[docs] async def find_inbound_connection(
self, receipt: MessageReceipt
) -> Optional[ConnRecord]:
"""Deserialize an incoming message and further populate the request context.
Args:
receipt: The message receipt
Returns:
The `ConnRecord` associated with the expanded message, if any
"""
cache_key = None
connection = None
resolved = False
if receipt.sender_verkey and receipt.recipient_verkey:
cache_key = (
f"connection_by_verkey::{receipt.sender_verkey}"
f"::{receipt.recipient_verkey}"
)
cache = self._profile.inject_or(BaseCache)
if cache:
async with cache.acquire(cache_key) as entry:
if entry.result:
cached = entry.result
receipt.sender_did = cached["sender_did"]
receipt.recipient_did_public = cached["recipient_did_public"]
receipt.recipient_did = cached["recipient_did"]
async with self._profile.session() as session:
connection = await ConnRecord.retrieve_by_id(
session, cached["id"]
)
else:
connection = await self.resolve_inbound_connection(receipt)
if connection:
cache_val = {
"id": connection.connection_id,
"sender_did": receipt.sender_did,
"recipient_did": receipt.recipient_did,
"recipient_did_public": receipt.recipient_did_public,
}
await entry.set_result(cache_val, 3600)
resolved = True
if not connection and not resolved:
connection = await self.resolve_inbound_connection(receipt)
return connection
[docs] async def resolve_inbound_connection(
self, receipt: MessageReceipt
) -> Optional[ConnRecord]:
"""Populate the receipt DID information and find the related `ConnRecord`.
Args:
receipt: The message receipt
Returns:
The `ConnRecord` associated with the expanded message, if any
"""
if receipt.sender_verkey:
try:
receipt.sender_did = await self.find_did_for_key(receipt.sender_verkey)
except StorageNotFoundError:
self._logger.warning(
"No corresponding DID found for sender verkey: %s",
receipt.sender_verkey,
)
if receipt.recipient_verkey:
try:
async with self._profile.session() as session:
wallet = session.inject(BaseWallet)
my_info = await wallet.get_local_did_for_verkey(
receipt.recipient_verkey
)
receipt.recipient_did = my_info.did
if "posted" in my_info.metadata and my_info.metadata["posted"] is True:
receipt.recipient_did_public = True
except InjectionError:
self._logger.warning(
"Cannot resolve recipient verkey, no wallet defined by "
"context: %s",
receipt.recipient_verkey,
)
except WalletNotFoundError:
self._logger.warning(
"No corresponding DID found for recipient verkey: %s",
receipt.recipient_verkey,
)
return await self.find_connection(
receipt.sender_did, receipt.recipient_did, receipt.recipient_verkey, True
)
[docs] async def get_endpoints(self, conn_id: str) -> Tuple[Optional[str], Optional[str]]:
"""Get connection endpoints.
Args:
conn_id: connection identifier
Returns:
Their endpoint for this connection
"""
async with self._profile.session() as session:
connection = await ConnRecord.retrieve_by_id(session, conn_id)
wallet = session.inject(BaseWallet)
my_did_info = await wallet.get_local_did(connection.my_did)
my_endpoint = my_did_info.metadata.get(
"endpoint",
self._profile.settings.get("default_endpoint"),
)
conn_targets = await self.get_connection_targets(
connection_id=connection.connection_id,
connection=connection,
)
return (my_endpoint, conn_targets[0].endpoint)
[docs] async def create_static_connection(
self,
my_did: Optional[str] = None,
my_seed: Optional[str] = None,
their_did: Optional[str] = None,
their_seed: Optional[str] = None,
their_verkey: Optional[str] = None,
their_endpoint: Optional[str] = None,
their_label: Optional[str] = None,
alias: Optional[str] = None,
mediation_id: Optional[str] = None,
) -> Tuple[DIDInfo, DIDInfo, ConnRecord]:
"""Register a new static connection (for use by the test suite).
Args:
my_did: override the DID used in the connection
my_seed: provide a seed used to generate our DID and keys
their_did: provide the DID used by the other party
their_seed: provide a seed used to generate their DID and keys
their_verkey: provide the verkey used by the other party
their_endpoint: their URL endpoint for routing messages
alias: an alias for this connection record
Returns:
Tuple: my DIDInfo, their DIDInfo, new `ConnRecord` instance
"""
async with self._profile.session() as session:
wallet = session.inject(BaseWallet)
# seed and DID optional
my_info = await wallet.create_local_did(SOV, ED25519, my_seed, my_did)
# must provide their DID and verkey if the seed is not known
if (not their_did or not their_verkey) and not their_seed:
raise BaseConnectionManagerError(
"Either a verkey or seed must be provided for the other party"
)
if not their_did:
their_did = seed_to_did(their_seed)
if not their_verkey:
their_verkey_bin, _ = create_keypair(ED25519, their_seed.encode())
their_verkey = bytes_to_b58(their_verkey_bin)
their_info = DIDInfo(their_did, their_verkey, {}, method=SOV, key_type=ED25519)
# Create connection record
connection = ConnRecord(
invitation_mode=ConnRecord.INVITATION_MODE_STATIC,
my_did=my_info.did,
their_did=their_info.did,
their_label=their_label,
state=ConnRecord.State.COMPLETED.rfc160,
alias=alias,
connection_protocol=CONN_PROTO,
)
async with self._profile.session() as session:
await connection.save(session, reason="Created new static connection")
if session.settings.get("auto_disclose_features"):
discovery_mgr = V20DiscoveryMgr(self._profile)
await discovery_mgr.proactive_disclose_features(
connection_id=connection.connection_id
)
# Routing
mediation_record = await self._route_manager.mediation_record_if_id(
self._profile, mediation_id, or_default=True
)
multitenant_mgr = self._profile.inject_or(BaseMultitenantManager)
wallet_id = self._profile.settings.get("wallet.id")
base_mediation_record = None
if multitenant_mgr and wallet_id:
base_mediation_record = await multitenant_mgr.get_default_mediator()
await self._route_manager.route_static(
self._profile, connection, mediation_record
)
# Synthesize their DID doc
did_doc = await self.create_did_document(
their_info,
[their_endpoint or ""],
mediation_records=list(
filter(None, [base_mediation_record, mediation_record])
),
)
await self.store_did_document(did_doc)
return my_info, their_info, connection