Source code for aries_cloudagent.ledger.base

"""Ledger base class."""

import re

from abc import ABC, abstractmethod, ABCMeta
from enum import Enum
from hashlib import sha256
from typing import Sequence, Tuple, Union

from ..indy.issuer import IndyIssuer
from ..utils import sentinel
from ..wallet.did_info import DIDInfo

from .endpoint_type import EndpointType


[docs]class BaseLedger(ABC, metaclass=ABCMeta): """Base class for ledger.""" BACKEND_NAME: str = None async def __aenter__(self) -> "BaseLedger": """ Context manager entry. Returns: The current instance """ return self async def __aexit__(self, exc_type, exc, tb): """Context manager exit.""" @property def backend(self) -> str: """Accessor for the ledger backend name.""" return self.__class__.BACKEND_NAME @property @abstractmethod def read_only(self) -> bool: """Accessor for the ledger read-only flag."""
[docs] @abstractmethod async def get_key_for_did(self, did: str) -> str: """Fetch the verkey for a ledger DID. Args: did: The DID to look up on the ledger or in the cache """
[docs] @abstractmethod async def get_endpoint_for_did( self, did: str, endpoint_type: EndpointType = EndpointType.ENDPOINT ) -> str: """Fetch the endpoint for a ledger DID. Args: did: The DID to look up on the ledger or in the cache endpoint_type: The type of the endpoint (default 'endpoint') """
[docs] @abstractmethod async def get_all_endpoints_for_did(self, did: str) -> dict: """Fetch all endpoints for a ledger DID. Args: did: The DID to look up on the ledger or in the cache """
[docs] @abstractmethod async def update_endpoint_for_did( self, did: str, endpoint: str, endpoint_type: EndpointType = EndpointType.ENDPOINT, write_ledger: bool = True, endorser_did: str = None, ) -> bool: """Check and update the endpoint on the ledger. Args: did: The ledger DID endpoint: The endpoint address endpoint_type: The type of the endpoint (default 'endpoint') """
[docs] @abstractmethod async def register_nym( self, did: str, verkey: str, alias: str = None, role: str = None, write_ledger: bool = True, endorser_did: str = None, ) -> Tuple[bool, dict]: """ Register a nym on the ledger. Args: did: DID to register on the ledger. verkey: The verification key of the keypair. alias: Human-friendly alias to assign to the DID. role: For permissioned ledgers, what role should the new DID have. """
[docs] @abstractmethod async def get_nym_role(self, did: str): """ Return the role registered to input public DID on the ledger. Args: did: DID to register on the ledger. """
[docs] @abstractmethod def nym_to_did(self, nym: str) -> str: """Format a nym with the ledger's DID prefix."""
[docs] @abstractmethod async def rotate_public_did_keypair(self, next_seed: str = None) -> None: """ Rotate keypair for public DID: create new key, submit to ledger, update wallet. Args: next_seed: seed for incoming ed25519 keypair (default random) """
[docs] def did_to_nym(self, did: str) -> str: """Remove the ledger's DID prefix to produce a nym.""" if did: return re.sub(r"^did:\w+:", "", did)
[docs] @abstractmethod async def get_txn_author_agreement(self, reload: bool = False): """Get the current transaction author agreement, fetching it if necessary."""
[docs] @abstractmethod async def fetch_txn_author_agreement(self): """Fetch the current AML and TAA from the ledger."""
[docs] @abstractmethod async def accept_txn_author_agreement( self, taa_record: dict, mechanism: str, accept_time: int = None ): """Save a new record recording the acceptance of the TAA."""
[docs] @abstractmethod async def get_latest_txn_author_acceptance(self): """Look up the latest TAA acceptance."""
[docs] def taa_digest(self, version: str, text: str): """Generate the digest of a TAA record.""" if not version or not text: raise ValueError("Bad input for TAA digest") taa_plaintext = version + text return sha256(taa_plaintext.encode("utf-8")).digest().hex()
[docs] @abstractmethod async def txn_endorse( self, request_json: str, endorse_did: DIDInfo = None, ) -> str: """Endorse (sign) the provided transaction."""
[docs] @abstractmethod async def txn_submit( self, request_json: str, sign: bool, taa_accept: bool, sign_did: DIDInfo = sentinel, ) -> str: """Write the provided (signed and possibly endorsed) transaction to the ledger."""
[docs] @abstractmethod async def create_and_send_schema( self, issuer: IndyIssuer, schema_name: str, schema_version: str, attribute_names: Sequence[str], write_ledger: bool = True, endorser_did: str = None, ) -> Tuple[str, dict]: """ Send schema to ledger. Args: issuer: The issuer instance to use for schema creation schema_name: The schema name schema_version: The schema version attribute_names: A list of schema attributes """
[docs] @abstractmethod async def get_revoc_reg_def(self, revoc_reg_id: str) -> dict: """Look up a revocation registry definition by ID."""
[docs] @abstractmethod async def send_revoc_reg_def( self, revoc_reg_def: dict, issuer_did: str = None, write_ledger: bool = True, endorser_did: str = None, ): """Publish a revocation registry definition to the ledger."""
[docs] @abstractmethod async def send_revoc_reg_entry( self, revoc_reg_id: str, revoc_def_type: str, revoc_reg_entry: dict, issuer_did: str = None, write_ledger: bool = True, endorser_did: str = None, ): """Publish a revocation registry entry to the ledger."""
[docs] @abstractmethod async def create_and_send_credential_definition( self, issuer: IndyIssuer, schema_id: str, signature_type: str = None, tag: str = None, support_revocation: bool = False, write_ledger: bool = True, endorser_did: str = None, ) -> Tuple[str, dict, bool]: """ Send credential definition to ledger and store relevant key matter in wallet. Args: issuer: The issuer instance to use for credential definition creation schema_id: The schema id of the schema to create cred def for signature_type: The signature type to use on the credential definition tag: Optional tag to distinguish multiple credential definitions support_revocation: Optional flag to enable revocation for this cred def Returns: Tuple with cred def id, cred def structure, and whether it's novel """
[docs] @abstractmethod async def get_credential_definition(self, credential_definition_id: str) -> dict: """ Get a credential definition from the cache if available, otherwise the ledger. Args: credential_definition_id: The schema id of the schema to fetch cred def for """
[docs] @abstractmethod async def get_revoc_reg_delta( self, revoc_reg_id: str, timestamp_from=0, timestamp_to=None ) -> Tuple[dict, int]: """Look up a revocation registry delta by ID."""
[docs] @abstractmethod async def get_schema(self, schema_id: str) -> dict: """ Get a schema from the cache if available, otherwise fetch from the ledger. Args: schema_id: The schema id (or stringified sequence number) to retrieve """
[docs] @abstractmethod async def get_revoc_reg_entry( self, revoc_reg_id: str, timestamp: int ) -> Tuple[dict, int]: """Get revocation registry entry by revocation registry ID and timestamp."""
[docs]class Role(Enum): """Enum for indy roles.""" STEWARD = (2,) TRUSTEE = (0,) ENDORSER = (101,) NETWORK_MONITOR = (201,) USER = (None, "") # in case reading from file, default empty "" or None for USER ROLE_REMOVE = ("",) # but indy-sdk uses "" to identify a role in reset
[docs] @staticmethod def get(token: Union[str, int] = None) -> "Role": """ Return enum instance corresponding to input token. Args: token: token identifying role to indy-sdk: "STEWARD", "TRUSTEE", "ENDORSER", "" or None """ if token is None: return Role.USER for role in Role: if role == Role.ROLE_REMOVE: continue # not a sensible role to parse from any configuration if isinstance(token, int) and token in role.value: return role if str(token).upper() == role.name or token in (str(v) for v in role.value): return role return None
[docs] def to_indy_num_str(self) -> str: """ Return (typically, numeric) string value that indy-sdk associates with role. Recall that None signifies USER and "" signifies a role undergoing reset. """ return str(self.value[0]) if isinstance(self.value[0], int) else self.value[0]
[docs] def token(self) -> str: """Return token identifying role to indy-sdk.""" return self.value[0] if self in (Role.USER, Role.ROLE_REMOVE) else self.name