Source code for aries_cloudagent.vc.ld_proofs.suites.linked_data_signature

"""Linked Data Signature class."""

from abc import abstractmethod, ABCMeta
from datetime import datetime, timezone
from hashlib import sha256
from pytz import utc
from typing import Union

from ..constants import SECURITY_CONTEXT_URL
from ..document_loader import DocumentLoaderMethod
from ..error import LinkedDataProofException
from ..purposes import _ProofPurpose as ProofPurpose
from ..validation_result import ProofResult

from .linked_data_proof import LinkedDataProof


[docs]class LinkedDataSignature(LinkedDataProof, metaclass=ABCMeta): """Linked Data Signature class.""" def __init__( self, *, signature_type: str, proof: dict = None, verification_method: str = None, date: Union[datetime, None] = None, ): """Create new LinkedDataSignature instance. Must be subclassed, not initialized directly. Args: signature_type (str): Signature type to use for the proof proof (dict, optional): A JSON-LD document with options to use for the `proof` node (e.g. any other custom fields can be provided here using a context different from security-v2). verification_method (str, optional): A key id URL to the paired public key. date (datetime, optional): Signing date to use. Defaults to now """ super().__init__(signature_type=signature_type, proof=proof) self.verification_method = verification_method self.date = date
[docs] @abstractmethod async def sign(self, *, verify_data: bytes, proof: dict) -> dict: """Sign the data and add it to the proof. Args: verify_data (bytes): The data to sign. proof (dict): The proof to add the signature to Returns: dict: The proof object with the added signature """ pass
[docs] @abstractmethod async def verify_signature( self, *, verify_data: bytes, verification_method: dict, document: dict, proof: dict, document_loader: DocumentLoaderMethod, ) -> bool: """Verify the data against the proof. Args: verify_data (bytes): The data to check verification_method (dict): The verification method to use. document (dict): The document the verify data is derived for as extra context proof (dict): The proof to check document_loader (DocumentLoader): Document loader used for resolving Returns: bool: Whether the signature is valid for the data """
[docs] async def create_proof( self, *, document: dict, purpose: ProofPurpose, document_loader: DocumentLoaderMethod, ) -> dict: """Create proof for document, return proof.""" proof = self.proof.copy() if self.proof else {} # TODO: validate if verification_method is set? proof["type"] = self.signature_type proof["verificationMethod"] = self.verification_method # Set created if not already set if not proof.get("created"): # Use class date, or now date = self.date or datetime.now(timezone.utc) if not date.tzinfo: date = utc.localize(date) proof["created"] = date.isoformat() # Allow purpose to update the proof; the `proof` is in the # SECURITY_CONTEXT_URL `@context` -- therefore the `purpose` must # ensure any added fields are also represented in that same `@context` proof = purpose.update(proof) # Create data to sign verify_data = self._create_verify_data( proof=proof, document=document, document_loader=document_loader ) # Sign data proof = await self.sign(verify_data=verify_data, proof=proof) return proof
[docs] async def verify_proof( self, *, proof: dict, document: dict, purpose: ProofPurpose, document_loader: DocumentLoaderMethod, ) -> ProofResult: """Verify proof against document and proof purpose.""" try: # Create data to verify verify_data = self._create_verify_data( proof=proof, document=document, document_loader=document_loader ) # Fetch verification method verification_method = self._get_verification_method( proof=proof, document_loader=document_loader ) # Verify signature on data verified = await self.verify_signature( verify_data=verify_data, verification_method=verification_method, document=document, proof=proof, document_loader=document_loader, ) if not verified: raise LinkedDataProofException( f"Invalid signature on document {document}" ) # Ensure proof was performed for a valid purpose purpose_result = purpose.validate( proof=proof, document=document, suite=self, verification_method=verification_method, document_loader=document_loader, ) if not purpose_result.valid: return ProofResult( verified=False, purpose_result=purpose_result, error=purpose_result.error, ) return ProofResult(verified=True, purpose_result=purpose_result) except Exception as err: return ProofResult(verified=False, error=err)
def _create_verify_data( self, *, proof: dict, document: dict, document_loader: DocumentLoaderMethod ) -> bytes: """Create signing or verification data.""" c14n_proof_options = self._canonize_proof( proof=proof, document=document, document_loader=document_loader ) c14n_doc = self._canonize(input=document, document_loader=document_loader) # TODO: detect any dropped properties using expand/contract step return ( sha256(c14n_proof_options.encode("utf-8")).digest() + sha256(c14n_doc.encode("utf-8")).digest() ) def _canonize_proof( self, *, proof: dict, document: dict, document_loader: DocumentLoaderMethod ): """Canonize proof dictionary. Removes jws, signature, etc...""" # Use default security context url if document has no context proof = { **proof, "@context": document.get("@context") or SECURITY_CONTEXT_URL, } proof.pop("jws", None) proof.pop("signatureValue", None) proof.pop("proofValue", None) return self._canonize(input=proof, document_loader=document_loader)