Source code for aries_cloudagent.resolver.default.universal

"""HTTP Universal DID Resolver."""

import logging
import re
from typing import Iterable, Optional, Pattern, Sequence, Union, Text

import aiohttp

from ...config.injection_context import InjectionContext
from ...core.profile import Profile
from ..base import BaseDIDResolver, DIDNotFound, ResolverError, ResolverType

LOGGER = logging.getLogger(__name__)

def _compile_supported_did_regex(patterns: Iterable[Union[str, Pattern]]):
    """Create regex from list of regex."""
    return re.compile(
        + "|".join(
                pattern.pattern if isinstance(pattern, Pattern) else pattern
                for pattern in patterns
        + ")"

[docs]class UniversalResolver(BaseDIDResolver): """Universal DID Resolver with HTTP bindings.""" def __init__( self, *, endpoint: Optional[str] = None, supported_did_regex: Optional[Pattern] = None, bearer_token: Optional[str] = None, ): """Initialize UniversalResolver.""" super().__init__(ResolverType.NON_NATIVE) self._endpoint = endpoint self._supported_did_regex = supported_did_regex self.__default_headers = ( {"Authorization": f"Bearer {bearer_token}"} if bearer_token else {} )
[docs] async def setup(self, context: InjectionContext): """Perform setup, populate supported method list, configuration.""" # configure endpoint endpoint = context.settings.get_str("resolver.universal") if endpoint == "DEFAULT" or not endpoint: endpoint = DEFAULT_ENDPOINT self._endpoint = endpoint # configure authorization token = context.settings.get_str("resolver.universal.token") self.__default_headers = {"Authorization": f"Bearer {token}"} if token else {} # configure supported methods supported = context.settings.get("resolver.universal.supported") if supported is None: supported_did_regex = await self._get_supported_did_regex() else: supported_did_regex = _compile_supported_did_regex(supported) self._supported_did_regex = supported_did_regex
@property def supported_did_regex(self) -> Pattern: """Return supported methods regex.""" if not self._supported_did_regex: raise ResolverError("Resolver has not been set up") return self._supported_did_regex async def _resolve( self, _profile: Profile, did: str, service_accept: Optional[Sequence[Text]] = None, ) -> dict: """Resolve DID through remote universal resolver.""" async with aiohttp.ClientSession(headers=self.__default_headers) as session: async with session.get(f"{self._endpoint}/identifiers/{did}") as resp: if resp.status == 200: doc = await resp.json() did_doc = doc["didDocument"]"Retrieved doc: %s", did_doc) return did_doc if resp.status == 404: raise DIDNotFound(f"{did} not found by {self.__class__.__name__}") text = await resp.text() raise ResolverError( f"Unexpected status from universal resolver ({resp.status}): {text}" ) async def _fetch_resolver_props(self) -> dict: """Retrieve universal resolver properties.""" async with aiohttp.ClientSession(headers=self.__default_headers) as session: async with session.get(f"{self._endpoint}/properties/") as resp: if 200 <= resp.status < 400: return await resp.json() raise ResolverError( "Failed to retrieve resolver properties: " + await resp.text() ) async def _get_supported_did_regex(self) -> Pattern: props = await self._fetch_resolver_props() return _compile_supported_did_regex( driver["http"]["pattern"] for driver in props.values() )