Source code for aries_cloudagent.anoncreds.registry

"""AnonCreds Registry."""

import logging
from typing import List, Optional, Sequence

from ..core.profile import Profile
from .base import (
    AnonCredsRegistrationError,
    AnonCredsResolutionError,
    BaseAnonCredsHandler,
    BaseAnonCredsRegistrar,
    BaseAnonCredsResolver,
)
from .models.anoncreds_cred_def import (
    CredDef,
    CredDefResult,
    GetCredDefResult,
)
from .models.anoncreds_revocation import (
    GetRevListResult,
    GetRevRegDefResult,
    RevList,
    RevListResult,
    RevRegDef,
    RevRegDefResult,
)
from .models.anoncreds_schema import AnonCredsSchema, GetSchemaResult, SchemaResult

LOGGER = logging.getLogger(__name__)


[docs]class AnonCredsRegistry: """AnonCredsRegistry.""" def __init__(self, registries: Optional[List[BaseAnonCredsHandler]] = None): """Create DID Resolver.""" self.resolvers = [] self.registrars = [] if registries: for registry in registries: self.register(registry)
[docs] def register(self, registry: BaseAnonCredsHandler): """Register a new registry.""" if isinstance(registry, BaseAnonCredsResolver): self.resolvers.append(registry) if isinstance(registry, BaseAnonCredsRegistrar): self.registrars.append(registry)
async def _resolver_for_identifier(self, identifier: str) -> BaseAnonCredsResolver: resolvers = [ resolver for resolver in self.resolvers if await resolver.supports(identifier) ] if len(resolvers) == 0: raise AnonCredsResolutionError( f"No resolver available for identifier {identifier}" ) if len(resolvers) > 1: raise AnonCredsResolutionError( f"More than one resolver found for identifier {identifier}" ) return resolvers[0] async def _registrar_for_identifier( self, identifier: str ) -> BaseAnonCredsRegistrar: registrars = [ registrar for registrar in self.registrars if await registrar.supports(identifier) ] if len(registrars) == 0: raise AnonCredsRegistrationError( f"No registrar available for identifier {identifier}" ) if len(registrars) > 1: raise AnonCredsRegistrationError( f"More than one registrar found for identifier {identifier}" ) return registrars[0]
[docs] async def get_schema(self, profile: Profile, schema_id: str) -> GetSchemaResult: """Get a schema from the registry.""" resolver = await self._resolver_for_identifier(schema_id) return await resolver.get_schema(profile, schema_id)
[docs] async def register_schema( self, profile: Profile, schema: AnonCredsSchema, options: Optional[dict] = None, ) -> SchemaResult: """Register a schema on the registry.""" registrar = await self._registrar_for_identifier(schema.issuer_id) return await registrar.register_schema(profile, schema, options)
[docs] async def get_credential_definition( self, profile: Profile, credential_definition_id: str ) -> GetCredDefResult: """Get a credential definition from the registry.""" resolver = await self._resolver_for_identifier(credential_definition_id) return await resolver.get_credential_definition( profile, credential_definition_id, )
[docs] async def register_credential_definition( self, profile: Profile, schema: GetSchemaResult, credential_definition: CredDef, options: Optional[dict] = None, ) -> CredDefResult: """Register a credential definition on the registry.""" registrar = await self._registrar_for_identifier( credential_definition.issuer_id ) return await registrar.register_credential_definition( profile, schema, credential_definition, options, )
[docs] async def get_revocation_registry_definition( self, profile: Profile, revocation_registry_id: str ) -> GetRevRegDefResult: """Get a revocation registry definition from the registry.""" resolver = await self._resolver_for_identifier(revocation_registry_id) return await resolver.get_revocation_registry_definition( profile, revocation_registry_id )
[docs] async def register_revocation_registry_definition( self, profile: Profile, revocation_registry_definition: RevRegDef, options: Optional[dict] = None, ) -> RevRegDefResult: """Register a revocation registry definition on the registry.""" registrar = await self._registrar_for_identifier( revocation_registry_definition.issuer_id ) return await registrar.register_revocation_registry_definition( profile, revocation_registry_definition, options )
[docs] async def get_revocation_list( self, profile: Profile, rev_reg_def_id: str, timestamp: int ) -> GetRevListResult: """Get a revocation list from the registry.""" resolver = await self._resolver_for_identifier(rev_reg_def_id) return await resolver.get_revocation_list(profile, rev_reg_def_id, timestamp)
[docs] async def register_revocation_list( self, profile: Profile, rev_reg_def: RevRegDef, rev_list: RevList, options: Optional[dict] = None, ) -> RevListResult: """Register a revocation list on the registry.""" registrar = await self._registrar_for_identifier(rev_list.issuer_id) return await registrar.register_revocation_list( profile, rev_reg_def, rev_list, options )
[docs] async def update_revocation_list( self, profile: Profile, rev_reg_def: RevRegDef, prev_list: RevList, curr_list: RevList, revoked: Sequence[int], options: Optional[dict] = None, ) -> RevListResult: """Update a revocation list on the registry.""" registrar = await self._registrar_for_identifier(prev_list.issuer_id) return await registrar.update_revocation_list( profile, rev_reg_def, prev_list, curr_list, revoked, options )