Source code for aries_cloudagent.messaging.decorators.attach_decorator

A message decorator for attachments.

An attach decorator embeds content or specifies appended content.

import json
import uuid

from typing import Any, Mapping, Sequence, Union

from marshmallow import EXCLUDE, fields, pre_load

from ...wallet.base import BaseWallet
from ...wallet.util import (
from ..models.base import BaseModel, BaseModelError, BaseModelSchema
from ..valid import (

MULTICODEC_ED25519_PUB = b"\xed"

[docs]class AttachDecoratorDataJWSHeader(BaseModel): """Attach decorator data JWS header."""
[docs] class Meta: """AttachDecoratorDataJWS metadata.""" schema_class = "AttachDecoratorDataJWSHeaderSchema"
def __init__(self, kid: str): """Initialize JWS header to include in attach decorator data.""" self.kid = kid def __eq__(self, other: Any): """Compare equality with another.""" return type(self) == type(other) and self.kid == other.kid
[docs]class AttachDecoratorDataJWSHeaderSchema(BaseModelSchema): """Attach decorator data JWS header schema."""
[docs] class Meta: """Attach decorator data schema metadata.""" model_class = AttachDecoratorDataJWSHeader unknown = EXCLUDE
kid = fields.Str( description="Key identifier, in W3C did:key or DID URL format", required=True, **JWS_HEADER_KID, )
[docs]class AttachDecoratorData1JWS(BaseModel): """Single Detached JSON Web Signature for inclusion in attach decorator data."""
[docs] class Meta: """AttachDecoratorData1JWS metadata.""" schema_class = "AttachDecoratorData1JWSSchema"
def __init__( self, *, header: AttachDecoratorDataJWSHeader, protected: str = None, signature: str, ): """Initialize flattened single-JWS to include in attach decorator data.""" self.header = header self.protected = protected self.signature = signature def __eq__(self, other: Any): """Compare equality with another.""" return ( type(self) == type(other) and self.header == other.header and self.protected == other.protected and self.signature == other.signature )
[docs]class AttachDecoratorData1JWSSchema(BaseModelSchema): """Single attach decorator data JWS schema."""
[docs] class Meta: """Single attach decorator data JWS schema metadata.""" model_class = AttachDecoratorData1JWS unknown = EXCLUDE
header = fields.Nested(AttachDecoratorDataJWSHeaderSchema, required=True) protected = fields.Str( description="protected JWS header", required=False, **BASE64URL_NO_PAD ) signature = fields.Str(description="signature", required=True, **BASE64URL_NO_PAD)
[docs]class AttachDecoratorDataJWS(BaseModel): """ Detached JSON Web Signature for inclusion in attach decorator data. May hold one signature in flattened format, or multiple signatures in the "signatures" member. """
[docs] class Meta: """AttachDecoratorDataJWS metadata.""" schema_class = "AttachDecoratorDataJWSSchema"
def __init__( self, *, header: AttachDecoratorDataJWSHeader = None, protected: str = None, signature: str = None, signatures: Sequence[AttachDecoratorData1JWS] = None, ): """Initialize JWS to include in attach decorator multi-sig data.""" self.header = header self.protected = protected self.signature = signature self.signatures = signatures
[docs]class AttachDecoratorDataJWSSchema(BaseModelSchema): """Schema for detached JSON Web Signature for inclusion in attach decorator data."""
[docs] class Meta: """Metadata for schema for detached JWS for inclusion in attach deco data.""" model_class = AttachDecoratorDataJWS unknown = EXCLUDE
[docs] @pre_load def validate_single_xor_multi_sig(self, data: Mapping, **kwargs): """Ensure model is for either 1 or many sigatures, not mishmash of both.""" if "signatures" in data: if any(k in data for k in ("header", "protected", "signature")): raise BaseModelError( "AttachDecoratorDataJWSSchema: " "JWS must be flattened or general JSON serialization format" ) elif not all(k in data for k in ("header", "signature")): raise BaseModelError( "AttachDecoratorDataJWSSchema: " "Flattened JSON serialization format must include header and signature" ) return data
header = fields.Nested( AttachDecoratorDataJWSHeaderSchema, required=False, # packed in signatures if multi-sig ) protected = fields.Str( description="protected JWS header", required=False, # packed in signatures if multi-sig **BASE64URL_NO_PAD, ) signature = fields.Str( description="signature", required=False, # packed in signatures if multi-sig **BASE64URL_NO_PAD, ) signatures = fields.List( fields.Nested(AttachDecoratorData1JWSSchema), required=False, # only present if multi-sig description="List of signatures", )
[docs]def did_key(verkey: str) -> str: """Qualify verkey into DID key if need be.""" if verkey.startswith(f"did:key:{MULTIBASE_B58_BTC}"): return verkey return f"did:key:{MULTIBASE_B58_BTC}" + bytes_to_b58( MULTICODEC_ED25519_PUB + b58_to_bytes(verkey) )
[docs]def raw_key(verkey: str) -> str: """Strip qualified key to raw key if need be.""" if verkey.startswith(f"did:key:{MULTIBASE_B58_BTC}"): return bytes_to_b58(b58_to_bytes(verkey[9:])[1:]) return verkey
[docs]class AttachDecoratorData(BaseModel): """Attach decorator data."""
[docs] class Meta: """AttachDecoratorData metadata.""" schema_class = "AttachDecoratorDataSchema"
def __init__( self, *, jws_: AttachDecoratorDataJWS = None, sha256_: str = None, links_: Union[Sequence[str], str] = None, base64_: str = None, json_: dict = None, ): """ Initialize decorator data. Specify content for one of: - `base64_` - `json_` - `links_`. Args: jws_: detached JSON Web Signature over base64 or linked attachment content sha256_: optional sha-256 hash for content links_: URL or list of URLs base64_: base64 encoded content for inclusion json_: dict content for inclusion as json """ if jws_: self.jws_ = jws_ assert not json_ if base64_: self.base64_ = base64_ elif json_: self.json_ = json_ else: assert isinstance(links_, (str, Sequence)) self.links_ = [links_] if isinstance(links_, str) else list(links_) if sha256_: self.sha256_ = sha256_ @property def base64(self): """Accessor for base64 decorator data, or None.""" return getattr(self, "base64_", None) @property def jws(self): """Accessor for JWS, or None.""" return getattr(self, "jws_", None) @property def signatures(self) -> int: """Accessor for number of signatures.""" if self.jws: return 1 if self.jws.signature else len(self.jws.signatures) return 0 @property def signed(self) -> bytes: """Accessor for signed content (payload), None for unsigned.""" return ( b64_to_bytes(unpad(set_urlsafe_b64(self.base64, urlsafe=True))) if self.signatures else None )
[docs] def header_map(self, idx: int = 0, jose: bool = True) -> Mapping: """ Accessor for header info at input index, default 0 or unique for singly-signed. Args: idx: index of interest, zero-based (default 0) jose: True to return unprotected header attributes, False for protected only """ if not self.signatures: return None headers = {} sig = self.jws if self.jws.signature else self.jws.signatures[idx] if sig.protected: headers.update(json.loads(b64_to_str(sig.protected, urlsafe=True))) if jose: headers.update(sig.header.serialize()) return headers
@property def json(self): """Accessor for json decorator data, or None.""" return getattr(self, "json_", None) @property def links(self): """Accessor for links decorator data, or None.""" return getattr(self, "links_", None) @property def sha256(self): """Accessor for sha256 decorator data, or None.""" return getattr(self, "sha256_", None)
[docs] async def sign( self, verkeys: Union[str, Sequence[str]], wallet: BaseWallet, ): """ Sign base64 data value of attachment. Args: verkeys: verkey(s) of the signing party (in raw or DID key format) wallet: The wallet to use for the signature """ def build_protected(verkey: str): """Build protected header.""" return str_to_b64( json.dumps( { "alg": "EdDSA", "kid": did_key(verkey), "jwk": { "kty": "OKP", "crv": "Ed25519", "x": bytes_to_b64( b58_to_bytes(raw_key(verkey)), urlsafe=True, pad=False ), "kid": did_key(verkey), }, } ), urlsafe=True, pad=False, ) assert self.base64 b64_payload = unpad(set_urlsafe_b64(self.base64, True)) if isinstance(verkeys, str) or ( isinstance(verkeys, Sequence) and len(verkeys) == 1 ): kid = did_key(verkeys if isinstance(verkeys, str) else verkeys[0]) verkey = raw_key(verkeys if isinstance(verkeys, str) else verkeys[0]) b64_protected = build_protected(verkey) b64_sig = bytes_to_b64( await wallet.sign_message( message=(b64_protected + "." + b64_payload).encode("ascii"), from_verkey=verkey, ), urlsafe=True, pad=False, ) self.jws_ = AttachDecoratorDataJWS.deserialize( { "header": AttachDecoratorDataJWSHeader(kid).serialize(), "protected": b64_protected, # always present by construction "signature": b64_sig, } ) else: jws = {"signatures": []} for verkey in verkeys: b64_protected = build_protected(verkey) b64_sig = bytes_to_b64( await wallet.sign_message( message=(b64_protected + "." + b64_payload).encode("ascii"), from_verkey=raw_key(verkey), ), urlsafe=True, pad=False, ) jws["signatures"].append( { "protected": b64_protected, # always present by construction "header": {"kid": did_key(verkey)}, "signature": b64_sig, } ) self.jws_ = AttachDecoratorDataJWS.deserialize(jws)
[docs] async def verify(self, wallet: BaseWallet) -> bool: """ Verify the signature(s). Args: wallet: Wallet to use to verify signature Returns: True if verification succeeds else False """ assert self.jws b64_payload = unpad(set_urlsafe_b64(self.base64, True)) for sig in [self.jws] if self.signatures == 1 else self.jws.signatures: b64_protected = sig.protected b64_sig = sig.signature protected = json.loads(b64_to_str(b64_protected, urlsafe=True)) assert "jwk" in protected and protected["jwk"].get("kty") == "OKP" sign_input = (b64_protected + "." + b64_payload).encode("ascii") b_sig = b64_to_bytes(b64_sig, urlsafe=True) verkey = bytes_to_b58(b64_to_bytes(protected["jwk"]["x"], urlsafe=True)) if not await wallet.verify_message(sign_input, b_sig, verkey): return False return True
def __eq__(self, other): """Compare equality with another.""" for attr in ["jws_", "sha256_", "base64_"]: if getattr(self, attr, None) != getattr(other, attr, None): return False if set(getattr(self, "links_", [])) != set(getattr(other, "links_", [])): return False return True
[docs]class AttachDecoratorDataSchema(BaseModelSchema): """Attach decorator data schema."""
[docs] class Meta: """Attach decorator data schema metadata.""" model_class = AttachDecoratorData unknown = EXCLUDE
[docs] @pre_load def validate_data_spec(self, data: Mapping, **kwargs): """Ensure model chooses exactly one of base64, json, or links.""" if len(set(data.keys()) & {"base64", "json", "links"}) != 1: raise BaseModelError( "AttachDecoratorSchema: choose exactly one of base64, json, or links" ) return data
base64_ = fields.Str( description="Base64-encoded data", required=False, data_key="base64", **BASE64 ) jws_ = fields.Nested( AttachDecoratorDataJWSSchema, description="Detached Java Web Signature", required=False, data_key="jws", ) json_ = fields.Dict( description="JSON-serialized data", required=False, example='{"sample": "content"}', data_key="json", ) links_ = fields.List( fields.Str(example=""), description="List of hypertext links to data", required=False, data_key="links", ) sha256_ = fields.Str( description="SHA256 hash (binhex encoded) of content", required=False, data_key="sha256", **SHA256, )
[docs]class AttachDecorator(BaseModel): """Class representing attach decorator."""
[docs] class Meta: """AttachDecorator metadata.""" schema_class = "AttachDecoratorSchema"
def __init__( self, *, ident: str = None, description: str = None, filename: str = None, mime_type: str = None, lastmod_time: str = None, byte_count: int = None, data: AttachDecoratorData, **kwargs, ): """ Initialize an AttachDecorator instance. The attachment decorator allows for embedding or appending content to a message. Args: ident ("@id" in serialization): identifier for the appendage mime_type ("mime-type" in serialization): MIME type for attachment filename: file name lastmod_time: last modification time, "%Y-%m-%d %H:%M:%SZ" description: content description data: payload, as per `AttachDecoratorData` """ super().__init__(**kwargs) self.ident = ident self.description = description self.filename = filename self.mime_type = mime_type self.lastmod_time = lastmod_time self.byte_count = byte_count = data @property def content(self): """ Return attachment content. Returns: data attachment, decoded if necessary and json-loaded, or data links """ if hasattr(, "base64_"): return json.loads(b64_to_bytes( elif hasattr(, "json_"): return elif hasattr(, "links_"): return # fetching would be async; we want a property here else: return None
[docs] @classmethod def data_base64( cls, mapping: Mapping, *, ident: str = None, description: str = None, filename: str = None, lastmod_time: str = None, byte_count: int = None, ): """ Create `AttachDecorator` instance on base64-encoded data from input mapping. Given mapping, JSON dump, base64-encode, and embed it as data; mark `application/json` MIME type. Args: mapping: (dict) data structure; e.g., indy production ident: optional attachment identifier (default random UUID4) description: optional attachment description filename: optional attachment filename lastmod_time: optional attachment last modification time byte_count: optional attachment byte count """ return AttachDecorator( ident=ident or str(uuid.uuid4()), description=description, filename=filename, mime_type="application/json", lastmod_time=lastmod_time, byte_count=byte_count, data=AttachDecoratorData( base64_=bytes_to_b64(json.dumps(mapping).encode()) ), )
[docs] @classmethod def data_json( cls, mapping: dict, *, ident: str = None, description: str = None, filename: str = None, lastmod_time: str = None, byte_count: int = None, ): """ Create `AttachDecorator` instance on json-encoded data from input mapping. Given message object (dict), JSON dump, and embed it as data; mark `application/json` MIME type. Args: mapping: (dict) data structure; e.g., Aries message ident: optional attachment identifier (default random UUID4) description: optional attachment description filename: optional attachment filename lastmod_time: optional attachment last modification time byte_count: optional attachment byte count """ return AttachDecorator( ident=ident or str(uuid.uuid4()), description=description, filename=filename, mime_type="application/json", lastmod_time=lastmod_time, byte_count=byte_count, data=AttachDecoratorData(json_=mapping), )
[docs]class AttachDecoratorSchema(BaseModelSchema): """Attach decorator schema used in serialization/deserialization."""
[docs] class Meta: """AttachDecoratorSchema metadata.""" model_class = AttachDecorator unknown = EXCLUDE
ident = fields.Str( description="Attachment identifier", example=UUIDFour.EXAMPLE, required=False, allow_none=False, data_key="@id", ) mime_type = fields.Str( description="MIME type", example="image/png", required=False, data_key="mime-type", ) filename = fields.Str( description="File name", example="IMG1092348.png", required=False ) byte_count = fields.Int( description="Byte count of data included by reference", example=1234, required=False, strict=True, ) lastmod_time = fields.Str( description="Hint regarding last modification datetime, in ISO-8601 format", required=False, **INDY_ISO8601_DATETIME, ) description = fields.Str( description="Human-readable description of content", example="view from doorway, facing east, with lights off", required=False, ) data = fields.Nested( AttachDecoratorDataSchema, required=True, )