Source code for aries_cloudagent.ledger.multiple_ledger.indy_vdr_manager
"""Multiple IndyVdrLedger Manager."""
import asyncio
import concurrent.futures
import logging
import json
from collections import OrderedDict
from typing import Optional, Tuple, Mapping
from ...cache.base import BaseCache
from ...core.profile import Profile
from ...ledger.error import LedgerError
from ...wallet.crypto import did_is_self_certified
from ..indy_vdr import IndyVdrLedger
from ..merkel_validation.domain_txn_handler import (
prepare_for_state_read,
get_proof_nodes,
)
from ..merkel_validation.trie import SubTrie
from .base_manager import BaseMultipleLedgerManager, MultipleLedgerManagerError
LOGGER = logging.getLogger(__name__)
[docs]class MultiIndyVDRLedgerManager(BaseMultipleLedgerManager):
"""Multiple Indy VDR Ledger Manager."""
def __init__(
self,
profile: Profile,
production_ledgers: OrderedDict = OrderedDict(),
non_production_ledgers: OrderedDict = OrderedDict(),
write_ledger_info: Tuple[str, IndyVdrLedger] = None,
cache_ttl: int = None,
):
"""Initialize MultiIndyLedgerManager.
Args:
profile: The base profile for this manager
production_ledgers: production IndyVDRLedger mapping
non_production_ledgers: non_production IndyVDRLedger mapping
cache_ttl: Time in sec to persist did_ledger_id_resolver cache keys
"""
self.profile = profile
self.production_ledgers = production_ledgers
self.non_production_ledgers = non_production_ledgers
self.write_ledger_info = write_ledger_info
self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=5)
self.cache_ttl = cache_ttl
[docs] async def get_write_ledger(self) -> Optional[Tuple[str, IndyVdrLedger]]:
"""Return the write IndyVdrLedger instance."""
return self.write_ledger_info
[docs] async def get_prod_ledgers(self) -> Mapping:
"""Return production ledgers mapping."""
return self.production_ledgers
[docs] async def get_nonprod_ledgers(self) -> Mapping:
"""Return non_production ledgers mapping."""
return self.non_production_ledgers
async def _get_ledger_by_did(
self,
ledger_id: str,
did: str,
) -> Optional[Tuple[str, IndyVdrLedger, bool]]:
"""Build and submit GET_NYM request and process response.
Successful response return tuple with ledger_id, IndyVdrLedger instance
and is_self_certified bool flag. Unsuccessful response return None.
Args:
ledger_id: provided ledger_id to retrieve IndyVdrLedger instance
from production_ledgers or non_production_ledgers
did: provided DID
Return:
(str, IndyVdrLedger, bool) or None
"""
try:
indy_vdr_ledger = None
if ledger_id in self.production_ledgers:
indy_vdr_ledger = self.production_ledgers.get(ledger_id)
else:
indy_vdr_ledger = self.non_production_ledgers.get(ledger_id)
async with indy_vdr_ledger:
request = await indy_vdr_ledger.build_and_return_get_nym_request(
None, did
)
response_json = await asyncio.wait_for(
indy_vdr_ledger.submit_get_nym_request(request), 10
)
if isinstance(response_json, dict):
response = response_json
else:
response = json.loads(response_json)
if "result" in response.keys():
data = response.get("result", {}).get("data")
else:
data = response.get("data")
if not data:
LOGGER.warning(f"Did {did} not posted to ledger {ledger_id}")
return None
if isinstance(data, str):
data = json.loads(data)
if not await SubTrie.verify_spv_proof(
expected_value=prepare_for_state_read(response),
proof_nodes=get_proof_nodes(response),
):
LOGGER.warning(
f"State Proof validation failed for Did {did} "
f"and ledger {ledger_id}"
)
return None
if did_is_self_certified(did, data.get("verkey")):
return (ledger_id, indy_vdr_ledger, True)
return (ledger_id, indy_vdr_ledger, False)
except asyncio.TimeoutError:
LOGGER.exception(
f"get-nym request timedout for Did {did} and "
f"ledger {ledger_id}, reply not received within 10 sec"
)
return None
except LedgerError as err:
LOGGER.error(
"Exception when building and submitting get-nym request, "
f"for Did {did} and ledger {ledger_id}, {err}"
)
return None
[docs] async def lookup_did_in_configured_ledgers(
self, did: str, cache_did: bool = True
) -> Tuple[str, IndyVdrLedger]:
"""Lookup given DID in configured ledgers in parallel."""
self.cache = self.profile.inject_or(BaseCache)
cache_key = f"did_ledger_id_resolver::{did}"
if bool(cache_did and self.cache and await self.cache.get(cache_key)):
cached_ledger_id = await self.cache.get(cache_key)
if cached_ledger_id in self.production_ledgers:
return (cached_ledger_id, self.production_ledgers.get(cached_ledger_id))
elif cached_ledger_id in self.non_production_ledgers:
return (
cached_ledger_id,
self.non_production_ledgers.get(cached_ledger_id),
)
else:
raise MultipleLedgerManagerError(
f"cached ledger_id {cached_ledger_id} not found in either "
"production_ledgers or non_production_ledgers"
)
applicable_prod_ledgers = {"self_certified": {}, "non_self_certified": {}}
applicable_non_prod_ledgers = {"self_certified": {}, "non_self_certified": {}}
ledger_ids = list(self.production_ledgers.keys()) + list(
self.non_production_ledgers.keys()
)
coro_futures = {
self.executor.submit(self._get_ledger_by_did, ledger_id, did): ledger_id
for ledger_id in ledger_ids
}
for coro_future in concurrent.futures.as_completed(coro_futures):
result = await coro_future.result()
if result:
applicable_ledger_id = result[0]
applicable_ledger_inst = result[1]
is_self_certified = result[2]
if applicable_ledger_id in self.production_ledgers:
insert_key = list(self.production_ledgers).index(
applicable_ledger_id
)
if is_self_certified:
applicable_prod_ledgers["self_certified"][insert_key] = (
applicable_ledger_id,
applicable_ledger_inst,
)
else:
applicable_prod_ledgers["non_self_certified"][insert_key] = (
applicable_ledger_id,
applicable_ledger_inst,
)
else:
insert_key = list(self.non_production_ledgers).index(
applicable_ledger_id
)
if is_self_certified:
applicable_non_prod_ledgers["self_certified"][insert_key] = (
applicable_ledger_id,
applicable_ledger_inst,
)
else:
applicable_non_prod_ledgers["non_self_certified"][
insert_key
] = (applicable_ledger_id, applicable_ledger_inst)
applicable_prod_ledgers["self_certified"] = OrderedDict(
sorted(applicable_prod_ledgers.get("self_certified").items())
)
applicable_prod_ledgers["non_self_certified"] = OrderedDict(
sorted(applicable_prod_ledgers.get("non_self_certified").items())
)
applicable_non_prod_ledgers["self_certified"] = OrderedDict(
sorted(applicable_non_prod_ledgers.get("self_certified").items())
)
applicable_non_prod_ledgers["non_self_certified"] = OrderedDict(
sorted(applicable_non_prod_ledgers.get("non_self_certified").items())
)
if len(applicable_prod_ledgers.get("self_certified")) > 0:
successful_ledger_inst = list(
applicable_prod_ledgers.get("self_certified").values()
)[0]
if cache_did and self.cache:
await self.cache.set(
cache_key, successful_ledger_inst[0], self.cache_ttl
)
return successful_ledger_inst
elif len(applicable_non_prod_ledgers.get("self_certified")) > 0:
successful_ledger_inst = list(
applicable_non_prod_ledgers.get("self_certified").values()
)[0]
if cache_did and self.cache:
await self.cache.set(
cache_key, successful_ledger_inst[0], self.cache_ttl
)
return successful_ledger_inst
elif len(applicable_prod_ledgers.get("non_self_certified")) > 0:
successful_ledger_inst = list(
applicable_prod_ledgers.get("non_self_certified").values()
)[0]
if cache_did and self.cache:
await self.cache.set(
cache_key, successful_ledger_inst[0], self.cache_ttl
)
return successful_ledger_inst
elif len(applicable_non_prod_ledgers.get("non_self_certified")) > 0:
successful_ledger_inst = list(
applicable_non_prod_ledgers.get("non_self_certified").values()
)[0]
if cache_did and self.cache:
await self.cache.set(
cache_key, successful_ledger_inst[0], self.cache_ttl
)
return successful_ledger_inst
else:
raise MultipleLedgerManagerError(
f"DID {did} not found in any of the ledgers total: "
f"(production: {len(self.production_ledgers)}, "
f"non_production: {len(self.non_production_ledgers)})"
)