Source code for aries_cloudagent.askar.didcomm.v1

"""DIDComm v1 envelope handling via Askar backend."""

from collections import OrderedDict
from typing import Optional, Sequence, Tuple

from aries_askar import (
    crypto_box,
    Key,
    KeyAlg,
    Session,
)
from aries_askar.bindings import key_get_secret_bytes
from marshmallow import ValidationError

from ...utils.jwe import b64url, JweEnvelope, JweRecipient
from ...wallet.base import WalletError
from ...wallet.crypto import extract_pack_recipients
from ...wallet.util import b58_to_bytes, bytes_to_b58


[docs]def pack_message( to_verkeys: Sequence[str], from_key: Optional[Key], message: bytes ) -> bytes: """Encode a message using the DIDComm v1 'pack' algorithm.""" wrapper = JweEnvelope(with_protected_recipients=True, with_flatten_recipients=False) cek = Key.generate(KeyAlg.C20P) # avoid converting to bytes object: this way the only copy is zeroed afterward cek_b = key_get_secret_bytes(cek._handle) sender_vk = ( bytes_to_b58(from_key.get_public_bytes()).encode("utf-8") if from_key else None ) sender_xk = from_key.convert_key(KeyAlg.X25519) if from_key else None for target_vk in to_verkeys: target_xk = Key.from_public_bytes( KeyAlg.ED25519, b58_to_bytes(target_vk) ).convert_key(KeyAlg.X25519) if sender_vk: enc_sender = crypto_box.crypto_box_seal(target_xk, sender_vk) nonce = crypto_box.random_nonce() enc_cek = crypto_box.crypto_box(target_xk, sender_xk, cek_b, nonce) wrapper.add_recipient( JweRecipient( encrypted_key=enc_cek, header=OrderedDict( [ ("kid", target_vk), ("sender", b64url(enc_sender)), ("iv", b64url(nonce)), ] ), ) ) else: enc_sender = None nonce = None enc_cek = crypto_box.crypto_box_seal(target_xk, cek_b) wrapper.add_recipient( JweRecipient(encrypted_key=enc_cek, header={"kid": target_vk}) ) wrapper.set_protected( OrderedDict( [ ("enc", "xchacha20poly1305_ietf"), ("typ", "JWM/1.0"), ("alg", "Authcrypt" if from_key else "Anoncrypt"), ] ), ) enc = cek.aead_encrypt(message, aad=wrapper.protected_bytes) ciphertext, tag, nonce = enc.parts wrapper.set_payload(ciphertext, nonce, tag) return wrapper.to_json().encode("utf-8")
[docs]async def unpack_message(session: Session, enc_message: bytes) -> Tuple[str, str, str]: """Decode a message using the DIDComm v1 'unpack' algorithm.""" try: wrapper = JweEnvelope.from_json(enc_message) except ValidationError: raise WalletError("Invalid packed message") alg = wrapper.protected.get("alg") is_authcrypt = alg == "Authcrypt" if not is_authcrypt and alg != "Anoncrypt": raise WalletError("Unsupported pack algorithm: {}".format(alg)) recips = extract_pack_recipients(wrapper.recipients) payload_key, sender_vk = None, None for recip_vk in recips: recip_key_entry = await session.fetch_key(recip_vk) if recip_key_entry: payload_key, sender_vk = _extract_payload_key( recips[recip_vk], recip_key_entry.key ) break if not payload_key: raise WalletError( "No corresponding recipient key found in {}".format(tuple(recips)) ) if not sender_vk and is_authcrypt: raise WalletError("Sender public key not provided for Authcrypt message") cek = Key.from_secret_bytes(KeyAlg.C20P, payload_key) message = cek.aead_decrypt( wrapper.ciphertext, nonce=wrapper.iv, tag=wrapper.tag, aad=wrapper.protected_bytes, ) return message, recip_vk, sender_vk
def _extract_payload_key(sender_cek: dict, recip_secret: Key) -> Tuple[bytes, str]: """Extract the payload key from pack recipient details. Returns: A tuple of the CEK and sender verkey """ recip_x = recip_secret.convert_key(KeyAlg.X25519) if sender_cek["nonce"] and sender_cek["sender"]: sender_vk = crypto_box.crypto_box_seal_open( recip_x, sender_cek["sender"] ).decode("utf-8") sender_x = Key.from_public_bytes( KeyAlg.ED25519, b58_to_bytes(sender_vk) ).convert_key(KeyAlg.X25519) cek = crypto_box.crypto_box_open( recip_x, sender_x, sender_cek["key"], sender_cek["nonce"] ) else: sender_vk = None cek = crypto_box.crypto_box_seal_open(recip_x, sender_cek["key"]) return cek, sender_vk