Improve error handling.

This commit is contained in:
Kevin Gorham 2020-06-09 22:00:41 -04:00
parent 40c2be9ce1
commit c585ed93ff
No known key found for this signature in database
GPG Key ID: CCA55602DF49FC38
2 changed files with 118 additions and 31 deletions

View File

@ -1,10 +1,13 @@
package cash.z.wallet.sdk.block package cash.z.wallet.sdk.block
import androidx.annotation.VisibleForTesting import androidx.annotation.VisibleForTesting
import cash.z.wallet.sdk.BuildConfig
import cash.z.wallet.sdk.annotation.OpenForTesting import cash.z.wallet.sdk.annotation.OpenForTesting
import cash.z.wallet.sdk.block.CompactBlockProcessor.State.* import cash.z.wallet.sdk.block.CompactBlockProcessor.State.*
import cash.z.wallet.sdk.entity.ConfirmedTransaction import cash.z.wallet.sdk.entity.ConfirmedTransaction
import cash.z.wallet.sdk.exception.CompactBlockProcessorException import cash.z.wallet.sdk.exception.CompactBlockProcessorException
import cash.z.wallet.sdk.exception.CompactBlockProcessorException.EnhanceTransactionError.EnhanceTxDecryptError
import cash.z.wallet.sdk.exception.CompactBlockProcessorException.EnhanceTransactionError.EnhanceTxDownloadError
import cash.z.wallet.sdk.exception.RustLayerException import cash.z.wallet.sdk.exception.RustLayerException
import cash.z.wallet.sdk.ext.* import cash.z.wallet.sdk.ext.*
import cash.z.wallet.sdk.ext.ZcashSdk.DOWNLOAD_BATCH_SIZE import cash.z.wallet.sdk.ext.ZcashSdk.DOWNLOAD_BATCH_SIZE
@ -17,7 +20,7 @@ import cash.z.wallet.sdk.ext.ZcashSdk.SAPLING_ACTIVATION_HEIGHT
import cash.z.wallet.sdk.ext.ZcashSdk.SCAN_BATCH_SIZE import cash.z.wallet.sdk.ext.ZcashSdk.SCAN_BATCH_SIZE
import cash.z.wallet.sdk.jni.RustBackend import cash.z.wallet.sdk.jni.RustBackend
import cash.z.wallet.sdk.jni.RustBackendWelding import cash.z.wallet.sdk.jni.RustBackendWelding
import cash.z.wallet.sdk.service.LightWalletGrpcService import cash.z.wallet.sdk.transaction.PagedTransactionRepository
import cash.z.wallet.sdk.transaction.TransactionRepository import cash.z.wallet.sdk.transaction.TransactionRepository
import io.grpc.StatusRuntimeException import io.grpc.StatusRuntimeException
import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.Dispatchers
@ -25,8 +28,6 @@ import kotlinx.coroutines.Dispatchers.IO
import kotlinx.coroutines.channels.ConflatedBroadcastChannel import kotlinx.coroutines.channels.ConflatedBroadcastChannel
import kotlinx.coroutines.delay import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.asFlow import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.isActive import kotlinx.coroutines.isActive
import kotlinx.coroutines.withContext import kotlinx.coroutines.withContext
import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.atomic.AtomicInteger
@ -56,7 +57,10 @@ class CompactBlockProcessor(
minimumHeight: Int = SAPLING_ACTIVATION_HEIGHT minimumHeight: Int = SAPLING_ACTIVATION_HEIGHT
) { ) {
/** /**
* Callback for any critical errors that occur while processing compact blocks. * Callback for any non-trivial errors that occur while processing compact blocks.
*
* @return true when processing should continue. Return false when the error is unrecoverable
* and all processing should halt and stop retrying.
*/ */
var onProcessorErrorListener: ((Throwable) -> Boolean)? = null var onProcessorErrorListener: ((Throwable) -> Boolean)? = null
@ -64,11 +68,10 @@ class CompactBlockProcessor(
* Callbaqck for reorgs. This callback is invoked when validation fails with the height at which * Callbaqck for reorgs. This callback is invoked when validation fails with the height at which
* an error was found and the lower bound to which the data will rewind, at most. * an error was found and the lower bound to which the data will rewind, at most.
*/ */
var onChainErrorListener: ((Int, Int) -> Any)? = null var onChainErrorListener: ((errorHeight: Int, rewindHeight: Int) -> Any)? = null
private val consecutiveChainErrors = AtomicInteger(0) private val consecutiveChainErrors = AtomicInteger(0)
private val lowerBoundHeight: Int = max(SAPLING_ACTIVATION_HEIGHT, minimumHeight - MAX_REORG_SIZE) private val lowerBoundHeight: Int = max(SAPLING_ACTIVATION_HEIGHT, minimumHeight - MAX_REORG_SIZE)
private val reconnectError = -20
private val _state: ConflatedBroadcastChannel<State> = ConflatedBroadcastChannel(Initialized) private val _state: ConflatedBroadcastChannel<State> = ConflatedBroadcastChannel(Initialized)
private val _progress = ConflatedBroadcastChannel(0) private val _progress = ConflatedBroadcastChannel(0)
@ -79,7 +82,7 @@ class CompactBlockProcessor(
* sequentially, due to the way sqlite works so it is okay for this not to be threadsafe or * sequentially, due to the way sqlite works so it is okay for this not to be threadsafe or
* coroutine safe because processing cannot be concurrent. * coroutine safe because processing cannot be concurrent.
*/ */
private var currentInfo = ProcessorInfo() internal var currentInfo = ProcessorInfo()
/** /**
* The flow of state values so that a wallet can monitor the state of this class without needing * The flow of state values so that a wallet can monitor the state of this class without needing
@ -168,16 +171,15 @@ class CompactBlockProcessor(
twig("Disconnection detected! Attempting to reconnect!") twig("Disconnection detected! Attempting to reconnect!")
setState(Disconnected) setState(Disconnected)
downloader.lightwalletService.reconnect() downloader.lightwalletService.reconnect()
reconnectError ERROR_CODE_RECONNECT
} else if (currentInfo.lastDownloadRange.isEmpty() && currentInfo.lastScanRange.isEmpty()) { } else if (currentInfo.lastDownloadRange.isEmpty() && currentInfo.lastScanRange.isEmpty()) {
twig("Nothing to process: no new blocks to download or scan, right now.") twig("Nothing to process: no new blocks to download or scan, right now.")
setState(Scanned(currentInfo.lastScanRange)) setState(Scanned(currentInfo.lastScanRange))
-1 ERROR_CODE_NONE
} else { } else {
downloadNewBlocks(currentInfo.lastDownloadRange) downloadNewBlocks(currentInfo.lastDownloadRange)
validateAndScanNewBlocks(currentInfo.lastScanRange).also { val error = validateAndScanNewBlocks(currentInfo.lastScanRange)
enhanceTransactionDetails(currentInfo.lastScanRange) if (error != ERROR_CODE_NONE) error else enhanceTransactionDetails(currentInfo.lastScanRange)
}
} }
} }
@ -222,12 +224,12 @@ class CompactBlockProcessor(
* *
* @param lastScanRange the range to be validated and scanned. * @param lastScanRange the range to be validated and scanned.
* *
* @return error code or -1 when there is no error. * @return error code or [ERROR_CODE_NONE] when there is no error.
*/ */
private suspend fun validateAndScanNewBlocks(lastScanRange: IntRange): Int = withContext(IO) { private suspend fun validateAndScanNewBlocks(lastScanRange: IntRange): Int = withContext(IO) {
setState(Validating) setState(Validating)
var error = validateNewBlocks(lastScanRange) var error = validateNewBlocks(lastScanRange)
if (error < 0) { if (error == ERROR_CODE_NONE) {
// in theory, a scan should not fail after validation succeeds but maybe consider // in theory, a scan should not fail after validation succeeds but maybe consider
// changing the rust layer to return the failed block height whenever scan does fail // changing the rust layer to return the failed block height whenever scan does fail
// rather than a boolean // rather than a boolean
@ -237,7 +239,7 @@ class CompactBlockProcessor(
else { else {
setState(Scanned(lastScanRange)) setState(Scanned(lastScanRange))
} }
-1 ERROR_CODE_NONE
} else { } else {
error error
} }
@ -255,23 +257,34 @@ class CompactBlockProcessor(
else enhance(newTransaction) else enhance(newTransaction)
} }
twig("Done enhancing transaction details") twig("Done enhancing transaction details")
1 ERROR_CODE_NONE
} catch (t: Throwable) { } catch (t: Throwable) {
twig("Failed to enhance due to $t") twig("Failed to enhance due to $t")
t.printStackTrace() t.printStackTrace()
-1 ERROR_CODE_FAILED_ENHANCE
} finally { } finally {
Twig.clip("enhancing") Twig.clip("enhancing")
} }
} }
// TODO: we still need a way to identify those transactions that failed to be enhanced
private suspend fun enhance(transaction: ConfirmedTransaction) = withContext(Dispatchers.IO) { private suspend fun enhance(transaction: ConfirmedTransaction) = withContext(Dispatchers.IO) {
twig("START: enhancing transaction (id:${transaction.id} block:${transaction.minedHeight})") var downloaded = false
downloader.fetchTransaction(transaction.rawTransactionId)?.let { tx -> try {
twig("decrypting and storing transaction (id:${transaction.id} block:${transaction.minedHeight})") twig("START: enhancing transaction (id:${transaction.id} block:${transaction.minedHeight})")
rustBackend.decryptAndStoreTransaction(tx.data.toByteArray()) downloader.fetchTransaction(transaction.rawTransactionId)?.let { tx ->
} ?: twig("no transaction found. Nothing to enhance. This probably shouldn't happen.") downloaded = true
twig("DONE: enhancing transaction (id:${transaction.id} block:${transaction.minedHeight})") twig("decrypting and storing transaction (id:${transaction.id} block:${transaction.minedHeight})")
rustBackend.decryptAndStoreTransaction(tx.data.toByteArray())
} ?: twig("no transaction found. Nothing to enhance. This probably shouldn't happen.")
twig("DONE: enhancing transaction (id:${transaction.id} block:${transaction.minedHeight})")
} catch (t: Throwable) {
twig("Warning: failure on transaction: error: $t\ttransaction: $transaction")
onProcessorError(
if (downloaded) EnhanceTxDecryptError(transaction.minedHeight, t)
else EnhanceTxDownloadError(transaction.minedHeight, t)
)
}
} }
/** /**
@ -327,13 +340,13 @@ class CompactBlockProcessor(
* *
* @param range the range of blocks to validate. * @param range the range of blocks to validate.
* *
* @return -1 when there is no problem. Otherwise, return the lowest height where an error was * @return [ERROR_CODE_NONE] when there is no problem. Otherwise, return the lowest height where an error was
* found. In other words, validation starts at the back of the chain and works toward the tip. * found. In other words, validation starts at the back of the chain and works toward the tip.
*/ */
private fun validateNewBlocks(range: IntRange?): Int { private fun validateNewBlocks(range: IntRange?): Int {
if (range?.isEmpty() != false) { if (range?.isEmpty() != false) {
twig("no blocks to validate: $range") twig("no blocks to validate: $range")
return -1 return ERROR_CODE_NONE
} }
Twig.sprout("validating") Twig.sprout("validating")
twig("validating blocks in range $range in db: ${(rustBackend as RustBackend).pathCacheDb}") twig("validating blocks in range $range in db: ${(rustBackend as RustBackend).pathCacheDb}")
@ -348,7 +361,7 @@ class CompactBlockProcessor(
* *
* @param range the range of blocks to scan. * @param range the range of blocks to scan.
* *
* @return -1 when there is no problem. Otherwise, return the lowest height where an error was * @return [ERROR_CODE_NONE] when there is no problem. Otherwise, return the lowest height where an error was
* found. In other words, scanning starts at the back of the chain and works toward the tip. * found. In other words, scanning starts at the back of the chain and works toward the tip.
*/ */
private suspend fun scanNewBlocks(range: IntRange?): Boolean = withContext(IO) { private suspend fun scanNewBlocks(range: IntRange?): Boolean = withContext(IO) {
@ -414,13 +427,52 @@ class CompactBlockProcessor(
} }
private suspend fun handleChainError(errorHeight: Int) = withContext(IO) { private suspend fun handleChainError(errorHeight: Int) = withContext(IO) {
val lowerBound = determineLowerBound(errorHeight) // TODO consider an error object containing hash information
twig("handling chain error at $errorHeight by rewinding to block $lowerBound") printValidationErrorInfo(errorHeight)
onChainErrorListener?.invoke(errorHeight, lowerBound) determineLowerBound(errorHeight).let { lowerBound ->
rustBackend.rewindToHeight(lowerBound) twig("handling chain error at $errorHeight by rewinding to block $lowerBound")
downloader.rewindToHeight(lowerBound) onChainErrorListener?.invoke(errorHeight, lowerBound)
rustBackend.rewindToHeight(lowerBound)
downloader.rewindToHeight(lowerBound)
}
} }
/** insightful function for debugging these critical errors */
private suspend fun printValidationErrorInfo(errorHeight: Int, count: Int = 11) {
// Note: blocks are public information so it's okay to print them but, still, let's not unless we're debugging something
if (!BuildConfig.DEBUG) return
var errorInfo = fetchValidationErrorInfo(errorHeight)
twig("validation failed at block ${errorInfo.errorHeight} which had hash ${errorInfo.actualPrevHash} but the expected hash was ${errorInfo.expectedPrevHash}")
errorInfo = fetchValidationErrorInfo(errorHeight + 1)
twig("The next block block: ${errorInfo.errorHeight} which had hash ${errorInfo.actualPrevHash} but the expected hash was ${errorInfo.expectedPrevHash}")
twig("=================== BLOCKS [$errorHeight..${errorHeight + count - 1}]: START ========")
repeat(count) { i ->
val height = errorHeight + i
val block = downloader.compactBlockStore.findCompactBlock(height)
// sometimes the initial block was inserted via checkpoint and will not appear in the cache. We can get the hash another way but prevHash is correctly null.
val hash = block?.hash?.toByteArray() ?: (repository as PagedTransactionRepository).findBlockHash(height)
twig("block: $height\thash=${hash?.toHexReversed()} \tprevHash=${block?.prevHash?.toByteArray()?.toHexReversed()}")
}
twig("=================== BLOCKS [$errorHeight..${errorHeight + count - 1}]: END ========")
}
private suspend fun fetchValidationErrorInfo(errorHeight: Int): ValidationErrorInfo {
val hash = (repository as PagedTransactionRepository).findBlockHash(errorHeight + 1)?.toHexReversed()
val prevHash = repository.findBlockHash(errorHeight)?.toHexReversed()
val compactBlock = downloader.compactBlockStore.findCompactBlock(errorHeight + 1)
val expectedPrevHash = compactBlock?.prevHash?.toByteArray()?.toHexReversed()
return ValidationErrorInfo(errorHeight, hash, expectedPrevHash, prevHash)
}
/**
* Called for every noteworthy error.
*
* @return true when processing should continue. Return false when the error is unrecoverable
* and all processing should halt and stop retrying.
*/
private fun onProcessorError(throwable: Throwable): Boolean { private fun onProcessorError(throwable: Throwable): Boolean {
return onProcessorErrorListener?.invoke(throwable) ?: true return onProcessorErrorListener?.invoke(throwable) ?: true
} }
@ -654,6 +706,19 @@ class CompactBlockProcessor(
} }
} }
} }
}
data class ValidationErrorInfo(
val errorHeight: Int,
val hash: String?,
val expectedPrevHash: String?,
val actualPrevHash: String?
)
companion object {
const val ERROR_CODE_NONE = -1
const val ERROR_CODE_RECONNECT = 20
const val ERROR_CODE_FAILED_ENHANCE = 40
} }
} }

View File

@ -1,5 +1,6 @@
package cash.z.wallet.sdk.exception package cash.z.wallet.sdk.exception
import cash.z.wallet.sdk.ext.ConsensusBranchId
import java.lang.RuntimeException import java.lang.RuntimeException
@ -35,6 +36,10 @@ sealed class SynchronizerException(message: String, cause: Throwable? = null) :
object FalseStart: SynchronizerException("This synchronizer was already started. Multiple calls to start are not" + object FalseStart: SynchronizerException("This synchronizer was already started. Multiple calls to start are not" +
"allowed and once a synchronizer has stopped it cannot be restarted." "allowed and once a synchronizer has stopped it cannot be restarted."
) )
object NotYetStarted: SynchronizerException("The synchronizer has not yet started. Verify that" +
" start has been called prior to this operation and that the coroutineScope is not" +
" being accessed before it is initialized."
)
} }
/** /**
@ -57,6 +62,10 @@ sealed class CompactBlockProcessorException(message: String, cause: Throwable? =
object Uninitialized : CompactBlockProcessorException("Cannot process blocks because the wallet has not been" + object Uninitialized : CompactBlockProcessorException("Cannot process blocks because the wallet has not been" +
" initialized. Verify that the seed phrase was properly created or imported. If so, then this problem" + " initialized. Verify that the seed phrase was properly created or imported. If so, then this problem" +
" can be fixed by re-importing the wallet.") " can be fixed by re-importing the wallet.")
open class EnhanceTransactionError(message: String, val height: Int, cause: Throwable) : CompactBlockProcessorException(message, cause) {
class EnhanceTxDownloadError(height: Int, cause: Throwable) : EnhanceTransactionError("Error while attempting to download a transaction to enhance", height, cause)
class EnhanceTxDecryptError(height: Int, cause: Throwable) : EnhanceTransactionError("Error while attempting to decrypt and store a transaction to enhance", height, cause)
}
} }
/** /**
@ -104,6 +113,14 @@ sealed class LightwalletException(message: String, cause: Throwable? = null) : S
" with an insecure connection! Plaintext connections are only allowed when the" + " with an insecure connection! Plaintext connections are only allowed when the" +
" resource value for 'R.bool.lightwalletd_allow_very_insecure_connections' is true" + " resource value for 'R.bool.lightwalletd_allow_very_insecure_connections' is true" +
" because this choice should be explicit.") " because this choice should be explicit.")
class ConsensusBranchException(sdkBranch: String, lwdBranch: String) :
LightwalletException(
"Error: the lightwalletd server is using a consensus branch" +
" (branch: $lwdBranch) that does not match the transactions being created" +
" (branch: $sdkBranch). This probably means the SDK and Server are on two" +
" different chains, most likely because of a recent network upgrade (NU). Either" +
" update the SDK to match lightwalletd or use a lightwalletd that matches the SDK."
)
} }
/** /**
@ -121,4 +138,9 @@ sealed class TransactionEncoderException(message: String, cause: Throwable? = nu
class TransactionNotEncodedException(transactionId: Long) : TransactionEncoderException("The transaction returned by the wallet," + class TransactionNotEncodedException(transactionId: Long) : TransactionEncoderException("The transaction returned by the wallet," +
" with id $transactionId, does not have any raw data. This is a scenario where the wallet should have thrown" + " with id $transactionId, does not have any raw data. This is a scenario where the wallet should have thrown" +
" an exception but failed to do so.") " an exception but failed to do so.")
class IncompleteScanException(lastScannedHeight: Int) : TransactionEncoderException("Cannot" +
" create spending transaction because scanning is incomplete. We must scan up to the" +
" latest height to know which consensus rules to apply. However, the last scanned" +
" height was $lastScannedHeight.")
} }