"""Classes to manage credentials."""
import asyncio
import json
import logging
from typing import Mapping, Tuple
from ....cache.base import BaseCache
from ....core.error import BaseError
from ....core.profile import Profile
from ....indy.holder import IndyHolder, IndyHolderError
from ....indy.issuer import IndyIssuer, IndyIssuerRevocationRegistryFullError
from ....ledger.base import BaseLedger
from ....messaging.credential_definitions.util import (
CRED_DEF_TAGS,
CRED_DEF_SENT_RECORD_TYPE,
)
from ....messaging.responder import BaseResponder
from ....revocation.indy import IndyRevocation
from ....revocation.models.revocation_registry import RevocationRegistry
from ....revocation.models.issuer_rev_reg_record import IssuerRevRegRecord
from ....storage.base import BaseStorage
from ....storage.error import StorageError, StorageNotFoundError
from .messages.credential_ack import CredentialAck
from .messages.credential_issue import CredentialIssue
from .messages.credential_offer import CredentialOffer
from .messages.credential_problem_report import (
CredentialProblemReport,
ProblemReportReason,
)
from .messages.credential_proposal import CredentialProposal
from .messages.credential_request import CredentialRequest
from .messages.inner.credential_preview import CredentialPreview
from .models.credential_exchange import V10CredentialExchange
LOGGER = logging.getLogger(__name__)
[docs]class CredentialManagerError(BaseError):
"""Credential error."""
[docs]class CredentialManager:
"""Class for managing credentials."""
def __init__(self, profile: Profile):
"""
Initialize a CredentialManager.
Args:
profile: The profile instance for this credential manager
"""
self._profile = profile
@property
def profile(self) -> Profile:
"""
Accessor for the current profile instance.
Returns:
The profile instance for this credential manager
"""
return self._profile
async def _match_sent_cred_def_id(self, tag_query: Mapping[str, str]) -> str:
"""Return most recent matching id of cred def that agent sent to ledger."""
async with self._profile.session() as session:
storage = session.inject(BaseStorage)
found = await storage.find_all_records(
type_filter=CRED_DEF_SENT_RECORD_TYPE, tag_query=tag_query
)
if not found:
raise CredentialManagerError(
f"Issuer has no operable cred def for proposal spec {tag_query}"
)
return max(found, key=lambda r: int(r.tags["epoch"])).tags["cred_def_id"]
[docs] async def prepare_send(
self,
connection_id: str,
credential_proposal: CredentialProposal,
auto_remove: bool = None,
comment: str = None,
) -> Tuple[V10CredentialExchange, CredentialOffer]:
"""
Set up a new credential exchange for an automated send.
Args:
connection_id: Connection to create offer for
credential_proposal: The credential proposal with preview
auto_remove: Flag to automatically remove the record on completion
Returns:
A tuple of the new credential exchange record and credential offer message
"""
if auto_remove is None:
auto_remove = not self._profile.settings.get("preserve_exchange_records")
credential_exchange = V10CredentialExchange(
connection_id=connection_id,
initiator=V10CredentialExchange.INITIATOR_SELF,
role=V10CredentialExchange.ROLE_ISSUER,
credential_proposal_dict=credential_proposal,
auto_issue=True,
auto_remove=auto_remove,
trace=(credential_proposal._trace is not None),
)
(credential_exchange, credential_offer) = await self.create_offer(
cred_ex_record=credential_exchange,
counter_proposal=None,
comment=comment,
)
return (credential_exchange, credential_offer)
[docs] async def create_proposal(
self,
connection_id: str,
*,
auto_offer: bool = None,
auto_remove: bool = None,
comment: str = None,
credential_preview: CredentialPreview = None,
schema_id: str = None,
schema_issuer_did: str = None,
schema_name: str = None,
schema_version: str = None,
cred_def_id: str = None,
issuer_did: str = None,
trace: bool = False,
) -> V10CredentialExchange:
"""
Create a credential proposal.
Args:
connection_id: Connection to create proposal for
auto_offer: Should this proposal request automatically be handled to
offer a credential
auto_remove: Should the record be automatically removed on completion
comment: Optional human-readable comment to include in proposal
credential_preview: The credential preview to use to create
the credential proposal
schema_id: Schema id for credential proposal
schema_issuer_did: Schema issuer DID for credential proposal
schema_name: Schema name for credential proposal
schema_version: Schema version for credential proposal
cred_def_id: Credential definition id for credential proposal
issuer_did: Issuer DID for credential proposal
Returns:
Resulting credential exchange record including credential proposal
"""
credential_proposal_message = CredentialProposal(
comment=comment,
credential_proposal=credential_preview,
schema_id=schema_id,
schema_issuer_did=schema_issuer_did,
schema_name=schema_name,
schema_version=schema_version,
cred_def_id=cred_def_id,
issuer_did=issuer_did,
)
credential_proposal_message.assign_trace_decorator(
self._profile.settings, trace
)
if auto_remove is None:
auto_remove = not self._profile.settings.get("preserve_exchange_records")
cred_ex_record = V10CredentialExchange(
connection_id=connection_id,
thread_id=credential_proposal_message._thread_id,
initiator=V10CredentialExchange.INITIATOR_SELF,
role=V10CredentialExchange.ROLE_HOLDER,
state=V10CredentialExchange.STATE_PROPOSAL_SENT,
credential_proposal_dict=credential_proposal_message,
auto_offer=auto_offer,
auto_remove=auto_remove,
trace=trace,
)
async with self._profile.session() as session:
await cred_ex_record.save(session, reason="create credential proposal")
return cred_ex_record
[docs] async def receive_proposal(
self, message: CredentialProposal, connection_id: str
) -> V10CredentialExchange:
"""
Receive a credential proposal.
Returns:
The resulting credential exchange record, created
"""
# at this point, cred def and schema still open to potential negotiation
cred_ex_record = V10CredentialExchange(
connection_id=connection_id,
thread_id=message._thread_id,
initiator=V10CredentialExchange.INITIATOR_EXTERNAL,
role=V10CredentialExchange.ROLE_ISSUER,
state=V10CredentialExchange.STATE_PROPOSAL_RECEIVED,
credential_proposal_dict=message,
auto_offer=self._profile.settings.get(
"debug.auto_respond_credential_proposal"
),
auto_issue=self._profile.settings.get(
"debug.auto_respond_credential_request"
),
auto_remove=not self._profile.settings.get("preserve_exchange_records"),
trace=(message._trace is not None),
)
async with self._profile.session() as session:
await cred_ex_record.save(session, reason="receive credential proposal")
return cred_ex_record
[docs] async def create_offer(
self,
cred_ex_record: V10CredentialExchange,
counter_proposal: CredentialProposal = None,
comment: str = None,
) -> Tuple[V10CredentialExchange, CredentialOffer]:
"""
Create a credential offer, update credential exchange record.
Args:
cred_ex_record: Credential exchange to create offer for
comment: optional human-readable comment to set in offer message
Returns:
A tuple (credential exchange record, credential offer message)
"""
async def _create(cred_def_id):
issuer = self._profile.inject(IndyIssuer)
offer_json = await issuer.create_credential_offer(cred_def_id)
return json.loads(offer_json)
credential_proposal_message = (
counter_proposal
if counter_proposal
else cred_ex_record.credential_proposal_dict
)
credential_proposal_message.assign_trace_decorator(
self._profile.settings, cred_ex_record.trace
)
cred_def_id = await self._match_sent_cred_def_id(
{
t: getattr(credential_proposal_message, t)
for t in CRED_DEF_TAGS
if getattr(credential_proposal_message, t)
}
)
credential_preview = credential_proposal_message.credential_proposal
# vet attributes
ledger = self._profile.inject(BaseLedger)
async with ledger:
schema_id = await ledger.credential_definition_id2schema_id(cred_def_id)
schema = await ledger.get_schema(schema_id)
schema_attrs = {attr for attr in schema["attrNames"]}
preview_attrs = {attr for attr in credential_preview.attr_dict()}
if preview_attrs != schema_attrs:
raise CredentialManagerError(
f"Preview attributes {preview_attrs} "
f"mismatch corresponding schema attributes {schema_attrs}"
)
credential_offer = None
cache_key = f"credential_offer::{cred_def_id}"
cache = self._profile.inject(BaseCache, required=False)
if cache:
async with cache.acquire(cache_key) as entry:
if entry.result:
credential_offer = entry.result
else:
credential_offer = await _create(cred_def_id)
await entry.set_result(credential_offer, 3600)
if not credential_offer:
credential_offer = await _create(cred_def_id)
credential_offer_message = CredentialOffer(
comment=comment,
credential_preview=credential_preview,
offers_attach=[CredentialOffer.wrap_indy_offer(credential_offer)],
)
credential_offer_message._thread = {"thid": cred_ex_record.thread_id}
credential_offer_message.assign_trace_decorator(
self._profile.settings, cred_ex_record.trace
)
cred_ex_record.thread_id = credential_offer_message._thread_id
cred_ex_record.schema_id = credential_offer["schema_id"]
cred_ex_record.credential_definition_id = credential_offer["cred_def_id"]
cred_ex_record.state = V10CredentialExchange.STATE_OFFER_SENT
cred_ex_record.credential_proposal_dict = ( # any counter replaces original
credential_proposal_message
)
cred_ex_record.credential_offer = credential_offer
cred_ex_record.credential_offer_dict = credential_offer_message
async with self._profile.session() as session:
await cred_ex_record.save(session, reason="create credential offer")
return (cred_ex_record, credential_offer_message)
[docs] async def receive_offer(
self, message: CredentialOffer, connection_id: str
) -> V10CredentialExchange:
"""
Receive a credential offer.
Returns:
The credential exchange record, updated
"""
credential_preview = message.credential_preview
indy_offer = message.indy_offer(0)
schema_id = indy_offer["schema_id"]
cred_def_id = indy_offer["cred_def_id"]
credential_proposal_dict = CredentialProposal(
comment=message.comment,
credential_proposal=credential_preview,
schema_id=schema_id,
cred_def_id=cred_def_id,
)
async with self._profile.session() as session:
# Get credential exchange record (holder sent proposal first)
# or create it (issuer sent offer first)
try:
cred_ex_record = await (
V10CredentialExchange.retrieve_by_connection_and_thread(
session, connection_id, message._thread_id
)
)
cred_ex_record.credential_proposal_dict = credential_proposal_dict
except StorageNotFoundError: # issuer sent this offer free of any proposal
cred_ex_record = V10CredentialExchange(
connection_id=connection_id,
thread_id=message._thread_id,
initiator=V10CredentialExchange.INITIATOR_EXTERNAL,
role=V10CredentialExchange.ROLE_HOLDER,
credential_proposal_dict=credential_proposal_dict,
auto_remove=not self._profile.settings.get(
"preserve_exchange_records"
),
trace=(message._trace is not None),
)
cred_ex_record.credential_offer = indy_offer
cred_ex_record.state = V10CredentialExchange.STATE_OFFER_RECEIVED
cred_ex_record.schema_id = schema_id
cred_ex_record.credential_definition_id = cred_def_id
await cred_ex_record.save(session, reason="receive credential offer")
return cred_ex_record
[docs] async def create_request(
self, cred_ex_record: V10CredentialExchange, holder_did: str
) -> Tuple[V10CredentialExchange, CredentialRequest]:
"""
Create a credential request.
Args:
cred_ex_record: Credential exchange record
for which to create request
holder_did: holder DID
Returns:
A tuple (credential exchange record, credential request message)
"""
if cred_ex_record.state != V10CredentialExchange.STATE_OFFER_RECEIVED:
raise CredentialManagerError(
f"Credential exchange {cred_ex_record.credential_exchange_id} "
f"in {cred_ex_record.state} state "
f"(must be {V10CredentialExchange.STATE_OFFER_RECEIVED})"
)
credential_definition_id = cred_ex_record.credential_definition_id
cred_offer_ser = cred_ex_record._credential_offer.ser
async def _create():
ledger = self._profile.inject(BaseLedger)
async with ledger:
credential_definition = await ledger.get_credential_definition(
credential_definition_id
)
holder = self._profile.inject(IndyHolder)
request_json, metadata_json = await holder.create_credential_request(
cred_offer_ser,
credential_definition,
holder_did,
)
return {
"request": json.loads(request_json),
"metadata": json.loads(metadata_json),
}
if cred_ex_record.credential_request:
LOGGER.warning(
"create_request called multiple times for v1.0 credential exchange: %s",
cred_ex_record.credential_exchange_id,
)
else:
nonce = cred_offer_ser["nonce"]
cache_key = (
f"credential_request::{credential_definition_id}::{holder_did}::{nonce}"
)
cred_req_result = None
cache = self._profile.inject(BaseCache, required=False)
if cache:
async with cache.acquire(cache_key) as entry:
if entry.result:
cred_req_result = entry.result
else:
cred_req_result = await _create()
await entry.set_result(cred_req_result, 3600)
if not cred_req_result:
cred_req_result = await _create()
(
cred_ex_record.credential_request,
cred_ex_record.credential_request_metadata,
) = (cred_req_result["request"], cred_req_result["metadata"])
credential_request_message = CredentialRequest(
requests_attach=[
CredentialRequest.wrap_indy_cred_req(
cred_ex_record._credential_request.ser
)
]
)
credential_request_message._thread = {"thid": cred_ex_record.thread_id}
credential_request_message.assign_trace_decorator(
self._profile.settings, cred_ex_record.trace
)
cred_ex_record.state = V10CredentialExchange.STATE_REQUEST_SENT
async with self._profile.session() as session:
await cred_ex_record.save(session, reason="create credential request")
return (cred_ex_record, credential_request_message)
[docs] async def receive_request(self, message: CredentialRequest, connection_id: str):
"""
Receive a credential request.
Args:
credential_request_message: Credential request to receive
Returns:
credential exchange record, retrieved and updated
"""
assert len(message.requests_attach or []) == 1
credential_request = message.indy_cred_req(0)
async with self._profile.session() as session:
cred_ex_record = await (
V10CredentialExchange.retrieve_by_connection_and_thread(
session, connection_id, message._thread_id
)
)
cred_ex_record.credential_request = credential_request
cred_ex_record.state = V10CredentialExchange.STATE_REQUEST_RECEIVED
await cred_ex_record.save(session, reason="receive credential request")
return cred_ex_record
[docs] async def issue_credential(
self,
cred_ex_record: V10CredentialExchange,
*,
comment: str = None,
retries: int = 5,
) -> Tuple[V10CredentialExchange, CredentialIssue]:
"""
Issue a credential.
Args:
cred_ex_record: The credential exchange record
for which to issue a credential
comment: optional human-readable comment pertaining to credential issue
Returns:
Tuple: (Updated credential exchange record, credential message)
"""
if cred_ex_record.state != V10CredentialExchange.STATE_REQUEST_RECEIVED:
raise CredentialManagerError(
f"Credential exchange {cred_ex_record.credential_exchange_id} "
f"in {cred_ex_record.state} state "
f"(must be {V10CredentialExchange.STATE_REQUEST_RECEIVED})"
)
schema_id = cred_ex_record.schema_id
rev_reg = None
if cred_ex_record.credential:
LOGGER.warning(
"issue_credential called multiple times for "
+ "credential exchange record %s - abstaining",
cred_ex_record.credential_exchange_id,
)
else:
cred_offer_ser = cred_ex_record._credential_offer.ser
cred_req_ser = cred_ex_record._credential_request.ser
ledger = self._profile.inject(BaseLedger)
async with ledger:
schema = await ledger.get_schema(schema_id)
credential_definition = await ledger.get_credential_definition(
cred_ex_record.credential_definition_id
)
tails_path = None
if credential_definition["value"].get("revocation"):
revoc = IndyRevocation(self._profile)
try:
active_rev_reg_rec = await revoc.get_active_issuer_rev_reg_record(
cred_ex_record.credential_definition_id
)
rev_reg = await active_rev_reg_rec.get_registry()
cred_ex_record.revoc_reg_id = active_rev_reg_rec.revoc_reg_id
tails_path = rev_reg.tails_local_path
await rev_reg.get_or_fetch_local_tails_path()
except StorageNotFoundError:
async with self._profile.session() as session:
posted_rev_reg_recs = (
await IssuerRevRegRecord.query_by_cred_def_id(
session,
cred_ex_record.credential_definition_id,
state=IssuerRevRegRecord.STATE_POSTED,
)
)
if not posted_rev_reg_recs:
# Send next 2 rev regs, publish tails files in background
async with self._profile.session() as session:
old_rev_reg_recs = sorted(
await IssuerRevRegRecord.query_by_cred_def_id(
session,
cred_ex_record.credential_definition_id,
)
) # prefer to reuse prior rev reg size
for _ in range(2):
pending_rev_reg_rec = await revoc.init_issuer_registry(
cred_ex_record.credential_definition_id,
max_cred_num=(
old_rev_reg_recs[0].max_cred_num
if old_rev_reg_recs
else None
),
)
asyncio.ensure_future(
pending_rev_reg_rec.stage_pending_registry(
self._profile,
max_attempts=3, # fail both in < 2s at worst
)
)
if retries > 0:
LOGGER.info(
"Waiting 2s on posted rev reg for cred def %s, retrying",
cred_ex_record.credential_definition_id,
)
await asyncio.sleep(2)
return await self.issue_credential(
cred_ex_record=cred_ex_record,
comment=comment,
retries=retries - 1,
)
raise CredentialManagerError(
f"Cred def id {cred_ex_record.credential_definition_id} "
"has no active revocation registry"
)
del revoc
credential_values = (
cred_ex_record.credential_proposal_dict.credential_proposal.attr_dict(
decode=False
)
)
issuer = self._profile.inject(IndyIssuer)
try:
(
credential_json,
cred_ex_record.revocation_id,
) = await issuer.create_credential(
schema,
cred_offer_ser,
cred_req_ser,
credential_values,
cred_ex_record.credential_exchange_id,
cred_ex_record.revoc_reg_id,
tails_path,
)
# If the rev reg is now full
if rev_reg and rev_reg.max_creds == int(cred_ex_record.revocation_id):
async with self._profile.session() as session:
await active_rev_reg_rec.set_state(
session,
IssuerRevRegRecord.STATE_FULL,
)
# Send next 1 rev reg, publish tails file in background
revoc = IndyRevocation(self._profile)
pending_rev_reg_rec = await revoc.init_issuer_registry(
active_rev_reg_rec.cred_def_id,
max_cred_num=active_rev_reg_rec.max_cred_num,
)
asyncio.ensure_future(
pending_rev_reg_rec.stage_pending_registry(
self._profile,
max_attempts=16,
)
)
except IndyIssuerRevocationRegistryFullError:
# unlucky: duelling instance issued last cred near same time as us
async with self._profile.session() as session:
await active_rev_reg_rec.set_state(
session,
IssuerRevRegRecord.STATE_FULL,
)
if retries > 0:
# use next rev reg; at worst, lucky instance is putting one up
LOGGER.info(
"Waiting 1s and retrying: revocation registry %s is full",
active_rev_reg_rec.revoc_reg_id,
)
await asyncio.sleep(1)
return await self.issue_credential(
cred_ex_record=cred_ex_record,
comment=comment,
retries=retries - 1,
)
raise
cred_ex_record.credential = json.loads(credential_json)
cred_ex_record.state = V10CredentialExchange.STATE_ISSUED
async with self._profile.session() as session:
# FIXME - re-fetch record to check state, apply transactional update
await cred_ex_record.save(session, reason="issue credential")
credential_message = CredentialIssue(
comment=comment,
credentials_attach=[
CredentialIssue.wrap_indy_credential(cred_ex_record._credential.ser)
],
)
credential_message._thread = {"thid": cred_ex_record.thread_id}
credential_message.assign_trace_decorator(
self._profile.settings, cred_ex_record.trace
)
return (cred_ex_record, credential_message)
[docs] async def receive_credential(
self, message: CredentialIssue, connection_id: str
) -> V10CredentialExchange:
"""
Receive a credential from an issuer.
Hold in storage potentially to be processed by controller before storing.
Returns:
Credential exchange record, retrieved and updated
"""
assert len(message.credentials_attach or []) == 1
raw_credential = message.indy_credential(0)
# FIXME use transaction, fetch for_update
async with self._profile.session() as session:
cred_ex_record = await (
V10CredentialExchange.retrieve_by_connection_and_thread(
session,
connection_id,
message._thread_id,
)
)
cred_ex_record.raw_credential = raw_credential
cred_ex_record.state = V10CredentialExchange.STATE_CREDENTIAL_RECEIVED
await cred_ex_record.save(session, reason="receive credential")
return cred_ex_record
[docs] async def store_credential(
self, cred_ex_record: V10CredentialExchange, credential_id: str = None
) -> Tuple[V10CredentialExchange, CredentialAck]:
"""
Store a credential in holder wallet; send ack to issuer.
Args:
cred_ex_record: credential exchange record
with credential to store and ack
credential_id: optional credential identifier to override default on storage
Returns:
Updated credential exchange record
"""
if cred_ex_record.state != (V10CredentialExchange.STATE_CREDENTIAL_RECEIVED):
raise CredentialManagerError(
f"Credential exchange {cred_ex_record.credential_exchange_id} "
f"in {cred_ex_record.state} state "
f"(must be {V10CredentialExchange.STATE_CREDENTIAL_RECEIVED})"
)
raw_cred_serde = cred_ex_record._raw_credential
revoc_reg_def = None
ledger = self._profile.inject(BaseLedger)
async with ledger:
credential_definition = await ledger.get_credential_definition(
raw_cred_serde.de.cred_def_id
)
if raw_cred_serde.de.rev_reg_id:
revoc_reg_def = await ledger.get_revoc_reg_def(
raw_cred_serde.de.rev_reg_id
)
holder = self._profile.inject(IndyHolder)
if (
cred_ex_record.credential_proposal_dict
and cred_ex_record.credential_proposal_dict.credential_proposal
):
mime_types = (
cred_ex_record.credential_proposal_dict.credential_proposal.mime_types()
)
else:
mime_types = None
if revoc_reg_def:
revoc_reg = RevocationRegistry.from_definition(revoc_reg_def, True)
await revoc_reg.get_or_fetch_local_tails_path()
try:
credential_id = await holder.store_credential(
credential_definition,
raw_cred_serde.ser,
cred_ex_record.credential_request_metadata,
mime_types,
credential_id=credential_id,
rev_reg_def=revoc_reg_def,
)
except IndyHolderError as e:
LOGGER.error(f"Error storing credential. {e.error_code}: {e.message}")
raise e
credential_json = await holder.get_credential(credential_id)
credential = json.loads(credential_json)
cred_ex_record.credential_id = credential_id
cred_ex_record.credential = credential
cred_ex_record.revoc_reg_id = credential.get("rev_reg_id", None)
cred_ex_record.revocation_id = credential.get("cred_rev_id", None)
async with self._profile.session() as session:
# FIXME - re-fetch record to check state, apply transactional update
await cred_ex_record.save(session, reason="store credential")
return cred_ex_record
[docs] async def send_credential_ack(
self,
cred_ex_record: V10CredentialExchange,
):
"""
Create, send, and return ack message for input credential exchange record.
Delete credential exchange record if set to auto-remove.
Returns:
Tuple: cred ex record, credential ack message for tracing.
"""
credential_ack_message = CredentialAck()
credential_ack_message.assign_thread_id(
cred_ex_record.thread_id, cred_ex_record.parent_thread_id
)
credential_ack_message.assign_trace_decorator(
self._profile.settings, cred_ex_record.trace
)
cred_ex_record.state = V10CredentialExchange.STATE_ACKED
try:
async with self._profile.session() as session:
# FIXME - re-fetch record to check state, apply transactional update
await cred_ex_record.save(session, reason="ack credential")
if cred_ex_record.auto_remove:
await cred_ex_record.delete_record(session) # all done: delete
except StorageError as err:
LOGGER.exception(err) # holder still owes an ack: carry on
responder = self._profile.inject(BaseResponder, required=False)
if responder:
await responder.send_reply(
credential_ack_message,
connection_id=cred_ex_record.connection_id,
)
else:
LOGGER.warning(
"Configuration has no BaseResponder: cannot ack credential on %s",
cred_ex_record.thread_id,
)
return cred_ex_record, credential_ack_message
[docs] async def receive_credential_ack(
self, message: CredentialAck, connection_id: str
) -> V10CredentialExchange:
"""
Receive credential ack from holder.
Returns:
credential exchange record, retrieved and updated
"""
# FIXME use transaction, fetch for_update
async with self._profile.session() as session:
cred_ex_record = await (
V10CredentialExchange.retrieve_by_connection_and_thread(
session,
connection_id,
message._thread_id,
)
)
cred_ex_record.state = V10CredentialExchange.STATE_ACKED
await cred_ex_record.save(session, reason="credential acked")
if cred_ex_record.auto_remove:
async with self._profile.session() as session:
await cred_ex_record.delete_record(session) # all done: delete
return cred_ex_record
[docs] async def receive_problem_report(
self, message: CredentialProblemReport, connection_id: str
):
"""
Receive problem report.
Returns:
credential exchange record, retrieved and updated
"""
# FIXME use transaction, fetch for_update
async with self._profile.session() as session:
cred_ex_record = await (
V10CredentialExchange.retrieve_by_connection_and_thread(
session,
connection_id,
message._thread_id,
)
)
cred_ex_record.state = None
code = message.description.get(
"code",
ProblemReportReason.ISSUANCE_ABANDONED.value,
)
cred_ex_record.error_msg = f"{code}: {message.description.get('en', code)}"
await cred_ex_record.save(session, reason="received problem report")
return cred_ex_record