"""
A message decorator for attachments.
An attach decorator embeds content or specifies appended content.
"""
import json
import uuid
from typing import Any, Mapping, Sequence, Union
from marshmallow import fields, pre_load
from ...wallet.base import BaseWallet
from ...wallet.util import (
b58_to_bytes,
b64_to_bytes,
b64_to_str,
bytes_to_b58,
bytes_to_b64,
set_urlsafe_b64,
str_to_b64,
unpad,
)
from ..models.base import BaseModel, BaseModelError, BaseModelSchema
from ..valid import (
BASE64,
BASE64URL_NO_PAD,
INDY_ISO8601_DATETIME,
JWS_HEADER_KID,
SHA256,
UUIDFour,
)
MULTIBASE_B58_BTC = "z"
MULTICODEC_ED25519_PUB = b"\xed"
[docs]class AttachDecoratorData1JWS(BaseModel):
"""Single Detached JSON Web Signature for inclusion in attach decorator data."""
def __init__(
self,
*,
header: AttachDecoratorDataJWSHeader,
protected: str = None,
signature: str,
):
"""Initialize flattened single-JWS to include in attach decorator data."""
self.header = header
self.protected = protected
self.signature = signature
def __eq__(self, other: Any):
"""Compare equality with another."""
return (
type(self) == type(other)
and self.header == other.header
and self.protected == other.protected
and self.signature == other.signature
)
[docs]class AttachDecoratorData1JWSSchema(BaseModelSchema):
"""Single attach decorator data JWS schema."""
header = fields.Nested(AttachDecoratorDataJWSHeaderSchema, required=True)
protected = fields.Str(
description="protected JWS header", required=False, **BASE64URL_NO_PAD
)
signature = fields.Str(description="signature", required=True, **BASE64URL_NO_PAD)
[docs]class AttachDecoratorDataJWS(BaseModel):
"""
Detached JSON Web Signature for inclusion in attach decorator data.
May hold one signature in flattened format, or multiple signatures in the
"signatures" member.
"""
def __init__(
self,
*,
header: AttachDecoratorDataJWSHeader = None,
protected: str = None,
signature: str = None,
signatures: Sequence[AttachDecoratorData1JWS] = None,
):
"""Initialize JWS to include in attach decorator multi-sig data."""
self.header = header
self.protected = protected
self.signature = signature
self.signatures = signatures
[docs]class AttachDecoratorDataJWSSchema(BaseModelSchema):
"""Schema for detached JSON Web Signature for inclusion in attach decorator data."""
[docs] @pre_load
def validate_single_xor_multi_sig(self, data: Mapping, **kwargs):
"""Ensure model is for either 1 or many sigatures, not mishmash of both."""
if "signatures" in data:
if any(k in data for k in ("header", "protected", "signature")):
raise BaseModelError(
"AttachDecoratorDataJWSSchema: "
"JWS must be flattened or general JSON serialization format"
)
elif not all(k in data for k in ("header", "signature")):
raise BaseModelError(
"AttachDecoratorDataJWSSchema: "
"Flattened JSON serialization format must include header and signature"
)
return data
header = fields.Nested(
AttachDecoratorDataJWSHeaderSchema,
required=False, # packed in signatures if multi-sig
)
protected = fields.Str(
description="protected JWS header",
required=False, # packed in signatures if multi-sig
**BASE64URL_NO_PAD,
)
signature = fields.Str(
description="signature",
required=False, # packed in signatures if multi-sig
**BASE64URL_NO_PAD,
)
signatures = fields.List(
fields.Nested(AttachDecoratorData1JWSSchema),
required=False, # only present if multi-sig
description="List of signatures",
)
[docs]def did_key(verkey: str) -> str:
"""Qualify verkey into DID key if need be."""
if verkey.startswith(f"did:key:{MULTIBASE_B58_BTC}"):
return verkey
return f"did:key:{MULTIBASE_B58_BTC}" + bytes_to_b58(
MULTICODEC_ED25519_PUB + b58_to_bytes(verkey)
)
[docs]def raw_key(verkey: str) -> str:
"""Strip qualified key to raw key if need be."""
if verkey.startswith(f"did:key:{MULTIBASE_B58_BTC}"):
return bytes_to_b58(b58_to_bytes(verkey[9:])[1:])
return verkey
[docs]class AttachDecoratorData(BaseModel):
"""Attach decorator data."""
def __init__(
self,
*,
jws_: AttachDecoratorDataJWS = None,
sha256_: str = None,
links_: Union[list, str] = None,
base64_: str = None,
json_: str = None,
):
"""
Initialize decorator data.
Specify content for one of:
- `base64_`
- `json_`
- `links_`.
Args:
jws_: detached JSON Web Signature over base64 or linked attachment content
sha256_: optional sha-256 hash for content
links_: list or single URL of hyperlinks
base64_: base64 encoded content for inclusion
json_: json-dumped content for inclusion
"""
if jws_:
self.jws_ = jws_
assert not json_
if base64_:
self.base64_ = base64_
elif json_:
self.json_ = json_
else:
assert isinstance(links_, (str, list))
self.links_ = [links_] if isinstance(links_, str) else list(links_)
if sha256_:
self.sha256_ = sha256_
@property
def base64(self):
"""Accessor for base64 decorator data, or None."""
return getattr(self, "base64_", None)
@property
def jws(self):
"""Accessor for JWS, or None."""
return getattr(self, "jws_", None)
@property
def signatures(self) -> int:
"""Accessor for number of signatures."""
if self.jws:
return 1 if self.jws.signature else len(self.jws.signatures)
return 0
@property
def signed(self) -> bytes:
"""Accessor for signed content (payload), None for unsigned."""
return (
b64_to_bytes(unpad(set_urlsafe_b64(self.base64, urlsafe=True)))
if self.signatures
else None
)
@property
def json(self):
"""Accessor for json decorator data, or None."""
return getattr(self, "json_", None)
@property
def links(self):
"""Accessor for links decorator data, or None."""
return getattr(self, "links_", None)
@property
def sha256(self):
"""Accessor for sha256 decorator data, or None."""
return getattr(self, "sha256_", None)
[docs] async def sign(
self, verkeys: Union[str, Sequence[str]], wallet: BaseWallet,
):
"""
Sign base64 data value of attachment.
Args:
verkeys: verkey(s) of the signing party (in raw or DID key format)
wallet: The wallet to use for the signature
"""
def build_protected(verkey: str):
"""Build protected header."""
return str_to_b64(
json.dumps(
{
"alg": "EdDSA",
"kid": did_key(verkey),
"jwk": {
"kty": "OKP",
"crv": "Ed25519",
"x": bytes_to_b64(
b58_to_bytes(raw_key(verkey)), urlsafe=True, pad=False
),
"kid": did_key(verkey),
},
}
),
urlsafe=True,
pad=False,
)
assert self.base64
b64_payload = unpad(set_urlsafe_b64(self.base64, True))
if isinstance(verkeys, str) or (
isinstance(verkeys, Sequence) and len(verkeys) == 1
):
kid = did_key(verkeys if isinstance(verkeys, str) else verkeys[0])
verkey = raw_key(verkeys if isinstance(verkeys, str) else verkeys[0])
b64_protected = build_protected(verkey)
b64_sig = bytes_to_b64(
await wallet.sign_message(
message=(b64_protected + "." + b64_payload).encode("ascii"),
from_verkey=verkey,
),
urlsafe=True,
pad=False,
)
self.jws_ = AttachDecoratorDataJWS.deserialize(
{
"header": AttachDecoratorDataJWSHeader(kid).serialize(),
"protected": b64_protected, # always present by construction
"signature": b64_sig,
}
)
else:
jws = {"signatures": []}
for verkey in verkeys:
b64_protected = build_protected(verkey)
b64_sig = bytes_to_b64(
await wallet.sign_message(
message=(b64_protected + "." + b64_payload).encode("ascii"),
from_verkey=raw_key(verkey),
),
urlsafe=True,
pad=False,
)
jws["signatures"].append(
{
"protected": b64_protected, # always present by construction
"header": {"kid": did_key(verkey)},
"signature": b64_sig,
}
)
self.jws_ = AttachDecoratorDataJWS.deserialize(jws)
[docs] async def verify(self, wallet: BaseWallet) -> bool:
"""
Verify the signature(s).
Args:
wallet: Wallet to use to verify signature
Returns:
True if verification succeeds else False
"""
assert self.jws
b64_payload = unpad(set_urlsafe_b64(self.base64, True))
for sig in [self.jws] if self.signatures == 1 else self.jws.signatures:
b64_protected = sig.protected
b64_sig = sig.signature
protected = json.loads(b64_to_str(b64_protected, urlsafe=True))
assert "jwk" in protected and protected["jwk"].get("kty") == "OKP"
sign_input = (b64_protected + "." + b64_payload).encode("ascii")
b_sig = b64_to_bytes(b64_sig, urlsafe=True)
verkey = bytes_to_b58(b64_to_bytes(protected["jwk"]["x"], urlsafe=True))
if not await wallet.verify_message(sign_input, b_sig, verkey):
return False
return True
def __eq__(self, other):
"""Compare equality with another."""
for attr in ["jws_", "sha256_", "base64_"]:
if getattr(self, attr, None) != getattr(other, attr, None):
return False
if set(getattr(self, "links_", [])) != set(getattr(other, "links_", [])):
return False
return True
[docs]class AttachDecoratorDataSchema(BaseModelSchema):
"""Attach decorator data schema."""
[docs] @pre_load
def validate_data_spec(self, data: Mapping, **kwargs):
"""Ensure model chooses exactly one of base64, json, or links."""
if len(set(data.keys()) & {"base64", "json", "links"}) != 1:
raise BaseModelError(
"AttachDecoratorSchema: choose exactly one of base64, json, or links"
)
return data
base64_ = fields.Str(
description="Base64-encoded data", required=False, data_key="base64", **BASE64
)
jws_ = fields.Nested(
AttachDecoratorDataJWSSchema,
description="Detached Java Web Signature",
required=False,
data_key="jws",
)
json_ = fields.Str(
description="JSON-serialized data",
required=False,
example='{"sample": "content"}',
data_key="json",
)
links_ = fields.List(
fields.Str(example="https://link.to/data"),
description="List of hypertext links to data",
required=False,
data_key="links",
)
sha256_ = fields.Str(
description="SHA256 hash (binhex encoded) of content",
required=False,
data_key="sha256",
**SHA256,
)
[docs]class AttachDecorator(BaseModel):
"""Class representing attach decorator."""
def __init__(
self,
*,
ident: str = None,
description: str = None,
filename: str = None,
mime_type: str = None,
lastmod_time: str = None,
byte_count: int = None,
data: AttachDecoratorData,
**kwargs,
):
"""
Initialize an AttachDecorator instance.
The attachment decorator allows for embedding or appending
content to a message.
Args:
ident ("@id" in serialization): identifier for the appendage
mime_type ("mime-type" in serialization): MIME type for attachment
filename: file name
lastmod_time: last modification time, "%Y-%m-%d %H:%M:%SZ"
description: content description
data: payload, as per `AttachDecoratorData`
"""
super().__init__(**kwargs)
self.ident = ident
self.description = description
self.filename = filename
self.mime_type = mime_type
self.lastmod_time = lastmod_time
self.byte_count = byte_count
self.data = data
@property
def indy_dict(self):
"""
Return indy data structure encoded in attachment.
Returns: dict with indy object in data attachment
"""
assert hasattr(self.data, "base64_")
return json.loads(b64_to_bytes(self.data.base64))
[docs] @classmethod
def from_indy_dict(
cls,
indy_dict: dict,
*,
ident: str = None,
description: str = None,
filename: str = None,
lastmod_time: str = None,
byte_count: int = None,
):
"""
Create `AttachDecorator` instance from indy object (dict).
Given indy object (dict), JSON dump, base64-encode, and embed
it as data; mark `application/json` MIME type.
Args:
indy_dict: indy (dict) data structure
ident: optional attachment identifier (default random UUID4)
description: optional attachment description
filename: optional attachment filename
lastmod_time: optional attachment last modification time
byte_count: optional attachment byte count
"""
return AttachDecorator(
ident=ident or str(uuid.uuid4()),
description=description,
filename=filename,
mime_type="application/json",
lastmod_time=lastmod_time,
byte_count=byte_count,
data=AttachDecoratorData(
base64_=bytes_to_b64(json.dumps(indy_dict).encode())
),
)
[docs]class AttachDecoratorSchema(BaseModelSchema):
"""Attach decorator schema used in serialization/deserialization."""
ident = fields.Str(
description="Attachment identifier",
example=UUIDFour.EXAMPLE,
required=False,
allow_none=False,
data_key="@id",
)
mime_type = fields.Str(
description="MIME type",
example="image/png",
required=False,
data_key="mime-type",
)
filename = fields.Str(
description="File name", example="IMG1092348.png", required=False
)
byte_count = fields.Integer(
description="Byte count of data included by reference",
example=1234,
required=False,
)
lastmod_time = fields.Str(
description="Hint regarding last modification datetime, in ISO-8601 format",
required=False,
**INDY_ISO8601_DATETIME,
)
description = fields.Str(
description="Human-readable description of content",
example="view from doorway, facing east, with lights off",
required=False,
)
data = fields.Nested(AttachDecoratorDataSchema, required=True,)