Source code for aries_cloudagent.wallet.util

"""Wallet utility functions."""

import re

import base58
import base64

import nacl.utils
import nacl.bindings

from ..core.profile import Profile


[docs]def random_seed() -> bytes: """Generate a random seed value. Returns: A new random seed """ return nacl.utils.random(nacl.bindings.crypto_box_SEEDBYTES)
[docs]def pad(val: str) -> str: """Pad base64 values if need be: JWT calls to omit trailing padding.""" padlen = 4 - len(val) % 4 return val if padlen > 2 else (val + "=" * padlen)
[docs]def unpad(val: str) -> str: """Remove padding from base64 values if need be.""" return val.rstrip("=")
[docs]def b64_to_bytes(val: str, urlsafe=False) -> bytes: """Convert a base 64 string to bytes.""" if urlsafe: return base64.urlsafe_b64decode(pad(val)) return base64.b64decode(pad(val))
[docs]def b64_to_str(val: str, urlsafe=False, encoding=None) -> str: """Convert a base 64 string to string on input encoding (default utf-8).""" return b64_to_bytes(val, urlsafe).decode(encoding or "utf-8")
[docs]def bytes_to_b64(val: bytes, urlsafe=False, pad=True, encoding: str = "ascii") -> str: """Convert a byte string to base 64.""" b64 = ( base64.urlsafe_b64encode(val).decode(encoding) if urlsafe else base64.b64encode(val).decode(encoding) ) return b64 if pad else unpad(b64)
[docs]def str_to_b64(val: str, urlsafe=False, encoding=None, pad=True) -> str: """Convert a string to base64 string on input encoding (default utf-8).""" return bytes_to_b64(val.encode(encoding or "utf-8"), urlsafe, pad)
[docs]def set_urlsafe_b64(val: str, urlsafe: bool = True) -> str: """Set URL safety in base64 encoding.""" if urlsafe: return val.replace("+", "-").replace("/", "_") return val.replace("-", "+").replace("_", "/")
[docs]def b58_to_bytes(val: str) -> bytes: """Convert a base 58 string to bytes.""" return base58.b58decode(val)
[docs]def bytes_to_b58(val: bytes) -> str: """Convert a byte string to base 58.""" return base58.b58encode(val).decode("ascii")
[docs]def full_verkey(did: str, abbr_verkey: str) -> str: """Given a DID and abbreviated verkey, return the full verkey.""" return ( bytes_to_b58(b58_to_bytes(did.split(":")[-1]) + b58_to_bytes(abbr_verkey[1:])) if abbr_verkey.startswith("~") else abbr_verkey )
[docs]def default_did_from_verkey(verkey: str) -> str: """Given a verkey, return the default indy did. By default the did is the first 16 bytes of the verkey. """ did = bytes_to_b58(b58_to_bytes(verkey)[:16]) return did
[docs]def abbr_verkey(full_verkey: str, did: str = None) -> str: """Given a full verkey and DID, return the abbreviated verkey.""" did_len = len(b58_to_bytes(did.split(":")[-1])) if did else 16 return f"~{bytes_to_b58(b58_to_bytes(full_verkey)[did_len:])}"
DID_EVENT_PREFIX = "acapy::ENDORSE_DID::" DID_ATTRIB_EVENT_PREFIX = "acapy::ENDORSE_DID_ATTRIB::" EVENT_LISTENER_PATTERN = re.compile(f"^{DID_EVENT_PREFIX}(.*)?$") ATTRIB_EVENT_LISTENER_PATTERN = re.compile(f"^{DID_ATTRIB_EVENT_PREFIX}(.*)?$")
[docs]async def notify_endorse_did_event(profile: Profile, did: str, meta_data: dict): """Send notification for a DID post-process event.""" await profile.notify( DID_EVENT_PREFIX + did, meta_data, )
[docs]async def notify_endorse_did_attrib_event(profile: Profile, did: str, meta_data: dict): """Send notification for a DID ATTRIB post-process event.""" await profile.notify( DID_ATTRIB_EVENT_PREFIX + did, meta_data, )