"""HTTP Universal DID Resolver."""
import logging
import re
from typing import Iterable, Optional, Pattern, Sequence, Text, Union
import aiohttp
from ...config.injection_context import InjectionContext
from ...core.profile import Profile
from ..base import BaseDIDResolver, DIDNotFound, ResolverError, ResolverType
LOGGER = logging.getLogger(__name__)
DEFAULT_ENDPOINT = "https://dev.uniresolver.io/1.0"
def _compile_supported_did_regex(patterns: Iterable[Union[str, Pattern]]):
"""Create regex from list of regex."""
return re.compile(
"(?:"
+ "|".join(
[
pattern.pattern if isinstance(pattern, Pattern) else pattern
for pattern in patterns
]
)
+ ")"
)
[docs]
class UniversalResolver(BaseDIDResolver):
"""Universal DID Resolver with HTTP bindings."""
def __init__(
self,
*,
endpoint: Optional[str] = None,
supported_did_regex: Optional[Pattern] = None,
bearer_token: Optional[str] = None,
):
"""Initialize UniversalResolver."""
super().__init__(ResolverType.NON_NATIVE)
self._endpoint = endpoint
self._supported_did_regex = supported_did_regex
self.__default_headers = (
{"Authorization": f"Bearer {bearer_token}"} if bearer_token else {}
)
[docs]
async def setup(self, context: InjectionContext):
"""Perform setup, populate supported method list, configuration."""
# configure endpoint
endpoint = context.settings.get_str("resolver.universal")
if endpoint == "DEFAULT" or not endpoint:
endpoint = DEFAULT_ENDPOINT
self._endpoint = endpoint
# configure authorization
token = context.settings.get_str("resolver.universal.token")
self.__default_headers = {"Authorization": f"Bearer {token}"} if token else {}
# configure supported methods
supported = context.settings.get("resolver.universal.supported")
if supported is None:
supported_did_regex = await self._get_supported_did_regex()
else:
supported_did_regex = _compile_supported_did_regex(supported)
self._supported_did_regex = supported_did_regex
@property
def supported_did_regex(self) -> Pattern:
"""Return supported methods regex."""
if not self._supported_did_regex:
raise ResolverError("Resolver has not been set up")
return self._supported_did_regex
async def _resolve(
self,
_profile: Profile,
did: str,
service_accept: Optional[Sequence[Text]] = None,
) -> dict:
"""Resolve DID through remote universal resolver."""
async with aiohttp.ClientSession(headers=self.__default_headers) as session:
async with session.get(f"{self._endpoint}/identifiers/{did}") as resp:
if resp.status == 200:
doc = await resp.json()
did_doc = doc["didDocument"]
LOGGER.info("Retrieved doc: %s", did_doc)
return did_doc
if resp.status == 404:
raise DIDNotFound(f"{did} not found by {self.__class__.__name__}")
text = await resp.text()
raise ResolverError(
f"Unexpected status from universal resolver ({resp.status}): {text}"
)
async def _fetch_resolver_props(self) -> dict:
"""Retrieve universal resolver properties."""
async with aiohttp.ClientSession(headers=self.__default_headers) as session:
try:
async with session.get(f"{self._endpoint}/properties/") as resp:
if 200 <= resp.status < 400:
return await resp.json()
raise ResolverError(
"Failed to retrieve resolver properties: " + await resp.text()
)
except aiohttp.ClientError as err:
raise ResolverError(f"Failed to fetch resolver properties: {err}")
async def _get_supported_did_regex(self):
props = await self._fetch_resolver_props()
def _get_patterns():
"""Handle both old and new properties responses."""
patterns = list(props.values())[0].get("http", {}).get("pattern")
if not patterns:
return props.keys()
else:
return [driver["http"]["pattern"] for driver in props.values()]
patterns = _get_patterns()
return _compile_supported_did_regex(patterns)