Add processorInfo for more visibility into status.

This commit is contained in:
Kevin Gorham 2020-01-14 12:52:41 -05:00
parent e56c1ff24a
commit 328e90d241
No known key found for this signature in database
GPG Key ID: CCA55602DF49FC38
3 changed files with 136 additions and 47 deletions

View File

@ -80,10 +80,16 @@ class SdkSynchronizer internal constructor(
/**
* Indicates the download progress of the Synchronizer. When progress reaches 100, that
* signals that the Synchronizer is in sync with the network. Balances should be considered
* inaccurate and outbound transactions should be prevented until this sync is complete.
* inaccurate and outbound transactions should be prevented until this sync is complete. It is
* a simplified version of [processorInfo].
*/
override val progress: Flow<Int> = processor.progress
/**
* Indicates the latest information about the blocks that have been processed by the SDK. This
* is very helpful for conveying detailed progress and status to the user.
*/
override val processorInfo: Flow<CompactBlockProcessor.ProcessorInfo> = processor.processorInfo
//
// Error Handling

View File

@ -1,6 +1,7 @@
package cash.z.wallet.sdk
import androidx.paging.PagedList
import cash.z.wallet.sdk.block.CompactBlockProcessor
import cash.z.wallet.sdk.block.CompactBlockProcessor.WalletBalance
import cash.z.wallet.sdk.entity.*
import kotlinx.coroutines.CoroutineScope
@ -53,6 +54,13 @@ interface Synchronizer {
*/
val progress: Flow<Int>
/**
* A flow of processor details, updated every time blocks are processed to include the latest
* block height, blocks downloaded and blocks scanned. Similar to the [progress] flow but with a
* lot more detail.
*/
val processorInfo: Flow<CompactBlockProcessor.ProcessorInfo>
/**
* A stream of balance values, separately reflecting both the available and total balance.
*/

View File

@ -51,9 +51,18 @@ class CompactBlockProcessor(
private val _state: ConflatedBroadcastChannel<State> = ConflatedBroadcastChannel(Initialized)
private val _progress = ConflatedBroadcastChannel(0)
private val _processorInfo = ConflatedBroadcastChannel(ProcessorInfo())
/**
* The root source of truth for the processor's progress. All processing must be done
* sequentially, due to the way sqlite works so it is okay for this not to be threadsafe or
* coroutine safe because processing cannot be concurrent.
*/
private var currentInfo = ProcessorInfo()
val state = _state.asFlow()
val progress = _progress.asFlow()
val processorInfo = _processorInfo.asFlow()
/**
* Download compact blocks, verify and scan them.
@ -103,47 +112,51 @@ class CompactBlockProcessor(
* return the block height where an error was found.
*/
private suspend fun processNewBlocks(): Int = withContext(IO) {
twig("beginning to process new blocks...")
twig("beginning to process new blocks (with lower bound: $lowerBoundHeight)...")
// define ranges
val latestBlockHeight = downloader.getLatestBlockHeight()
val lastDownloadedHeight = getLastDownloadedHeight()
val lastScannedHeight = getLastScannedHeight()
val boundedLastDownloadedHeight = max(lastDownloadedHeight, lowerBoundHeight - 1)
// Get the latest info (but don't transmit it on the channel) and then use that to update the scan/download ranges
ProcessorInfo(
networkBlockHeight = downloader.getLatestBlockHeight(),
lastScannedHeight = getLastScannedHeight(),
lastDownloadedHeight = max(getLastDownloadedHeight(), lowerBoundHeight - 1)
).let { initialInfo ->
updateProgress(
networkBlockHeight = initialInfo.networkBlockHeight,
lastScannedHeight = initialInfo.lastScannedHeight,
lastDownloadedHeight = initialInfo.lastDownloadedHeight,
lastScanRange = (initialInfo.lastScannedHeight + 1)..initialInfo.networkBlockHeight,
lastDownloadRange = (max(initialInfo.lastDownloadedHeight, initialInfo.lastScannedHeight) + 1)..initialInfo.networkBlockHeight
)
}
twig(
"latestBlockHeight: $latestBlockHeight\tlastDownloadedHeight: $lastDownloadedHeight" +
"\tlastScannedHeight: $lastScannedHeight\tlowerBoundHeight: $lowerBoundHeight"
)
// as long as the database has the sapling tree (like when it's initialized from a checkpoint) we can avoid
// downloading earlier blocks so take the larger of these two numbers
val rangeToDownload = (max(boundedLastDownloadedHeight, lastScannedHeight) + 1)..latestBlockHeight
val rangeToScan = (lastScannedHeight + 1)..latestBlockHeight
setState(Downloading)
downloadNewBlocks(rangeToDownload)
setState(Validating)
var error = validateNewBlocks(rangeToScan)
if (error < 0) {
// in theory, a scan should not fail after validation succeeds but maybe consider
// changing the rust layer to return the failed block height whenever scan does fail
// rather than a boolean
setState(Scanning)
val success = scanNewBlocks(rangeToScan)
if (!success) throw CompactBlockProcessorException.FailedScan
else {
setState(Scanned(rangeToScan))
}
if (currentInfo.lastDownloadRange.isEmpty() && currentInfo.lastScanRange.isEmpty()) {
twig("Nothing to process: no new blocks to download or scan, right now.")
setState(Scanned(currentInfo.lastScanRange))
-1
} else {
error
setState(Downloading)
downloadNewBlocks(currentInfo.lastDownloadRange)
setState(Validating)
var error = validateNewBlocks(currentInfo.lastScanRange)
if (error < 0) {
// in theory, a scan should not fail after validation succeeds but maybe consider
// changing the rust layer to return the failed block height whenever scan does fail
// rather than a boolean
setState(Scanning)
val success = scanNewBlocks(currentInfo.lastScanRange)
if (!success) throw CompactBlockProcessorException.FailedScan
else {
setState(Scanned(currentInfo.lastScanRange))
}
-1
} else {
error
}
}
}
@VisibleForTesting //allow mocks to verify how this is called, rather than the downloader, which is more complex
internal suspend fun downloadNewBlocks(range: IntRange) = withContext<Unit>(IO) {
if (range.isEmpty()) {
@ -188,16 +201,58 @@ class CompactBlockProcessor(
return result
}
private fun scanNewBlocks(range: IntRange?): Boolean {
private suspend fun scanNewBlocks(range: IntRange?): Boolean = withContext(IO) {
if (range?.isEmpty() != false) {
twig("no blocks to scan")
return true
twig("no blocks to scan for range $range")
true
} else {
Twig.sprout("scanning")
twig("scanning blocks for range $range in batches")
var result = false
retryUpTo(3, 500L) {failedAttempts ->
if (failedAttempts > 0) twig("retrying the scan after $failedAttempts failure(s)...")
do {
var scannedNewBlocks = false
result = rustBackend.scanBlocks(500)
val lastScannedHeight = getLastScannedHeight()
twig("batch scan complete. Last scanned height: $lastScannedHeight target height: ${range.last}")
if (currentInfo.lastScannedHeight != lastScannedHeight) {
scannedNewBlocks = true
updateProgress(lastScannedHeight = lastScannedHeight)
}
// if we made progress toward our scan, then keep trying
} while(result && scannedNewBlocks && lastScannedHeight < range.last)
twig("batch scan complete!")
}
Twig.clip("scanning")
result
}
Twig.sprout("scanning")
twig("scanning blocks in range $range")
val result = rustBackend.scanBlocks()
Twig.clip("scanning")
return result
}
/**
* @param lastScanRange the inclusive range to scan. This represents what we most recently
* wanted to scan. In most cases, it will be an invalid range because we'd like to scan blocks
* that we don't yet have.
* @param lastDownloadRange the inclusive range to download. This represents what we most
* recently wanted to scan. In most cases, it will be an invalid range because we'd like to scan
* blocks that we don't yet have.
*/
private suspend fun updateProgress(
networkBlockHeight: Int = currentInfo.networkBlockHeight,
lastScannedHeight: Int = currentInfo.lastScannedHeight,
lastDownloadedHeight: Int = currentInfo.lastDownloadedHeight,
lastScanRange: IntRange = currentInfo.lastScanRange,
lastDownloadRange: IntRange = currentInfo.lastDownloadRange
): Unit = withContext(IO) {
currentInfo = currentInfo.copy(
networkBlockHeight = networkBlockHeight,
lastScannedHeight = lastScannedHeight,
lastDownloadedHeight = lastDownloadedHeight,
lastScanRange = lastScanRange,
lastDownloadRange = lastDownloadRange
)
twig("Sending updated currentInfo: $currentInfo")
_processorInfo.send(currentInfo)
}
private suspend fun handleChainError(errorHeight: Int) = withContext(IO) {
@ -269,14 +324,34 @@ class CompactBlockProcessor(
* Data structure to hold the total and available balance of the wallet. This is what is
* received on the balance channel.
*
* @param total the total balance, ignoring funds that cannot be used.
* @param available the amount of funds that are available for use. Typical reasons that funds
* @param totalZatoshi the total balance, ignoring funds that cannot be used.
* @param availableZatoshi the amount of funds that are available for use. Typical reasons that funds
* may be unavailable include fairly new transactions that do not have enough confirmations or
* notes that are tied up because we are awaiting change from a transaction. When a note has
* been spent, its change cannot be used until there are enough confirmations.
*/
data class WalletBalance(
val total: Long = -1,
val available: Long = -1
val totalZatoshi: Long = -1,
val availableZatoshi: Long = -1
)
/**
* @param lastDownloadRange inclusive range to download. Meaning, if the range is 10..10,
* then we will download exactly block 10. If the range is 11..10, then we want to download
* block 11 but can't.
* @param lastScanRange inclusive range to scan.
*/
data class ProcessorInfo(
val networkBlockHeight: Int = -1,
val lastScannedHeight: Int = -1,
val lastDownloadedHeight: Int = -1,
val lastDownloadRange: IntRange = 0..-1, // empty range
val lastScanRange: IntRange = 0..-1 // empty range
) {
val hasData get() = networkBlockHeight != -1
|| lastScannedHeight != -1
|| lastDownloadedHeight != -1
|| lastDownloadRange != 0..-1
|| lastScanRange != 0..-1
}
}