[#1159] Update sync progress reporting

* [#1168] Checkpoints update

* [#1159] Updated sync progress reporting

* Increase test_robo_demo_app timeout

* Migrate to ClosedEndRange

It’s better to transform suggested ranges from OpenEndRange to ClosedRange as soon as possible to avoid its handling in the rest of the logic.

* Improve all batch count calculating

* Subsequent SbS sync algorithm renaming
This commit is contained in:
Honza Rychnovský 2023-08-15 13:07:38 +02:00 committed by Honza
parent 91f5cbe24d
commit fd17e7ef0e
4 changed files with 124 additions and 68 deletions

View File

@ -395,7 +395,7 @@ jobs:
with: with:
name: Demo app release binaries name: Demo app release binaries
- name: Robo test - name: Robo test
timeout-minutes: 15 timeout-minutes: 20
env: env:
# Path depends on `release_build` job, plus path of `Download a single artifact` step # Path depends on `release_build` job, plus path of `Download a single artifact` step
BINARIES_ZIP_PATH: binaries.zip BINARIES_ZIP_PATH: binaries.zip

View File

@ -22,7 +22,6 @@ import cash.z.ecc.android.sdk.internal.ext.length
import cash.z.ecc.android.sdk.internal.ext.retryUpTo import cash.z.ecc.android.sdk.internal.ext.retryUpTo
import cash.z.ecc.android.sdk.internal.ext.retryUpToAndContinue import cash.z.ecc.android.sdk.internal.ext.retryUpToAndContinue
import cash.z.ecc.android.sdk.internal.ext.retryWithBackoff import cash.z.ecc.android.sdk.internal.ext.retryWithBackoff
import cash.z.ecc.android.sdk.internal.ext.toClosedRange
import cash.z.ecc.android.sdk.internal.ext.toHexReversed import cash.z.ecc.android.sdk.internal.ext.toHexReversed
import cash.z.ecc.android.sdk.internal.model.BlockBatch import cash.z.ecc.android.sdk.internal.model.BlockBatch
import cash.z.ecc.android.sdk.internal.model.DbTransactionOverview import cash.z.ecc.android.sdk.internal.model.DbTransactionOverview
@ -217,13 +216,13 @@ class CompactBlockProcessor internal constructor(
) { ) {
val result = processingMutex.withLockLogged("processNewBlocks") { val result = processingMutex.withLockLogged("processNewBlocks") {
when (subTreeRootResult) { when (subTreeRootResult) {
is GetSubtreeRootsResult.UseNonLinear -> { is GetSubtreeRootsResult.UseSbS -> {
processNewBlocksInSbSOrder( processNewBlocksInSbSOrder(
backend = backend, backend = backend,
downloader = downloader, downloader = downloader,
repository = repository, repository = repository,
network = network, network = network,
subTreeRootList = (subTreeRootResult as GetSubtreeRootsResult.UseNonLinear) subTreeRootList = (subTreeRootResult as GetSubtreeRootsResult.UseSbS)
.subTreeRootList, .subTreeRootList,
lastValidHeight = lowerBoundHeight, lastValidHeight = lowerBoundHeight,
firstUnenhancedHeight = _processorInfo.value.firstUnenhancedHeight firstUnenhancedHeight = _processorInfo.value.firstUnenhancedHeight
@ -323,7 +322,7 @@ class CompactBlockProcessor internal constructor(
} }
private suspend fun processNewBlocksInLinearOrder(): BlockProcessingResult { private suspend fun processNewBlocksInLinearOrder(): BlockProcessingResult {
Twig.debug { "Beginning to process new blocks with Linear approach (with lower bound: $lowerBoundHeight)..." } Twig.info { "Beginning to process new blocks with Linear approach (with lower bound: $lowerBoundHeight)..." }
return if (!updateRange(null)) { return if (!updateRange(null)) {
Twig.warn { "Disconnection detected. Attempting to reconnect." } Twig.warn { "Disconnection detected. Attempting to reconnect." }
@ -347,7 +346,7 @@ class CompactBlockProcessor internal constructor(
_processorInfo.value.overallSyncRange!! _processorInfo.value.overallSyncRange!!
} }
syncBlocksAndEnhanceTransactions( syncBlocksAndEnhanceTransactionsLinearly(
syncRange = syncRange, syncRange = syncRange,
withDownload = true, withDownload = true,
enhanceStartHeight = _processorInfo.value.firstUnenhancedHeight enhanceStartHeight = _processorInfo.value.firstUnenhancedHeight
@ -361,7 +360,8 @@ class CompactBlockProcessor internal constructor(
} }
internal sealed class GetSubtreeRootsResult { internal sealed class GetSubtreeRootsResult {
data class UseNonLinear(val subTreeRootList: List<SubtreeRoot>) : GetSubtreeRootsResult() // SbS: Spend-before-Sync
data class UseSbS(val subTreeRootList: List<SubtreeRoot>) : GetSubtreeRootsResult()
object UseLinear : GetSubtreeRootsResult() object UseLinear : GetSubtreeRootsResult()
object FailureConnection : GetSubtreeRootsResult() object FailureConnection : GetSubtreeRootsResult()
data class OtherFailure(val exception: Throwable) : GetSubtreeRootsResult() data class OtherFailure(val exception: Throwable) : GetSubtreeRootsResult()
@ -398,7 +398,7 @@ class CompactBlockProcessor internal constructor(
lastValidHeight: BlockHeight, lastValidHeight: BlockHeight,
firstUnenhancedHeight: BlockHeight? firstUnenhancedHeight: BlockHeight?
): BlockProcessingResult { ): BlockProcessingResult {
Twig.debug { Twig.info {
"Beginning to process new blocks with Spend-before-Sync approach (with roots: $subTreeRootList, and lower" + "Beginning to process new blocks with Spend-before-Sync approach (with roots: $subTreeRootList, and lower" +
"bound: $lastValidHeight)..." "bound: $lastValidHeight)..."
} }
@ -473,6 +473,8 @@ class CompactBlockProcessor internal constructor(
} }
setState(State.Syncing) setState(State.Syncing)
val allBatchCount = getBatchCount(suggestedRangesResult.ranges.map { it.range }).toFloat()
var lastBatchOrder: Long = 0
// Parse and process ranges. If it recognizes a range with Priority.Verify, it runs the verification part. // Parse and process ranges. If it recognizes a range with Priority.Verify, it runs the verification part.
var verifyRangeResult = shouldVerifySuggestedScanRanges(suggestedRangesResult) var verifyRangeResult = shouldVerifySuggestedScanRanges(suggestedRangesResult)
@ -490,23 +492,26 @@ class CompactBlockProcessor internal constructor(
) )
var syncingResult: SyncingResult = SyncingResult.AllSuccess var syncingResult: SyncingResult = SyncingResult.AllSuccess
runSyncingAndEnhancing( runSyncingAndEnhancingOnRange(
backend = backend, backend = backend,
downloader = downloader, downloader = downloader,
repository = repository, repository = repository,
network = network, network = network,
syncRange = verifyRangeResult.scanRange.range.toClosedRange(), syncRange = verifyRangeResult.scanRange.range,
withDownload = true, withDownload = true,
enhanceStartHeight = firstUnenhancedHeight enhanceStartHeight = firstUnenhancedHeight,
).collect { syncProgress -> lastBatchOrder = lastBatchOrder
setProgress(syncProgress.percentage) ).collect { rangeSyncProgress ->
setProgress(PercentDecimal(rangeSyncProgress.overallOrder / allBatchCount))
// We need to update lastBatchOrder for the next ranges processing
lastBatchOrder = rangeSyncProgress.overallOrder
when (syncProgress.resultState) { when (rangeSyncProgress.resultState) {
SyncingResult.UpdateBirthday -> { SyncingResult.UpdateBirthday -> {
updateBirthdayHeight() updateBirthdayHeight()
} }
is SyncingResult.Failure -> { is SyncingResult.Failure -> {
syncingResult = syncProgress.resultState syncingResult = rangeSyncProgress.resultState
return@collect return@collect
} else -> { } else -> {
// Continue with processing // Continue with processing
@ -567,23 +572,25 @@ class CompactBlockProcessor internal constructor(
// TODO [#1145]: Sync Historic range in reverse order // TODO [#1145]: Sync Historic range in reverse order
// TODO [#1145]: https://github.com/zcash/zcash-android-wallet-sdk/issues/1145 // TODO [#1145]: https://github.com/zcash/zcash-android-wallet-sdk/issues/1145
var syncingResult: SyncingResult = SyncingResult.AllSuccess var syncingResult: SyncingResult = SyncingResult.AllSuccess
runSyncingAndEnhancing( runSyncingAndEnhancingOnRange(
backend = backend, backend = backend,
downloader = downloader, downloader = downloader,
repository = repository, repository = repository,
network = network, network = network,
syncRange = scanRange.range.toClosedRange(), syncRange = scanRange.range,
withDownload = true, withDownload = true,
enhanceStartHeight = firstUnenhancedHeight enhanceStartHeight = firstUnenhancedHeight,
).collect { syncProgress -> lastBatchOrder = lastBatchOrder
setProgress(syncProgress.percentage) ).collect { rangeSyncProgress ->
setProgress(PercentDecimal(rangeSyncProgress.overallOrder / allBatchCount))
lastBatchOrder = rangeSyncProgress.overallOrder
when (syncProgress.resultState) { when (rangeSyncProgress.resultState) {
SyncingResult.UpdateBirthday -> { SyncingResult.UpdateBirthday -> {
updateBirthdayHeight() updateBirthdayHeight()
} }
is SyncingResult.Failure -> { is SyncingResult.Failure -> {
syncingResult = syncProgress.resultState syncingResult = rangeSyncProgress.resultState
return@collect return@collect
} else -> { } else -> {
// Continue with processing // Continue with processing
@ -607,30 +614,33 @@ class CompactBlockProcessor internal constructor(
} }
@Suppress("ReturnCount") @Suppress("ReturnCount")
private suspend fun syncBlocksAndEnhanceTransactions( private suspend fun syncBlocksAndEnhanceTransactionsLinearly(
syncRange: ClosedRange<BlockHeight>, syncRange: ClosedRange<BlockHeight>,
withDownload: Boolean, withDownload: Boolean,
enhanceStartHeight: BlockHeight? enhanceStartHeight: BlockHeight?
): BlockProcessingResult { ): BlockProcessingResult {
// Syncing last blocks and enhancing transactions
var syncingResult: SyncingResult = SyncingResult.AllSuccess var syncingResult: SyncingResult = SyncingResult.AllSuccess
runSyncingAndEnhancing( val allBatchCount = getBatchCount(listOf(syncRange)).toFloat()
// Syncing last blocks and enhancing transactions
runSyncingAndEnhancingOnRange(
backend = backend, backend = backend,
downloader = downloader, downloader = downloader,
repository = repository, repository = repository,
network = network, network = network,
syncRange = syncRange, syncRange = syncRange,
withDownload = withDownload, withDownload = withDownload,
enhanceStartHeight = enhanceStartHeight enhanceStartHeight = enhanceStartHeight,
).collect { syncProgress -> lastBatchOrder = 0
setProgress(syncProgress.percentage) ).collect { rangeSyncProgress ->
setProgress(PercentDecimal(rangeSyncProgress.overallOrder / allBatchCount))
when (syncProgress.resultState) { when (rangeSyncProgress.resultState) {
SyncingResult.UpdateBirthday -> { SyncingResult.UpdateBirthday -> {
updateBirthdayHeight() updateBirthdayHeight()
} }
is SyncingResult.Failure -> { is SyncingResult.Failure -> {
syncingResult = syncProgress.resultState syncingResult = rangeSyncProgress.resultState
return@collect return@collect
} else -> { } else -> {
// Continue with processing // Continue with processing
@ -708,13 +718,13 @@ class CompactBlockProcessor internal constructor(
val syncRange = if (ranges == null) { val syncRange = if (ranges == null) {
lastSyncedHeight + 1..networkBlockHeight lastSyncedHeight + 1..networkBlockHeight
} else if (ranges.isNotEmpty()) { } else if (ranges.isNotEmpty()) {
var resultRange = ranges[0].range.start..ranges[0].range.endExclusive var resultRange = ranges[0].range.start..ranges[0].range.endInclusive
ranges.forEach { nextRange -> ranges.forEach { nextRange ->
if (nextRange.range.start < resultRange.start) { if (nextRange.range.start < resultRange.start) {
resultRange = nextRange.range.start..resultRange.endInclusive resultRange = nextRange.range.start..resultRange.endInclusive
} }
if (nextRange.range.endExclusive > resultRange.endInclusive) { if (nextRange.range.endInclusive > resultRange.endInclusive) {
resultRange = resultRange.start..nextRange.range.endExclusive resultRange = resultRange.start..nextRange.range.endInclusive
} }
} }
resultRange resultRange
@ -1064,7 +1074,7 @@ class CompactBlockProcessor internal constructor(
result = if (it.isEmpty()) { result = if (it.isEmpty()) {
GetSubtreeRootsResult.UseLinear GetSubtreeRootsResult.UseLinear
} else { } else {
GetSubtreeRootsResult.UseNonLinear(it) GetSubtreeRootsResult.UseSbS(it)
} }
} }
} }
@ -1135,7 +1145,7 @@ class CompactBlockProcessor internal constructor(
} }
/** /**
* Get the suggested scan ranges from the wallet database. * Get the suggested scan ranges from the wallet database via the rust layer.
* *
* @param backend Typesafe Rust backend * @param backend Typesafe Rust backend
* @param lastValidHeight The height to which rewind in case of any trouble * @param lastValidHeight The height to which rewind in case of any trouble
@ -1238,12 +1248,14 @@ class CompactBlockProcessor internal constructor(
* processed existing blocks * processed existing blocks
* @param enhanceStartHeight the height in which the enhancing should start, or null in case of no previous * @param enhanceStartHeight the height in which the enhancing should start, or null in case of no previous
* transaction enhancing done yet * transaction enhancing done yet
* @param lastBatchOrder is the order of the last processed batch. It comes from a previous range processing
* and is necessary for calculating cross ranges batch order of currently processing batches.
* @return Flow of BatchSyncProgress sync and enhancement results * @return Flow of [BatchSyncProgress] sync and enhancement results
*/ */
@VisibleForTesting @VisibleForTesting
@Suppress("LongParameterList", "LongMethod") @Suppress("LongParameterList", "LongMethod")
internal suspend fun runSyncingAndEnhancing( internal suspend fun runSyncingAndEnhancingOnRange(
backend: TypesafeBackend, backend: TypesafeBackend,
downloader: CompactBlockDownloader, downloader: CompactBlockDownloader,
repository: DerivedDataRepository, repository: DerivedDataRepository,
@ -1251,20 +1263,19 @@ class CompactBlockProcessor internal constructor(
syncRange: ClosedRange<BlockHeight>, syncRange: ClosedRange<BlockHeight>,
withDownload: Boolean, withDownload: Boolean,
enhanceStartHeight: BlockHeight?, enhanceStartHeight: BlockHeight?,
lastBatchOrder: Long
): Flow<BatchSyncProgress> = flow { ): Flow<BatchSyncProgress> = flow {
if (syncRange.isEmpty()) { if (syncRange.isEmpty()) {
Twig.debug { "No blocks to sync" } Twig.debug { "No blocks to sync" }
emit( emit(
BatchSyncProgress( BatchSyncProgress(
percentage = PercentDecimal.ONE_HUNDRED_PERCENT,
lastSyncedHeight = getLastScannedHeight(repository),
resultState = SyncingResult.AllSuccess resultState = SyncingResult.AllSuccess
) )
) )
} else { } else {
Twig.info { "Syncing blocks in range $syncRange" } Twig.info { "Syncing blocks in range $syncRange" }
val batches = getBatchedBlockList(syncRange, network) val batches = getBatchedBlockList(lastBatchOrder, syncRange, network)
// Check for the last enhanced height and eventually set is as the beginning of the next enhancing range // Check for the last enhanced height and eventually set is as the beginning of the next enhancing range
var enhancingRange = if (enhanceStartHeight != null) { var enhancingRange = if (enhanceStartHeight != null) {
@ -1333,8 +1344,8 @@ class CompactBlockProcessor internal constructor(
emit( emit(
BatchSyncProgress( BatchSyncProgress(
percentage = PercentDecimal(continuousResult.batch.order / batches.size.toFloat()), inRangeOrder = continuousResult.batch.inRangeOrder,
lastSyncedHeight = getLastScannedHeight(repository), overallOrder = continuousResult.batch.crossRangesOrder,
resultState = resultState resultState = resultState
) )
) )
@ -1342,11 +1353,11 @@ class CompactBlockProcessor internal constructor(
// Increment and compare the range for triggering the enhancing // Increment and compare the range for triggering the enhancing
enhancingRange = enhancingRange.start..continuousResult.batch.range.endInclusive enhancingRange = enhancingRange.start..continuousResult.batch.range.endInclusive
// Enhance is run in case of the range is on or over its limit, or in case of any failure // Enhancing is run in case of the range is on or over its limit, or in case of any failure
// state comes from the previous stages, or if the end of the sync range is reached // state comes from the previous stages, or if the end of the sync range is reached
if (enhancingRange.length() >= ENHANCE_BATCH_SIZE || if (enhancingRange.length() >= ENHANCE_BATCH_SIZE ||
resultState != SyncingResult.AllSuccess || resultState != SyncingResult.AllSuccess ||
continuousResult.batch.order == batches.size.toLong() continuousResult.batch.inRangeOrder == batches.size.toLong()
) { ) {
// Copy the range for use and reset for the next iteration // Copy the range for use and reset for the next iteration
val currentEnhancingRange = enhancingRange val currentEnhancingRange = enhancingRange
@ -1374,16 +1385,16 @@ class CompactBlockProcessor internal constructor(
} }
emit( emit(
BatchSyncProgress( BatchSyncProgress(
percentage = PercentDecimal(continuousResult.batch.order / batches.size.toFloat()), inRangeOrder = continuousResult.batch.inRangeOrder,
lastSyncedHeight = getLastScannedHeight(repository), overallOrder = continuousResult.batch.crossRangesOrder,
resultState = resultState resultState = resultState
) )
) )
} }
} }
Twig.debug { Twig.info {
"All sync stages done for the batch: ${continuousResult.batch} with result state: " + "All sync stages done for the batch ${continuousResult.batch.inRangeOrder}/${batches.size}:" +
"$resultState" " ${continuousResult.batch} with result state: $resultState"
} }
}.takeWhile { batchProcessResult -> }.takeWhile { batchProcessResult ->
batchProcessResult.stageResult == SyncingResult.DeleteSuccess || batchProcessResult.stageResult == SyncingResult.DeleteSuccess ||
@ -1392,20 +1403,51 @@ class CompactBlockProcessor internal constructor(
} }
} }
/**
* Returns count of batches of blocks across all ranges. It works the same when triggered from the Linear
* synchronization or from the SbS synchronization.
*
* @param syncRanges List of ranges of all blocks to process
*
* @return Count of all batches for processing
*/
private fun getBatchCount(syncRanges: List<ClosedRange<BlockHeight>>): Long {
var allRangesBatchCount = 0L
var allMissingBlocksCount = 0L
syncRanges.forEach { range ->
val missingBlockCount = range.endInclusive.value - range.start.value + 1
val batchCount = (
missingBlockCount / SYNC_BATCH_SIZE +
(if (missingBlockCount.rem(SYNC_BATCH_SIZE) == 0L) 0 else 1)
)
allMissingBlocksCount += missingBlockCount
allRangesBatchCount += batchCount
}
Twig.debug {
"Found $allMissingBlocksCount missing blocks, syncing in $allRangesBatchCount batches of " +
"$SYNC_BATCH_SIZE..."
}
return allRangesBatchCount
}
/**
* Prepare list of all [BlockBatch] internal objects to be processed during a range of
* blocks processing
*
* @param lastBatchOrder The index of the last previously processed batch
* @param syncRange Current range to be processed
* @param network The network we are operating on
*
* @return List of [BlockBatch] to for synchronization
*/
private fun getBatchedBlockList( private fun getBatchedBlockList(
lastBatchOrder: Long,
syncRange: ClosedRange<BlockHeight>, syncRange: ClosedRange<BlockHeight>,
network: ZcashNetwork network: ZcashNetwork
): List<BlockBatch> { ): List<BlockBatch> {
val missingBlockCount = syncRange.endInclusive.value - syncRange.start.value + 1 val batchCount = getBatchCount(listOf(syncRange))
val batchCount = (
missingBlockCount / SYNC_BATCH_SIZE +
(if (missingBlockCount.rem(SYNC_BATCH_SIZE) == 0L) 0 else 1)
)
Twig.debug {
"Found $missingBlockCount missing blocks, syncing in $batchCount batches of $SYNC_BATCH_SIZE..."
}
var start = syncRange.start var start = syncRange.start
return buildList { return buildList {
for (index in 1..batchCount) { for (index in 1..batchCount) {
@ -1417,7 +1459,13 @@ class CompactBlockProcessor internal constructor(
) )
) // subtract 1 on the first value because the range is inclusive ) // subtract 1 on the first value because the range is inclusive
add(BlockBatch(index, start..end)) add(
BlockBatch(
inRangeOrder = index,
crossRangesOrder = lastBatchOrder + index,
range = start..end
)
)
start = end + 1 start = end + 1
} }
} }
@ -1991,9 +2039,9 @@ class CompactBlockProcessor internal constructor(
* Progress model class for sharing the whole batch sync progress out of the sync process. * Progress model class for sharing the whole batch sync progress out of the sync process.
*/ */
internal data class BatchSyncProgress( internal data class BatchSyncProgress(
val percentage: PercentDecimal, val inRangeOrder: Long = 0,
val lastSyncedHeight: BlockHeight?, val overallOrder: Long = 0,
val resultState: SyncingResult val resultState: SyncingResult = SyncingResult.AllSuccess
) )
/** /**

View File

@ -3,9 +3,11 @@ package cash.z.ecc.android.sdk.internal.model
import cash.z.ecc.android.sdk.model.BlockHeight import cash.z.ecc.android.sdk.model.BlockHeight
internal data class BlockBatch( internal data class BlockBatch(
val order: Long, val inRangeOrder: Long,
val crossRangesOrder: Long,
val range: ClosedRange<BlockHeight>, val range: ClosedRange<BlockHeight>,
var blocks: List<JniBlockMeta>? = null var blocks: List<JniBlockMeta>? = null
) { ) {
override fun toString() = "BlockBatch(order=$order, range=$range, blocks=${blocks?.size ?: "null"})" override fun toString() = "BlockBatch(crossRangesOrder=$crossRangesOrder, inRangeOrder=$inRangeOrder, " +
"range=$range, blocks=${blocks?.size ?: "null"})"
} }

View File

@ -1,12 +1,12 @@
package cash.z.ecc.android.sdk.internal.model package cash.z.ecc.android.sdk.internal.model
import cash.z.ecc.android.sdk.internal.Twig import cash.z.ecc.android.sdk.internal.Twig
import cash.z.ecc.android.sdk.internal.ext.toClosedRange
import cash.z.ecc.android.sdk.model.BlockHeight import cash.z.ecc.android.sdk.model.BlockHeight
import cash.z.ecc.android.sdk.model.ZcashNetwork import cash.z.ecc.android.sdk.model.ZcashNetwork
@OptIn(ExperimentalStdlibApi::class)
internal data class ScanRange( internal data class ScanRange(
val range: OpenEndRange<BlockHeight>, val range: ClosedRange<BlockHeight>,
val priority: Long val priority: Long
) { ) {
override fun toString() = "ScanRange(range=$range, priority=${getSuggestScanRangePriority()})" override fun toString() = "ScanRange(range=$range, priority=${getSuggestScanRangePriority()})"
@ -18,9 +18,15 @@ internal data class ScanRange(
} }
companion object { companion object {
/**
* Note that this function also transforms the suggested ranges from [OpenEndRange] to [ClosedRange] so the
* rest of the logic can safely work with a unified range type.
*/
@OptIn(ExperimentalStdlibApi::class)
fun new(jni: JniScanRange, zcashNetwork: ZcashNetwork): ScanRange { fun new(jni: JniScanRange, zcashNetwork: ZcashNetwork): ScanRange {
return ScanRange( return ScanRange(
range = BlockHeight.new(zcashNetwork, jni.startHeight)..<BlockHeight.new(zcashNetwork, jni.endHeight), range = (BlockHeight.new(zcashNetwork, jni.startHeight)..<BlockHeight.new(zcashNetwork, jni.endHeight))
.toClosedRange(),
priority = jni.priority priority = jni.priority
) )
} }