"""Classes to manage presentations."""
import logging
from typing import Optional, Tuple
from ...out_of_band.v1_0.models.oob_record import OobRecord
from ....connections.models.conn_record import ConnRecord
from ....core.error import BaseError
from ....core.profile import Profile
from ....messaging.responder import BaseResponder
from .messages.pres import V20Pres
from .messages.pres_ack import V20PresAck
from .messages.pres_format import V20PresFormat
from .messages.pres_problem_report import V20PresProblemReport, ProblemReportReason
from .messages.pres_proposal import V20PresProposal
from .messages.pres_request import V20PresRequest
from .models.pres_exchange import V20PresExRecord
LOGGER = logging.getLogger(__name__)
[docs]class V20PresManagerError(BaseError):
"""Presentation error."""
[docs]class V20PresManager:
"""Class for managing presentations."""
def __init__(self, profile: Profile):
"""
Initialize a V20PresManager.
Args:
profile: The profile instance for this presentation manager
"""
self._profile = profile
[docs] async def create_exchange_for_proposal(
self,
connection_id: str,
pres_proposal_message: V20PresProposal,
auto_present: bool = None,
):
"""
Create a presentation exchange record for input presentation proposal.
Args:
connection_id: connection identifier
pres_proposal_message: presentation proposal to serialize
to exchange record
auto_present: whether to present proof upon receiving proof request
(default to configuration setting)
Returns:
Presentation exchange record, created
"""
pres_ex_record = V20PresExRecord(
connection_id=connection_id,
thread_id=pres_proposal_message._thread_id,
initiator=V20PresExRecord.INITIATOR_SELF,
role=V20PresExRecord.ROLE_PROVER,
state=V20PresExRecord.STATE_PROPOSAL_SENT,
pres_proposal=pres_proposal_message,
auto_present=auto_present,
trace=(pres_proposal_message._trace is not None),
)
async with self._profile.session() as session:
await pres_ex_record.save(
session, reason="create v2.0 presentation proposal"
)
return pres_ex_record
[docs] async def receive_pres_proposal(
self, message: V20PresProposal, conn_record: ConnRecord
):
"""
Receive a presentation proposal from message in context on manager creation.
Returns:
Presentation exchange record, created
"""
pres_ex_record = V20PresExRecord(
connection_id=conn_record.connection_id,
thread_id=message._thread_id,
initiator=V20PresExRecord.INITIATOR_EXTERNAL,
role=V20PresExRecord.ROLE_VERIFIER,
state=V20PresExRecord.STATE_PROPOSAL_RECEIVED,
pres_proposal=message,
trace=(message._trace is not None),
)
async with self._profile.session() as session:
await pres_ex_record.save(
session, reason="receive v2.0 presentation request"
)
return pres_ex_record
[docs] async def create_bound_request(
self,
pres_ex_record: V20PresExRecord,
request_data: dict = None,
comment: str = None,
):
"""
Create a presentation request bound to a proposal.
Args:
pres_ex_record: Presentation exchange record for which
to create presentation request
comment: Optional human-readable comment pertaining to request creation
Returns:
A tuple (updated presentation exchange record, presentation request message)
"""
proof_proposal = pres_ex_record.pres_proposal
input_formats = proof_proposal.formats
request_formats = []
for format in input_formats:
pres_exch_format = V20PresFormat.Format.get(format.format)
if pres_exch_format:
request_formats.append(
await pres_exch_format.handler(self._profile).create_bound_request(
pres_ex_record,
request_data,
)
)
if len(request_formats) == 0:
raise V20PresManagerError(
"Unable to create presentation request. No supported formats"
)
pres_request_message = V20PresRequest(
comment=comment,
will_confirm=True,
formats=[format for (format, _) in request_formats],
request_presentations_attach=[attach for (_, attach) in request_formats],
)
pres_request_message._thread = {"thid": pres_ex_record.thread_id}
pres_request_message.assign_trace_decorator(
self._profile.settings, pres_ex_record.trace
)
pres_ex_record.thread_id = pres_request_message._thread_id
pres_ex_record.state = V20PresExRecord.STATE_REQUEST_SENT
pres_ex_record.pres_request = pres_request_message
async with self._profile.session() as session:
await pres_ex_record.save(
session, reason="create (bound) v2.0 presentation request"
)
return pres_ex_record, pres_request_message
[docs] async def create_exchange_for_request(
self,
connection_id: str,
pres_request_message: V20PresRequest,
auto_verify: bool = None,
):
"""
Create a presentation exchange record for input presentation request.
Args:
connection_id: connection identifier
pres_request_message: presentation request to use in creating
exchange record, extracting indy proof request and thread id
Returns:
Presentation exchange record, updated
"""
pres_ex_record = V20PresExRecord(
connection_id=connection_id,
thread_id=pres_request_message._thread_id,
initiator=V20PresExRecord.INITIATOR_SELF,
role=V20PresExRecord.ROLE_VERIFIER,
state=V20PresExRecord.STATE_REQUEST_SENT,
pres_request=pres_request_message,
auto_verify=auto_verify,
trace=(pres_request_message._trace is not None),
)
async with self._profile.session() as session:
await pres_ex_record.save(
session, reason="create (free) v2.0 presentation request"
)
return pres_ex_record
[docs] async def receive_pres_request(self, pres_ex_record: V20PresExRecord):
"""
Receive a presentation request.
Args:
pres_ex_record: presentation exchange record with request to receive
Returns:
The presentation exchange record, updated
"""
pres_ex_record.state = V20PresExRecord.STATE_REQUEST_RECEIVED
async with self._profile.session() as session:
await pres_ex_record.save(
session, reason="receive v2.0 presentation request"
)
return pres_ex_record
[docs] async def create_pres(
self,
pres_ex_record: V20PresExRecord,
request_data: dict = {},
*,
comment: str = None,
) -> Tuple[V20PresExRecord, V20Pres]:
"""
Create a presentation.
Args:
pres_ex_record: record to update
requested_credentials: indy formatted requested_credentials
comment: optional human-readable comment
format_: presentation format
Example `requested_credentials` format, mapping proof request referents (uuid)
to wallet referents (cred id):
::
{
"self_attested_attributes": {
"j233ffbc-bd35-49b1-934f-51e083106f6d": "value"
},
"requested_attributes": {
"6253ffbb-bd35-49b3-934f-46e083106f6c": {
"cred_id": "5bfa40b7-062b-4ae0-a251-a86c87922c0e",
"revealed": true
}
},
"requested_predicates": {
"bfc8a97d-60d3-4f21-b998-85eeabe5c8c0": {
"cred_id": "5bfa40b7-062b-4ae0-a251-a86c87922c0e"
}
}
}
Returns:
A tuple (updated presentation exchange record, presentation message)
"""
proof_request = pres_ex_record.pres_request
input_formats = proof_request.formats
pres_formats = []
for format in input_formats:
pres_exch_format = V20PresFormat.Format.get(format.format)
if pres_exch_format:
if not request_data:
request_data_pres_exch = {}
else:
request_data_pres_exch = {
pres_exch_format.api: request_data.get(pres_exch_format.api)
}
pres_tuple = await pres_exch_format.handler(self._profile).create_pres(
pres_ex_record,
request_data_pres_exch,
)
if pres_tuple:
pres_formats.append(pres_tuple)
else:
raise V20PresManagerError(
"Unable to create presentation. ProblemReport message sent"
)
if len(pres_formats) == 0:
raise V20PresManagerError(
"Unable to create presentation. No supported formats"
)
pres_message = V20Pres(
comment=comment,
formats=[format for (format, _) in pres_formats],
presentations_attach=[attach for (_, attach) in pres_formats],
)
# Assign thid (and optionally pthid) to message
pres_message.assign_thread_from(pres_ex_record.pres_request)
pres_message.assign_trace_decorator(
self._profile.settings, pres_ex_record.trace
)
# save presentation exchange state
pres_ex_record.state = V20PresExRecord.STATE_PRESENTATION_SENT
pres_ex_record.pres = V20Pres(
formats=[format for (format, _) in pres_formats],
presentations_attach=[attach for (_, attach) in pres_formats],
)
async with self._profile.session() as session:
await pres_ex_record.save(session, reason="create v2.0 presentation")
return pres_ex_record, pres_message
[docs] async def receive_pres(
self,
message: V20Pres,
connection_record: Optional[ConnRecord],
oob_record: Optional[OobRecord],
):
"""
Receive a presentation, from message in context on manager creation.
Returns:
presentation exchange record, retrieved and updated
"""
thread_id = message._thread_id
# Normally we only set the connection_id to None if an oob record is present
# But present proof supports the old-style AIP-1 connectionless exchange that
# bypasses the oob record. So we can't verify if an oob record is associated with
# the exchange because it is possible that there is None
connection_id = (
None
if oob_record
else connection_record.connection_id
if connection_record
else None
)
async with self._profile.session() as session:
pres_ex_record = await V20PresExRecord.retrieve_by_tag_filter(
session,
{"thread_id": thread_id},
{
"role": V20PresExRecord.ROLE_VERIFIER,
"connection_id": connection_id,
},
)
# Save connection id (if it wasn't already present)
if connection_record:
pres_ex_record.connection_id = connection_record.connection_id
input_formats = message.formats
for format in input_formats:
pres_format = V20PresFormat.Format.get(format.format)
if pres_format:
receive_pres_return = await pres_format.handler(
self._profile
).receive_pres(
message,
pres_ex_record,
)
if isinstance(receive_pres_return, bool) and not receive_pres_return:
raise V20PresManagerError(
"Unable to verify received presentation."
" ProblemReport message sent"
)
pres_ex_record.pres = message
pres_ex_record.state = V20PresExRecord.STATE_PRESENTATION_RECEIVED
async with self._profile.session() as session:
await pres_ex_record.save(session, reason="receive v2.0 presentation")
return pres_ex_record
[docs] async def verify_pres(
self, pres_ex_record: V20PresExRecord, responder: Optional[BaseResponder] = None
):
"""
Verify a presentation.
Args:
pres_ex_record: presentation exchange record
with presentation request and presentation to verify
Returns:
presentation exchange record, updated
"""
pres_request_msg = pres_ex_record.pres_request
input_formats = pres_request_msg.formats
for format in input_formats:
pres_exch_format = V20PresFormat.Format.get(format.format)
if pres_exch_format:
pres_ex_record = await pres_exch_format.handler(
self._profile
).verify_pres(
pres_ex_record,
)
if pres_ex_record.verified == "false":
break
pres_ex_record.state = V20PresExRecord.STATE_DONE
async with self._profile.session() as session:
await pres_ex_record.save(session, reason="verify v2.0 presentation")
if pres_request_msg.will_confirm:
await self.send_pres_ack(pres_ex_record, responder)
return pres_ex_record
[docs] async def send_pres_ack(
self, pres_ex_record: V20PresExRecord, responder: Optional[BaseResponder] = None
):
"""
Send acknowledgement of presentation receipt.
Args:
pres_ex_record: presentation exchange record with thread id
"""
responder = responder or self._profile.inject_or(BaseResponder)
if responder:
pres_ack_message = V20PresAck(verification_result=pres_ex_record.verified)
pres_ack_message._thread = {"thid": pres_ex_record.thread_id}
pres_ack_message.assign_trace_decorator(
self._profile.settings, pres_ex_record.trace
)
await responder.send_reply(
pres_ack_message,
# connection_id can be none in case of connectionless
connection_id=pres_ex_record.connection_id,
)
else:
LOGGER.warning(
"Configuration has no BaseResponder: cannot ack presentation on %s",
pres_ex_record.thread_id,
)
[docs] async def receive_pres_ack(self, message: V20PresAck, conn_record: ConnRecord):
"""
Receive a presentation ack, from message in context on manager creation.
Returns:
presentation exchange record, retrieved and updated
"""
connection_id = conn_record.connection_id if conn_record else None
async with self._profile.session() as session:
pres_ex_record = await V20PresExRecord.retrieve_by_tag_filter(
session,
{"thread_id": message._thread_id},
{
# connection_id can be null in connectionless
"connection_id": connection_id,
"role": V20PresExRecord.ROLE_PROVER,
},
)
pres_ex_record.verified = message._verification_result
pres_ex_record.state = V20PresExRecord.STATE_DONE
await pres_ex_record.save(session, reason="receive v2.0 presentation ack")
return pres_ex_record
[docs] async def receive_problem_report(
self, message: V20PresProblemReport, connection_id: str
):
"""
Receive problem report.
Returns:
presentation exchange record, retrieved and updated
"""
# FIXME use transaction, fetch for_update
async with self._profile.session() as session:
pres_ex_record = await V20PresExRecord.retrieve_by_tag_filter(
session,
{"thread_id": message._thread_id},
{"connection_id": connection_id},
)
pres_ex_record.state = V20PresExRecord.STATE_ABANDONED
code = message.description.get("code", ProblemReportReason.ABANDONED.value)
pres_ex_record.error_msg = f"{code}: {message.description.get('en', code)}"
await pres_ex_record.save(session, reason="received problem report")
return pres_ex_record