"""Manager for multitenancy."""
import logging
import jwt
from typing import List, Optional, cast
from ..core.profile import (
Profile,
ProfileSession,
)
from ..messaging.responder import BaseResponder
from ..config.wallet import wallet_config
from ..config.injection_context import InjectionContext
from ..wallet.models.wallet_record import WalletRecord
from ..wallet.base import BaseWallet
from ..core.error import BaseError
from ..protocols.routing.v1_0.manager import RouteNotFoundError, RoutingManager
from ..protocols.routing.v1_0.models.route_record import RouteRecord
from ..transport.wire_format import BaseWireFormat
from ..storage.base import BaseStorage
from ..storage.error import StorageNotFoundError
from ..protocols.coordinate_mediation.v1_0.manager import (
MediationManager,
MediationRecord,
)
from .error import WalletKeyMissingError
LOGGER = logging.getLogger(__name__)
[docs]class MultitenantManagerError(BaseError):
"""Generic multitenant error."""
[docs]class MultitenantManager:
"""Class for handling multitenancy."""
def __init__(self, profile: Profile):
"""Initialize multitenant Manager.
Args:
profile: The profile for this manager
"""
self._profile = profile
if not profile:
raise MultitenantManagerError("Missing profile")
self._instances: dict[str, Profile] = {}
async def _wallet_name_exists(
self, session: ProfileSession, wallet_name: str
) -> bool:
"""
Check whether wallet with specified wallet name already exists.
Besides checking for wallet records, it will also check if the base wallet
Args:
session: The profile session to use
wallet_name: the wallet name to check for
Returns:
bool: Whether the wallet name already exists
"""
# wallet_name is same as base wallet name
if session.settings.get("wallet.name") == wallet_name:
return True
# subwallet record exists, we assume the wallet actually exists
wallet_records = await WalletRecord.query(session, {"wallet_name": wallet_name})
if len(wallet_records) > 0:
return True
return False
[docs] def get_webhook_urls(
self,
base_context: InjectionContext,
wallet_record: WalletRecord,
) -> list:
"""Get the webhook urls according to dispatch_type.
Args:
base_context: Base context to get base_webhook_urls
wallet_record: Wallet record to get dispatch_type and webhook_urls
Returns:
webhook urls according to dispatch_type
"""
wallet_id = wallet_record.wallet_id
dispatch_type = wallet_record.wallet_dispatch_type
subwallet_webhook_urls = wallet_record.wallet_webhook_urls or []
base_webhook_urls = base_context.settings.get("admin.webhook_urls", [])
if dispatch_type == "both":
webhook_urls = list(set(base_webhook_urls) | set(subwallet_webhook_urls))
if not webhook_urls:
LOGGER.warning(
"No webhook URLs in context configuration "
f"nor wallet record {wallet_id}, but wallet record "
f"configures dispatch type {dispatch_type}"
)
elif dispatch_type == "default":
webhook_urls = subwallet_webhook_urls
if not webhook_urls:
LOGGER.warning(
f"No webhook URLs in nor wallet record {wallet_id}, but "
f"wallet record configures dispatch type {dispatch_type}"
)
else:
webhook_urls = base_webhook_urls
return webhook_urls
[docs] async def get_wallet_profile(
self,
base_context: InjectionContext,
wallet_record: WalletRecord,
extra_settings: dict = {},
*,
provision=False,
) -> Profile:
"""Get profile for a wallet record.
Args:
base_context: Base context to extend from
wallet_record: Wallet record to get the context for
extra_settings: Any extra context settings
Returns:
Profile: Profile for the wallet record
"""
wallet_id = wallet_record.wallet_id
if wallet_id not in self._instances:
# Extend base context
context = base_context.copy()
# Settings we don't want to use from base wallet
reset_settings = {
"wallet.recreate": False,
"wallet.seed": None,
"wallet.rekey": None,
"wallet.name": None,
"wallet.type": None,
"mediation.open": None,
"mediation.invite": None,
"mediation.default_id": None,
"mediation.clear": None,
}
extra_settings["admin.webhook_urls"] = self.get_webhook_urls(
base_context, wallet_record
)
context.settings = (
context.settings.extend(reset_settings)
.extend(wallet_record.settings)
.extend(extra_settings)
)
# MTODO: add ledger config
profile, _ = await wallet_config(context, provision=provision)
self._instances[wallet_id] = profile
return self._instances[wallet_id]
[docs] async def create_wallet(
self,
settings: dict,
key_management_mode: str,
) -> WalletRecord:
"""Create new wallet and wallet record.
Args:
settings: The context settings for this wallet
key_management_mode: The mode to use for key management. Either "unmanaged"
to not store the wallet key, or "managed" to store the wallet key
Raises:
MultitenantManagerError: If the wallet name already exists
Returns:
WalletRecord: The newly created wallet record
"""
wallet_key = settings.get("wallet.key")
wallet_name = settings.get("wallet.name")
# base wallet context
async with self._profile.session() as session:
# Check if the wallet name already exists to avoid indy wallet errors
if wallet_name and await self._wallet_name_exists(session, wallet_name):
raise MultitenantManagerError(
f"Wallet with name {wallet_name} already exists"
)
# In unmanaged mode we don't want to store the wallet key
if key_management_mode == WalletRecord.MODE_UNMANAGED:
del settings["wallet.key"]
# create and store wallet record
wallet_record = WalletRecord(
settings=settings, key_management_mode=key_management_mode
)
await wallet_record.save(session)
# provision wallet
profile = await self.get_wallet_profile(
self._profile.context,
wallet_record,
{
"wallet.key": wallet_key,
},
provision=True,
)
# subwallet context
async with profile.session() as session:
wallet = session.inject(BaseWallet)
public_did_info = await wallet.get_public_did()
if public_did_info:
await self.add_key(
wallet_record.wallet_id, public_did_info.verkey, skip_if_exists=True
)
return wallet_record
[docs] async def update_wallet(
self,
wallet_id: str,
new_settings: dict,
) -> WalletRecord:
"""Update a existing wallet and wallet record.
Args:
wallet_id: The wallet id of the wallet record
new_settings: The context settings to be updated for this wallet
Returns:
WalletRecord: The updated wallet record
"""
# update wallet_record
async with self._profile.session() as session:
wallet_record = await WalletRecord.retrieve_by_id(session, wallet_id)
wallet_record.update_settings(new_settings)
await wallet_record.save(session)
# update profile only if loaded
if wallet_id in self._instances:
profile = self._instances[wallet_id]
profile.settings.update(wallet_record.settings)
extra_settings = {
"admin.webhook_urls": self.get_webhook_urls(
self._profile.context, wallet_record
),
}
profile.settings.update(extra_settings)
return wallet_record
[docs] async def remove_wallet(self, wallet_id: str, wallet_key: str = None):
"""Remove the wallet with specified wallet id.
Args:
wallet_id: The wallet id of the wallet record
wallet_key: The wallet key to open the wallet.
Only required for "unmanaged" wallets
Raises:
WalletKeyMissingError: If the wallet key is missing.
Only thrown for "unmanaged" wallets
"""
async with self._profile.session() as session:
wallet = cast(
WalletRecord,
await WalletRecord.retrieve_by_id(session, wallet_id),
)
wallet_key = wallet_key or wallet.wallet_key
if wallet.requires_external_key and not wallet_key:
raise WalletKeyMissingError("Missing key to open wallet")
profile = await self.get_wallet_profile(
self._profile.context,
wallet,
{"wallet.key": wallet_key},
)
del self._instances[wallet_id]
await profile.remove()
# Remove all routing records associated with wallet
storage = session.inject(BaseStorage)
await storage.delete_all_records(
RouteRecord.RECORD_TYPE, {"wallet_id": wallet.wallet_id}
)
await wallet.delete_record(session)
[docs] async def add_key(
self, wallet_id: str, recipient_key: str, *, skip_if_exists: bool = False
):
"""
Add a wallet key to map incoming messages to specific subwallets.
Args:
wallet_id: The wallet id the key corresponds to
recipient_key: The recipient key belonging to the wallet
skip_if_exists: Whether to skip the action if the key is already registered
for relaying / mediation
"""
LOGGER.info(
f"Add route record for recipient {recipient_key} to wallet {wallet_id}"
)
routing_mgr = RoutingManager(self._profile)
mediation_mgr = MediationManager(self._profile)
mediation_record = await mediation_mgr.get_default_mediator()
if skip_if_exists:
try:
async with self._profile.session() as session:
await RouteRecord.retrieve_by_recipient_key(session, recipient_key)
# If no error is thrown, it means there is already a record
return
except (StorageNotFoundError):
pass
await routing_mgr.create_route_record(
recipient_key=recipient_key, internal_wallet_id=wallet_id
)
# External mediation
if mediation_record:
keylist_updates = await mediation_mgr.add_key(recipient_key)
responder = self._profile.inject(BaseResponder)
await responder.send(
keylist_updates, connection_id=mediation_record.connection_id
)
[docs] def create_auth_token(
self, wallet_record: WalletRecord, wallet_key: str = None
) -> str:
"""Create JWT auth token for specified wallet record.
Args:
wallet_record: The wallet record to create the token for
wallet_key: The wallet key to include in the token.
Only required for "unmanaged" wallets
Raises:
WalletKeyMissingError: If the wallet key is missing.
Only thrown for "unmanaged" wallets
Returns:
str: JWT auth token
"""
jwt_payload = {"wallet_id": wallet_record.wallet_id}
jwt_secret = self._profile.settings.get("multitenant.jwt_secret")
if wallet_record.requires_external_key:
if not wallet_key:
raise WalletKeyMissingError()
jwt_payload["wallet_key"] = wallet_key
token = jwt.encode(jwt_payload, jwt_secret, algorithm="HS256").decode()
return token
[docs] async def get_profile_for_token(
self, context: InjectionContext, token: str
) -> Profile:
"""Get the profile associated with a JWT header token.
Args:
context: The context to use for profile creation
token: The token
Raises:
WalletKeyMissingError: If the wallet_key is missing for an unmanaged wallet
InvalidTokenError: If there is an exception while decoding the token
Returns:
Profile associated with the token
"""
jwt_secret = self._profile.context.settings.get("multitenant.jwt_secret")
extra_settings = {}
token_body = jwt.decode(token, jwt_secret, algorithms=["HS256"])
wallet_id = token_body.get("wallet_id")
wallet_key = token_body.get("wallet_key")
async with self._profile.session() as session:
wallet = await WalletRecord.retrieve_by_id(session, wallet_id)
if wallet.requires_external_key:
if not wallet_key:
raise WalletKeyMissingError()
extra_settings["wallet.key"] = wallet_key
profile = await self.get_wallet_profile(context, wallet, extra_settings)
return profile
async def _get_wallet_by_key(self, recipient_key: str) -> Optional[WalletRecord]:
"""Get the wallet record associated with the recipient key.
Args:
recipient_key: The recipient key
Returns:
Wallet record associated with the recipient key
"""
routing_mgr = RoutingManager(self._profile)
try:
routing_record = await routing_mgr.get_recipient(recipient_key)
async with self._profile.session() as session:
wallet = await WalletRecord.retrieve_by_id(
session, routing_record.wallet_id
)
return wallet
except (RouteNotFoundError):
pass
[docs] async def get_wallets_by_message(
self, message_body, wire_format: BaseWireFormat = None
) -> List[WalletRecord]:
"""Get the wallet records associated with the message boy.
Args:
message_body: The body of the message
wire_format: Wire format to use for recipient detection
Returns:
Wallet records associated with the message body
"""
wire_format = wire_format or self._profile.inject(BaseWireFormat)
recipient_keys = wire_format.get_recipient_keys(message_body)
wallets = []
for key in recipient_keys:
wallet = await self._get_wallet_by_key(key)
if wallet:
wallets.append(wallet)
return wallets