"""Indy implementation of BaseWallet interface."""
import json
import logging
from typing import Sequence
import indy.anoncreds
import indy.did
import indy.crypto
import indy.wallet
from indy.error import IndyError, ErrorCode
from ..indy.error import IndyErrorHandler
from .base import BaseWallet, KeyInfo, DIDInfo
from .crypto import validate_seed
from .error import WalletError, WalletDuplicateError, WalletNotFoundError
from .plugin import load_postgres_plugin
from .util import bytes_to_b64
[docs]class IndyWallet(BaseWallet):
"""Indy wallet implementation."""
DEFAULT_FRESHNESS = 0
DEFAULT_KEY = ""
DEFAULT_KEY_DERIVIATION = "ARGON2I_MOD"
DEFAULT_NAME = "default"
DEFAULT_STORAGE_TYPE = None
WALLET_TYPE = "indy"
KEY_DERIVATION_RAW = "RAW"
KEY_DERIVATION_ARGON2I_INT = "ARGON2I_INT"
KEY_DERIVATION_ARGON2I_MOD = "ARGON2I_MOD"
def __init__(self, config: dict = None):
"""
Initialize a `IndyWallet` instance.
Args:
config: {name, key, seed, did, auto-create, auto-remove,
storage_type, storage_config, storage_creds}
"""
self.logger = logging.getLogger(__name__)
if not config:
config = {}
super(IndyWallet, self).__init__(config)
self._auto_create = config.get("auto_create", True)
self._auto_remove = config.get("auto_remove", False)
self._created = False
self._freshness_time = config.get("freshness_time", False)
self._handle = None
self._key = config.get("key") or self.DEFAULT_KEY
self._key_derivation_method = (
config.get("key_derivation_method") or self.DEFAULT_KEY_DERIVIATION
)
self._name = config.get("name") or self.DEFAULT_NAME
self._storage_type = config.get("storage_type") or self.DEFAULT_STORAGE_TYPE
self._storage_config = config.get("storage_config", None)
self._storage_creds = config.get("storage_creds", None)
self._master_secret_id = None
if self._storage_type == "postgres_storage":
load_postgres_plugin(self._storage_config, self._storage_creds)
@property
def type(self) -> str:
"""Accessor for the wallet type."""
return IndyWallet.WALLET_TYPE
@property
def handle(self):
"""
Get internal wallet reference.
Returns:
A handle to the wallet
"""
return self._handle
@property
def created(self) -> bool:
"""Check whether the wallet was created on the last open call."""
return self._created
@property
def opened(self) -> bool:
"""
Check whether wallet is currently open.
Returns:
True if open, else False
"""
return bool(self._handle)
@property
def name(self) -> str:
"""
Accessor for the wallet name.
Returns:
The wallet name
"""
return self._name
@property
def master_secret_id(self) -> str:
"""
Accessor for the master secret id.
Returns:
The master secret id
"""
return self._master_secret_id
@property
def _wallet_config(self) -> dict:
"""
Accessor for the wallet config.
Returns:
The wallet config
"""
ret = {
"id": self._name,
"freshness_time": self._freshness_time,
"storage_type": self._storage_type,
# storage_config
}
if self._storage_config is not None:
ret["storage_config"] = json.loads(self._storage_config)
return ret
@property
def _wallet_access(self) -> dict:
"""
Accessor for the wallet access.
Returns:
The wallet access
"""
ret = {
"key": self._key,
"key_derivation_method": self._key_derivation_method,
# storage_credentials
}
if self._storage_creds is not None:
ret["storage_credentials"] = json.loads(self._storage_creds)
return ret
[docs] async def create(self, replace: bool = False):
"""
Create a new wallet.
Args:
replace: Removes the old wallet if True
Raises:
WalletError: If there was a problem removing the wallet
WalletError: IF there was a libindy error
"""
if replace:
try:
await self.remove()
except WalletNotFoundError:
pass
try:
await indy.wallet.create_wallet(
config=json.dumps(self._wallet_config),
credentials=json.dumps(self._wallet_access),
)
except IndyError as x_indy:
raise IndyErrorHandler.wrap_error(
x_indy,
"Wallet was not removed by SDK, {} may still be open".format(self.name)
if x_indy.error_code == ErrorCode.WalletAlreadyExistsError
else None,
WalletError,
) from x_indy
[docs] async def remove(self):
"""
Remove an existing wallet.
Raises:
WalletNotFoundError: If the wallet could not be found
WalletError: If there was an libindy error
"""
try:
await indy.wallet.delete_wallet(
config=json.dumps(self._wallet_config),
credentials=json.dumps(self._wallet_access),
)
except IndyError as x_indy:
if x_indy.error_code == ErrorCode.WalletNotFoundError:
raise IndyErrorHandler.wrap_error(
x_indy, "Wallet {} not found".format(self.name), WalletNotFoundError
) from x_indy
raise IndyErrorHandler.wrap_error(
x_indy, "Wallet error", WalletError
) from x_indy
[docs] async def open(self):
"""
Open wallet, removing and/or creating it if so configured.
Raises:
WalletError: If wallet not found after creation
WalletNotFoundError: If the wallet is not found
WalletError: If the wallet is already open
WalletError: If there is a libindy error
"""
if self.opened:
return
self._created = False
while True:
try:
self._handle = await indy.wallet.open_wallet(
config=json.dumps(self._wallet_config),
credentials=json.dumps(self._wallet_access),
)
break
except IndyError as x_indy:
if x_indy.error_code == ErrorCode.WalletNotFoundError:
if self._created:
raise IndyErrorHandler.wrap_error(
x_indy,
"Wallet {} not found after creation".format(self.name),
WalletError,
) from x_indy
if self._auto_create:
await self.create(self._auto_remove)
self._created = True
else:
raise WalletNotFoundError(
"Wallet {} not found".format(self.name)
)
elif x_indy.error_code == ErrorCode.WalletAlreadyOpenedError:
raise WalletError("Wallet {} is already open".format(self.name))
else:
raise IndyErrorHandler.wrap_error(
x_indy, "Wallet {} error".format(self.name), WalletError
) from x_indy
self.logger.info("Creating master secret...")
try:
self._master_secret_id = await indy.anoncreds.prover_create_master_secret(
self.handle, self.name
)
except IndyError as x_indy:
if x_indy.error_code == ErrorCode.AnoncredsMasterSecretDuplicateNameError:
self.logger.info("Master secret already exists")
self._master_secret_id = self.name
else:
raise IndyErrorHandler.wrap_error(
x_indy, "Wallet {} error".format(self.name), WalletError
) from x_indy
[docs] async def close(self):
"""Close previously-opened wallet, removing it if so configured."""
if self._handle:
await indy.wallet.close_wallet(self._handle)
if self._auto_remove:
await self.remove()
self._handle = None
[docs] async def create_signing_key(
self, seed: str = None, metadata: dict = None
) -> KeyInfo:
"""
Create a new public/private signing keypair.
Args:
seed: Seed for key
metadata: Optional metadata to store with the keypair
Returns:
A `KeyInfo` representing the new record
Raises:
WalletDuplicateError: If the resulting verkey already exists in the wallet
WalletError: If there is a libindy error
"""
args = {}
if seed:
args["seed"] = bytes_to_b64(validate_seed(seed))
try:
verkey = await indy.crypto.create_key(self.handle, json.dumps(args))
except IndyError as x_indy:
if x_indy.error_code == ErrorCode.WalletItemAlreadyExists:
raise WalletDuplicateError("Verification key already present in wallet")
raise IndyErrorHandler.wrap_error(
x_indy, "Wallet {} error".format(self.name), WalletError
) from x_indy
# must save metadata to allow identity check
# otherwise get_key_metadata just returns WalletItemNotFound
if metadata is None:
metadata = {}
await indy.crypto.set_key_metadata(self.handle, verkey, json.dumps(metadata))
return KeyInfo(verkey, metadata)
[docs] async def get_signing_key(self, verkey: str) -> KeyInfo:
"""
Fetch info for a signing keypair.
Args:
verkey: The verification key of the keypair
Returns:
A `KeyInfo` representing the keypair
Raises:
WalletNotFoundError: If no keypair is associated with the verification key
WalletError: If there is a libindy error
"""
try:
metadata = await indy.crypto.get_key_metadata(self.handle, verkey)
except IndyError as x_indy:
if x_indy.error_code == ErrorCode.WalletItemNotFound:
raise WalletNotFoundError("Unknown key: {}".format(verkey))
else:
raise IndyErrorHandler.wrap_error(
x_indy, "Wallet {} error".format(self.name), WalletError
) from x_indy
return KeyInfo(verkey, json.loads(metadata) if metadata else {})
[docs] async def create_local_did(
self, seed: str = None, did: str = None, metadata: dict = None
) -> DIDInfo:
"""
Create and store a new local DID.
Args:
seed: Optional seed to use for did
did: The DID to use
metadata: Metadata to store with DID
Returns:
A `DIDInfo` instance representing the created DID
Raises:
WalletDuplicateError: If the DID already exists in the wallet
WalletError: If there is a libindy error
"""
cfg = {}
if seed:
cfg["seed"] = bytes_to_b64(validate_seed(seed))
if did:
cfg["did"] = did
did_json = json.dumps(cfg)
# crypto_type, cid - optional parameters skipped
try:
did, verkey = await indy.did.create_and_store_my_did(self.handle, did_json)
except IndyError as x_indy:
if x_indy.error_code == ErrorCode.DidAlreadyExistsError:
raise WalletDuplicateError("DID already present in wallet")
raise IndyErrorHandler.wrap_error(
x_indy, "Wallet {} error".format(self.name), WalletError
) from x_indy
if metadata:
await self.replace_local_did_metadata(did, metadata)
else:
metadata = {}
return DIDInfo(did, verkey, metadata)
[docs] async def get_local_dids(self) -> Sequence[DIDInfo]:
"""
Get list of defined local DIDs.
Returns:
A list of locally stored DIDs as `DIDInfo` instances
"""
info_json = await indy.did.list_my_dids_with_meta(self.handle)
info = json.loads(info_json)
ret = []
for did in info:
ret.append(
DIDInfo(
did=did["did"],
verkey=did["verkey"],
metadata=json.loads(did["metadata"]) if did["metadata"] else {},
)
)
return ret
[docs] async def get_local_did(self, did: str) -> DIDInfo:
"""
Find info for a local DID.
Args:
did: The DID to get info for
Returns:
A `DIDInfo` instance representing the found DID
Raises:
WalletNotFoundError: If the DID is not found
WalletError: If there is a libindy error
"""
try:
info_json = await indy.did.get_my_did_with_meta(self.handle, did)
except IndyError as x_indy:
if x_indy.error_code == ErrorCode.WalletItemNotFound:
raise WalletNotFoundError("Unknown DID: {}".format(did))
raise IndyErrorHandler.wrap_error(
x_indy, "Wallet {} error".format(self.name), WalletError
) from x_indy
info = json.loads(info_json)
return DIDInfo(
did=info["did"],
verkey=info["verkey"],
metadata=json.loads(info["metadata"]) if info["metadata"] else {},
)
[docs] async def get_local_did_for_verkey(self, verkey: str) -> DIDInfo:
"""
Resolve a local DID from a verkey.
Args:
verkey: The verkey to get the local DID for
Returns:
A `DIDInfo` instance representing the found DID
Raises:
WalletNotFoundError: If the verkey is not found
"""
dids = await self.get_local_dids()
for info in dids:
if info.verkey == verkey:
return info
raise WalletNotFoundError("No DID defined for verkey: {}".format(verkey))
[docs] async def sign_message(self, message: bytes, from_verkey: str) -> bytes:
"""
Sign a message using the private key associated with a given verkey.
Args:
message: Message bytes to sign
from_verkey: The verkey to use to sign
Returns:
A signature
Raises:
WalletError: If the message is not provided
WalletError: If the verkey is not provided
WalletError: If a libindy error occurs
"""
if not message:
raise WalletError("Message not provided")
if not from_verkey:
raise WalletError("Verkey not provided")
try:
result = await indy.crypto.crypto_sign(self.handle, from_verkey, message)
except IndyError:
raise WalletError("Exception when signing message")
return result
[docs] async def verify_message(
self, message: bytes, signature: bytes, from_verkey: str
) -> bool:
"""
Verify a signature against the public key of the signer.
Args:
message: Message to verify
signature: Signature to verify
from_verkey: Verkey to use in verification
Returns:
True if verified, else False
Raises:
WalletError: If the verkey is not provided
WalletError: If the signature is not provided
WalletError: If the message is not provided
WalletError: If a libindy error occurs
"""
if not from_verkey:
raise WalletError("Verkey not provided")
if not signature:
raise WalletError("Signature not provided")
if not message:
raise WalletError("Message not provided")
try:
result = await indy.crypto.crypto_verify(from_verkey, message, signature)
except IndyError as x_indy:
if x_indy.error_code == ErrorCode.CommonInvalidStructure:
result = False
else:
raise IndyErrorHandler.wrap_error(
x_indy, "Wallet {} error".format(self.name), WalletError
) from x_indy
return result
[docs] async def pack_message(
self, message: str, to_verkeys: Sequence[str], from_verkey: str = None
) -> bytes:
"""
Pack a message for one or more recipients.
Args:
message: The message to pack
to_verkeys: List of verkeys to pack for
from_verkey: Sender verkey to pack from
Returns:
The resulting packed message bytes
Raises:
WalletError: If no message is provided
WalletError: If a libindy error occurs
"""
if message is None:
raise WalletError("Message not provided")
try:
result = await indy.crypto.pack_message(
self.handle, message, to_verkeys, from_verkey
)
except IndyError as x_indy:
raise IndyErrorHandler.wrap_error(
x_indy, "Exception when packing message", WalletError
) from x_indy
return result
[docs] async def unpack_message(self, enc_message: bytes) -> (str, str, str):
"""
Unpack a message.
Args:
enc_message: The packed message bytes
Returns:
A tuple: (message, from_verkey, to_verkey)
Raises:
WalletError: If the message is not provided
WalletError: If a libindy error occurs
"""
if not enc_message:
raise WalletError("Message not provided")
try:
unpacked_json = await indy.crypto.unpack_message(self.handle, enc_message)
except IndyError:
raise WalletError("Exception when unpacking message")
unpacked = json.loads(unpacked_json)
message = unpacked["message"]
to_verkey = unpacked.get("recipient_verkey", None)
from_verkey = unpacked.get("sender_verkey", None)
return message, from_verkey, to_verkey
[docs] async def get_credential_definition_tag_policy(self, credential_definition_id: str):
"""Return the tag policy for a given credential definition ID."""
try:
policy_json = await indy.anoncreds.prover_get_credential_attr_tag_policy(
self.handle, credential_definition_id
)
except IndyError as x_indy:
raise IndyErrorHandler.wrap_error(
x_indy, "Wallet {} error".format(self.name), WalletError
) from x_indy
return json.loads(policy_json) if policy_json else None
[docs] async def set_credential_definition_tag_policy(
self,
credential_definition_id: str,
taggables: Sequence[str] = None,
retroactive: bool = True,
):
"""
Set the tag policy for a given credential definition ID.
Args:
credential_definition_id: The ID of the credential definition
taggables: A sequence of string values representing attribute names;
empty array for none, None for all
retroactive: Whether to apply the policy to previously-stored credentials
"""
self.logger.info(
"%s tagging policy: %s",
"Clear" if taggables is None else "Set",
credential_definition_id,
)
try:
await indy.anoncreds.prover_set_credential_attr_tag_policy(
self.handle,
credential_definition_id,
json.dumps(taggables),
retroactive,
)
except IndyError as x_indy:
raise IndyErrorHandler.wrap_error(
x_indy, "Wallet {} error".format(self.name), WalletError
) from x_indy
[docs] @classmethod
async def generate_wallet_key(self, seed: str = None) -> str:
"""Generate a raw Indy wallet key."""
return await indy.wallet.generate_wallet_key(seed)