"""Indy implementation of BaseWallet interface."""
import json
from typing import Sequence
import indy.anoncreds
import indy.did
import indy.crypto
import indy.wallet
from indy.error import IndyError, ErrorCode
from ..indy.sdk.error import IndyErrorHandler
from ..indy.sdk.wallet_setup import IndyOpenWallet
from ..ledger.base import BaseLedger
from ..ledger.endpoint_type import EndpointType
from ..ledger.error import LedgerConfigError
from .base import BaseWallet, KeyInfo, DIDInfo
from .crypto import validate_seed
from .error import WalletError, WalletDuplicateError, WalletNotFoundError
from .util import bytes_to_b64
[docs]class IndySdkWallet(BaseWallet):
"""Indy identity wallet implementation."""
def __init__(self, opened: IndyOpenWallet):
"""Create a new IndySdkWallet instance."""
self.opened = opened
[docs] async def create_signing_key(
self, seed: str = None, metadata: dict = None
) -> KeyInfo:
"""
Create a new public/private signing keypair.
Args:
seed: Seed for key
metadata: Optional metadata to store with the keypair
Returns:
A `KeyInfo` representing the new record
Raises:
WalletDuplicateError: If the resulting verkey already exists in the wallet
WalletError: If there is a libindy error
"""
args = {}
if seed:
args["seed"] = bytes_to_b64(validate_seed(seed))
try:
verkey = await indy.crypto.create_key(self.opened.handle, json.dumps(args))
except IndyError as x_indy:
if x_indy.error_code == ErrorCode.WalletItemAlreadyExists:
raise WalletDuplicateError("Verification key already present in wallet")
raise IndyErrorHandler.wrap_error(
x_indy, "Wallet {} error".format(self.opened.name), WalletError
) from x_indy
# must save metadata to allow identity check
# otherwise get_key_metadata just returns WalletItemNotFound
if metadata is None:
metadata = {}
await indy.crypto.set_key_metadata(
self.opened.handle, verkey, json.dumps(metadata)
)
return KeyInfo(verkey, metadata)
[docs] async def get_signing_key(self, verkey: str) -> KeyInfo:
"""
Fetch info for a signing keypair.
Args:
verkey: The verification key of the keypair
Returns:
A `KeyInfo` representing the keypair
Raises:
WalletNotFoundError: If no keypair is associated with the verification key
WalletError: If there is a libindy error
"""
try:
metadata = await indy.crypto.get_key_metadata(self.opened.handle, verkey)
except IndyError as x_indy:
if x_indy.error_code == ErrorCode.WalletItemNotFound:
raise WalletNotFoundError("Unknown key: {}".format(verkey))
else:
raise IndyErrorHandler.wrap_error(
x_indy, "Wallet {} error".format(self.opened.name), WalletError
) from x_indy
return KeyInfo(verkey, json.loads(metadata) if metadata else {})
[docs] async def rotate_did_keypair_start(self, did: str, next_seed: str = None) -> str:
"""
Begin key rotation for DID that wallet owns: generate new keypair.
Args:
did: signing DID
next_seed: incoming replacement seed (default random)
Returns:
The new verification key
"""
try:
verkey = await indy.did.replace_keys_start(
self.opened.handle,
did,
json.dumps(
{"seed": bytes_to_b64(validate_seed(next_seed))}
if next_seed
else {}
),
)
except IndyError as x_indy:
if x_indy.error_code == ErrorCode.WalletItemNotFound:
raise WalletNotFoundError("Wallet owns no such DID: {}".format(did))
raise IndyErrorHandler.wrap_error(
x_indy, "Wallet {} error".format(self.opened.name), WalletError
) from x_indy
return verkey
[docs] async def rotate_did_keypair_apply(self, did: str) -> DIDInfo:
"""
Apply temporary keypair as main for DID that wallet owns.
Args:
did: signing DID
Returns:
DIDInfo with new verification key and metadata for DID
"""
try:
await indy.did.replace_keys_apply(self.opened.handle, did)
except IndyError as x_indy:
if x_indy.error_code == ErrorCode.WalletItemNotFound:
raise WalletNotFoundError("Wallet owns no such DID: {}".format(did))
raise IndyErrorHandler.wrap_error(
x_indy, "Wallet {} error".format(self.opened.name), WalletError
) from x_indy
[docs] async def create_local_did(
self, seed: str = None, did: str = None, metadata: dict = None
) -> DIDInfo:
"""
Create and store a new local DID.
Args:
seed: Optional seed to use for DID
did: The DID to use
metadata: Metadata to store with DID
Returns:
A `DIDInfo` instance representing the created DID
Raises:
WalletDuplicateError: If the DID already exists in the wallet
WalletError: If there is a libindy error
"""
cfg = {}
if seed:
cfg["seed"] = bytes_to_b64(validate_seed(seed))
if did:
cfg["did"] = did
did_json = json.dumps(cfg)
# crypto_type, cid - optional parameters skipped
try:
did, verkey = await indy.did.create_and_store_my_did(
self.opened.handle, did_json
)
except IndyError as x_indy:
if x_indy.error_code == ErrorCode.DidAlreadyExistsError:
raise WalletDuplicateError("DID already present in wallet")
raise IndyErrorHandler.wrap_error(
x_indy, "Wallet {} error".format(self.opened.name), WalletError
) from x_indy
if metadata:
await self.replace_local_did_metadata(did, metadata)
else:
metadata = {}
return DIDInfo(did, verkey, metadata)
[docs] async def get_local_dids(self) -> Sequence[DIDInfo]:
"""
Get list of defined local DIDs.
Returns:
A list of locally stored DIDs as `DIDInfo` instances
"""
info_json = await indy.did.list_my_dids_with_meta(self.opened.handle)
info = json.loads(info_json)
ret = []
for did in info:
ret.append(
DIDInfo(
did=did["did"],
verkey=did["verkey"],
metadata=json.loads(did["metadata"]) if did["metadata"] else {},
)
)
return ret
[docs] async def get_local_did(self, did: str) -> DIDInfo:
"""
Find info for a local DID.
Args:
did: The DID for which to get info
Returns:
A `DIDInfo` instance representing the found DID
Raises:
WalletNotFoundError: If the DID is not found
WalletError: If there is a libindy error
"""
try:
info_json = await indy.did.get_my_did_with_meta(self.opened.handle, did)
except IndyError as x_indy:
if x_indy.error_code == ErrorCode.WalletItemNotFound:
raise WalletNotFoundError("Unknown DID: {}".format(did))
raise IndyErrorHandler.wrap_error(
x_indy, "Wallet {} error".format(self.opened.name), WalletError
) from x_indy
info = json.loads(info_json)
return DIDInfo(
did=info["did"],
verkey=info["verkey"],
metadata=json.loads(info["metadata"]) if info["metadata"] else {},
)
[docs] async def get_local_did_for_verkey(self, verkey: str) -> DIDInfo:
"""
Resolve a local DID from a verkey.
Args:
verkey: The verkey for which to get the local DID
Returns:
A `DIDInfo` instance representing the found DID
Raises:
WalletNotFoundError: If the verkey is not found
"""
dids = await self.get_local_dids()
for info in dids:
if info.verkey == verkey:
return info
raise WalletNotFoundError("No DID defined for verkey: {}".format(verkey))
[docs] async def set_did_endpoint(
self,
did: str,
endpoint: str,
ledger: BaseLedger,
endpoint_type: EndpointType = None,
):
"""
Update the endpoint for a DID in the wallet, send to ledger if public or posted.
Args:
did: DID for which to set endpoint
endpoint: the endpoint to set, None to clear
ledger: the ledger to which to send endpoint update if
DID is public or posted
endpoint_type: the type of the endpoint/service. Only endpoint_type
'endpoint' affects local wallet
"""
did_info = await self.get_local_did(did)
metadata = {**did_info.metadata}
if not endpoint_type:
endpoint_type = EndpointType.ENDPOINT
if endpoint_type == EndpointType.ENDPOINT:
metadata[endpoint_type.indy] = endpoint
wallet_public_didinfo = await self.get_public_did()
if (
wallet_public_didinfo and wallet_public_didinfo.did == did
) or did_info.metadata.get("posted"):
# if DID on ledger, set endpoint there first
if not ledger:
raise LedgerConfigError(
f"No ledger available but DID {did} is public: missing wallet-type?"
)
if not ledger.read_only:
async with ledger:
await ledger.update_endpoint_for_did(did, endpoint, endpoint_type)
await self.replace_local_did_metadata(did, metadata)
[docs] async def sign_message(self, message: bytes, from_verkey: str) -> bytes:
"""
Sign a message using the private key associated with a given verkey.
Args:
message: Message bytes to sign
from_verkey: The verkey to use to sign
Returns:
A signature
Raises:
WalletError: If the message is not provided
WalletError: If the verkey is not provided
WalletError: If a libindy error occurs
"""
if not message:
raise WalletError("Message not provided")
if not from_verkey:
raise WalletError("Verkey not provided")
try:
result = await indy.crypto.crypto_sign(
self.opened.handle, from_verkey, message
)
except IndyError:
raise WalletError("Exception when signing message")
return result
[docs] async def verify_message(
self, message: bytes, signature: bytes, from_verkey: str
) -> bool:
"""
Verify a signature against the public key of the signer.
Args:
message: Message to verify
signature: Signature to verify
from_verkey: Verkey to use in verification
Returns:
True if verified, else False
Raises:
WalletError: If the verkey is not provided
WalletError: If the signature is not provided
WalletError: If the message is not provided
WalletError: If a libindy error occurs
"""
if not from_verkey:
raise WalletError("Verkey not provided")
if not signature:
raise WalletError("Signature not provided")
if not message:
raise WalletError("Message not provided")
try:
result = await indy.crypto.crypto_verify(from_verkey, message, signature)
except IndyError as x_indy:
if x_indy.error_code == ErrorCode.CommonInvalidStructure:
result = False
else:
raise IndyErrorHandler.wrap_error(
x_indy, "Wallet {} error".format(self.opened.name), WalletError
) from x_indy
return result
[docs] async def pack_message(
self, message: str, to_verkeys: Sequence[str], from_verkey: str = None
) -> bytes:
"""
Pack a message for one or more recipients.
Args:
message: The message to pack
to_verkeys: List of verkeys for which to pack
from_verkey: Sender verkey from which to pack
Returns:
The resulting packed message bytes
Raises:
WalletError: If no message is provided
WalletError: If a libindy error occurs
"""
if message is None:
raise WalletError("Message not provided")
try:
result = await indy.crypto.pack_message(
self.opened.handle, message, to_verkeys, from_verkey
)
except IndyError as x_indy:
raise IndyErrorHandler.wrap_error(
x_indy, "Exception when packing message", WalletError
) from x_indy
return result
[docs] async def unpack_message(self, enc_message: bytes) -> (str, str, str):
"""
Unpack a message.
Args:
enc_message: The packed message bytes
Returns:
A tuple: (message, from_verkey, to_verkey)
Raises:
WalletError: If the message is not provided
WalletError: If a libindy error occurs
"""
if not enc_message:
raise WalletError("Message not provided")
try:
unpacked_json = await indy.crypto.unpack_message(
self.opened.handle, enc_message
)
except IndyError:
raise WalletError("Exception when unpacking message")
unpacked = json.loads(unpacked_json)
message = unpacked["message"]
to_verkey = unpacked.get("recipient_verkey", None)
from_verkey = unpacked.get("sender_verkey", None)
return message, from_verkey, to_verkey
[docs] @classmethod
async def generate_wallet_key(self, seed: str = None) -> str:
"""Generate a raw Indy wallet key."""
return await indy.wallet.generate_wallet_key(seed)