Source code for aries_cloudagent.protocols.endorse_transaction.v1_0.manager

"""Class to manage transactions."""

import json
import logging
import uuid

from asyncio import shield

from ....connections.models.conn_record import ConnRecord
from ....core.error import BaseError
from ....core.profile import Profile
from ....indy.issuer import IndyIssuerError
from ....ledger.base import BaseLedger
from ....ledger.error import LedgerError
from ....messaging.credential_definitions.util import notify_cred_def_event
from ....messaging.schemas.util import notify_schema_event
from ....revocation.util import (
    notify_revocation_reg_endorsed_event,
    notify_revocation_entry_endorsed_event,
)
from ....storage.error import StorageError, StorageNotFoundError
from ....transport.inbound.receipt import MessageReceipt
from ....wallet.base import BaseWallet
from ....wallet.util import (
    notify_endorse_did_event,
    notify_endorse_did_attrib_event,
)

from .messages.cancel_transaction import CancelTransaction
from .messages.endorsed_transaction_response import EndorsedTransactionResponse
from .messages.refused_transaction_response import RefusedTransactionResponse
from .messages.transaction_acknowledgement import TransactionAcknowledgement
from .messages.transaction_job_to_send import TransactionJobToSend
from .messages.transaction_request import TransactionRequest
from .messages.transaction_resend import TransactionResend
from .models.transaction_record import TransactionRecord
from .transaction_jobs import TransactionJob


[docs]class TransactionManagerError(BaseError): """Transaction error."""
[docs]class TransactionManager: """Class for managing transactions.""" def __init__(self, profile: Profile): """ Initialize a TransactionManager. Args: session: The Profile Session for this transaction manager """ self._profile = profile self._logger = logging.getLogger(__name__) @property def profile(self) -> Profile: """ Accessor for the current Profile. Returns: The Profile for this transaction manager """ return self._profile
[docs] async def create_record( self, messages_attach: str, connection_id: str, meta_data: dict = None ): """ Create a new Transaction Record. Args: messages_attach: messages to attach, JSON-dumped connection_id: The connection_id of the ConnRecord between author and endorser Returns: The transaction Record """ messages_attach_dict = { "@id": str(uuid.uuid4()), "mime-type": "application/json", "data": {"json": messages_attach}, } transaction = TransactionRecord() formats = { "attach_id": messages_attach_dict["@id"], "format": TransactionRecord.FORMAT_VERSION, } transaction.formats.clear() transaction.formats.append(formats) transaction.messages_attach.clear() transaction.messages_attach.append(messages_attach_dict) if meta_data: transaction.meta_data = meta_data transaction.state = TransactionRecord.STATE_TRANSACTION_CREATED transaction.connection_id = connection_id async with self._profile.session() as session: await transaction.save(session, reason="Created a Transaction Record") return transaction
# todo - implementing changes for writing final transaction to the ledger # (For Sign Transaction Protocol)
[docs] async def create_request( self, transaction: TransactionRecord, signature: str = None, signed_request: dict = None, expires_time: str = None, endorser_write_txn: bool = None, author_goal_code: str = None, signer_goal_code: str = None, ): """ Create a new Transaction Request. Args: transaction: The transaction from which the request is created. expires_time: The time till which the endorser should endorse the transaction. Returns: The transaction Record and transaction request """ if transaction.state != TransactionRecord.STATE_TRANSACTION_CREATED: raise TransactionManagerError( f"Cannot create a request for transaction record" f" in state: {transaction.state}" ) transaction._type = TransactionRecord.SIGNATURE_REQUEST signature_request = { "context": TransactionRecord.SIGNATURE_CONTEXT, "method": TransactionRecord.ADD_SIGNATURE, "signature_type": TransactionRecord.SIGNATURE_TYPE, "signer_goal_code": signer_goal_code if signer_goal_code else TransactionRecord.ENDORSE_TRANSACTION, "author_goal_code": author_goal_code if author_goal_code else TransactionRecord.WRITE_TRANSACTION, } transaction.signature_request.clear() transaction.signature_request.append(signature_request) transaction.state = TransactionRecord.STATE_REQUEST_SENT timing = {"expires_time": expires_time} transaction.timing = timing transaction.endorser_write_txn = endorser_write_txn async with self._profile.session() as session: await transaction.save(session, reason="Created an endorsement request") transaction_request = TransactionRequest( transaction_id=transaction._id, signature_request=transaction.signature_request[0], timing=transaction.timing, messages_attach=transaction.messages_attach[0], endorser_write_txn=endorser_write_txn, ) return transaction, transaction_request
[docs] async def receive_request(self, request: TransactionRequest, connection_id: str): """ Receive a Transaction request. Args: request: A Transaction Request connection_id: The connection id related to this transaction record """ transaction = TransactionRecord() transaction._type = TransactionRecord.SIGNATURE_REQUEST transaction.signature_request.clear() transaction.signature_request.append(request.signature_request) transaction.timing = request.timing format = { "attach_id": request.messages_attach["@id"], "format": TransactionRecord.FORMAT_VERSION, } transaction.formats.clear() transaction.formats.append(format) transaction.messages_attach.clear() transaction.messages_attach.append(request.messages_attach) transaction.thread_id = request.transaction_id transaction.connection_id = connection_id transaction.state = TransactionRecord.STATE_REQUEST_RECEIVED transaction.endorser_write_txn = request.endorser_write_txn async with self._profile.session() as session: await transaction.save(session, reason="Received an endorsement request") return transaction
# todo - implementing changes for writing final transaction to the ledger # (For Sign Transaction Protocol)
[docs] async def create_endorse_response( self, transaction: TransactionRecord, state: str, use_endorser_did: str = None, ): """ Create a response to endorse a transaction. Args: transaction: The transaction record which would be endorsed. state: The state of the transaction record Returns: The updated transaction and an endorsed response """ if transaction.state not in ( TransactionRecord.STATE_REQUEST_RECEIVED, TransactionRecord.STATE_TRANSACTION_RESENT_RECEIEVED, ): raise TransactionManagerError( f"Cannot endorse transaction for transaction record" f" in state: {transaction.state}" ) transaction._type = TransactionRecord.SIGNATURE_RESPONSE transaction_json = transaction.messages_attach[0]["data"]["json"] ledger_response = {} async with self._profile.session() as session: wallet: BaseWallet = session.inject_or(BaseWallet) if not wallet: raise StorageError("No wallet available") endorser_did_info = None override_did = ( use_endorser_did if use_endorser_did else session.context.settings.get_value( "endorser.endorser_endorse_with_did" ) ) if override_did: endorser_did_info = await wallet.get_local_did(override_did) else: endorser_did_info = await wallet.get_public_did() if not endorser_did_info: raise StorageError( "Transaction cannot be endorsed as there is no Public DID in wallet " "or Endorser DID specified" ) endorser_did = endorser_did_info.did endorser_verkey = endorser_did_info.verkey ledger = self._profile.context.inject_or(BaseLedger) if not ledger: reason = "No ledger available" if not self._profile.context.settings.get_value("wallet.type"): reason += ": missing wallet-type?" raise LedgerError(reason=reason) async with ledger: # check our goal code! txn_goal_code = ( transaction.signature_request[0]["signer_goal_code"] if transaction.signature_request and "signer_goal_code" in transaction.signature_request[0] else TransactionRecord.ENDORSE_TRANSACTION ) if txn_goal_code == TransactionRecord.ENDORSE_TRANSACTION: endorsed_msg = await shield( ledger.txn_endorse(transaction_json, endorse_did=endorser_did_info) ) elif txn_goal_code == TransactionRecord.WRITE_DID_TRANSACTION: # get DID info from transaction.meta_data meta_data = json.loads(transaction_json) (success, txn) = await shield( ledger.register_nym( meta_data["did"], meta_data["verkey"], meta_data["alias"], meta_data["role"], ) ) # we don't have an endorsed transaction so just return did meta-data ledger_response = { "result": { "txn": {"type": "1", "data": {"dest": meta_data["did"]}} }, "meta_data": meta_data, } endorsed_msg = json.dumps(ledger_response) else: raise TransactionManagerError( f"Invalid goal code for transaction record:" f" {txn_goal_code}" ) # need to return the endorsed msg or else the ledger will reject the # eventual transaction write transaction.messages_attach[0]["data"]["json"] = endorsed_msg signature_response = { "message_id": transaction.messages_attach[0]["@id"], "context": TransactionRecord.SIGNATURE_CONTEXT, "method": TransactionRecord.ADD_SIGNATURE, "signer_goal_code": txn_goal_code, "signature_type": TransactionRecord.SIGNATURE_TYPE, "signature": {endorser_did: endorsed_msg or endorser_verkey}, } transaction.signature_response.clear() transaction.signature_response.append(signature_response) transaction.state = state async with self._profile.session() as session: await transaction.save(session, reason="Created an endorsed response") if ( transaction.endorser_write_txn and txn_goal_code == TransactionRecord.ENDORSE_TRANSACTION ): # running as the endorser, we've been asked to write the transaction ledger_response = await self.complete_transaction(transaction, True) endorsed_transaction_response = EndorsedTransactionResponse( transaction_id=transaction.thread_id, thread_id=transaction._id, signature_response=signature_response, state=TransactionRecord.STATE_TRANSACTION_ACKED, endorser_did=endorser_did, ledger_response=ledger_response, ) return transaction, endorsed_transaction_response endorsed_transaction_response = EndorsedTransactionResponse( transaction_id=transaction.thread_id, thread_id=transaction._id, signature_response=signature_response, state=state, endorser_did=endorser_did, ledger_response=ledger_response, ) return transaction, endorsed_transaction_response
[docs] async def receive_endorse_response(self, response: EndorsedTransactionResponse): """ Update the transaction record with the endorsed response. Args: response: The Endorsed Transaction Response """ async with self._profile.session() as session: transaction = await TransactionRecord.retrieve_by_id( session, response.transaction_id ) transaction._type = TransactionRecord.SIGNATURE_RESPONSE transaction.state = response.state transaction.signature_response.clear() transaction.signature_response.append(response.signature_response) transaction.thread_id = response.thread_id # the returned signature is actually the endorsed ledger transaction endorser_did = response.endorser_did transaction.messages_attach[0]["data"]["json"] = response.signature_response[ "signature" ][endorser_did] async with self._profile.session() as session: await transaction.save(session, reason="Received an endorsed response") # this scenario is where the author has asked the endorser to write the ledger if transaction.endorser_write_txn: connection_id = transaction.connection_id async with self._profile.session() as session: connection_record = await ConnRecord.retrieve_by_id( session, connection_id ) await self.endorsed_txn_post_processing( transaction, response.ledger_response, connection_record ) return transaction
[docs] async def complete_transaction( self, transaction: TransactionRecord, endorser: bool = False ): """ Complete a transaction. This is the final state where the received ledger transaction is written to the ledger. Args: transaction: The transaction record which would be completed Returns: The updated transaction """ ledger_transaction = transaction.messages_attach[0]["data"]["json"] # check if we (author) have requested the endorser to write the transaction if (endorser and transaction.endorser_write_txn) or ( (not endorser) and (not transaction.endorser_write_txn) ): ledger = self._profile.inject(BaseLedger) if not ledger: reason = "No ledger available" if not self._profile.context.settings.get_value("wallet.type"): reason += ": missing wallet-type?" raise TransactionManagerError(reason) async with ledger: try: ledger_response_json = await shield( ledger.txn_submit( ledger_transaction, sign=False, taa_accept=False ) ) except (IndyIssuerError, LedgerError) as err: raise TransactionManagerError(err.roll_up) from err ledger_response = json.loads(ledger_response_json) else: ledger_response = ledger_transaction transaction.state = TransactionRecord.STATE_TRANSACTION_ACKED async with self._profile.session() as session: await transaction.save(session, reason="Completed transaction") # this scenario is where the endorser is writing the transaction # (called from self.create_endorse_response()) if endorser and transaction.endorser_write_txn: return ledger_response connection_id = transaction.connection_id async with self._profile.session() as session: connection_record = await ConnRecord.retrieve_by_id(session, connection_id) jobs = await connection_record.metadata_get(session, "transaction_jobs") if not jobs: raise TransactionManagerError( "The transaction related jobs are not set up in " "connection metadata for this connection record" ) if "transaction_my_job" not in jobs.keys(): raise TransactionManagerError( 'The "transaction_my_job" is not set in "transaction_jobs"' " in connection metadata for this connection record" ) if jobs["transaction_my_job"] == TransactionJob.TRANSACTION_AUTHOR.name: # the author write the endorsed transaction to the ledger await self.endorsed_txn_post_processing( transaction, ledger_response, connection_record ) transaction_acknowledgement_message = TransactionAcknowledgement( thread_id=transaction._id ) elif jobs["transaction_my_job"] == TransactionJob.TRANSACTION_ENDORSER.name: transaction_acknowledgement_message = TransactionAcknowledgement( thread_id=transaction._id, ledger_response=ledger_response ) return transaction, transaction_acknowledgement_message
[docs] async def receive_transaction_acknowledgement( self, response: TransactionAcknowledgement, connection_id: str ): """ Update the transaction record after receiving the transaction acknowledgement. Args: response: The transaction acknowledgement connection_id: The connection_id related to this Transaction Record """ async with self._profile.session() as session: transaction = await TransactionRecord.retrieve_by_connection_and_thread( session, connection_id, response.thread_id ) if transaction.state != TransactionRecord.STATE_TRANSACTION_ENDORSED: raise TransactionManagerError( "Only an endorsed transaction can be written to the ledger." ) transaction.state = TransactionRecord.STATE_TRANSACTION_ACKED async with self._profile.session() as session: await transaction.save(session, reason="Received a transaction ack") connection_id = transaction.connection_id try: async with self._profile.session() as session: connection_record = await ConnRecord.retrieve_by_id( session, connection_id ) jobs = await connection_record.metadata_get(session, "transaction_jobs") except StorageNotFoundError as err: raise TransactionManagerError(err.roll_up) from err if not jobs: raise TransactionManagerError( "The transaction related jobs are not set up in " "connection metadata for this connection record" ) if "transaction_my_job" not in jobs.keys(): raise TransactionManagerError( 'The "transaction_my_job" is not set in "transaction_jobs"' " in connection metadata for this connection record" ) if jobs["transaction_my_job"] == TransactionJob.TRANSACTION_AUTHOR.name: # store the related non-secrets record in our wallet await self.endorsed_txn_post_processing( transaction, response.ledger_response, connection_record ) return transaction
[docs] async def create_refuse_response( self, transaction: TransactionRecord, state: str, refuser_did: str ): """ Create a response to refuse a transaction. Args: transaction: The transaction record which would be refused state: The state of the transaction record Returns: The updated transaction and the refused response """ if transaction.state not in ( TransactionRecord.STATE_REQUEST_RECEIVED, TransactionRecord.STATE_TRANSACTION_RESENT_RECEIEVED, ): raise TransactionManagerError( f"Cannot refuse transaction for transaction record" f" in state: {transaction.state}" ) transaction._type = TransactionRecord.SIGNATURE_RESPONSE signature_response = { "message_id": transaction.messages_attach[0]["@id"], "context": TransactionRecord.SIGNATURE_CONTEXT, "method": TransactionRecord.ADD_SIGNATURE, "signer_goal_code": TransactionRecord.REFUSE_TRANSACTION, } transaction.signature_response.clear() transaction.signature_response.append(signature_response) transaction.state = state async with self._profile.session() as session: await transaction.save(session, reason="Created a refused response") refused_transaction_response = RefusedTransactionResponse( transaction_id=transaction.thread_id, thread_id=transaction._id, signature_response=signature_response, state=state, endorser_did=refuser_did, ) return transaction, refused_transaction_response
[docs] async def receive_refuse_response(self, response: RefusedTransactionResponse): """ Update the transaction record with a refused response. Args: response: The refused transaction response """ async with self._profile.session() as session: transaction = await TransactionRecord.retrieve_by_id( session, response.transaction_id ) transaction._type = TransactionRecord.SIGNATURE_RESPONSE transaction.state = response.state transaction.signature_response.clear() transaction.signature_response.append(response.signature_response) transaction.thread_id = response.thread_id async with self._profile.session() as session: await transaction.save(session, reason="Received a refused response") return transaction
[docs] async def cancel_transaction(self, transaction: TransactionRecord, state: str): """ Cancel a Transaction Request. Args: transaction: The transaction record which would be cancelled state: The state of the transaction record Returns: The updated transaction and the cancelled transaction response """ if transaction.state not in ( TransactionRecord.STATE_REQUEST_SENT, TransactionRecord.STATE_TRANSACTION_RESENT, ): raise TransactionManagerError( f"Cannot cancel transaction as transaction is" f" in state: {transaction.state}" ) transaction.state = state async with self._profile.session() as session: await transaction.save(session, reason="Cancelled the transaction") cancelled_transaction_response = CancelTransaction( state=state, thread_id=transaction._id ) return transaction, cancelled_transaction_response
[docs] async def receive_cancel_transaction( self, response: CancelTransaction, connection_id: str ): """ Update the transaction record to cancel a transaction request. Args: response: The cancel transaction response connection_id: The connection_id related to this Transaction Record """ async with self._profile.session() as session: transaction = await TransactionRecord.retrieve_by_connection_and_thread( session, connection_id, response.thread_id ) transaction.state = response.state async with self._profile.session() as session: await transaction.save(session, reason="Received a cancel request") return transaction
[docs] async def transaction_resend(self, transaction: TransactionRecord, state: str): """ Resend a transaction request. Args: transaction: The transaction record which needs to be resend state: the state of the transaction record Returns: The updated transaction and the resend response """ if transaction.state not in ( TransactionRecord.STATE_TRANSACTION_REFUSED, TransactionRecord.STATE_TRANSACTION_CANCELLED, ): raise TransactionManagerError( f"Cannot resend transaction as transaction is" f" in state: {transaction.state}" ) transaction.state = state async with self._profile.session() as session: await transaction.save(session, reason="Resends the transaction request") resend_transaction_response = TransactionResend( state=TransactionRecord.STATE_TRANSACTION_RESENT_RECEIEVED, thread_id=transaction._id, ) return transaction, resend_transaction_response
[docs] async def receive_transaction_resend( self, response: TransactionResend, connection_id: str ): """ Update the transaction with a resend request. Args: response: The Resend transaction response connection_id: The connection_id related to this Transaction Record """ async with self._profile.session() as session: transaction = await TransactionRecord.retrieve_by_connection_and_thread( session, connection_id, response.thread_id ) transaction.state = response.state async with self._profile.session() as session: await transaction.save(session, reason="Receives a transaction request") return transaction
[docs] async def set_transaction_my_job(self, record: ConnRecord, transaction_my_job: str): """ Set transaction_my_job. Args: record: The connection record in which to set transaction jobs transaction_my_job: My transaction job Returns: The transaction job that is send to other agent """ async with self._profile.session() as session: value = await record.metadata_get(session, "transaction_jobs") if value: value["transaction_my_job"] = transaction_my_job else: value = {"transaction_my_job": transaction_my_job} await record.metadata_set(session, key="transaction_jobs", value=value) tx_job_to_send = TransactionJobToSend(job=transaction_my_job) return tx_job_to_send
[docs] async def set_transaction_their_job( self, tx_job_received: TransactionJobToSend, receipt: MessageReceipt ): """ Set transaction_their_job. Args: tx_job_received: The transaction job that is received from the other agent receipt: The Message Receipt Object """ try: async with self._profile.session() as session: connection = await ConnRecord.retrieve_by_did( session, receipt.sender_did, receipt.recipient_did ) value = await connection.metadata_get(session, "transaction_jobs") if value: value["transaction_their_job"] = tx_job_received.job else: value = {"transaction_their_job": tx_job_received.job} await connection.metadata_set( session, key="transaction_jobs", value=value ) except StorageNotFoundError as err: raise TransactionManagerError(err.roll_up) from err
[docs] async def endorsed_txn_post_processing( self, transaction: TransactionRecord, ledger_response: dict = None, connection_record: ConnRecord = None, ): """ Store record in wallet, and kick off any required post-processing. Args: transaction: The transaction from which the schema/cred_def would be stored in wallet. """ if isinstance(ledger_response, str): ledger_response = json.loads(ledger_response) ledger = self._profile.inject(BaseLedger) if not ledger: reason = "No ledger available" if not self._profile.context.settings.get_value("wallet.type"): reason += ": missing wallet-type?" raise TransactionManagerError(reason) # setup meta_data to pass to future events, if necessary meta_data = transaction.meta_data meta_data["endorser"] = { "connection_id": transaction.connection_id, } # write the wallet non-secrets record if ledger_response["result"]["txn"]["type"] == "101": # schema transaction schema_id = ledger_response["result"]["txnMetadata"]["txnId"] public_did = ledger_response["result"]["txn"]["metadata"]["from"] meta_data["context"]["schema_id"] = schema_id meta_data["context"]["public_did"] = public_did # Notify schema ledger write event await notify_schema_event(self._profile, schema_id, meta_data) elif ledger_response["result"]["txn"]["type"] == "102": # cred def transaction async with ledger: try: schema_seq_no = str(ledger_response["result"]["txn"]["data"]["ref"]) schema_response = await shield(ledger.get_schema(schema_seq_no)) except (IndyIssuerError, LedgerError) as err: raise TransactionManagerError(err.roll_up) from err schema_id = schema_response["id"] cred_def_id = ledger_response["result"]["txnMetadata"]["txnId"] issuer_did = ledger_response["result"]["txn"]["metadata"]["from"] meta_data["context"]["schema_id"] = schema_id meta_data["context"]["cred_def_id"] = cred_def_id meta_data["context"]["issuer_did"] = issuer_did # Notify event await notify_cred_def_event(self._profile, cred_def_id, meta_data) elif ledger_response["result"]["txn"]["type"] == "113": # revocation registry transaction rev_reg_id = ledger_response["result"]["txnMetadata"]["txnId"] meta_data["context"]["rev_reg_id"] = rev_reg_id await notify_revocation_reg_endorsed_event( self._profile, rev_reg_id, meta_data ) elif ledger_response["result"]["txn"]["type"] == "114": # revocation entry transaction rev_reg_id = ledger_response["result"]["txn"]["data"]["revocRegDefId"] meta_data["context"]["rev_reg_id"] = rev_reg_id await notify_revocation_entry_endorsed_event( self._profile, rev_reg_id, meta_data ) elif ledger_response["result"]["txn"]["type"] == "1": # write DID to ledger did = ledger_response["result"]["txn"]["data"]["dest"] await notify_endorse_did_event(self._profile, did, meta_data) elif ledger_response["result"]["txn"]["type"] == "100": # write DID ATTRIB to ledger did = ledger_response["result"]["txn"]["data"]["dest"] await notify_endorse_did_attrib_event(self._profile, did, meta_data) else: # TODO unknown ledger transaction type, just ignore for now ... pass