Improved error handling.

Added callback for reorg detection and also improved the logic for handling critical processor errors.
This commit is contained in:
Kevin Gorham 2020-02-21 18:14:34 -05:00
parent 6bcc00b098
commit ac7f3475e9
No known key found for this signature in database
GPG Key ID: CCA55602DF49FC38
4 changed files with 117 additions and 43 deletions

View File

@ -121,6 +121,12 @@ class SdkSynchronizer internal constructor(
*/
override var onSubmissionErrorHandler: ((Throwable?) -> Boolean)? = null
/**
* A callback to invoke whenever a chain error is encountered. These occur whenever the
* processor detects a missing or non-chain-sequential block (i.e. a reorg).
*/
override var onChainErrorHandler: ((Int, Int) -> Any)? = null
//
// Public API
@ -182,7 +188,8 @@ class SdkSynchronizer internal constructor(
private fun CoroutineScope.onReady() = launch(CoroutineExceptionHandler(::onCriticalError)) {
twig("Synchronizer (${this@SdkSynchronizer}) Ready. Starting processor!")
processor.onErrorListener = ::onProcessorError
processor.onProcessorErrorListener = ::onProcessorError
processor.onChainErrorListener = ::onChainError
processor.state.onEach {
when (it) {
is Scanned -> {
@ -238,6 +245,18 @@ class SdkSynchronizer internal constructor(
} == true
}
private fun onChainError(errorHeight: Int, rewindHeight: Int) {
twig("Chain error detected at height: $errorHeight. Rewinding to: $rewindHeight")
if (onChainErrorHandler == null) {
twig(
"WARNING: a chain error occurred but no callback is registered to be notified of " +
"chain errors. To respond to these errors (perhaps to update the UI or alert the" +
" user) set synchronizer.onChainErrorHandler to a non-null value"
)
}
onChainErrorHandler?.invoke(errorHeight, rewindHeight)
}
private suspend fun onScanComplete(scannedRange: IntRange) {
// TODO: optimize to skip logic here if there are no new transactions with a block height
// within the given range

View File

@ -187,6 +187,13 @@ interface Synchronizer {
*/
var onSubmissionErrorHandler: ((Throwable?) -> Boolean)?
/**
* A callback to invoke whenever a chain error is encountered. These occur whenever the
* processor detects a missing or non-chain-sequential block (i.e. a reorg).
*/
var onChainErrorHandler: ((Int, Int) -> Any)?
enum class Status {
/**
* Indicates that [stop] has been called on this Synchronizer and it will no longer be used.
@ -232,4 +239,5 @@ interface Synchronizer {
val isNotValid get() = this !is Valid
}
}

View File

@ -45,7 +45,8 @@ class CompactBlockProcessor(
private val rustBackend: RustBackendWelding,
minimumHeight: Int = SAPLING_ACTIVATION_HEIGHT
) {
var onErrorListener: ((Throwable) -> Boolean)? = null
var onProcessorErrorListener: ((Throwable) -> Boolean)? = null
var onChainErrorListener: ((Int, Int) -> Any)? = null
private val consecutiveChainErrors = AtomicInteger(0)
private val lowerBoundHeight: Int = max(SAPLING_ACTIVATION_HEIGHT, minimumHeight - MAX_REORG_SIZE)
@ -74,7 +75,7 @@ class CompactBlockProcessor(
// using do/while makes it easier to execute exactly one loop which helps with testing this processor quickly
// (because you can start and then immediately set isStopped=true to always get precisely one loop)
do {
retryWithBackoff(::onConnectionError, maxDelayMillis = MAX_BACKOFF_INTERVAL) {
retryWithBackoff(::onProcessorError, maxDelayMillis = MAX_BACKOFF_INTERVAL) {
val result = processNewBlocks()
// immediately process again after failures in order to download new blocks right away
if (result < 0) {
@ -96,10 +97,19 @@ class CompactBlockProcessor(
stop()
}
/**
* Sets the state to [Stopped], which causes the processor loop to exit.
*/
suspend fun stop() {
setState(Stopped)
runCatching {
setState(Stopped)
downloader.stop()
}
}
/**
* Stop processing and throw an error.
*/
private suspend fun fail(error: Throwable) {
stop()
twig("${error.message}")
@ -115,7 +125,23 @@ class CompactBlockProcessor(
private suspend fun processNewBlocks(): Int = withContext(IO) {
verifySetup()
twig("beginning to process new blocks (with lower bound: $lowerBoundHeight)...")
// Get the latest info (but don't transmit it on the channel) and then use that to update the scan/download ranges
updateRanges()
if (currentInfo.lastDownloadRange.isEmpty() && currentInfo.lastScanRange.isEmpty()) {
twig("Nothing to process: no new blocks to download or scan, right now.")
setState(Scanned(currentInfo.lastScanRange))
-1
} else {
downloadNewBlocks(currentInfo.lastDownloadRange)
validateAndScanNewBlocks(currentInfo.lastScanRange)
}
}
/**
* Gets the latest range info and then uses that initialInfo to update (and transmit)
* the scan/download ranges that require processing.
*/
private suspend fun updateRanges() = withContext(IO) {
ProcessorInfo(
networkBlockHeight = downloader.getLatestBlockHeight(),
lastScannedHeight = getLastScannedHeight(),
@ -126,42 +152,50 @@ class CompactBlockProcessor(
lastScannedHeight = initialInfo.lastScannedHeight,
lastDownloadedHeight = initialInfo.lastDownloadedHeight,
lastScanRange = (initialInfo.lastScannedHeight + 1)..initialInfo.networkBlockHeight,
lastDownloadRange = (max(initialInfo.lastDownloadedHeight, initialInfo.lastScannedHeight) + 1)..initialInfo.networkBlockHeight
lastDownloadRange = (max(
initialInfo.lastDownloadedHeight,
initialInfo.lastScannedHeight
) + 1)..initialInfo.networkBlockHeight
)
}
if (currentInfo.lastDownloadRange.isEmpty() && currentInfo.lastScanRange.isEmpty()) {
twig("Nothing to process: no new blocks to download or scan, right now.")
setState(Scanned(currentInfo.lastScanRange))
-1
} else {
setState(Downloading)
downloadNewBlocks(currentInfo.lastDownloadRange)
setState(Validating)
var error = validateNewBlocks(currentInfo.lastScanRange)
if (error < 0) {
// in theory, a scan should not fail after validation succeeds but maybe consider
// changing the rust layer to return the failed block height whenever scan does fail
// rather than a boolean
setState(Scanning)
val success = scanNewBlocks(currentInfo.lastScanRange)
if (!success) throw CompactBlockProcessorException.FailedScan
else {
setState(Scanned(currentInfo.lastScanRange))
}
-1
} else {
error
}
}
}
/**
* Given a range, validate and then scan all blocks. Validation is ensuring that the blocks are
* in ascending order, with no gaps and are also chain-sequential. This means every block's
* prevHash value matches the preceding block in the chain.
*
* @return error code or -1 when there is no error.
*/
private suspend fun validateAndScanNewBlocks(lastScanRange: IntRange): Int = withContext(IO) {
setState(Validating)
var error = validateNewBlocks(lastScanRange)
if (error < 0) {
// in theory, a scan should not fail after validation succeeds but maybe consider
// changing the rust layer to return the failed block height whenever scan does fail
// rather than a boolean
setState(Scanning)
val success = scanNewBlocks(lastScanRange)
if (!success) throw CompactBlockProcessorException.FailedScan()
else {
setState(Scanned(lastScanRange))
}
-1
} else {
error
}
}
/**
* Confirm that the wallet data is properly setup for use.
*/
private fun verifySetup() {
if (!repository.isInitialized()) throw CompactBlockProcessorException.Uninitialized
}
/**
* Download all blocks in the given range.
*/
@VisibleForTesting //allow mocks to verify how this is called, rather than the downloader, which is more complex
internal suspend fun downloadNewBlocks(range: IntRange) = withContext<Unit>(IO) {
if (range.isEmpty()) {
@ -178,7 +212,7 @@ class CompactBlockProcessor(
var progress: Int
twig("found $missingBlockCount missing blocks, downloading in $batches batches of ${DOWNLOAD_BATCH_SIZE}...")
for (i in 1..batches) {
retryUpTo(RETRIES) {
retryUpTo(RETRIES, { CompactBlockProcessorException.FailedDownload(it) }) {
val end = min((range.first + (i * DOWNLOAD_BATCH_SIZE)) - 1, range.last) // subtract 1 on the first value because the range is inclusive
var count = 0
twig("downloaded $downloadedBlockHeight..$end (batch $i of $batches) [${downloadedBlockHeight..end}]") {
@ -196,6 +230,11 @@ class CompactBlockProcessor(
_progress.send(100)
}
/**
* Validate all blocks in the given range, ensuring that the blocks are in ascending order, with
* no gaps and are also chain-sequential. This means every block's prevHash value matches the
* preceding block in the chain.
*/
private fun validateNewBlocks(range: IntRange?): Int {
if (range?.isEmpty() != false) {
twig("no blocks to validate: $range")
@ -208,6 +247,10 @@ class CompactBlockProcessor(
return result
}
/**
* Scan all blocks in the given range, decrypting anything that matches our wallet and storing
* the data.
*/
private suspend fun scanNewBlocks(range: IntRange?): Boolean = withContext(IO) {
if (range?.isEmpty() != false) {
twig("no blocks to scan for range $range")
@ -216,13 +259,15 @@ class CompactBlockProcessor(
Twig.sprout("scanning")
twig("scanning blocks for range $range in batches")
var result = false
retryUpTo(3, 500L) {failedAttempts ->
// Attempt to scan a few times to work around any concurrent modification errors, then
// rethrow as an official processorError which is handled by [start.retryWithBackoff]
retryUpTo(3, { CompactBlockProcessorException.FailedScan(it) }) { failedAttempts ->
if (failedAttempts > 0) twig("retrying the scan after $failedAttempts failure(s)...")
do {
var scannedNewBlocks = false
result = rustBackend.scanBlocks(SCAN_BATCH_SIZE)
val lastScannedHeight = getLastScannedHeight()
twig("batch scan complete. Last scanned height: $lastScannedHeight target height: ${range.last}")
twig("batch scanned: $lastScannedHeight/${range.last}")
if (currentInfo.lastScannedHeight != lastScannedHeight) {
scannedNewBlocks = true
updateProgress(lastScannedHeight = lastScannedHeight)
@ -264,13 +309,13 @@ class CompactBlockProcessor(
private suspend fun handleChainError(errorHeight: Int) = withContext(IO) {
val lowerBound = determineLowerBound(errorHeight)
twig("handling chain error at $errorHeight by rewinding to block $lowerBound")
onChainErrorListener?.invoke(errorHeight, lowerBound)
rustBackend.rewindToHeight(lowerBound)
downloader.rewindToHeight(lowerBound)
}
private fun onConnectionError(throwable: Throwable): Boolean {
_state.offer(Disconnected)
return onErrorListener?.invoke(throwable) ?: true
private fun onProcessorError(throwable: Throwable): Boolean {
return onProcessorErrorListener?.invoke(throwable) ?: true
}
private fun determineLowerBound(errorHeight: Int): Int {
@ -302,9 +347,9 @@ class CompactBlockProcessor(
twigTask("checking balance info") {
try {
val balanceTotal = rustBackend.getBalance(accountIndex)
twig("found total balance of: $balanceTotal")
twig("found total balance: $balanceTotal")
val balanceAvailable = rustBackend.getVerifiedBalance(accountIndex)
twig("found available balance of: $balanceAvailable")
twig("found available balance: $balanceAvailable")
WalletBalance(balanceTotal, balanceAvailable)
} catch (t: Throwable) {
twig("failed to get balance due to $t")

View File

@ -39,8 +39,10 @@ sealed class CompactBlockProcessorException(message: String, cause: Throwable? =
" than just the database filename because Rust does not access the app Context." +
" So pass in context.getDatabasePath(dbFileName).absolutePath instead of just dbFileName alone.", null)
class FailedReorgRepair(message: String) : CompactBlockProcessorException(message)
object FailedScan : CompactBlockProcessorException("Error while scanning blocks. This most " +
"likely means a block is missing or a reorg was mishandled. See Rust logs for details.")
class FailedDownload(cause: Throwable? = null) : CompactBlockProcessorException("Error while downloading blocks. This most " +
"likely means the server is down or slow to respond. See logs for details.", cause)
class FailedScan(cause: Throwable? = null) : CompactBlockProcessorException("Error while scanning blocks. This most " +
"likely means a block was missed or a reorg was mishandled. See logs for details.", cause)
object Uninitialized : CompactBlockProcessorException("Cannot process blocks because the wallet has not been" +
" initialized. Verify that the seed phrase was properly created or imported. If so, then this problem" +
" can be fixed by re-importing the wallet.")