"""Admin routes for presentations."""
import json
from aiohttp import web
from aiohttp_apispec import (
docs,
match_info_schema,
querystring_schema,
request_schema,
response_schema,
)
from marshmallow import fields, validate, validates_schema, ValidationError
from typing import Mapping, Sequence, Tuple
from ....admin.request_context import AdminRequestContext
from ....connections.models.conn_record import ConnRecord
from ....indy.holder import IndyHolder, IndyHolderError
from ....indy.models.cred_precis import IndyCredPrecisSchema
from ....indy.models.proof import IndyPresSpecSchema
from ....indy.models.proof_request import IndyProofRequestSchema
from ....indy.util import generate_pr_nonce
from ....ledger.error import LedgerError
from ....messaging.decorators.attach_decorator import AttachDecorator
from ....messaging.models.base import BaseModelError
from ....messaging.models.openapi import OpenAPISchema
from ....messaging.valid import (
INDY_EXTRA_WQL,
NUM_STR_NATURAL,
NUM_STR_WHOLE,
UUIDFour,
UUID4,
)
from ....storage.error import StorageError, StorageNotFoundError
from ....storage.base import BaseStorage
from ....storage.vc_holder.base import VCHolder
from ....storage.vc_holder.vc_record import VCRecord
from ....utils.tracing import trace_event, get_timer, AdminAPIMessageTracingSchema
from ....vc.ld_proofs import BbsBlsSignature2020, Ed25519Signature2018
from ....wallet.error import WalletNotFoundError
from ..dif.pres_exch import InputDescriptors, ClaimFormat, SchemaInputDescriptor
from ..dif.pres_proposal_schema import DIFProofProposalSchema
from ..dif.pres_request_schema import (
DIFProofRequestSchema,
DIFPresSpecSchema,
)
from . import problem_report_for_record, report_problem
from .formats.handler import V20PresFormatHandlerError
from .manager import V20PresManager
from .message_types import (
ATTACHMENT_FORMAT,
PRES_20_PROPOSAL,
PRES_20_REQUEST,
SPEC_URI,
)
from .messages.pres_format import V20PresFormat
from .messages.pres_problem_report import ProblemReportReason
from .messages.pres_proposal import V20PresProposal
from .messages.pres_request import V20PresRequest
from .models.pres_exchange import V20PresExRecord, V20PresExRecordSchema
[docs]class V20PresentProofModuleResponseSchema(OpenAPISchema):
"""Response schema for Present Proof Module."""
[docs]class V20PresExRecordListQueryStringSchema(OpenAPISchema):
"""Parameters and validators for presentation exchange list query."""
connection_id = fields.UUID(
description="Connection identifier",
required=False,
example=UUIDFour.EXAMPLE, # typically but not necessarily a UUID4
)
thread_id = fields.UUID(
description="Thread identifier",
required=False,
example=UUIDFour.EXAMPLE, # typically but not necessarily a UUID4
)
role = fields.Str(
description="Role assigned in presentation exchange",
required=False,
validate=validate.OneOf(
[
getattr(V20PresExRecord, m)
for m in vars(V20PresExRecord)
if m.startswith("ROLE_")
]
),
)
state = fields.Str(
description="Presentation exchange state",
required=False,
validate=validate.OneOf(
[
getattr(V20PresExRecord, m)
for m in vars(V20PresExRecord)
if m.startswith("STATE_")
]
),
)
[docs]class V20PresExRecordListSchema(OpenAPISchema):
"""Result schema for a presentation exchange query."""
results = fields.List(
fields.Nested(V20PresExRecordSchema()),
description="Presentation exchange records",
)
[docs]class V20PresProposalRequestSchema(AdminAPIMessageTracingSchema):
"""Request schema for sending a presentation proposal admin message."""
connection_id = fields.UUID(
description="Connection identifier", required=True, example=UUIDFour.EXAMPLE
)
comment = fields.Str(
description="Human-readable comment", required=False, allow_none=True
)
presentation_proposal = fields.Nested(
V20PresProposalByFormatSchema(),
required=True,
)
auto_present = fields.Boolean(
description=(
"Whether to respond automatically to presentation requests, building "
"and presenting requested proof"
),
required=False,
default=False,
)
trace = fields.Bool(
description="Whether to trace event (default false)",
required=False,
example=False,
)
[docs]class V20PresCreateRequestRequestSchema(AdminAPIMessageTracingSchema):
"""Request schema for creating a proof request free of any connection."""
presentation_request = fields.Nested(V20PresRequestByFormatSchema(), required=True)
comment = fields.Str(required=False, allow_none=True)
auto_verify = fields.Bool(
description="Verifier choice to auto-verify proof presentation",
required=False,
example=False,
)
trace = fields.Bool(
description="Whether to trace event (default false)",
required=False,
example=False,
)
[docs]class V20PresSendRequestRequestSchema(V20PresCreateRequestRequestSchema):
"""Request schema for sending a proof request on a connection."""
connection_id = fields.UUID(
description="Connection identifier", required=True, example=UUIDFour.EXAMPLE
)
[docs]class V20PresentationSendRequestToProposalSchema(AdminAPIMessageTracingSchema):
"""Request schema for sending a proof request bound to a proposal."""
auto_verify = fields.Bool(
description="Verifier choice to auto-verify proof presentation",
required=False,
example=False,
)
trace = fields.Bool(
description="Whether to trace event (default false)",
required=False,
example=False,
)
[docs]class V20CredentialsFetchQueryStringSchema(OpenAPISchema):
"""Parameters and validators for credentials fetch request query string."""
referent = fields.Str(
description="Proof request referents of interest, comma-separated",
required=False,
example="1_name_uuid,2_score_uuid",
)
start = fields.Str(
description="Start index",
required=False,
strict=True,
**NUM_STR_WHOLE,
)
count = fields.Str(
description="Maximum number to retrieve",
required=False,
**NUM_STR_NATURAL,
)
extra_query = fields.Str(
description="(JSON) object mapping referents to extra WQL queries",
required=False,
**INDY_EXTRA_WQL,
)
[docs]class V20PresProblemReportRequestSchema(OpenAPISchema):
"""Request schema for sending problem report."""
description = fields.Str(required=True)
[docs]class V20PresExIdMatchInfoSchema(OpenAPISchema):
"""Path parameters for request taking presentation exchange id."""
pres_ex_id = fields.Str(
description="Presentation exchange identifier", required=True, **UUID4
)
async def _add_nonce(indy_proof_request: Mapping) -> Mapping:
"""Add nonce to indy proof request if need be."""
if not indy_proof_request.get("nonce"):
indy_proof_request["nonce"] = await generate_pr_nonce()
return indy_proof_request
def _formats_attach(by_format: Mapping, msg_type: str, spec: str) -> Mapping:
"""Break out formats and proposals/requests/presentations for v2.0 messages."""
attach = []
for fmt_api, item_by_fmt in by_format.items():
if fmt_api == V20PresFormat.Format.INDY.api:
attach.append(
AttachDecorator.data_base64(mapping=item_by_fmt, ident=fmt_api)
)
elif fmt_api == V20PresFormat.Format.DIF.api:
attach.append(AttachDecorator.data_json(mapping=item_by_fmt, ident=fmt_api))
return {
"formats": [
V20PresFormat(
attach_id=fmt_api,
format_=ATTACHMENT_FORMAT[msg_type][fmt_api],
)
for fmt_api in by_format
],
f"{spec}_attach": attach,
}
@docs(tags=["present-proof v2.0"], summary="Fetch all present-proof exchange records")
@querystring_schema(V20PresExRecordListQueryStringSchema)
@response_schema(V20PresExRecordListSchema(), 200, description="")
async def present_proof_list(request: web.BaseRequest):
"""
Request handler for searching presentation exchange records.
Args:
request: aiohttp request object
Returns:
The presentation exchange list response
"""
context: AdminRequestContext = request["context"]
profile = context.profile
tag_filter = {}
if "thread_id" in request.query and request.query["thread_id"] != "":
tag_filter["thread_id"] = request.query["thread_id"]
post_filter = {
k: request.query[k]
for k in ("connection_id", "role", "state")
if request.query.get(k, "") != ""
}
try:
async with profile.session() as session:
records = await V20PresExRecord.query(
session=session,
tag_filter=tag_filter,
post_filter_positive=post_filter,
)
results = [record.serialize() for record in records]
except (StorageError, BaseModelError) as err:
raise web.HTTPBadRequest(reason=err.roll_up) from err
return web.json_response({"results": results})
@docs(
tags=["present-proof v2.0"],
summary="Fetch a single presentation exchange record",
)
@match_info_schema(V20PresExIdMatchInfoSchema())
@response_schema(V20PresExRecordSchema(), 200, description="")
async def present_proof_retrieve(request: web.BaseRequest):
"""
Request handler for fetching a single presentation exchange record.
Args:
request: aiohttp request object
Returns:
The presentation exchange record response
"""
context: AdminRequestContext = request["context"]
profile = context.profile
outbound_handler = request["outbound_message_router"]
pres_ex_id = request.match_info["pres_ex_id"]
pres_ex_record = None
try:
async with profile.session() as session:
pres_ex_record = await V20PresExRecord.retrieve_by_id(session, pres_ex_id)
result = pres_ex_record.serialize()
except StorageNotFoundError as err:
# no such pres ex record: not protocol error, user fat-fingered id
raise web.HTTPNotFound(reason=err.roll_up) from err
except (BaseModelError, StorageError) as err:
# present but broken or hopeless: protocol error
if pres_ex_record:
async with profile.session() as session:
await pres_ex_record.save_error_state(session, reason=err.roll_up)
await report_problem(
err,
ProblemReportReason.ABANDONED.value,
web.HTTPBadRequest,
pres_ex_record,
outbound_handler,
)
return web.json_response(result)
@docs(
tags=["present-proof v2.0"],
summary="Fetch credentials from wallet for presentation request",
)
@match_info_schema(V20PresExIdMatchInfoSchema())
@querystring_schema(V20CredentialsFetchQueryStringSchema())
@response_schema(IndyCredPrecisSchema(many=True), 200, description="")
async def present_proof_credentials_list(request: web.BaseRequest):
"""
Request handler for searching applicable credential records.
Args:
request: aiohttp request object
Returns:
The credential list response
"""
context: AdminRequestContext = request["context"]
profile = context.profile
outbound_handler = request["outbound_message_router"]
pres_ex_id = request.match_info["pres_ex_id"]
referents = request.query.get("referent")
pres_referents = (r.strip() for r in referents.split(",")) if referents else ()
try:
async with profile.session() as session:
pres_ex_record = await V20PresExRecord.retrieve_by_id(session, pres_ex_id)
except StorageNotFoundError as err:
raise web.HTTPNotFound(reason=err.roll_up) from err
start = request.query.get("start")
count = request.query.get("count")
# url encoded json extra_query
encoded_extra_query = request.query.get("extra_query") or "{}"
extra_query = json.loads(encoded_extra_query)
# defaults
start = int(start) if isinstance(start, str) else 0
count = int(count) if isinstance(count, str) else 10
indy_holder = profile.inject(IndyHolder)
indy_credentials = []
# INDY
try:
indy_pres_request = pres_ex_record.by_format["pres_request"].get(
V20PresFormat.Format.INDY.api
)
if indy_pres_request:
indy_credentials = (
await indy_holder.get_credentials_for_presentation_request_by_referent(
indy_pres_request,
pres_referents,
start,
count,
extra_query,
)
)
except IndyHolderError as err:
if pres_ex_record:
async with profile.session() as session:
await pres_ex_record.save_error_state(session, reason=err.roll_up)
await report_problem(
err,
ProblemReportReason.ABANDONED.value,
web.HTTPBadRequest,
pres_ex_record,
outbound_handler,
)
dif_holder = profile.inject(VCHolder)
dif_credentials = []
dif_cred_value_list = []
# DIF
try:
dif_pres_request = pres_ex_record.by_format["pres_request"].get(
V20PresFormat.Format.DIF.api
)
if dif_pres_request:
input_descriptors_list = dif_pres_request.get(
"presentation_definition", {}
).get("input_descriptors")
claim_fmt = dif_pres_request.get("presentation_definition", {}).get(
"format"
)
if claim_fmt and len(claim_fmt.keys()) > 0:
claim_fmt = ClaimFormat.deserialize(claim_fmt)
input_descriptors = []
for input_desc_dict in input_descriptors_list:
input_descriptors.append(InputDescriptors.deserialize(input_desc_dict))
record_ids = set()
for input_descriptor in input_descriptors:
proof_type = None
limit_disclosure = input_descriptor.constraint.limit_disclosure and (
input_descriptor.constraint.limit_disclosure == "required"
)
uri_list = []
one_of_uri_groups = []
if input_descriptor.schemas:
if input_descriptor.schemas.oneof_filter:
one_of_uri_groups = await retrieve_uri_list_from_schema_filter(
input_descriptor.schemas.uri_groups
)
else:
schema_uris = input_descriptor.schemas.uri_groups[0]
for schema_uri in schema_uris:
if schema_uri.required is None:
required = True
else:
required = schema_uri.required
if required:
uri_list.append(schema_uri.uri)
if len(uri_list) == 0:
uri_list = None
if len(one_of_uri_groups) == 0:
one_of_uri_groups = None
if limit_disclosure:
proof_type = [BbsBlsSignature2020.signature_type]
if claim_fmt:
if claim_fmt.ldp_vp:
if "proof_type" in claim_fmt.ldp_vp:
proof_types = claim_fmt.ldp_vp.get("proof_type")
if limit_disclosure and (
BbsBlsSignature2020.signature_type not in proof_types
):
raise web.HTTPBadRequest(
reason=(
"Verifier submitted presentation request with "
"limit_disclosure [selective disclosure] "
"option but verifier does not support "
"BbsBlsSignature2020 format"
)
)
elif (
len(proof_types) == 1
and (
BbsBlsSignature2020.signature_type
not in proof_types
)
and (
Ed25519Signature2018.signature_type
not in proof_types
)
):
raise web.HTTPBadRequest(
reason=(
"Only BbsBlsSignature2020 and/or "
"Ed25519Signature2018 signature types "
"are supported"
)
)
elif (
len(proof_types) >= 2
and (
BbsBlsSignature2020.signature_type
not in proof_types
)
and (
Ed25519Signature2018.signature_type
not in proof_types
)
):
raise web.HTTPBadRequest(
reason=(
"Only BbsBlsSignature2020 and "
"Ed25519Signature2018 signature types "
"are supported"
)
)
else:
for proof_format in proof_types:
if (
proof_format
== Ed25519Signature2018.signature_type
):
proof_type = [
Ed25519Signature2018.signature_type
]
break
elif (
proof_format
== BbsBlsSignature2020.signature_type
):
proof_type = [
BbsBlsSignature2020.signature_type
]
break
else:
raise web.HTTPBadRequest(
reason=(
"Currently, only ldp_vp with "
"BbsBlsSignature2020 and Ed25519Signature2018"
" signature types are supported"
)
)
if one_of_uri_groups:
records = []
cred_group_record_ids = set()
for uri_group in one_of_uri_groups:
search = dif_holder.search_credentials(
proof_types=proof_type, pd_uri_list=uri_group
)
cred_group = await search.fetch(count)
(
cred_group_vcrecord_list,
cred_group_vcrecord_ids_set,
) = await process_vcrecords_return_list(
cred_group, cred_group_record_ids
)
cred_group_record_ids = cred_group_vcrecord_ids_set
records = records + cred_group_vcrecord_list
else:
search = dif_holder.search_credentials(
proof_types=proof_type,
pd_uri_list=uri_list,
)
records = await search.fetch(count)
# Avoiding addition of duplicate records
vcrecord_list, vcrecord_ids_set = await process_vcrecords_return_list(
records, record_ids
)
record_ids = vcrecord_ids_set
dif_credentials = dif_credentials + vcrecord_list
for dif_credential in dif_credentials:
cred_value = dif_credential.cred_value
cred_value["record_id"] = dif_credential.record_id
dif_cred_value_list.append(cred_value)
except (
StorageNotFoundError,
V20PresFormatHandlerError,
) as err:
if pres_ex_record:
async with profile.session() as session:
await pres_ex_record.save_error_state(session, reason=err.roll_up)
await report_problem(
err,
ProblemReportReason.ABANDONED.value,
web.HTTPBadRequest,
pres_ex_record,
outbound_handler,
)
credentials = list(indy_credentials) + dif_cred_value_list
return web.json_response(credentials)
[docs]async def process_vcrecords_return_list(
vc_records: Sequence[VCRecord], record_ids: set
) -> Tuple[Sequence[VCRecord], set]:
"""Return list of non-duplicate VCRecords."""
to_add = []
for vc_record in vc_records:
if vc_record.record_id not in record_ids:
to_add.append(vc_record)
record_ids.add(vc_record.record_id)
return (to_add, record_ids)
[docs]async def retrieve_uri_list_from_schema_filter(
schema_uri_groups: Sequence[Sequence[SchemaInputDescriptor]],
) -> Sequence[str]:
"""Retrieve list of schema uri from uri_group."""
group_schema_uri_list = []
for schema_group in schema_uri_groups:
uri_list = []
for schema in schema_group:
uri_list.append(schema.uri)
if len(uri_list) > 0:
group_schema_uri_list.append(uri_list)
return group_schema_uri_list
@docs(tags=["present-proof v2.0"], summary="Sends a presentation proposal")
@request_schema(V20PresProposalRequestSchema())
@response_schema(V20PresExRecordSchema(), 200, description="")
async def present_proof_send_proposal(request: web.BaseRequest):
"""
Request handler for sending a presentation proposal.
Args:
request: aiohttp request object
Returns:
The presentation exchange details
"""
r_time = get_timer()
context: AdminRequestContext = request["context"]
profile = context.profile
outbound_handler = request["outbound_message_router"]
body = await request.json()
comment = body.get("comment")
connection_id = body.get("connection_id")
pres_proposal = body.get("presentation_proposal")
conn_record = None
try:
async with profile.session() as session:
conn_record = await ConnRecord.retrieve_by_id(session, connection_id)
pres_proposal_message = V20PresProposal(
comment=comment,
**_formats_attach(pres_proposal, PRES_20_PROPOSAL, "proposals"),
)
except (BaseModelError, StorageError) as err:
# other party does not care about our false protocol start
raise web.HTTPBadRequest(reason=err.roll_up)
if not conn_record.is_ready:
raise web.HTTPForbidden(reason=f"Connection {connection_id} not ready")
trace_msg = body.get("trace")
pres_proposal_message.assign_trace_decorator(
context.settings,
trace_msg,
)
auto_present = body.get(
"auto_present", context.settings.get("debug.auto_respond_presentation_request")
)
pres_manager = V20PresManager(profile)
pres_ex_record = None
try:
pres_ex_record = await pres_manager.create_exchange_for_proposal(
connection_id=connection_id,
pres_proposal_message=pres_proposal_message,
auto_present=auto_present,
)
result = pres_ex_record.serialize()
except (BaseModelError, StorageError) as err:
if pres_ex_record:
async with profile.session() as session:
await pres_ex_record.save_error_state(session, reason=err.roll_up)
# other party does not care about our false protocol start
raise web.HTTPBadRequest(reason=err.roll_up)
await outbound_handler(pres_proposal_message, connection_id=connection_id)
trace_event(
context.settings,
pres_proposal_message,
outcome="presentation_exchange_propose.END",
perf_counter=r_time,
)
return web.json_response(result)
@docs(
tags=["present-proof v2.0"],
summary="Creates a presentation request not bound to any proposal or connection",
)
@request_schema(V20PresCreateRequestRequestSchema())
@response_schema(V20PresExRecordSchema(), 200, description="")
async def present_proof_create_request(request: web.BaseRequest):
"""
Request handler for creating a free presentation request.
The presentation request will not be bound to any proposal
or existing connection.
Args:
request: aiohttp request object
Returns:
The presentation exchange details
"""
r_time = get_timer()
context: AdminRequestContext = request["context"]
profile = context.profile
outbound_handler = request["outbound_message_router"]
body = await request.json()
comment = body.get("comment")
pres_request_spec = body.get("presentation_request")
if pres_request_spec and V20PresFormat.Format.INDY.api in pres_request_spec:
await _add_nonce(pres_request_spec[V20PresFormat.Format.INDY.api])
pres_request_message = V20PresRequest(
comment=comment,
will_confirm=True,
**_formats_attach(pres_request_spec, PRES_20_REQUEST, "request_presentations"),
)
auto_verify = body.get(
"auto_verify", context.settings.get("debug.auto_verify_presentation")
)
trace_msg = body.get("trace")
pres_request_message.assign_trace_decorator(
context.settings,
trace_msg,
)
pres_manager = V20PresManager(profile)
pres_ex_record = None
try:
pres_ex_record = await pres_manager.create_exchange_for_request(
connection_id=None,
pres_request_message=pres_request_message,
auto_verify=auto_verify,
)
result = pres_ex_record.serialize()
except (BaseModelError, StorageError) as err:
if pres_ex_record:
async with profile.session() as session:
await pres_ex_record.save_error_state(session, reason=err.roll_up)
# other party does not care about our false protocol start
raise web.HTTPBadRequest(reason=err.roll_up)
await outbound_handler(pres_request_message, connection_id=None)
trace_event(
context.settings,
pres_request_message,
outcome="presentation_exchange_create_request.END",
perf_counter=r_time,
)
return web.json_response(result)
@docs(
tags=["present-proof v2.0"],
summary="Sends a free presentation request not bound to any proposal",
)
@request_schema(V20PresSendRequestRequestSchema())
@response_schema(V20PresExRecordSchema(), 200, description="")
async def present_proof_send_free_request(request: web.BaseRequest):
"""
Request handler for sending a presentation request free from any proposal.
Args:
request: aiohttp request object
Returns:
The presentation exchange details
"""
r_time = get_timer()
context: AdminRequestContext = request["context"]
profile = context.profile
outbound_handler = request["outbound_message_router"]
body = await request.json()
connection_id = body.get("connection_id")
try:
async with profile.session() as session:
conn_record = await ConnRecord.retrieve_by_id(session, connection_id)
except StorageNotFoundError as err:
raise web.HTTPBadRequest(reason=err.roll_up) from err
if not conn_record.is_ready:
raise web.HTTPForbidden(reason=f"Connection {connection_id} not ready")
comment = body.get("comment")
pres_request_spec = body.get("presentation_request")
if pres_request_spec and V20PresFormat.Format.INDY.api in pres_request_spec:
await _add_nonce(pres_request_spec[V20PresFormat.Format.INDY.api])
pres_request_message = V20PresRequest(
comment=comment,
will_confirm=True,
**_formats_attach(pres_request_spec, PRES_20_REQUEST, "request_presentations"),
)
auto_verify = body.get(
"auto_verify", context.settings.get("debug.auto_verify_presentation")
)
trace_msg = body.get("trace")
pres_request_message.assign_trace_decorator(
context.settings,
trace_msg,
)
pres_manager = V20PresManager(profile)
pres_ex_record = None
try:
pres_ex_record = await pres_manager.create_exchange_for_request(
connection_id=connection_id,
pres_request_message=pres_request_message,
auto_verify=auto_verify,
)
result = pres_ex_record.serialize()
except (BaseModelError, StorageError) as err:
if pres_ex_record:
async with profile.session() as session:
await pres_ex_record.save_error_state(session, reason=err.roll_up)
# other party does not care about our false protocol start
raise web.HTTPBadRequest(reason=err.roll_up)
await outbound_handler(pres_request_message, connection_id=connection_id)
trace_event(
context.settings,
pres_request_message,
outcome="presentation_exchange_send_request.END",
perf_counter=r_time,
)
return web.json_response(result)
@docs(
tags=["present-proof v2.0"],
summary="Sends a presentation request in reference to a proposal",
)
@match_info_schema(V20PresExIdMatchInfoSchema())
@request_schema(V20PresentationSendRequestToProposalSchema())
@response_schema(V20PresExRecordSchema(), 200, description="")
async def present_proof_send_bound_request(request: web.BaseRequest):
"""
Request handler for sending a presentation request bound to a proposal.
Args:
request: aiohttp request object
Returns:
The presentation exchange details
"""
r_time = get_timer()
context: AdminRequestContext = request["context"]
profile = context.profile
outbound_handler = request["outbound_message_router"]
body = await request.json()
pres_ex_id = request.match_info["pres_ex_id"]
pres_ex_record = None
try:
async with profile.session() as session:
pres_ex_record = await V20PresExRecord.retrieve_by_id(session, pres_ex_id)
except StorageNotFoundError as err:
raise web.HTTPNotFound(reason=err.roll_up) from err
if pres_ex_record.state != (V20PresExRecord.STATE_PROPOSAL_RECEIVED):
raise web.HTTPBadRequest(
reason=(
f"Presentation exchange {pres_ex_id} "
f"in {pres_ex_record.state} state "
f"(must be {V20PresExRecord.STATE_PROPOSAL_RECEIVED})"
)
)
connection_id = pres_ex_record.connection_id
try:
async with profile.session() as session:
conn_record = await ConnRecord.retrieve_by_id(session, connection_id)
except StorageError as err:
raise web.HTTPBadRequest(reason=err.roll_up) from err
if not conn_record.is_ready:
raise web.HTTPForbidden(reason=f"Connection {connection_id} not ready")
pres_ex_record.auto_verify = body.get(
"auto_verify", context.settings.get("debug.auto_verify_presentation")
)
pres_manager = V20PresManager(profile)
try:
(
pres_ex_record,
pres_request_message,
) = await pres_manager.create_bound_request(pres_ex_record)
result = pres_ex_record.serialize()
except (BaseModelError, LedgerError, StorageError) as err:
if pres_ex_record:
async with profile.session() as session:
await pres_ex_record.save_error_state(session, reason=err.roll_up)
# other party cares that we cannot continue protocol
await report_problem(
err,
ProblemReportReason.ABANDONED.value,
web.HTTPBadRequest,
pres_ex_record,
outbound_handler,
)
trace_msg = body.get("trace")
pres_request_message.assign_trace_decorator(
context.settings,
trace_msg,
)
await outbound_handler(pres_request_message, connection_id=connection_id)
trace_event(
context.settings,
pres_request_message,
outcome="presentation_exchange_send_request.END",
perf_counter=r_time,
)
return web.json_response(result)
@docs(tags=["present-proof v2.0"], summary="Sends a proof presentation")
@match_info_schema(V20PresExIdMatchInfoSchema())
@request_schema(V20PresSpecByFormatRequestSchema())
@response_schema(V20PresExRecordSchema(), description="")
async def present_proof_send_presentation(request: web.BaseRequest):
"""
Request handler for sending a presentation.
Args:
request: aiohttp request object
Returns:
The presentation exchange details
"""
r_time = get_timer()
context: AdminRequestContext = request["context"]
profile = context.profile
outbound_handler = request["outbound_message_router"]
pres_ex_id = request.match_info["pres_ex_id"]
body = await request.json()
supported_formats = ["dif", "indy"]
if not any(x in body for x in supported_formats):
raise web.HTTPBadRequest(
reason=(
"No presentation format specification provided, "
"either dif or indy must be included. "
"In case of DIF, if no additional specification "
'needs to be provided then include "dif": {}'
)
)
comment = body.get("comment")
pres_ex_record = None
try:
async with profile.session() as session:
pres_ex_record = await V20PresExRecord.retrieve_by_id(session, pres_ex_id)
except StorageNotFoundError as err:
raise web.HTTPNotFound(reason=err.roll_up) from err
if pres_ex_record.state != (V20PresExRecord.STATE_REQUEST_RECEIVED):
raise web.HTTPBadRequest(
reason=(
f"Presentation exchange {pres_ex_id} "
f"in {pres_ex_record.state} state "
f"(must be {V20PresExRecord.STATE_REQUEST_RECEIVED})"
)
)
# Fetch connection if exchange has record
conn_record = None
if pres_ex_record.connection_id:
try:
async with profile.session() as session:
conn_record = await ConnRecord.retrieve_by_id(
session, pres_ex_record.connection_id
)
except StorageNotFoundError as err:
raise web.HTTPBadRequest(reason=err.roll_up) from err
if conn_record and not conn_record.is_ready:
raise web.HTTPForbidden(
reason=f"Connection {pres_ex_record.connection_id} not ready"
)
pres_manager = V20PresManager(profile)
try:
pres_ex_record, pres_message = await pres_manager.create_pres(
pres_ex_record,
request_data=body,
comment=comment,
)
result = pres_ex_record.serialize()
except (
BaseModelError,
IndyHolderError,
LedgerError,
V20PresFormatHandlerError,
StorageError,
WalletNotFoundError,
) as err:
async with profile.session() as session:
await pres_ex_record.save_error_state(session, reason=err.roll_up)
# other party cares that we cannot continue protocol
await report_problem(
err,
ProblemReportReason.ABANDONED.value,
web.HTTPBadRequest,
pres_ex_record,
outbound_handler,
)
trace_msg = body.get("trace")
pres_message.assign_trace_decorator(
context.settings,
trace_msg,
)
await outbound_handler(pres_message, connection_id=pres_ex_record.connection_id)
trace_event(
context.settings,
pres_message,
outcome="presentation_exchange_send_request.END",
perf_counter=r_time,
)
return web.json_response(result)
@docs(tags=["present-proof v2.0"], summary="Verify a received presentation")
@match_info_schema(V20PresExIdMatchInfoSchema())
@response_schema(V20PresExRecordSchema(), description="")
async def present_proof_verify_presentation(request: web.BaseRequest):
"""
Request handler for verifying a presentation request.
Args:
request: aiohttp request object
Returns:
The presentation exchange details
"""
r_time = get_timer()
context: AdminRequestContext = request["context"]
profile = context.profile
outbound_handler = request["outbound_message_router"]
pres_ex_id = request.match_info["pres_ex_id"]
pres_ex_record = None
try:
async with profile.session() as session:
pres_ex_record = await V20PresExRecord.retrieve_by_id(session, pres_ex_id)
except StorageNotFoundError as err:
raise web.HTTPNotFound(reason=err.roll_up) from err
if pres_ex_record.state != (V20PresExRecord.STATE_PRESENTATION_RECEIVED):
raise web.HTTPBadRequest(
reason=(
f"Presentation exchange {pres_ex_id} "
f"in {pres_ex_record.state} state "
f"(must be {V20PresExRecord.STATE_PRESENTATION_RECEIVED})"
)
)
pres_manager = V20PresManager(profile)
try:
pres_ex_record = await pres_manager.verify_pres(pres_ex_record)
result = pres_ex_record.serialize()
except (BaseModelError, LedgerError, StorageError) as err:
if pres_ex_record:
async with profile.session() as session:
await pres_ex_record.save_error_state(session, reason=err.roll_up)
# other party cares that we cannot continue protocol
await report_problem(
err,
ProblemReportReason.ABANDONED.value,
web.HTTPBadRequest,
pres_ex_record,
outbound_handler,
)
trace_event(
context.settings,
pres_ex_record,
outcome="presentation_exchange_verify.END",
perf_counter=r_time,
)
return web.json_response(result)
@docs(
tags=["present-proof v2.0"],
summary="Send a problem report for presentation exchange",
)
@match_info_schema(V20PresExIdMatchInfoSchema())
@request_schema(V20PresProblemReportRequestSchema())
@response_schema(V20PresentProofModuleResponseSchema(), 200, description="")
async def present_proof_problem_report(request: web.BaseRequest):
"""
Request handler for sending problem report.
Args:
request: aiohttp request object
"""
context: AdminRequestContext = request["context"]
outbound_handler = request["outbound_message_router"]
pres_ex_id = request.match_info["pres_ex_id"]
body = await request.json()
description = body["description"]
try:
async with context.profile.session() as session:
pres_ex_record = await V20PresExRecord.retrieve_by_id(session, pres_ex_id)
await pres_ex_record.save_error_state(
session,
reason=f"created problem report: {description}",
)
report = problem_report_for_record(pres_ex_record, description)
except StorageNotFoundError as err: # other party does not care about meta-problems
raise web.HTTPNotFound(reason=err.roll_up) from err
except StorageError as err:
raise web.HTTPBadRequest(reason=err.roll_up)
await outbound_handler(report, connection_id=pres_ex_record.connection_id)
return web.json_response({})
@docs(
tags=["present-proof v2.0"],
summary="Remove an existing presentation exchange record",
)
@match_info_schema(V20PresExIdMatchInfoSchema())
@response_schema(V20PresentProofModuleResponseSchema(), description="")
async def present_proof_remove(request: web.BaseRequest):
"""
Request handler for removing a presentation exchange record.
Args:
request: aiohttp request object
"""
context: AdminRequestContext = request["context"]
pres_ex_id = request.match_info["pres_ex_id"]
pres_ex_record = None
try:
async with context.profile.session() as session:
try:
pres_ex_record = await V20PresExRecord.retrieve_by_id(
session, pres_ex_id
)
await pres_ex_record.delete_record(session)
except (BaseModelError, ValidationError):
storage = session.inject(BaseStorage)
storage_record = await storage.get_record(
record_type=V20PresExRecord.RECORD_TYPE, record_id=pres_ex_id
)
await storage.delete_record(storage_record)
except StorageNotFoundError as err:
raise web.HTTPNotFound(reason=err.roll_up) from err
except StorageError as err:
raise web.HTTPBadRequest(reason=err.roll_up)
return web.json_response({})
[docs]async def register(app: web.Application):
"""Register routes."""
app.add_routes(
[
web.get(
"/present-proof-2.0/records",
present_proof_list,
allow_head=False,
),
web.get(
"/present-proof-2.0/records/{pres_ex_id}",
present_proof_retrieve,
allow_head=False,
),
web.get(
"/present-proof-2.0/records/{pres_ex_id}/credentials",
present_proof_credentials_list,
allow_head=False,
),
web.post(
"/present-proof-2.0/send-proposal",
present_proof_send_proposal,
),
web.post(
"/present-proof-2.0/create-request",
present_proof_create_request,
),
web.post(
"/present-proof-2.0/send-request",
present_proof_send_free_request,
),
web.post(
"/present-proof-2.0/records/{pres_ex_id}/send-request",
present_proof_send_bound_request,
),
web.post(
"/present-proof-2.0/records/{pres_ex_id}/send-presentation",
present_proof_send_presentation,
),
web.post(
"/present-proof-2.0/records/{pres_ex_id}/verify-presentation",
present_proof_verify_presentation,
),
web.post(
"/present-proof-2.0/records/{pres_ex_id}/problem-report",
present_proof_problem_report,
),
web.delete(
"/present-proof-2.0/records/{pres_ex_id}",
present_proof_remove,
),
]
)
[docs]def post_process_routes(app: web.Application):
"""Amend swagger API."""
# Add top-level tags description
if "tags" not in app._state["swagger_dict"]:
app._state["swagger_dict"]["tags"] = []
app._state["swagger_dict"]["tags"].append(
{
"name": "present-proof v2.0",
"description": "Proof presentation v2.0",
"externalDocs": {"description": "Specification", "url": SPEC_URI},
}
)