New: add robust support for tx cancellation

Also greatly improved handling of expired and failed transactions.
This commit is contained in:
Kevin Gorham 2020-08-01 02:13:39 -04:00
parent 528442fa0a
commit bb77fa2d5d
No known key found for this signature in database
GPG Key ID: CCA55602DF49FC38
11 changed files with 490 additions and 92 deletions

View File

@ -0,0 +1,114 @@
package cash.z.ecc.android.sdk.transaction
import androidx.test.ext.junit.runners.AndroidJUnit4
import androidx.test.platform.app.InstrumentationRegistry
import cash.z.ecc.android.sdk.db.entity.EncodedTransaction
import cash.z.ecc.android.sdk.db.entity.PendingTransaction
import cash.z.ecc.android.sdk.db.entity.isCancelled
import cash.z.ecc.android.sdk.ext.ScopedTest
import cash.z.ecc.android.sdk.ext.TroubleshootingTwig
import cash.z.ecc.android.sdk.ext.Twig
import cash.z.ecc.android.sdk.ext.twig
import cash.z.ecc.android.sdk.service.LightWalletService
import com.nhaarman.mockitokotlin2.any
import com.nhaarman.mockitokotlin2.stub
import kotlinx.coroutines.cancel
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.drop
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import org.junit.Assert.*
import org.junit.Before
import org.junit.BeforeClass
import org.junit.Test
import org.junit.runner.RunWith
import org.mockito.Mock
import org.mockito.MockitoAnnotations
@RunWith(AndroidJUnit4::class)
class PersistentTransactionManagerTest : ScopedTest() {
@Mock lateinit var mockEncoder: TransactionEncoder
@Mock lateinit var mockService: LightWalletService
val pendingDbName = "PersistentTxMgrTest_Pending.db"
val dataDbName = "PersistentTxMgrTest_Data.db"
private val context = InstrumentationRegistry.getInstrumentation().context
private lateinit var manager: OutboundTransactionManager
@Before
fun setup() {
initMocks()
deleteDb()
manager = PersistentTransactionManager(context, mockEncoder, mockService, pendingDbName)
}
private fun deleteDb() {
context.getDatabasePath(pendingDbName).delete()
}
private fun initMocks() {
MockitoAnnotations.initMocks(this)
mockEncoder.stub {
onBlocking {
createTransaction(any(), any(), any(), any(), any())
}.thenAnswer { invocation ->
runBlocking {
delay(200)
EncodedTransaction(byteArrayOf(1,2,3), byteArrayOf(8,9), 5_000_000)
}
}
}
}
@Test
fun testCancellation_RaceCondition() = runBlocking {
val tx = manager.initSpend(1234, "taddr", "memo-good", 0)
val txFlow = manager.monitorById(tx.id)
// encode TX
testScope.launch {
twig("ENCODE: start"); manager.encode("fookey", tx); twig("ENCODE: end")
}
// then cancel it before it is done encoding
testScope.launch {
delay(100)
twig("CANCEL: start"); manager.cancel(tx.id); twig("CANCEL: end")
}
txFlow.drop(2).onEach {
twig("found tx: $it")
assertTrue("Expected the encoded tx to be cancelled but it wasn't", it.isCancelled())
twig("found it to be successfully cancelled")
testScope.cancel()
}.launchIn(testScope).join()
}
@Test
fun testCancel() = runBlocking {
var tx = manager.initSpend(1234, "a", "b", 0)
assertFalse(tx.isCancelled())
manager.cancel(tx.id)
tx = manager.findById(tx.id)!!
assertTrue("Transaction was not cancelled", tx.isCancelled())
}
@Test
fun testAbort() = runBlocking {
var tx: PendingTransaction? = manager.initSpend(1234, "a", "b", 0)
assertNotNull(tx)
manager.abort(tx!!)
tx = manager.findById(tx.id)
assertNull("Transaction was not removed from the DB", tx)
}
companion object {
@BeforeClass
fun init() {
Twig.plant(TroubleshootingTwig())
}
}
}

View File

@ -1,25 +1,25 @@
package cash.z.ecc.android.sdk
import cash.z.ecc.android.sdk.validate.AddressType
import cash.z.ecc.android.sdk.validate.AddressType.Shielded
import cash.z.ecc.android.sdk.validate.AddressType.Transparent
import cash.z.ecc.android.sdk.validate.ConsensusMatchType
import cash.z.ecc.android.sdk.Synchronizer.Status.*
import cash.z.ecc.android.sdk.block.CompactBlockDbStore
import cash.z.ecc.android.sdk.block.CompactBlockDownloader
import cash.z.ecc.android.sdk.block.CompactBlockProcessor
import cash.z.ecc.android.sdk.block.CompactBlockProcessor.State.*
import cash.z.ecc.android.sdk.block.CompactBlockProcessor.WalletBalance
import cash.z.ecc.android.sdk.block.CompactBlockStore
import cash.z.ecc.android.sdk.db.entity.*
import cash.z.ecc.android.sdk.exception.SynchronizerException
import cash.z.ecc.android.sdk.ext.*
import cash.z.ecc.android.sdk.service.LightWalletGrpcService
import cash.z.ecc.android.sdk.service.LightWalletService
import cash.z.ecc.android.sdk.transaction.*
import cash.z.ecc.android.sdk.transaction.OutboundTransactionManager
import cash.z.ecc.android.sdk.transaction.PagedTransactionRepository
import cash.z.ecc.android.sdk.transaction.PersistentTransactionManager
import cash.z.ecc.android.sdk.transaction.TransactionRepository
import cash.z.ecc.android.sdk.validate.AddressType
import cash.z.ecc.android.sdk.validate.AddressType.Shielded
import cash.z.ecc.android.sdk.validate.AddressType.Transparent
import cash.z.ecc.android.sdk.validate.ConsensusMatchType
import cash.z.wallet.sdk.rpc.Service
import io.grpc.ManagedChannel
import kotlinx.coroutines.*
import kotlinx.coroutines.Dispatchers.IO
import kotlinx.coroutines.channels.ConflatedBroadcastChannel
import kotlinx.coroutines.flow.*
import kotlin.coroutines.CoroutineContext
@ -357,11 +357,15 @@ class SdkSynchronizer internal constructor(
}
}
private suspend fun refreshPendingTransactions() {
private suspend fun refreshPendingTransactions() = withContext(IO) {
twig("[cleanup] beginning to refresh and clean up pending transactions")
// TODO: this would be the place to clear out any stale pending transactions. Remove filter
// logic and then delete any pending transaction with sufficient confirmations (all in one
// db transaction).
txManager.getAll().first().filter { it.isSubmitSuccess() && !it.isMined() }
val allPendingTxs = txManager.getAll().first()
val lastScannedHeight = storage.lastScannedHeight()
allPendingTxs.filter { it.isSubmitSuccess() && !it.isMined() }
.forEach { pendingTx ->
twig("checking for updates on pendingTx id: ${pendingTx.id}")
pendingTx.rawTransactionId?.let { rawId ->
@ -374,14 +378,55 @@ class SdkSynchronizer internal constructor(
}
}
}
twig("[cleanup] beginning to cleanup cancelled transactions")
var hasCleaned = false
// Experimental: cleanup cancelled transactions
allPendingTxs.filter { it.isCancelled() && it.hasRawTransactionId() }.let { cancellable ->
cancellable.forEachIndexed { index, pendingTx ->
twig("[cleanup] FOUND (${index + 1} of ${cancellable.size})" +
" CANCELLED pendingTxId: ${pendingTx.id}")
hasCleaned = hasCleaned || cleanupCancelledTx(pendingTx)
}
}
twig("[cleanup] beginning to cleanup expired transactions")
// Experimental: cleanup expired transactions
// note: don't delete the pendingTx until the related data has been scrubbed, or else you
// lose the thing that identifies the other data as invalid
// so we first mark the data for deletion, during the previous "cleanup" step, by removing
// the thing that we're trying to preserve to signal we no longer need it
// sometimes apps crash or things go wrong and we get an orphaned pendingTx that we'll poll
// forever, so maybe just get rid of all of them after a long while
allPendingTxs.filter { (it.isExpired(lastScannedHeight) && it.isMarkedForDeletion())
|| it.isLongExpired(lastScannedHeight) || it.isSafeToDiscard() }
.forEach {
val result = txManager.abort(it)
twig("[cleanup] FOUND EXPIRED pendingTX (lastScanHeight: $lastScannedHeight expiryHeight: ${it.expiryHeight}): and ${it.id} ${if (result > 0) "successfully removed" else "failed to remove"} it")
}
twig("[cleanup] deleting expired transactions from storage")
hasCleaned = hasCleaned || (storage.deleteExpired(lastScannedHeight) > 0)
if (hasCleaned) refreshBalance()
twig("[cleanup] done refreshing and cleaning up pending transactions")
}
private suspend fun cleanupCancelledTx(pendingTx: PendingTransaction): Boolean {
return if (storage.cleanupCancelledTx(pendingTx.rawTransactionId!!)) {
txManager.markForDeletion(pendingTx.id)
true
} else {
twig("[cleanup] no matching tx was cleaned so the pendingTx will not be marked for deletion")
false
}
}
//
// Send / Receive
//
override suspend fun cancelSpend(transaction: PendingTransaction) = txManager.cancel(transaction)
override suspend fun cancelSpend(pendingId: Long) = txManager.cancel(pendingId)
override suspend fun getAddress(accountId: Int): String = processor.getAddress(accountId)
@ -397,7 +442,12 @@ class SdkSynchronizer internal constructor(
txManager.initSpend(zatoshi, toAddress, memo, fromAccountIndex).let { placeHolderTx ->
emit(placeHolderTx)
txManager.encode(spendingKey, placeHolderTx).let { encodedTx ->
if (!encodedTx.isFailedEncoding() && !encodedTx.isCancelled()) {
// only submit if it wasn't cancelled. Otherwise cleanup, immediately for best UX.
if (encodedTx.isCancelled()) {
twig("[cleanup] this tx has been cancelled so we will cleanup instead of submitting")
if (cleanupCancelledTx(encodedTx)) refreshBalance()
encodedTx
} else {
txManager.submit(encodedTx)
}
}
@ -407,7 +457,8 @@ class SdkSynchronizer internal constructor(
txManager.monitorById(it.id)
}.distinctUntilChanged()
override suspend fun isValidShieldedAddr(address: String) = txManager.isValidShieldedAddress(address)
override suspend fun isValidShieldedAddr(address: String) =
txManager.isValidShieldedAddress(address)
override suspend fun isValidTransparentAddr(address: String) =
txManager.isValidTransparentAddress(address)

View File

@ -208,11 +208,11 @@ interface Synchronizer {
* Attempts to cancel a transaction that is about to be sent. Typically, cancellation is only
* an option if the transaction has not yet been submitted to the server.
*
* @param transaction the transaction to cancel.
* @param pendingId the id of the PendingTransaction to cancel.
*
* @return true when the cancellation request was successful. False when it is too late.
*/
suspend fun cancelSpend(transaction: PendingTransaction): Boolean
suspend fun cancelSpend(pendingId: Long): Boolean
/**
* Convenience function that exposes the underlying server information, like its name and

View File

@ -1,13 +1,12 @@
package cash.z.ecc.android.sdk.db
import androidx.paging.DataSource
import androidx.room.Dao
import androidx.room.Database
import androidx.room.Query
import androidx.room.RoomDatabase
import androidx.room.*
import androidx.room.Transaction
import androidx.room.migration.Migration
import androidx.sqlite.db.SupportSQLiteDatabase
import cash.z.ecc.android.sdk.db.entity.*
import cash.z.ecc.android.sdk.ext.twig
//
// Database
@ -183,7 +182,8 @@ interface TransactionDao {
@Query("""
SELECT transactions.txid AS txId,
transactions.raw AS raw
transactions.raw AS raw,
transactions.expiry_height AS expiryHeight
FROM transactions
WHERE id_tx = :id AND raw is not null
""")
@ -334,4 +334,108 @@ interface TransactionDao {
""")
suspend fun findAllTransactionsByRange(blockRangeStart: Int, blockRangeEnd: Int = blockRangeStart, limit: Int = Int.MAX_VALUE): List<ConfirmedTransaction>
// Experimental: cleanup cancelled transactions
// This should probably be a rust call but there's not a lot of bandwidth for this
// work to happen in librustzcash. So prove the concept on our side, first
// then move the logic to the right place. Especially since the data access API is
// coming soon
@Transaction
suspend fun cleanupCancelledTx(rawTransactionId: ByteArray): Boolean {
var success = false
try {
var hasInitialMatch = false
var hasFinalMatch = true
twig("[cleanup] cleanupCancelledTx starting...")
findUnminedTransactionIds(rawTransactionId).also {
twig("[cleanup] cleanupCancelledTx found ${it.size} matching transactions to cleanup")
}.forEach { transactionId ->
hasInitialMatch = true
removeInvalidTransaction(transactionId)
}
hasFinalMatch = findMatchingTransactionId(rawTransactionId) != null
success = hasInitialMatch && !hasFinalMatch
twig("[cleanup] cleanupCancelledTx Done. success? $success")
} catch (t: Throwable) {
twig("[cleanup] failed to cleanup transaction due to: $t")
}
return success
}
@Transaction
suspend fun removeInvalidTransaction(transactionId: Long): Boolean {
var success = false
try {
twig("[cleanup] removing invalid transactionId:$transactionId")
val result = unspendTransactionNotes(transactionId)
twig("[cleanup] unspent ($result) notes matching transaction $transactionId")
findSentNoteIds(transactionId)?.forEach { noteId ->
twig("[cleanup] WARNING: deleting invalid sent noteId:$noteId")
deleteSentNote(noteId)
}
twig("[cleanup] WARNING: deleting invalid transactionId $transactionId")
success = deleteTransaction(transactionId) != 0
twig("[cleanup] removeInvalidTransaction Done. success? $success")
} catch (t: Throwable) {
twig("[cleanup] failed to remove Invalid Transaction due to: $t")
}
return success
}
@Transaction
suspend fun deleteExpired(lastHeight: Int): Int {
var count = 0
findExpiredTxs(lastHeight).forEach { transactionId ->
if (removeInvalidTransaction(transactionId)) count++
}
return count
}
//
// Private-ish functions (these will move to rust, or the data access API eventually)
//
@Query("""
SELECT transactions.id_tx AS id
FROM transactions
WHERE txid = :rawTransactionId
AND block IS NULL
""")
fun findUnminedTransactionIds(rawTransactionId: ByteArray): List<Long>
@Query("""
SELECT transactions.id_tx AS id
FROM transactions
WHERE txid = :rawTransactionId
LIMIT 1
""")
suspend fun findMatchingTransactionId(rawTransactionId: ByteArray): Long?
@Query("""
SELECT sent_notes.id_note AS id
FROM sent_notes
WHERE tx = :transactionId
""")
fun findSentNoteIds(transactionId: Long): List<Int>?
@Query("DELETE FROM sent_notes WHERE id_note = :id")
fun deleteSentNote(id: Int): Int
@Query("DELETE FROM transactions WHERE id_tx = :id")
fun deleteTransaction(id: Long): Int
@Query("UPDATE received_notes SET spent = null WHERE spent = :transactionId")
fun unspendTransactionNotes(transactionId: Long): Int
@Query("""
SELECT transactions.id_tx
FROM transactions
WHERE created IS NOT NULL
AND block IS NULL
AND tx_index IS NULL
AND expiry_height < :lastheight
""")
suspend fun findExpiredTxs(lastheight: Int): List<Long>
}

View File

@ -43,7 +43,7 @@ interface PendingTransactionDao {
suspend fun update(transaction: PendingTransactionEntity)
@Delete
suspend fun delete(transaction: PendingTransactionEntity)
suspend fun delete(transaction: PendingTransactionEntity): Int
@Query("UPDATE pending_transactions SET cancelled = 1 WHERE id = :id")
suspend fun cancel(id: Long)
@ -56,6 +56,29 @@ interface PendingTransactionDao {
@Query("SELECT * FROM pending_transactions WHERE id = :id")
fun monitorById(id: Long): Flow<PendingTransactionEntity>
//
// Update helper functions
//
@Query("UPDATE pending_transactions SET rawTransactionId = null WHERE id = :id")
suspend fun removeRawTransactionId(id: Long)
@Query("UPDATE pending_transactions SET minedHeight = :minedHeight WHERE id = :id")
suspend fun updateMinedHeight(id: Long, minedHeight: Int)
@Query("UPDATE pending_transactions SET raw = :raw, rawTransactionId = :rawTransactionId, expiryHeight = :expiryHeight WHERE id = :id")
suspend fun updateEncoding(id: Long, raw: ByteArray, rawTransactionId: ByteArray, expiryHeight: Int?)
@Query("UPDATE pending_transactions SET errorMessage = :errorMessage, errorCode = :errorCode WHERE id = :id")
suspend fun updateError(id: Long, errorMessage: String?, errorCode: Int?)
@Query("UPDATE pending_transactions SET encodeAttempts = :attempts WHERE id = :id")
suspend fun updateEncodeAttempts(id: Long, attempts: Int)
@Query("UPDATE pending_transactions SET submitAttempts = :attempts WHERE id = :id")
suspend fun updateSubmitAttempts(id: Long, attempts: Int)
}

View File

@ -22,7 +22,7 @@ data class Sent(
val id: Int? = 0,
@ColumnInfo(name = "tx")
val transactionId: Int = 0,
val transactionId: Long = 0,
@ColumnInfo(name = "output_index")
val outputIndex: Int = 0,
@ -58,7 +58,7 @@ data class Sent(
override fun hashCode(): Int {
var result = id ?: 0
result = 31 * result + transactionId
result = 31 * result + transactionId.hashCode()
result = 31 * result + outputIndex
result = 31 * result + account
result = 31 * result + address.hashCode()

View File

@ -1,9 +1,11 @@
package cash.z.ecc.android.sdk.db.entity
import android.text.format.DateUtils
import androidx.room.ColumnInfo
import androidx.room.Entity
import androidx.room.ForeignKey
import androidx.room.PrimaryKey
import cash.z.ecc.android.sdk.ext.ZcashSdk
//
@ -208,7 +210,7 @@ data class ConfirmedTransaction(
}
}
data class EncodedTransaction(val txId: ByteArray, override val raw: ByteArray) :
data class EncodedTransaction(val txId: ByteArray, override val raw: ByteArray, val expiryHeight: Int?) :
SignedTransaction {
override fun equals(other: Any?): Boolean {
if (this === other) return true
@ -216,6 +218,7 @@ data class EncodedTransaction(val txId: ByteArray, override val raw: ByteArray)
if (!txId.contentEquals(other.txId)) return false
if (!raw.contentEquals(other.raw)) return false
if (expiryHeight != other.expiryHeight) return false
return true
}
@ -223,6 +226,7 @@ data class EncodedTransaction(val txId: ByteArray, override val raw: ByteArray)
override fun hashCode(): Int {
var result = txId.contentHashCode()
result = 31 * result + raw.contentHashCode()
result = 31 * result + (expiryHeight ?: 0)
return result
}
}
@ -293,6 +297,10 @@ fun PendingTransaction.isSameTxId(other: PendingTransaction): Boolean {
&& rawTransactionId!!.contentEquals(other.rawTransactionId!!)
}
fun PendingTransaction.hasRawTransactionId(): Boolean {
return rawTransactionId != null && (rawTransactionId?.isNotEmpty() == true)
}
fun PendingTransaction.isCreating(): Boolean {
return (raw?.isEmpty() != false) && submitAttempts <= 0 && !isFailedSubmit() && !isFailedEncoding()
}
@ -325,6 +333,42 @@ fun PendingTransaction.isSubmitted(): Boolean {
return submitAttempts > 0
}
fun PendingTransaction.isExpired(latestHeight: Int?): Boolean {
// TODO: test for off-by-one error here. Should we use <= or <
if (latestHeight == null || latestHeight < ZcashSdk.SAPLING_ACTIVATION_HEIGHT || expiryHeight < ZcashSdk.SAPLING_ACTIVATION_HEIGHT) return false
return expiryHeight < latestHeight
}
// if we don't have info on a pendingtx after 100 blocks then it's probably safe to stop polling!
fun PendingTransaction.isLongExpired(latestHeight: Int?): Boolean {
if (latestHeight == null || latestHeight < ZcashSdk.SAPLING_ACTIVATION_HEIGHT || expiryHeight < ZcashSdk.SAPLING_ACTIVATION_HEIGHT) return false
return (latestHeight - expiryHeight) > 100
}
fun PendingTransaction.isMarkedForDeletion(): Boolean {
return rawTransactionId == null && (errorCode ?: 0) == -9090
}
fun PendingTransaction.isSafeToDiscard(): Boolean {
// invalid dates shouldn't happen or should be temporary
if (createTime < 0) return false
val age = System.currentTimeMillis() - createTime
val smallThreshold = 30 * DateUtils.MINUTE_IN_MILLIS
val hugeThreshold = 30 * DateUtils.DAY_IN_MILLIS
return when {
// if it is mined, then it is not pending so it can be deleted fairly quickly from this db
isMined() && age > smallThreshold -> true
// if a tx fails to encode, then there's not much we can do with it
isFailedEncoding() && age > smallThreshold -> true
// don't delete failed submissions until they've been cleaned up, properly, or else we lose
// the ability to remove them in librustzcash prior to expiration
isFailedSubmit() && isMarkedForDeletion() -> true
!isMined() && age > hugeThreshold -> true
else -> false
}
}
fun PendingTransaction.isPending(currentHeight: Int = -1): Boolean {
// not mined and not expired and successfully created
return !isSubmitSuccess() && minedHeight == -1

View File

@ -84,6 +84,16 @@ open class PagedTransactionRepository(
transactions.findMinedHeight(rawTransactionId)
}
override suspend fun findMatchingTransactionId(rawTransactionId: ByteArray): Long? =
transactions.findMatchingTransactionId(rawTransactionId)
override suspend fun cleanupCancelledTx(rawTransactionId: ByteArray) = transactions.cleanupCancelledTx(rawTransactionId)
override suspend fun deleteExpired(lastScannedHeight: Int): Int {
// let expired transactions linger in the UI for a little while
return transactions.deleteExpired(lastScannedHeight - (ZcashSdk.EXPIRY_OFFSET/2))
}
/**
* Close the underlying database.

View File

@ -5,17 +5,17 @@ import androidx.room.Room
import androidx.room.RoomDatabase
import cash.z.ecc.android.sdk.db.PendingTransactionDao
import cash.z.ecc.android.sdk.db.PendingTransactionDb
import cash.z.ecc.android.sdk.db.entity.PendingTransaction
import cash.z.ecc.android.sdk.db.entity.PendingTransactionEntity
import cash.z.ecc.android.sdk.db.entity.isCancelled
import cash.z.ecc.android.sdk.db.entity.isSubmitted
import cash.z.ecc.android.sdk.db.entity.*
import cash.z.ecc.android.sdk.ext.twig
import cash.z.ecc.android.sdk.service.LightWalletService
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Dispatchers.IO
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import kotlinx.coroutines.withContext
import java.io.PrintWriter
import java.io.StringWriter
import kotlin.math.max
/**
@ -80,26 +80,24 @@ class PersistentTransactionManager(
accountIndex = fromAccountIndex
)
try {
twig("creating tx in DB: $tx")
pendingTransactionDao {
val insertedTx = findById(create(tx))
twig("pending transaction created with id: ${insertedTx?.id}")
tx = tx.copy(id = insertedTx!!.id)
}.also {
twig("successfully created TX in DB")
safeUpdate("creating tx in DB") {
tx = findById(create(tx))!!
twig("successfully created TX in DB with id: ${tx.id}")
}
} catch (t: Throwable) {
twig("Unknown error while attempting to create pending transaction: ${t.message}" +
" caused by: ${t.cause}")
twig(
"Unknown error while attempting to create and fetch pending transaction:" +
" ${t.message} caused by: ${t.cause}"
)
}
tx
}
override suspend fun applyMinedHeight(pendingTx: PendingTransaction, minedHeight: Int) {
(pendingTx as? PendingTransactionEntity)?.let {
twig("a pending transaction has been mined!")
safeUpdate(pendingTx.copy(minedHeight = minedHeight))
twig("a pending transaction has been mined!")
safeUpdate("updating mined height for pending tx id: ${pendingTx.id} to $minedHeight") {
updateMinedHeight(pendingTx.id, minedHeight)
}
}
@ -118,56 +116,63 @@ class PersistentTransactionManager(
tx.memo,
tx.accountIndex
)
twig("successfully encoded transaction for ${tx.memo}!!")
tx = tx.copy(raw = encodedTx.raw, rawTransactionId = encodedTx.txId)
twig("successfully encoded transaction!")
safeUpdate("updating transaction encoding") {
updateEncoding(tx.id, encodedTx.raw, encodedTx.txId, encodedTx.expiryHeight)
}
} catch (t: Throwable) {
val message = "failed to encode transaction due to : ${t.message} caused by: ${t.cause}"
twig(message)
message
tx = tx.copy(errorMessage = message, errorCode = ERROR_ENCODING)
safeUpdate("updating transaction error info") {
updateError(tx.id, message, ERROR_ENCODING)
}
} finally {
tx = tx.copy(encodeAttempts = max(1, tx.encodeAttempts + 1))
safeUpdate("incrementing transaction encodeAttempts (from: ${tx.encodeAttempts})") {
updateEncodeAttempts(tx.id, max(1, tx.encodeAttempts + 1))
tx = findById(tx.id)!!
}
}
safeUpdate(tx)
tx
}
override suspend fun submit(pendingTx: PendingTransaction): PendingTransaction = withContext(Dispatchers.IO) {
// reload the tx to check for cancellation
var storedTx = pendingTransactionDao { findById(pendingTx.id) }
var tx = pendingTransactionDao { findById(pendingTx.id) }
?: throw IllegalStateException("Error while submitting transaction. No pending" +
" transaction found that matches the one being submitted. Verify that the" +
" transaction still exists among the set of pending transactions.")
var tx = storedTx
try {
// do nothing when cancelled
if (!tx.isCancelled()) {
twig("submitting transaction with memo: ${tx.memo} amount: ${tx.value}")
val response = service.submitTransaction(tx.raw)
val error = response.errorCode < 0
twig("${if (error) "FAILURE! " else "SUCCESS!"} submit transaction completed with" +
" response: ${response.errorCode}: ${response.errorMessage}")
tx = tx.copy(
errorMessage = if (error) response.errorMessage else null,
errorCode = response.errorCode,
submitAttempts = max(1, tx.submitAttempts + 1)
)
safeUpdate(tx)
} else {
twig("Warning: ignoring cancelled transaction with id ${tx.id}")
// do nothing if failed or cancelled
when {
tx.isFailedEncoding() -> twig("Warning: this transaction will not be submitted because it failed to be encoded.")
tx.isCancelled() -> twig("Warning: ignoring cancelled transaction with id ${tx.id}. We will not submit it to the network because it has been cancelled.")
else -> {
twig("submitting transaction with memo: ${tx.memo} amount: ${tx.value}")
val response = service.submitTransaction(tx.raw)
val error = response.errorCode < 0
twig("${if (error) "FAILURE! " else "SUCCESS!"} submit transaction completed with" +
" response: ${response.errorCode}: ${response.errorMessage}")
safeUpdate("updating submitted transaction (hadError: $error)") {
updateError(tx.id, if (error) response.errorMessage else null, response.errorCode)
updateSubmitAttempts(tx.id, max(1, tx.submitAttempts + 1))
}
}
}
} catch (t: Throwable) {
// a non-server error has occurred
val message =
"Unknown error while submitting transaction: ${t.message} caused by: ${t.cause}"
twig(message)
tx = tx.copy(
errorMessage = t.message,
errorCode = ERROR_SUBMITTING,
submitAttempts = max(1, tx.submitAttempts + 1)
)
safeUpdate(tx)
safeUpdate("updating submission failure") {
updateError(tx.id, t.message, ERROR_SUBMITTING)
updateSubmitAttempts(tx.id, max(1, tx.submitAttempts + 1))
}
} finally {
safeUpdate("fetching latest tx info") {
tx = findById(tx.id)!!
}
}
tx
@ -183,18 +188,44 @@ class PersistentTransactionManager(
override suspend fun isValidTransparentAddress(address: String) =
encoder.isValidTransparentAddress(address)
override suspend fun cancel(pendingTx: PendingTransaction): Boolean {
override suspend fun cancel(pendingId: Long): Boolean {
return pendingTransactionDao {
val tx = findById(pendingTx.id)
val tx = findById(pendingId)
if (tx?.isSubmitted() == true) {
twig("Attempt to cancel transaction failed because it has already been submitted!")
false
} else {
cancel(pendingTx.id)
twig("Cancelling unsubmitted transaction id: $pendingId")
cancel(pendingId)
true
}
}
}
override suspend fun findById(id: Long) = pendingTransactionDao {
findById(id)
}
override suspend fun markForDeletion(id: Long) = pendingTransactionDao {
withContext(IO) {
twig("[cleanup] marking pendingTx $id for deletion")
removeRawTransactionId(id)
updateError(id, "safe to delete", -9090)
}
}
/**
* Remove a transaction and pretend it never existed.
*
* @return the final number of transactions that were removed from the database.
*/
override suspend fun abort(existingTransaction: PendingTransaction): Int {
return pendingTransactionDao {
twig("[cleanup] Deleting pendingTxId: ${existingTransaction.id}")
delete(existingTransaction as PendingTransactionEntity)
}
}
override fun getAll() = _dao.getAll()
@ -202,35 +233,32 @@ class PersistentTransactionManager(
// Helper functions
//
/**
* Remove a transaction and pretend it never existed.
*/
suspend fun abortTransaction(existingTransaction: PendingTransaction) {
pendingTransactionDao {
delete(existingTransaction as PendingTransactionEntity)
}
}
/**
* Updating the pending transaction is often done at the end of a function but still should
* happen within a try/catch block, surrounded by logging. So this helps with that.
* happen within a try/catch block, surrounded by logging. So this helps with that while also
* ensuring that no other coroutines are concurrently interacting with the DAO.
*/
private suspend fun safeUpdate(tx: PendingTransactionEntity): PendingTransaction {
private suspend fun <R> safeUpdate(logMessage: String = "", block: suspend PendingTransactionDao.() -> R ): R? {
return try {
twig("updating tx in DB: $tx")
pendingTransactionDao { update(tx) }
twig("successfully updated TX in DB")
tx
twig(logMessage)
pendingTransactionDao { block() }
} catch (t: Throwable) {
twig("Unknown error while attempting to update pending transaction: ${t.message}" +
" caused by: ${t.cause}")
tx
val stacktrace = StringWriter().also { t.printStackTrace(PrintWriter(it)) }.toString()
twig(
"Unknown error while attempting to '$logMessage':" +
" ${t.message} caused by: ${t.cause} stacktrace: $stacktrace"
)
null
}
}
private suspend fun <T> pendingTransactionDao(block: suspend PendingTransactionDao.() -> T): T {
return daoMutex.withLock {
_dao.block()
withContext(IO) {
_dao.block()
}
}
}

View File

@ -96,7 +96,14 @@ interface OutboundTransactionManager {
*
* @return true when the transaction was able to be cancelled.
*/
suspend fun cancel(pendingTx: PendingTransaction): Boolean
suspend fun cancel(pendingId: Long): Boolean
/**
* Delete the given transaction but return 0 if it did not exist.
*
* @return the total number of transactions successfully removed from storage.
*/
suspend fun abort(it: PendingTransaction): Int
/**
* Get all pending transactions known to this wallet as a flow that is updated anytime the list
@ -105,6 +112,11 @@ interface OutboundTransactionManager {
* @return a flow of all pending transactions known to this wallet.
*/
fun getAll(): Flow<List<PendingTransaction>>
// this is mostly useful for tests we can restrict it to tests if we need to
suspend fun findById(id: Long): PendingTransaction?
suspend fun markForDeletion(id: Long)
}
/**

View File

@ -55,11 +55,23 @@ interface TransactionRepository {
*/
suspend fun findMinedHeight(rawTransactionId: ByteArray): Int?
suspend fun findMatchingTransactionId(rawTransactionId: ByteArray): Long?
/**
* Provides a way for other components to signal that the underlying data has been modified.
*/
fun invalidate()
/**
* When a transaction has been cancelled by the user, we need a bridge to clean it up from the
* dataDb. This function will safely remove everything related to that transaction in the right
* order to satisfy foreign key constraints, even if cascading isn't setup in the DB.
*
* @return true when an unmined transaction was found and then successfully removed
*/
suspend fun cleanupCancelledTx(rawTransactionId: ByteArray): Boolean
suspend fun deleteExpired(lastScannedHeight: Int): Int
//
// Transactions