"""Kanon storage implementation of VC holder interface."""
import json
from typing import Mapping, Optional, Sequence
from ...core.profile import Profile
from ..kanon_storage import KanonStorage, KanonStorageSearch, KanonStorageSearchSession
from ..record import StorageRecord
from .base import VCHolder, VCRecordSearch
from .vc_record import VCRecord
from .xform import VC_CRED_RECORD_TYPE
[docs]
class KanonVCHolder(VCHolder):
"""Kanon VC record storage class."""
def __init__(self, profile: Profile):
"""Initialize the Indy-SDK VC holder instance."""
self._profile = profile
[docs]
def build_type_or_schema_query(self, uri_list: Sequence[str]) -> dict:
"""Build and return indy-specific type_or_schema_query."""
type_or_schema_query = {}
for uri in uri_list:
q = {"$or": [{"type": uri}, {"schema": uri}]}
if type_or_schema_query:
if "$and" not in type_or_schema_query:
type_or_schema_query = {"$and": [type_or_schema_query]}
type_or_schema_query["$and"].append(q)
else:
type_or_schema_query = q
return type_or_schema_query
[docs]
async def store_credential(self, cred: VCRecord):
"""Add a new VC record to the store.
Args:
cred: The VCRecord instance to store
Raises:
StorageDuplicateError: If the record_id is not unique
"""
record = vc_to_storage_record(cred)
async with self._profile.session() as session:
await KanonStorage(session).add_record(record)
[docs]
async def retrieve_credential_by_id(self, record_id: str) -> VCRecord:
"""Fetch a VC record by its record ID.
Raises:
StorageNotFoundError: If the record is not found
"""
async with self._profile.session() as session:
record = await KanonStorage(session).get_record(
VC_CRED_RECORD_TYPE, record_id
)
return storage_to_vc_record(record)
[docs]
async def retrieve_credential_by_given_id(self, given_id: str) -> VCRecord:
"""Fetch a VC record by its given ID ('id' property).
Raises:
StorageNotFoundError: If the record is not found
"""
async with self._profile.session() as session:
record = await KanonStorage(session).find_record(
VC_CRED_RECORD_TYPE, {"given_id": given_id}
)
return storage_to_vc_record(record)
[docs]
async def delete_credential(self, cred: VCRecord):
"""Remove a previously-stored VC record.
Raises:
StorageNotFoundError: If the record is not found
"""
async with self._profile.session() as session:
await KanonStorage(session).delete_record(vc_to_storage_record(cred))
[docs]
def search_credentials(
self,
contexts: Sequence[str] = None,
types: Sequence[str] = None,
schema_ids: Sequence[str] = None,
issuer_id: Optional[str] = None,
subject_ids: Optional[str] = None,
proof_types: Sequence[str] = None,
given_id: Optional[str] = None,
tag_query: Optional[Mapping] = None,
pd_uri_list: Sequence[str] = None,
) -> "VCRecordSearch":
"""Start a new VC record search.
Args:
contexts: An inclusive list of JSON-LD contexts to match
types: An inclusive list of JSON-LD types to match
schema_ids: An inclusive list of credential schema identifiers
issuer_id: The ID of the credential issuer
subject_ids: The IDs of credential subjects all of which to match
proof_types: The signature suite types used for the proof objects.
given_id: The given id of the credential
tag_query: A tag filter clause
pd_uri_list: A list of presentation definition URIs to match
"""
def _match_any(query: list, k, vals):
if vals is None:
return
elif len(vals) > 1:
query.append({"$or": [{k: v for v in vals}]})
else:
query.append({k: vals[0]})
def _make_custom_query(query):
result = {}
for k, v in query.items():
if isinstance(v, (list, set)) and k != "$exist":
result[k] = [_make_custom_query(cl) for cl in v]
elif k.startswith("$"):
result[k] = v
else:
result[f"cstm:{k}"] = v
return result
query = []
_match_any(query, "context", contexts)
_match_any(query, "type", types)
_match_any(query, "schema", schema_ids)
_match_any(query, "subject", subject_ids)
_match_any(query, "proof_type", proof_types)
if issuer_id:
query.append({"issuer_id": issuer_id})
if given_id:
query.append({"given_id": given_id})
if tag_query:
query.append(_make_custom_query(tag_query))
if pd_uri_list:
query.append(self.build_type_or_schema_query(pd_uri_list))
query = {"$and": query} if query else None
search = KanonStorageSearch(self._profile).search_records(
VC_CRED_RECORD_TYPE, query
)
return KanonVCRecordSearch(search)
[docs]
class KanonVCRecordSearch(VCRecordSearch):
"""Kanon storage search for VC records."""
def __init__(self, search: KanonStorageSearchSession):
"""Initialize the Kanon VC record search."""
self._search = search
[docs]
async def close(self):
"""Dispose of the search query."""
await self._search.close()
[docs]
async def fetch(self, max_count: Optional[int] = None) -> Sequence[VCRecord]:
"""Fetch the next list of VC records from the store.
Args:
max_count: Max number of records to return. If not provided,
defaults to the backend's preferred page size
Returns:
A list of `VCRecord` instances
"""
rows = await self._search.fetch(max_count)
records = [storage_to_vc_record(r) for r in rows]
return records
[docs]
def storage_to_vc_record(record: StorageRecord) -> VCRecord:
"""Convert an Kanon stored record into a VC record."""
def _make_set(val) -> set:
if isinstance(val, str):
return {val}
else:
return set(val)
cred_tags = {}
contexts = set()
types = set()
schema_ids = set()
subject_ids = set()
proof_types = set()
issuer_id = None
given_id = None
for tagname, tagval in (record.tags or {}).items():
if tagname == "context":
contexts = _make_set(tagval)
elif tagname == "type":
types = _make_set(tagval)
elif tagname == "schema":
schema_ids = _make_set(tagval)
elif tagname == "subject":
subject_ids = _make_set(tagval)
elif tagname == "proof_type":
proof_types = _make_set(tagval)
elif tagname == "issuer_id":
issuer_id = tagval
elif tagname == "given_id":
given_id = tagval
elif tagname.startswith("cstm:"):
cred_tags[tagname[5:]] = tagval
return VCRecord(
contexts=contexts,
expanded_types=types,
schema_ids=schema_ids,
issuer_id=issuer_id,
subject_ids=subject_ids,
proof_types=proof_types,
cred_value=json.loads(record.value),
given_id=given_id,
cred_tags=cred_tags,
record_id=record.id,
)
[docs]
def vc_to_storage_record(cred: VCRecord) -> StorageRecord:
"""Convert a VC record into an Kanon stored record."""
tags = {}
tags["context"] = set(cred.contexts)
tags["type"] = set(cred.expanded_types)
tags["schema"] = set(cred.schema_ids)
tags["subject"] = set(cred.subject_ids)
tags["proof_type"] = set(cred.proof_types)
if cred.issuer_id:
tags["issuer_id"] = cred.issuer_id
if cred.given_id:
tags["given_id"] = cred.given_id
for tagname, tagval in (cred.cred_tags or {}).items():
tags[f"cstm:{tagname}"] = tagval
return StorageRecord(
VC_CRED_RECORD_TYPE,
json.dumps(cred.cred_value),
tags,
cred.record_id,
)