Source code for aries_cloudagent.wallet.indy

"""Indy implementation of BaseWallet interface."""

import json
import logging

from typing import List, Sequence, Tuple, Union

import indy.anoncreds
import indy.did
import indy.crypto
import indy.wallet

from indy.error import IndyError, ErrorCode

from ..did.did_key import DIDKey
from ..indy.sdk.error import IndyErrorHandler
from ..indy.sdk.wallet_setup import IndyOpenWallet
from ..ledger.base import BaseLedger
from ..ledger.endpoint_type import EndpointType
from ..ledger.error import LedgerConfigError
from ..storage.indy import IndySdkStorage
from ..storage.error import StorageDuplicateError, StorageNotFoundError
from ..storage.record import StorageRecord

from .base import BaseWallet
from .crypto import (
    create_keypair,
    sign_message,
    validate_seed,
    verify_signed_message,
)
from .did_info import DIDInfo, KeyInfo
from .did_method import SOV, KEY, DIDMethod
from .error import WalletError, WalletDuplicateError, WalletNotFoundError
from .key_pair import KeyPairStorageManager
from .key_type import BLS12381G2, ED25519, KeyType, KeyTypes
from .util import b58_to_bytes, bytes_to_b58, bytes_to_b64


LOGGER = logging.getLogger(__name__)

RECORD_TYPE_CONFIG = "config"
RECORD_NAME_PUBLIC_DID = "default_public_did"


[docs]class IndySdkWallet(BaseWallet): """Indy identity wallet implementation.""" def __init__(self, opened: IndyOpenWallet): """Create a new IndySdkWallet instance.""" self.opened: IndyOpenWallet = opened def __did_info_from_indy_info(self, info): metadata = json.loads(info["metadata"]) if info["metadata"] else {} did: str = info["did"] verkey = info["verkey"] method = KEY if did.startswith("did:key") else SOV key_type = ED25519 if method == KEY: did = DIDKey.from_public_key_b58(info["verkey"], key_type).did return DIDInfo( did=did, verkey=verkey, metadata=metadata, method=method, key_type=key_type ) def __did_info_from_key_pair_info(self, info: dict): metadata = info["metadata"] verkey = info["verkey"] # TODO: inject context to support did method registry method = SOV if metadata.get("method", "key") == SOV.method_name else KEY # TODO: inject context to support keytype registry key_types = KeyTypes() key_type = key_types.from_key_type(info["key_type"]) if method == KEY: did = DIDKey.from_public_key_b58(info["verkey"], key_type).did return DIDInfo( did=did, verkey=verkey, metadata=metadata, method=method, key_type=key_type ) async def __create_indy_signing_key( self, key_type: KeyType, metadata: dict, seed: str = None ) -> str: if key_type != ED25519: raise WalletError(f"Unsupported key type: {key_type.key_type}") args = {} if seed: args["seed"] = bytes_to_b64(validate_seed(seed)) try: verkey = await indy.crypto.create_key(self.opened.handle, json.dumps(args)) except IndyError as x_indy: if x_indy.error_code == ErrorCode.WalletItemAlreadyExists: raise WalletDuplicateError("Verification key already present in wallet") raise IndyErrorHandler.wrap_error( x_indy, "Wallet {} error".format(self.opened.name), WalletError ) from x_indy await indy.crypto.set_key_metadata( self.opened.handle, verkey, json.dumps(metadata) ) return verkey async def __create_keypair_signing_key( self, key_type: KeyType, metadata: dict, seed: str = None ) -> str: if key_type != BLS12381G2: raise WalletError(f"Unsupported key type: {key_type.key_type}") public_key, secret_key = create_keypair(key_type, validate_seed(seed)) verkey = bytes_to_b58(public_key) key_pair_mgr = KeyPairStorageManager(IndySdkStorage(self.opened)) # Check if key already exists try: key_info = await self.__get_keypair_signing_key(verkey) if key_info: raise WalletDuplicateError("Verification key already present in wallet") except WalletNotFoundError: # If we can't find the key, it means it doesn't exist already # this is good pass await key_pair_mgr.store_key_pair( public_key=public_key, secret_key=secret_key, key_type=key_type, metadata=metadata, ) return verkey
[docs] async def create_signing_key( self, key_type: KeyType, seed: str = None, metadata: dict = None ) -> KeyInfo: """ Create a new public/private signing keypair. Args: seed: Seed for key metadata: Optional metadata to store with the keypair Returns: A `KeyInfo` representing the new record Raises: WalletDuplicateError: If the resulting verkey already exists in the wallet WalletError: If there is a libindy error """ # must save metadata to allow identity check # otherwise get_key_metadata just returns WalletItemNotFound if metadata is None: metadata = {} # All ed25519 keys are handled by indy if key_type == ED25519: verkey = await self.__create_indy_signing_key(key_type, metadata, seed) # All other (only bls12381g2 atm) are handled outside of indy else: verkey = await self.__create_keypair_signing_key(key_type, metadata, seed) return KeyInfo(verkey=verkey, metadata=metadata, key_type=key_type)
async def __get_indy_signing_key(self, verkey: str) -> KeyInfo: try: metadata = await indy.crypto.get_key_metadata(self.opened.handle, verkey) return KeyInfo( verkey=verkey, metadata=json.loads(metadata) if metadata else {}, key_type=ED25519, ) except IndyError as x_indy: if x_indy.error_code == ErrorCode.WalletItemNotFound: raise WalletNotFoundError(f"Unknown key: {verkey}") # # If we resolve a key that is not 32 bytes we get CommonInvalidStructure # elif x_indy.error_code == ErrorCode.CommonInvalidStructure: # raise WalletNotFoundError(f"Unknown key: {verkey}") else: raise IndyErrorHandler.wrap_error( x_indy, "Wallet {} error".format(self.opened.name), WalletError ) from x_indy async def __get_keypair_signing_key(self, verkey: str) -> KeyInfo: try: key_pair_mgr = KeyPairStorageManager(IndySdkStorage(self.opened)) key_pair = await key_pair_mgr.get_key_pair(verkey) # TODO: inject context to support more keytypes key_types = KeyTypes() return KeyInfo( verkey=verkey, metadata=key_pair["metadata"], key_type=key_types.from_key_type(key_pair["key_type"]) or BLS12381G2, ) except StorageNotFoundError: raise WalletNotFoundError(f"Unknown key: {verkey}") except StorageDuplicateError: raise WalletDuplicateError(f"Multiple keys exist for verkey: {verkey}")
[docs] async def get_signing_key(self, verkey: str) -> KeyInfo: """ Fetch info for a signing keypair. Args: verkey: The verification key of the keypair Returns: A `KeyInfo` representing the keypair Raises: WalletNotFoundError: If no keypair is associated with the verification key WalletError: If there is a libindy error """ if not verkey: raise WalletError("Missing required input parameter: verkey") # Only try to load indy signing key if the verkey is 32 bytes # this may change if indy is going to support verkeys of different byte length if len(b58_to_bytes(verkey)) == 32: try: return await self.__get_indy_signing_key(verkey) except WalletNotFoundError: return await self.__get_keypair_signing_key(verkey) else: return await self.__get_keypair_signing_key(verkey)
[docs] async def replace_signing_key_metadata(self, verkey: str, metadata: dict): """ Replace the metadata associated with a signing keypair. Args: verkey: The verification key of the keypair metadata: The new metadata to store Raises: WalletNotFoundError: if no keypair is associated with the verification key """ metadata = metadata or {} # throw exception if key is undefined key_info = await self.get_signing_key(verkey) # All ed25519 keys are handled by indy if key_info.key_type == ED25519: await indy.crypto.set_key_metadata( self.opened.handle, verkey, json.dumps(metadata) ) # All other (only bls12381g2 atm) are handled outside of indy else: key_pair_mgr = KeyPairStorageManager(IndySdkStorage(self.opened)) await key_pair_mgr.update_key_pair_metadata( verkey=key_info.verkey, metadata=metadata )
[docs] async def rotate_did_keypair_start(self, did: str, next_seed: str = None) -> str: """ Begin key rotation for DID that wallet owns: generate new keypair. Args: did: signing DID next_seed: incoming replacement seed (default random) Returns: The new verification key """ # Check if DID can rotate keys # TODO: inject context for did method registry support method_name = did.split(":")[1] if did.startswith("did:") else SOV.method_name did_method = SOV if method_name == SOV.method_name else KEY if not did_method.supports_rotation: raise WalletError( f"DID method '{did_method.method_name}' does not support key rotation." ) try: verkey = await indy.did.replace_keys_start( self.opened.handle, did, json.dumps( {"seed": bytes_to_b64(validate_seed(next_seed))} if next_seed else {} ), ) except IndyError as x_indy: if x_indy.error_code == ErrorCode.WalletItemNotFound: raise WalletNotFoundError("Wallet owns no such DID: {}".format(did)) raise IndyErrorHandler.wrap_error( x_indy, "Wallet {} error".format(self.opened.name), WalletError ) from x_indy return verkey
[docs] async def rotate_did_keypair_apply(self, did: str) -> DIDInfo: """ Apply temporary keypair as main for DID that wallet owns. Args: did: signing DID Returns: DIDInfo with new verification key and metadata for DID """ try: await indy.did.replace_keys_apply(self.opened.handle, did) except IndyError as x_indy: if x_indy.error_code == ErrorCode.WalletItemNotFound: raise WalletNotFoundError("Wallet owns no such DID: {}".format(did)) raise IndyErrorHandler.wrap_error( x_indy, "Wallet {} error".format(self.opened.name), WalletError ) from x_indy
async def __create_indy_local_did( self, method: DIDMethod, key_type: KeyType, metadata: dict = None, seed: str = None, *, did: str = None, ) -> DIDInfo: if method not in [SOV, KEY]: raise WalletError( f"Unsupported DID method for indy storage: {method.method_name}" ) if key_type != ED25519: raise WalletError( f"Unsupported key type for indy storage: {key_type.key_type}" ) cfg = {} if seed: cfg["seed"] = bytes_to_b64(validate_seed(seed)) if did: cfg["did"] = did # Create fully qualified did. This helps with determining the # did method when retrieving if method != SOV: cfg["method_name"] = method.method_name did_json = json.dumps(cfg) # crypto_type, cid - optional parameters skipped try: did, verkey = await indy.did.create_and_store_my_did( self.opened.handle, did_json ) except IndyError as x_indy: if x_indy.error_code == ErrorCode.DidAlreadyExistsError: raise WalletDuplicateError("DID already present in wallet") raise IndyErrorHandler.wrap_error( x_indy, "Wallet {} error".format(self.opened.name), WalletError ) from x_indy # did key uses different format if method == KEY: did = DIDKey.from_public_key_b58(verkey, key_type).did await self.replace_local_did_metadata(did, metadata or {}) return DIDInfo( did=did, verkey=verkey, metadata=metadata or {}, method=method, key_type=key_type, ) async def __create_keypair_local_did( self, method: DIDMethod, key_type: KeyType, metadata: dict = None, seed: str = None, ) -> DIDInfo: if method != KEY: raise WalletError( f"Unsupported DID method for keypair storage: {method.method_name}" ) if key_type != BLS12381G2: raise WalletError( f"Unsupported key type for keypair storage: {key_type.key_type}" ) public_key, secret_key = create_keypair(key_type, validate_seed(seed)) key_pair_mgr = KeyPairStorageManager(IndySdkStorage(self.opened)) # should change if other did methods are supported did_key = DIDKey.from_public_key(public_key, key_type) if not metadata: metadata = {} metadata["method"] = method.method_name await key_pair_mgr.store_key_pair( public_key=public_key, secret_key=secret_key, key_type=key_type, metadata=metadata, tags={"method": method.method_name}, ) return DIDInfo( did=did_key.did, verkey=did_key.public_key_b58, metadata=metadata, method=method, key_type=key_type, )
[docs] async def create_local_did( self, method: DIDMethod, key_type: KeyType, seed: str = None, did: str = None, metadata: dict = None, ) -> DIDInfo: """ Create and store a new local DID. Args: method: The method to use for the DID key_type: The key type to use for the DID seed: Optional seed to use for DID did: The DID to use metadata: Metadata to store with DID Returns: A `DIDInfo` instance representing the created DID Raises: WalletDuplicateError: If the DID already exists in the wallet WalletError: If there is a libindy error """ # validate key_type if not method.supports_key_type(key_type): raise WalletError( f"Invalid key type {key_type.key_type}" f" for DID method {method.method_name}" ) if method == KEY and did: raise WalletError("Not allowed to set DID for DID method 'key'") # All ed25519 keys are handled by indy if key_type == ED25519: return await self.__create_indy_local_did( method, key_type, metadata, seed, did=did ) # All other (only bls12381g2 atm) are handled outside of indy else: return await self.__create_keypair_local_did( method, key_type, metadata, seed )
[docs] async def get_local_dids(self) -> Sequence[DIDInfo]: """ Get list of defined local DIDs. Returns: A list of locally stored DIDs as `DIDInfo` instances """ # retrieve indy dids info_json = await indy.did.list_my_dids_with_meta(self.opened.handle) info = json.loads(info_json) ret = [] for did in info: ret.append(self.__did_info_from_indy_info(did)) # retrieve key pairs with method set to key # this needs to change if more did methods are added key_pair_mgr = KeyPairStorageManager(IndySdkStorage(self.opened)) key_pairs = await key_pair_mgr.find_key_pairs( tag_query={"method": KEY.method_name} ) for key_pair in key_pairs: ret.append(self.__did_info_from_key_pair_info(key_pair)) return ret
async def __get_indy_local_did( self, method: DIDMethod, key_type: KeyType, did: str ) -> DIDInfo: if method not in [SOV, KEY]: raise WalletError( f"Unsupported DID method for indy storage: {method.method_name}" ) if key_type != ED25519: raise WalletError( f"Unsupported DID type for indy storage: {key_type.key_type}" ) # key type is always ed25519, method not always key if method == KEY and key_type == ED25519: did_key = DIDKey.from_did(did) # Ed25519 did:keys are masked indy dids so transform to indy # did with did:key prefix. did = "did:key:" + bytes_to_b58(did_key.public_key[:16]) try: info_json = await indy.did.get_my_did_with_meta(self.opened.handle, did) except IndyError as x_indy: if x_indy.error_code == ErrorCode.WalletItemNotFound: raise WalletNotFoundError("Unknown DID: {}".format(did)) raise IndyErrorHandler.wrap_error( x_indy, "Wallet {} error".format(self.opened.name), WalletError ) from x_indy info = json.loads(info_json) return self.__did_info_from_indy_info(info) async def __get_keypair_local_did( self, method: DIDMethod, key_type: KeyType, did: str ): if method != KEY: raise WalletError( f"Unsupported DID method for keypair storage: {method.method_name}" ) if key_type != BLS12381G2: raise WalletError( f"Unsupported DID type for keypair storage: {key_type.key_type}" ) # method is always did:key did_key = DIDKey.from_did(did) key_pair_mgr = KeyPairStorageManager(IndySdkStorage(self.opened)) key_pair = await key_pair_mgr.get_key_pair(verkey=did_key.public_key_b58) return self.__did_info_from_key_pair_info(key_pair)
[docs] async def get_local_did(self, did: str) -> DIDInfo: """ Find info for a local DID. Args: did: The DID for which to get info Returns: A `DIDInfo` instance representing the found DID Raises: WalletNotFoundError: If the DID is not found WalletError: If there is a libindy error """ # TODO: inject context for did method registry support method_name = did.split(":")[1] if did.startswith("did:") else SOV.method_name method = SOV if method_name == SOV.method_name else KEY key_type = ED25519 # If did key, the key type can differ if method == KEY: did_key = DIDKey.from_did(did) key_type = did_key.key_type if key_type == ED25519: return await self.__get_indy_local_did(method, key_type, did) else: return await self.__get_keypair_local_did(method, key_type, did)
[docs] async def get_local_did_for_verkey(self, verkey: str) -> DIDInfo: """ Resolve a local DID from a verkey. Args: verkey: The verkey for which to get the local DID Returns: A `DIDInfo` instance representing the found DID Raises: WalletNotFoundError: If the verkey is not found """ dids = await self.get_local_dids() for info in dids: if info.verkey == verkey: return info raise WalletNotFoundError("No DID defined for verkey: {}".format(verkey))
[docs] async def replace_local_did_metadata(self, did: str, metadata: dict): """ Replace metadata for a local DID. Args: did: The DID for which to replace metadata metadata: The new metadata """ if not metadata: metadata = {} did_info = await self.get_local_did(did) # throw exception if undefined # ed25519 keys are handled by indy if did_info.key_type == ED25519: try: await indy.did.set_did_metadata( self.opened.handle, did, json.dumps(metadata) ) except IndyError as x_indy: raise IndyErrorHandler.wrap_error( x_indy, "Wallet {} error".format(self.opened.name), WalletError ) from x_indy # all other keys are handled by key pair else: key_pair_mgr = KeyPairStorageManager(IndySdkStorage(self.opened)) await key_pair_mgr.update_key_pair_metadata( verkey=did_info.verkey, metadata=metadata )
[docs] async def get_public_did(self) -> DIDInfo: """ Retrieve the public DID. Returns: The currently public `DIDInfo`, if any """ public_did = None public_info = None public_item = None storage = IndySdkStorage(self.opened) try: public_item = await storage.get_record( RECORD_TYPE_CONFIG, RECORD_NAME_PUBLIC_DID ) except StorageNotFoundError: # populate public DID record # this should only happen once, for an upgraded wallet # the 'public' metadata flag is no longer used dids = await self.get_local_dids() for info in dids: if info.metadata.get("public"): public_did = info.did public_info = info break try: # even if public is not set, store a record # to avoid repeated queries await storage.add_record( StorageRecord( type=RECORD_TYPE_CONFIG, id=RECORD_NAME_PUBLIC_DID, value=json.dumps({"did": public_did}), ) ) except StorageDuplicateError: # another process stored the record first public_item = await storage.get_record( RECORD_TYPE_CONFIG, RECORD_NAME_PUBLIC_DID ) if public_item: public_did = json.loads(public_item.value)["did"] if public_did: try: public_info = await self.get_local_did(public_did) except WalletNotFoundError: pass return public_info
[docs] async def set_public_did(self, did: Union[str, DIDInfo]) -> DIDInfo: """ Assign the public DID. Returns: The updated `DIDInfo` """ if isinstance(did, str): # will raise an exception if not found info = await self.get_local_did(did) else: info = did public = await self.get_public_did() if not public or public.did != info.did: if not info.metadata.get("posted"): metadata = {**info.metadata, "posted": True} await self.replace_local_did_metadata(info.did, metadata) info = info._replace(metadata=metadata) storage = IndySdkStorage(self.opened) await storage.update_record( StorageRecord( type=RECORD_TYPE_CONFIG, id=RECORD_NAME_PUBLIC_DID, value="{}", ), value=json.dumps({"did": info.did}), tags=None, ) public = info return public
[docs] async def set_did_endpoint( self, did: str, endpoint: str, ledger: BaseLedger, endpoint_type: EndpointType = None, write_ledger: bool = True, endorser_did: str = None, routing_keys: List[str] = None, ): """ Update the endpoint for a DID in the wallet, send to ledger if public or posted. Args: did: DID for which to set endpoint endpoint: the endpoint to set, None to clear ledger: the ledger to which to send endpoint update if DID is public or posted endpoint_type: the type of the endpoint/service. Only endpoint_type 'endpoint' affects local wallet """ did_info = await self.get_local_did(did) if did_info.method != SOV: raise WalletError("Setting DID endpoint is only allowed for did:sov DIDs") metadata = {**did_info.metadata} if not endpoint_type: endpoint_type = EndpointType.ENDPOINT if endpoint_type == EndpointType.ENDPOINT: metadata[endpoint_type.indy] = endpoint wallet_public_didinfo = await self.get_public_did() if ( wallet_public_didinfo and wallet_public_didinfo.did == did ) or did_info.metadata.get("posted"): # if DID on ledger, set endpoint there first if not ledger: raise LedgerConfigError( f"No ledger available but DID {did} is public: missing wallet-type?" ) if not ledger.read_only: async with ledger: attrib_def = await ledger.update_endpoint_for_did( did, endpoint, endpoint_type, write_ledger=write_ledger, endorser_did=endorser_did, routing_keys=routing_keys, ) if not write_ledger: return attrib_def await self.replace_local_did_metadata(did, metadata)
[docs] async def sign_message(self, message: bytes, from_verkey: str) -> bytes: """ Sign a message using the private key associated with a given verkey. Args: message: Message bytes to sign from_verkey: The verkey to use to sign Returns: A signature Raises: WalletError: If the message is not provided WalletError: If the verkey is not provided WalletError: If a libindy error occurs """ if not message: raise WalletError("Message not provided") if not from_verkey: raise WalletError("Verkey not provided") try: key_info = await self.get_signing_key(from_verkey) except WalletNotFoundError: key_info = await self.get_local_did_for_verkey(from_verkey) # ed25519 keys are handled by indy if key_info.key_type == ED25519: try: result = await indy.crypto.crypto_sign( self.opened.handle, from_verkey, message ) except IndyError: raise WalletError("Exception when signing message") # other keys are handled outside of indy else: key_pair_mgr = KeyPairStorageManager(IndySdkStorage(self.opened)) key_pair = await key_pair_mgr.get_key_pair(verkey=key_info.verkey) result = sign_message( message=message, secret=b58_to_bytes(key_pair["secret_key"]), key_type=key_info.key_type, ) return result
[docs] async def verify_message( self, message: Union[List[bytes], bytes], signature: bytes, from_verkey: str, key_type: KeyType, ) -> bool: """ Verify a signature against the public key of the signer. Args: message: Message to verify signature: Signature to verify from_verkey: Verkey to use in verification Returns: True if verified, else False Raises: WalletError: If the verkey is not provided WalletError: If the signature is not provided WalletError: If the message is not provided WalletError: If a libindy error occurs """ if not from_verkey: raise WalletError("Verkey not provided") if not signature: raise WalletError("Signature not provided") if not message: raise WalletError("Message not provided") # ed25519 keys are handled by indy if key_type == ED25519: try: result = await indy.crypto.crypto_verify( from_verkey, message, signature ) except IndyError as x_indy: if x_indy.error_code == ErrorCode.CommonInvalidStructure: result = False else: raise IndyErrorHandler.wrap_error( x_indy, "Wallet {} error".format(self.opened.name), WalletError ) from x_indy return result # all other keys (only bls12381g2 atm) are handled outside of indy else: return verify_signed_message( message=message, signature=signature, verkey=b58_to_bytes(from_verkey), key_type=key_type, )
[docs] async def pack_message( self, message: str, to_verkeys: Sequence[str], from_verkey: str = None ) -> bytes: """ Pack a message for one or more recipients. Args: message: The message to pack to_verkeys: List of verkeys for which to pack from_verkey: Sender verkey from which to pack Returns: The resulting packed message bytes Raises: WalletError: If no message is provided WalletError: If a libindy error occurs """ if message is None: raise WalletError("Message not provided") try: result = await indy.crypto.pack_message( self.opened.handle, message, to_verkeys, from_verkey ) except IndyError as x_indy: raise IndyErrorHandler.wrap_error( x_indy, "Exception when packing message", WalletError ) from x_indy return result
[docs] async def unpack_message(self, enc_message: bytes) -> Tuple[str, str, str]: """ Unpack a message. Args: enc_message: The packed message bytes Returns: A tuple: (message, from_verkey, to_verkey) Raises: WalletError: If the message is not provided WalletError: If a libindy error occurs """ if not enc_message: raise WalletError("Message not provided") try: unpacked_json = await indy.crypto.unpack_message( self.opened.handle, enc_message ) except IndyError: raise WalletError("Exception when unpacking message") unpacked = json.loads(unpacked_json) message = unpacked["message"] to_verkey = unpacked.get("recipient_verkey", None) from_verkey = unpacked.get("sender_verkey", None) return message, from_verkey, to_verkey
[docs] @classmethod async def generate_wallet_key(self, seed: str = None) -> str: """Generate a raw Indy wallet key.""" return await indy.wallet.generate_wallet_key(seed)