Source code for acapy_agent.wallet.keys.manager

"""Multikey class."""

import logging

from pydid import VerificationMethod

from ...core.profile import ProfileSession
from ...resolver.did_resolver import DIDResolver
from ...utils.multiformats import multibase
from ...wallet.error import WalletError, WalletNotFoundError
from ..base import BaseWallet
from ..key_type import BLS12381G2, ED25519, P256, KeyType
from ..util import b58_to_bytes, bytes_to_b58

LOGGER = logging.getLogger(__name__)

DEFAULT_ALG = "ed25519"
ALG_MAPPINGS = {
    "ed25519": {
        "key_type": ED25519,
        "multikey_prefix": "z6Mk",
        "prefix_hex": "ed01",
        "prefix_length": 2,
    },
    "x25519": {
        "key_type": ED25519,
        "multikey_prefix": "z6LS",
        "prefix_hex": "ec01",
        "prefix_length": 2,
    },
    "p256": {
        "key_type": P256,
        "multikey_prefix": "zDn",
        "prefix_hex": "8024",
        "prefix_length": 2,
    },
    "bls12381g2": {
        "key_type": BLS12381G2,
        "multikey_prefix": ("zUC7", "zUC6"),
        "prefix_hex": "eb01",
        "prefix_length": 2,
    },
}


[docs] def multikey_to_verkey(multikey: str): """Transform multikey to verkey.""" alg = key_type_from_multikey(multikey).key_type prefix_length = ALG_MAPPINGS[alg]["prefix_length"] public_bytes = bytes(bytearray(multibase.decode(multikey))[prefix_length:]) return bytes_to_b58(public_bytes)
[docs] def verkey_to_multikey(verkey: str, alg: str): """Transform verkey to multikey.""" prefix_hex = ALG_MAPPINGS[alg]["prefix_hex"] prefixed_key_hex = f"{prefix_hex}{b58_to_bytes(verkey).hex()}" return multibase.encode(bytes.fromhex(prefixed_key_hex), "base58btc")
[docs] def key_type_from_multikey(multikey: str) -> KeyType: """Derive key_type class from multikey prefix.""" for mapping in ALG_MAPPINGS: prefixes = ALG_MAPPINGS[mapping]["multikey_prefix"] if isinstance(prefixes, (list, tuple)): if any(multikey.startswith(p) for p in prefixes): return ALG_MAPPINGS[mapping]["key_type"] elif multikey.startswith(prefixes): return ALG_MAPPINGS[mapping]["key_type"] raise MultikeyManagerError(f"Unsupported key algorithm for multikey {multikey}.")
[docs] def multikey_from_verification_method(verification_method: VerificationMethod) -> str: """Derive a multikey from a VerificationMethod.""" if verification_method.type == "Multikey": multikey = verification_method.public_key_multibase elif verification_method.type == "Ed25519VerificationKey2018": multikey = verkey_to_multikey( verification_method.public_key_base58, alg="ed25519" ) elif verification_method.type == "Ed25519VerificationKey2020": multikey = verification_method.public_key_multibase elif verification_method.type == "Bls12381G2Key2020": multikey = verkey_to_multikey( verification_method.public_key_base58, alg="bls12381g2" ) # TODO address JsonWebKey based verification methods else: raise MultikeyManagerError("Unknown verification method type.") return multikey
[docs] class MultikeyManagerError(Exception): """Generic MultikeyManager Error."""
[docs] class MultikeyManager: """Class for managing wallet keys.""" def __init__(self, session: ProfileSession): """Initialize the MultikeyManager.""" self.session: ProfileSession = session self.wallet: BaseWallet = session.inject(BaseWallet)
[docs] async def resolve_and_bind_kid(self, kid: str): """Fetch key if exists, otherwise resolve and bind it. This function is idempotent. """ if await self.kid_exists(kid): LOGGER.info(f"kid {kid} already bound in storage, will not resolve.") return await self.from_kid(kid) else: multikey = await self.resolve_multikey_from_verification_method_id(kid) LOGGER.info( f"kid {kid} binding not found in storage, \ binding to resolved multikey {multikey}." ) return await self.update(multikey, kid)
[docs] async def resolve_multikey_from_verification_method_id(self, kid: str): """Derive a multikey from the verification method ID.""" resolver = self.session.inject(DIDResolver) verification_method = await resolver.dereference( profile=self.session.profile, did_url=kid ) return multikey_from_verification_method(verification_method)
[docs] def key_type_from_multikey(self, multikey: str) -> KeyType: """Derive key_type class from multikey prefix.""" for mapping in ALG_MAPPINGS: prefixes = ALG_MAPPINGS[mapping]["multikey_prefix"] if isinstance(prefixes, (list, tuple)): if any(multikey.startswith(p) for p in prefixes): return ALG_MAPPINGS[mapping]["key_type"] elif multikey.startswith(prefixes): return ALG_MAPPINGS[mapping]["key_type"] raise MultikeyManagerError(f"Unsupported key algorithm for multikey {multikey}.")
[docs] async def kid_exists(self, kid: str): """Check if kid exists.""" try: key = await self.wallet.get_key_by_kid(kid=kid) if key: return True return False except (WalletNotFoundError, AttributeError): return False
[docs] async def multikey_exists(self, multikey: str): """Check if a multikey exists in the wallet.""" try: key_info = await self.wallet.get_signing_key( verkey=multikey_to_verkey(multikey) ) if key_info: return True return False except (WalletNotFoundError, AttributeError): return False
[docs] async def from_kid(self, kid: str): """Fetch a single key.""" try: key_info = await self.wallet.get_key_by_kid(kid=kid) return { "kid": key_info.kid, "multikey": verkey_to_multikey( key_info.verkey, alg=key_info.key_type.key_type ), } except WalletError as err: LOGGER.error(err) return None
[docs] async def from_multikey(self, multikey: str): """Fetch a single key.""" key_info = await self.wallet.get_signing_key(verkey=multikey_to_verkey(multikey)) return { "kid": key_info.kid, "multikey": verkey_to_multikey( key_info.verkey, alg=key_info.key_type.key_type ), }
[docs] async def create(self, seed: str = None, kid: str = None, alg: str = DEFAULT_ALG): """Create a new key pair.""" if alg not in ALG_MAPPINGS: raise MultikeyManagerError( f"Unknown key algorithm, use one of {list(ALG_MAPPINGS.keys())}." ) if kid and await self.kid_exists(kid=kid): raise MultikeyManagerError(f"kid '{kid}' already exists in wallet.") key_type = ALG_MAPPINGS[alg]["key_type"] key_info = await self.wallet.create_key(key_type=key_type, seed=seed, kid=kid) return { "kid": key_info.kid, "multikey": verkey_to_multikey(key_info.verkey, alg=alg), }
[docs] async def update(self, multikey: str, kid: str, unbind=False): """Bind or unbind a kid with a key pair.""" ( await self.unbind_key_id(multikey, kid) if unbind else await self.bind_key_id(multikey, kid) ) return {"kid": kid, "multikey": multikey}
[docs] async def bind_key_id(self, multikey: str, kid: str): """Bind a new key id to a key pair.""" try: return await self.wallet.assign_kid_to_key(multikey_to_verkey(multikey), kid) except WalletError as err: LOGGER.error(err) raise MultikeyManagerError(err)
[docs] async def unbind_key_id(self, multikey: str, kid: str): """Unbind a key id from a key pair.""" try: return await self.wallet.unassign_kid_from_key( multikey_to_verkey(multikey), kid ) except WalletError as err: LOGGER.error(err) raise MultikeyManagerError(err)