"""Indy implementation of BaseWallet interface."""
import json
import logging
from typing import Sequence
import indy.anoncreds
import indy.did
import indy.crypto
import indy.pairwise
from indy.error import IndyError, ErrorCode
from .base import BaseWallet, KeyInfo, DIDInfo, PairwiseInfo
from .crypto import validate_seed
from .error import WalletError, WalletDuplicateError, WalletNotFoundError
from .util import bytes_to_b64
[docs]class IndyWallet(BaseWallet):
"""Indy wallet implementation."""
DEFAULT_FRESHNESS = 0
DEFAULT_KEY = ""
DEFAULT_NAME = "default"
DEFAULT_STORAGE_TYPE = None
WALLET_TYPE = "indy"
def __init__(self, config: dict = None):
"""
Initialize a `IndyWallet` instance.
Args:
config: {name, key, seed, did, auto-create, auto-remove,
storage_type, storage_config, storage_creds}
"""
self.logger = logging.getLogger(__name__)
if not config:
config = {}
super(IndyWallet, self).__init__(config)
self._auto_create = config.get("auto_create", True)
self._auto_remove = config.get("auto_remove", False)
self._created = False
self._freshness_time = config.get("freshness_time", False)
self._handle = None
self._key = config.get("key") or self.DEFAULT_KEY
self._name = config.get("name") or self.DEFAULT_NAME
self._storage_type = config.get("storage_type") or self.DEFAULT_STORAGE_TYPE
self._storage_config = config.get("storage_config", None)
self._storage_creds = config.get("storage_creds", None)
self._master_secret_id = None
@property
def handle(self):
"""
Get internal wallet reference.
Returns:
A handle to the wallet
"""
return self._handle
@property
def created(self) -> bool:
"""Check whether the wallet was created on the last open call."""
return self._created
@property
def opened(self) -> bool:
"""
Check whether wallet is currently open.
Returns:
True if open, else False
"""
return bool(self._handle)
@property
def name(self) -> str:
"""
Accessor for the wallet name.
Returns:
The wallet name
"""
return self._name
@property
def master_secret_id(self) -> str:
"""
Accessor for the master secret id.
Returns:
The master secret id
"""
return self._master_secret_id
@property
def _wallet_config(self) -> dict:
"""
Accessor for the wallet config.
Returns:
The wallet config
"""
ret = {
"id": self._name,
"freshness_time": self._freshness_time,
"storage_type": self._storage_type,
# storage_config
}
if self._storage_config is not None:
ret["storage_config"] = json.loads(self._storage_config)
return ret
@property
def _wallet_access(self) -> dict:
"""
Accessor for the wallet access.
Returns:
The wallet access
"""
ret = {
"key": self._key,
# key_derivation_method
# storage_credentials
}
if self._storage_creds is not None:
ret["storage_credentials"] = json.loads(self._storage_creds)
return ret
[docs] async def create(self, replace: bool = False):
"""
Create a new wallet.
Args:
replace: Removes the old wallet if True
Raises:
WalletError: If there was a problem removing the wallet
WalletError: IF there was a libindy error
"""
if replace:
try:
await self.remove()
except WalletNotFoundError:
pass
try:
await indy.wallet.create_wallet(
config=json.dumps(self._wallet_config),
credentials=json.dumps(self._wallet_access),
)
except IndyError as x_indy:
if x_indy.error_code == ErrorCode.WalletAlreadyExistsError:
raise WalletError(
"Wallet was not removed by SDK, may still be open: {}".format(
self.name
)
)
else:
raise WalletError(str(x_indy))
[docs] async def remove(self):
"""
Remove an existing wallet.
Raises:
WalletNotFoundError: If the wallet could not be found
WalletError: If there was an libindy error
"""
try:
await indy.wallet.delete_wallet(
config=json.dumps(self._wallet_config),
credentials=json.dumps(self._wallet_access),
)
except IndyError as x_indy:
if x_indy.error_code == ErrorCode.WalletNotFoundError:
raise WalletNotFoundError("Wallet not found: {}".format(self.name))
raise WalletError(str(x_indy))
[docs] async def open(self):
"""
Open wallet, removing and/or creating it if so configured.
Raises:
WalletError: If wallet not found after creation
WalletNotFoundError: If the wallet is not found
WalletError: If the wallet is already open
WalletError: If there is a libindy error
"""
if self.opened:
return
self._created = False
while True:
try:
self._handle = await indy.wallet.open_wallet(
config=json.dumps(self._wallet_config),
credentials=json.dumps(self._wallet_access),
)
break
except IndyError as x_indy:
if x_indy.error_code == ErrorCode.WalletNotFoundError:
if self._created:
raise WalletError(
"Wallet not found after creation: {}".format(self.name)
)
if self._auto_create:
await self.create(self._auto_remove)
self._created = True
else:
raise WalletNotFoundError(
"Wallet not found: {}".format(self.name)
)
elif x_indy.error_code == ErrorCode.WalletAlreadyOpenedError:
raise WalletError("Wallet is already open: {}".format(self.name))
else:
raise WalletError(str(x_indy))
self.logger.info("Creating master secret...")
try:
self._master_secret_id = await indy.anoncreds.prover_create_master_secret(
self.handle, self.name
)
except IndyError as error:
if error.error_code == ErrorCode.AnoncredsMasterSecretDuplicateNameError:
self.logger.info("Master secret already exists")
self._master_secret_id = self.name
else:
raise
[docs] async def close(self):
"""Close previously-opened wallet, removing it if so configured."""
if self._handle:
await indy.wallet.close_wallet(self._handle)
if self._auto_remove:
await self.remove()
self._handle = None
[docs] async def create_signing_key(
self, seed: str = None, metadata: dict = None
) -> KeyInfo:
"""
Create a new public/private signing keypair.
Args:
seed: Seed for key
metadata: Optional metadata to store with the keypair
Returns:
A `KeyInfo` representing the new record
Raises:
WalletDuplicateError: If the resulting verkey already exists in the wallet
WalletError: If there is a libindy error
"""
args = {}
if seed:
args["seed"] = bytes_to_b64(validate_seed(seed))
try:
verkey = await indy.crypto.create_key(self.handle, json.dumps(args))
except IndyError as x_indy:
if x_indy.error_code == ErrorCode.WalletItemAlreadyExists:
raise WalletDuplicateError("Verification key already present in wallet")
else:
raise WalletError(str(x_indy))
# must save metadata to allow identity check
# otherwise get_key_metadata just returns WalletItemNotFound
if metadata is None:
metadata = {}
await indy.crypto.set_key_metadata(self.handle, verkey, json.dumps(metadata))
return KeyInfo(verkey, metadata)
[docs] async def get_signing_key(self, verkey: str) -> KeyInfo:
"""
Fetch info for a signing keypair.
Args:
verkey: The verification key of the keypair
Returns:
A `KeyInfo` representing the keypair
Raises:
WalletNotFoundError: If no keypair is associated with the verification key
WalletError: If there is a libindy error
"""
try:
metadata = await indy.crypto.get_key_metadata(self.handle, verkey)
except IndyError as x_indy:
if x_indy.error_code == ErrorCode.WalletItemNotFound:
raise WalletNotFoundError("Unknown key: {}".format(verkey))
else:
raise WalletError(str(x_indy))
return KeyInfo(verkey, json.loads(metadata) if metadata else {})
[docs] async def create_local_did(
self, seed: str = None, did: str = None, metadata: dict = None
) -> DIDInfo:
"""
Create and store a new local DID.
Args:
seed: Optional seed to use for did
did: The DID to use
metadata: Metadata to store with DID
Returns:
A `DIDInfo` instance representing the created DID
Raises:
WalletDuplicateError: If the DID already exists in the wallet
WalletError: If there is a libindy error
"""
cfg = {}
if seed:
cfg["seed"] = bytes_to_b64(validate_seed(seed))
if did:
cfg["did"] = did
did_json = json.dumps(cfg)
# crypto_type, cid - optional parameters skipped
try:
did, verkey = await indy.did.create_and_store_my_did(self.handle, did_json)
except IndyError as x_indy:
if x_indy.error_code == ErrorCode.DidAlreadyExistsError:
raise WalletDuplicateError("DID already present in wallet")
else:
raise WalletError(str(x_indy))
if metadata:
await self.replace_local_did_metadata(did, metadata)
return DIDInfo(did, verkey, metadata)
[docs] async def get_local_dids(self) -> Sequence[DIDInfo]:
"""
Get list of defined local DIDs.
Returns:
A list of locally stored DIDs as `DIDInfo` instances
"""
info_json = await indy.did.list_my_dids_with_meta(self.handle)
info = json.loads(info_json)
ret = []
for did in info:
ret.append(
DIDInfo(
did=did["did"],
verkey=did["verkey"],
metadata=json.loads(did["metadata"]) if did["metadata"] else {},
)
)
return ret
[docs] async def get_local_did(self, did: str) -> DIDInfo:
"""
Find info for a local DID.
Args:
did: The DID to get info for
Returns:
A `DIDInfo` instance representing the found DID
Raises:
WalletNotFoundError: If the DID is not found
WalletError: If there is a libindy error
"""
try:
info_json = await indy.did.get_my_did_with_meta(self.handle, did)
except IndyError as x_indy:
if x_indy.error_code == ErrorCode.WalletItemNotFound:
raise WalletNotFoundError("Unknown DID: {}".format(did))
else:
raise WalletError(str(x_indy))
info = json.loads(info_json)
return DIDInfo(
did=info["did"],
verkey=info["verkey"],
metadata=json.loads(info["metadata"]) if info["metadata"] else {},
)
[docs] async def get_local_did_for_verkey(self, verkey: str) -> DIDInfo:
"""
Resolve a local DID from a verkey.
Args:
verkey: The verkey to get the local DID for
Returns:
A `DIDInfo` instance representing the found DID
Raises:
WalletNotFoundError: If the verkey is not found
"""
dids = await self.get_local_dids()
for info in dids:
if info.verkey == verkey:
return info
raise WalletNotFoundError("No DID defined for verkey: {}".format(verkey))
[docs] async def create_pairwise(
self,
their_did: str,
their_verkey: str,
my_did: str = None,
metadata: dict = None,
) -> PairwiseInfo:
"""
Create a new pairwise DID for a secure connection.
Args:
their_did: The other party's DID
their_verkey: The other party's verkey
my_did: My DID
metadata: Metadata to store with this relationship
Returns:
A `PairwiseInfo` object representing the pairwise connection
Raises:
WalletError: If there is a libindy error
WalletDuplicateError: If the DID already exists in the wallet
"""
# store their DID info in wallet
ident_json = json.dumps({"did": their_did, "verkey": their_verkey})
try:
await indy.did.store_their_did(self.handle, ident_json)
except IndyError as x_indy:
if x_indy.error_code == ErrorCode.WalletItemAlreadyExists:
# their DID already stored, but real test is creating pairwise
pass
else:
raise WalletError(str(x_indy))
# create a new local DID for this pairwise connection
if my_did:
my_info = await self.get_local_did(my_did)
else:
my_info = await self.create_local_did(
None, None, {"pairwise_for": their_did}
)
# create the pairwise record
combined_meta = {
# info that should be returned in pairwise_info struct but isn't
"their_verkey": their_verkey,
"my_verkey": my_info.verkey,
"custom": metadata or {},
}
meta_json = json.dumps(combined_meta)
try:
await indy.pairwise.create_pairwise(
self.handle, their_did, my_info.did, meta_json
)
except IndyError as x_indy:
if x_indy.error_code == ErrorCode.WalletItemAlreadyExists:
raise WalletDuplicateError(
"Pairwise DID already present in wallet: {}".format(their_did)
)
else:
raise WalletError(str(x_indy))
return PairwiseInfo(
their_did=their_did,
their_verkey=their_verkey,
my_did=my_info.did,
my_verkey=my_info.verkey,
metadata=combined_meta["custom"],
)
def _make_pairwise_info(self, result: dict, their_did: str = None) -> PairwiseInfo:
"""
Convert Indy pairwise info into PairwiseInfo record.
Args:
result:
their_did
Returns:
A new `PairwiseInfo` instance
"""
meta = result["metadata"] and json.loads(result["metadata"]) or {}
if "custom" not in meta:
# not one of our pairwise records
return None
return PairwiseInfo(
their_did=result.get("their_did", their_did),
their_verkey=meta.get("their_verkey"),
my_did=result.get("my_did"),
my_verkey=meta.get("my_verkey"),
metadata=meta["custom"],
)
[docs] async def get_pairwise_list(self) -> Sequence[PairwiseInfo]:
"""
Get list of defined pairwise DIDs.
Returns:
A list of `PairwiseInfo` instances for all pairwise relationships
"""
pairs_json = await indy.pairwise.list_pairwise(self.handle)
pairs = json.loads(pairs_json)
ret = []
for pair_json in pairs:
pair = json.loads(pair_json)
info = self._make_pairwise_info(pair)
if info:
ret.append(info)
return ret
[docs] async def get_pairwise_for_did(self, their_did: str) -> PairwiseInfo:
"""
Find info for a pairwise DID.
Args:
their_did: The DID to get a pairwise relationship for
Returns:
A `PairwiseInfo` instance representing the relationship
Raises:
WalletNotFoundError: If no pairwise DID defined for target
WalletNotFoundError: If no pairwise DID defined for target
"""
try:
pair_json = await indy.pairwise.get_pairwise(self.handle, their_did)
except IndyError as x_indy:
if x_indy.error_code == ErrorCode.WalletItemNotFound:
raise WalletNotFoundError(
"No pairwise DID defined for target: {}".format(their_did)
)
else:
raise WalletError(str(x_indy))
pair = json.loads(pair_json)
info = self._make_pairwise_info(pair, their_did)
if not info:
raise WalletNotFoundError(
"No pairwise DID defined for target: {}".format(their_did)
)
return info
[docs] async def get_pairwise_for_verkey(self, their_verkey: str) -> PairwiseInfo:
"""
Resolve a pairwise DID from a verkey.
Args:
their_verkey: The verkey to get a pairwise relationship for
Returns:
A `PairwiseInfo` instance for the relationship
Raises:
WalletNotFoundError: If no pairwise DID is defined for verkey
"""
dids = await self.get_pairwise_list()
for info in dids:
if info.their_verkey == their_verkey:
return info
raise WalletNotFoundError(
"No pairwise DID defined for verkey: {}".format(their_verkey)
)
[docs] async def sign_message(self, message: bytes, from_verkey: str) -> bytes:
"""
Sign a message using the private key associated with a given verkey.
Args:
message: Message bytes to sign
from_verkey: The verkey to use to sign
Returns:
A signature
Raises:
WalletError: If the message is not provided
WalletError: If the verkey is not provided
WalletError: If a libindy error occurs
"""
if not message:
raise WalletError("Message not provided")
if not from_verkey:
raise WalletError("Verkey not provided")
try:
result = await indy.crypto.crypto_sign(self.handle, from_verkey, message)
except IndyError:
raise WalletError("Exception when signing message")
return result
[docs] async def verify_message(
self, message: bytes, signature: bytes, from_verkey: str
) -> bool:
"""
Verify a signature against the public key of the signer.
Args:
message: Message to verify
signature: Signature to verify
from_verkey: Verkey to use in verification
Returns:
True if verified, else False
Raises:
WalletError: If the verkey is not provided
WalletError: If the signature is not provided
WalletError: If the message is not provided
WalletError: If a libindy error occurs
"""
if not from_verkey:
raise WalletError("Verkey not provided")
if not signature:
raise WalletError("Signature not provided")
if not message:
raise WalletError("Message not provided")
try:
result = await indy.crypto.crypto_verify(from_verkey, message, signature)
except IndyError as x_indy:
if x_indy.error_code == ErrorCode.CommonInvalidStructure:
result = False
else:
raise WalletError(str(x_indy))
return result
[docs] async def encrypt_message(
self, message: bytes, to_verkey: str, from_verkey: str = None
) -> bytes:
"""
Apply auth_crypt or anon_crypt to a message.
Args:
message: The binary message content
to_verkey: The verkey of the recipient
from_verkey: The verkey of the sender. If provided then auth_crypt is used,
otherwise anon_crypt is used.
Returns:
The encrypted message content
Raises:
WalletError: If a libindy error occurs
"""
if from_verkey:
try:
result = await indy.crypto.auth_crypt(
self.handle, from_verkey, to_verkey, message
)
except IndyError:
raise WalletError("Exception when encrypting auth message")
else:
try:
result = await indy.crypto.anon_crypt(to_verkey, message)
except IndyError:
raise WalletError("Exception when encrypting anonymous message")
return result
[docs] async def decrypt_message(
self, enc_message: bytes, to_verkey: str, use_auth: bool
) -> (bytes, str):
"""
Decrypt a message assembled by auth_crypt or anon_crypt.
Args:
message: The encrypted message content
to_verkey: The verkey of the recipient. If provided then auth_decrypt is
used, otherwise anon_decrypt is used.
use_auth: True if you would like to auth_decrypt, False for anon_decrypt
Returns:
A tuple of the decrypted message content and sender verkey
(None for anon_crypt)
Raises:
WalletError: If a libindy error occurs
"""
if use_auth:
try:
sender_verkey, result = await indy.crypto.auth_decrypt(
self.handle, to_verkey, enc_message
)
except IndyError:
raise WalletError("Exception when decrypting auth message")
else:
try:
result = await indy.crypto.anon_decrypt(
self.handle, to_verkey, enc_message
)
except IndyError:
raise WalletError("Exception when decrypting anonymous message")
sender_verkey = None
return result, sender_verkey
[docs] async def pack_message(
self, message: str, to_verkeys: Sequence[str], from_verkey: str = None
) -> bytes:
"""
Pack a message for one or more recipients.
Args:
message: The message to pack
to_verkeys: List of verkeys to pack for
from_verkey: Sender verkey to pack from
Returns:
The resulting packed message bytes
Raises:
WalletError: If no message is provided
WalletError: If a libindy error occurs
"""
if message is None:
raise WalletError("Message not provided")
try:
result = await indy.crypto.pack_message(
self.handle, message, to_verkeys, from_verkey
)
except IndyError:
raise WalletError("Exception when packing message")
return result
[docs] async def unpack_message(self, enc_message: bytes) -> (str, str, str):
"""
Unpack a message.
Args:
enc_message: The packed message bytes
Returns:
A tuple: (message, from_verkey, to_verkey)
Raises:
WalletError: If the message is not provided
WalletError: If a libindy error occurs
"""
if not enc_message:
raise WalletError("Message not provided")
try:
unpacked_json = await indy.crypto.unpack_message(self.handle, enc_message)
except IndyError:
raise WalletError("Exception when unpacking message")
unpacked = json.loads(unpacked_json)
message = unpacked["message"]
to_verkey = unpacked.get("recipient_verkey", None)
from_verkey = unpacked.get("sender_verkey", None)
return message, from_verkey, to_verkey