[#1180] Frequent SbS synchronization restarting
* [#1180] Frequent SbS synchronization restarting - Processing blocks with SbS split into preparation and processing functions, which can be called repeatedly. - Refactored other parts of the synchronization mechanism - Closes #1180 - This also partly solves #1137 * Update .gitignore * Update LINCENSE documentation * [#1177] Checkpoints update * Fix Ktlint warning
This commit is contained in:
parent
f23aca38a6
commit
10a7aa7f3f
|
@ -51,6 +51,7 @@ captures/
|
||||||
.idea/workspace.xml
|
.idea/workspace.xml
|
||||||
.idea/protoeditor.xml
|
.idea/protoeditor.xml
|
||||||
.idea/appInsightsSettings.xml
|
.idea/appInsightsSettings.xml
|
||||||
|
.idea/migrations.xml
|
||||||
*.iml
|
*.iml
|
||||||
|
|
||||||
# Keystore files
|
# Keystore files
|
||||||
|
|
2
LICENSE
2
LICENSE
|
@ -1,6 +1,6 @@
|
||||||
The MIT License (MIT)
|
The MIT License (MIT)
|
||||||
|
|
||||||
Copyright (c) 2017-2021 Electric Coin Company
|
Copyright (c) 2017-2023 Electric Coin Company
|
||||||
|
|
||||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||||
of this software and associated documentation files (the "Software"), to deal
|
of this software and associated documentation files (the "Software"), to deal
|
||||||
|
|
|
@ -6,6 +6,7 @@ import cash.z.ecc.android.sdk.annotation.OpenForTesting
|
||||||
import cash.z.ecc.android.sdk.block.processor.model.BatchSyncProgress
|
import cash.z.ecc.android.sdk.block.processor.model.BatchSyncProgress
|
||||||
import cash.z.ecc.android.sdk.block.processor.model.GetSubtreeRootsResult
|
import cash.z.ecc.android.sdk.block.processor.model.GetSubtreeRootsResult
|
||||||
import cash.z.ecc.android.sdk.block.processor.model.PutSaplingSubtreeRootsResult
|
import cash.z.ecc.android.sdk.block.processor.model.PutSaplingSubtreeRootsResult
|
||||||
|
import cash.z.ecc.android.sdk.block.processor.model.SbSPreparationResult
|
||||||
import cash.z.ecc.android.sdk.block.processor.model.SuggestScanRangesResult
|
import cash.z.ecc.android.sdk.block.processor.model.SuggestScanRangesResult
|
||||||
import cash.z.ecc.android.sdk.block.processor.model.SyncStageResult
|
import cash.z.ecc.android.sdk.block.processor.model.SyncStageResult
|
||||||
import cash.z.ecc.android.sdk.block.processor.model.SyncingResult
|
import cash.z.ecc.android.sdk.block.processor.model.SyncingResult
|
||||||
|
@ -27,8 +28,8 @@ import cash.z.ecc.android.sdk.internal.block.CompactBlockDownloader
|
||||||
import cash.z.ecc.android.sdk.internal.ext.isNullOrEmpty
|
import cash.z.ecc.android.sdk.internal.ext.isNullOrEmpty
|
||||||
import cash.z.ecc.android.sdk.internal.ext.isScanContinuityError
|
import cash.z.ecc.android.sdk.internal.ext.isScanContinuityError
|
||||||
import cash.z.ecc.android.sdk.internal.ext.length
|
import cash.z.ecc.android.sdk.internal.ext.length
|
||||||
import cash.z.ecc.android.sdk.internal.ext.retryUpTo
|
|
||||||
import cash.z.ecc.android.sdk.internal.ext.retryUpToAndContinue
|
import cash.z.ecc.android.sdk.internal.ext.retryUpToAndContinue
|
||||||
|
import cash.z.ecc.android.sdk.internal.ext.retryUpToAndThrow
|
||||||
import cash.z.ecc.android.sdk.internal.ext.retryWithBackoff
|
import cash.z.ecc.android.sdk.internal.ext.retryWithBackoff
|
||||||
import cash.z.ecc.android.sdk.internal.ext.toHexReversed
|
import cash.z.ecc.android.sdk.internal.ext.toHexReversed
|
||||||
import cash.z.ecc.android.sdk.internal.model.BlockBatch
|
import cash.z.ecc.android.sdk.internal.model.BlockBatch
|
||||||
|
@ -75,6 +76,7 @@ import kotlin.math.max
|
||||||
import kotlin.math.min
|
import kotlin.math.min
|
||||||
import kotlin.time.Duration
|
import kotlin.time.Duration
|
||||||
import kotlin.time.Duration.Companion.days
|
import kotlin.time.Duration.Companion.days
|
||||||
|
import kotlin.time.Duration.Companion.minutes
|
||||||
import kotlin.time.DurationUnit
|
import kotlin.time.DurationUnit
|
||||||
import kotlin.time.toDuration
|
import kotlin.time.toDuration
|
||||||
|
|
||||||
|
@ -149,6 +151,20 @@ class CompactBlockProcessor internal constructor(
|
||||||
private val _networkHeight = MutableStateFlow<BlockHeight?>(null)
|
private val _networkHeight = MutableStateFlow<BlockHeight?>(null)
|
||||||
private val processingMutex = Mutex()
|
private val processingMutex = Mutex()
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The synchronization-related variable that holds the all batch count computed in the first initial synchronization
|
||||||
|
* loop. It is supposed to keep the same value across the synchronization refreshes with [runSbSSyncingPreparation]
|
||||||
|
* as happens in [SyncAlgorithm.SPEND_BEFORE_SYNC] synchronization function.
|
||||||
|
*/
|
||||||
|
private var allBatchCount: Long = 0
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Another synchronization-related variable that holds the order of a currently processing batch of blocks. It
|
||||||
|
* is supposed to preserve its value across the synchronization refreshes with [runSbSSyncingPreparation] as
|
||||||
|
* happens in [SyncAlgorithm.SPEND_BEFORE_SYNC] synchronization function.
|
||||||
|
*/
|
||||||
|
private var lastBatchOrder: Long = 0
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Flow of birthday heights. The birthday is essentially the first block that the wallet cares
|
* Flow of birthday heights. The birthday is essentially the first block that the wallet cares
|
||||||
* about. Any prior block can be ignored. This is not a fixed value because the height is
|
* about. Any prior block can be ignored. This is not a fixed value because the height is
|
||||||
|
@ -225,13 +241,28 @@ class CompactBlockProcessor internal constructor(
|
||||||
val result = processingMutex.withLockLogged("processNewBlocks") {
|
val result = processingMutex.withLockLogged("processNewBlocks") {
|
||||||
when (subTreeRootResult) {
|
when (subTreeRootResult) {
|
||||||
is GetSubtreeRootsResult.UseSbS -> {
|
is GetSubtreeRootsResult.UseSbS -> {
|
||||||
|
// Pass the commitment tree data to the database
|
||||||
|
when (
|
||||||
|
val result = putSaplingSubtreeRoots(
|
||||||
|
backend = backend,
|
||||||
|
startIndex = 0,
|
||||||
|
subTreeRootList = (subTreeRootResult as GetSubtreeRootsResult.UseSbS)
|
||||||
|
.subTreeRootList,
|
||||||
|
lastValidHeight = lowerBoundHeight
|
||||||
|
)
|
||||||
|
) {
|
||||||
|
PutSaplingSubtreeRootsResult.Success -> {
|
||||||
|
// Lets continue with the next step
|
||||||
|
}
|
||||||
|
is PutSaplingSubtreeRootsResult.Failure -> {
|
||||||
|
BlockProcessingResult.SyncFailure(result.failedAtHeight, result.exception)
|
||||||
|
}
|
||||||
|
}
|
||||||
processNewBlocksInSbSOrder(
|
processNewBlocksInSbSOrder(
|
||||||
backend = backend,
|
backend = backend,
|
||||||
downloader = downloader,
|
downloader = downloader,
|
||||||
repository = repository,
|
repository = repository,
|
||||||
network = network,
|
network = network,
|
||||||
subTreeRootList = (subTreeRootResult as GetSubtreeRootsResult.UseSbS)
|
|
||||||
.subTreeRootList,
|
|
||||||
lastValidHeight = lowerBoundHeight,
|
lastValidHeight = lowerBoundHeight,
|
||||||
firstUnenhancedHeight = _processorInfo.value.firstUnenhancedHeight
|
firstUnenhancedHeight = _processorInfo.value.firstUnenhancedHeight
|
||||||
)
|
)
|
||||||
|
@ -251,7 +282,8 @@ class CompactBlockProcessor internal constructor(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// immediately process again after failures in order to download new blocks right away
|
|
||||||
|
// Immediately process again after failures in order to download new blocks right away
|
||||||
when (result) {
|
when (result) {
|
||||||
BlockProcessingResult.Reconnecting -> {
|
BlockProcessingResult.Reconnecting -> {
|
||||||
setState(State.Disconnected)
|
setState(State.Disconnected)
|
||||||
|
@ -264,6 +296,10 @@ class CompactBlockProcessor internal constructor(
|
||||||
}
|
}
|
||||||
delay(napTime)
|
delay(napTime)
|
||||||
}
|
}
|
||||||
|
BlockProcessingResult.RestartSynchronization -> {
|
||||||
|
Twig.info { "Planned restarting of block synchronization..." }
|
||||||
|
// No nap time set to immediately continue with refreshed block synchronization
|
||||||
|
}
|
||||||
BlockProcessingResult.NoBlocksToProcess -> {
|
BlockProcessingResult.NoBlocksToProcess -> {
|
||||||
setState(State.Synced(_processorInfo.value.overallSyncRange))
|
setState(State.Synced(_processorInfo.value.overallSyncRange))
|
||||||
val noWorkDone = _processorInfo.value.overallSyncRange?.isEmpty() ?: true
|
val noWorkDone = _processorInfo.value.overallSyncRange?.isEmpty() ?: true
|
||||||
|
@ -371,93 +407,43 @@ class CompactBlockProcessor internal constructor(
|
||||||
downloader: CompactBlockDownloader,
|
downloader: CompactBlockDownloader,
|
||||||
repository: DerivedDataRepository,
|
repository: DerivedDataRepository,
|
||||||
network: ZcashNetwork,
|
network: ZcashNetwork,
|
||||||
subTreeRootList: List<SubtreeRoot>,
|
|
||||||
lastValidHeight: BlockHeight,
|
lastValidHeight: BlockHeight,
|
||||||
firstUnenhancedHeight: BlockHeight?
|
firstUnenhancedHeight: BlockHeight?
|
||||||
): BlockProcessingResult {
|
): BlockProcessingResult {
|
||||||
Twig.info {
|
Twig.info {
|
||||||
"Beginning to process new blocks with Spend-before-Sync approach (with roots: $subTreeRootList, and lower" +
|
"Beginning to process new blocks with Spend-before-Sync approach with lower bound: $lastValidHeight)..."
|
||||||
"bound: $lastValidHeight)..."
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Pass the commitment tree data to the database.
|
// This step covers these operations fetchLatestBlockHeight, updateChainTip, suggestScanRanges, updateRange,
|
||||||
when (
|
// and shouldVerifySuggestedScanRanges
|
||||||
val result =
|
val preparationResult = runSbSSyncingPreparation(
|
||||||
putSaplingSubtreeRoots(
|
|
||||||
backend = backend,
|
backend = backend,
|
||||||
startIndex = 0,
|
|
||||||
subTreeRootList = subTreeRootList,
|
|
||||||
lastValidHeight = lastValidHeight
|
|
||||||
)
|
|
||||||
) {
|
|
||||||
PutSaplingSubtreeRootsResult.Success -> { /* Lets continue to the next step */ }
|
|
||||||
is PutSaplingSubtreeRootsResult.Failure -> {
|
|
||||||
return BlockProcessingResult.SyncFailure(result.failedAtHeight, result.exception)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Download chain tip metadata from lightwalletd
|
|
||||||
val chainTip = fetchLatestBlockHeight(
|
|
||||||
downloader = downloader,
|
downloader = downloader,
|
||||||
network = network
|
network = network,
|
||||||
) ?: let {
|
|
||||||
Twig.warn { "Disconnection detected. Attempting to reconnect." }
|
|
||||||
return BlockProcessingResult.Reconnecting
|
|
||||||
}
|
|
||||||
|
|
||||||
// Notify the wallet of the updated chain tip
|
|
||||||
when (
|
|
||||||
val result =
|
|
||||||
updateChainTip(
|
|
||||||
backend = backend,
|
|
||||||
chainTip = chainTip,
|
|
||||||
lastValidHeight = lastValidHeight
|
lastValidHeight = lastValidHeight
|
||||||
)
|
)
|
||||||
) {
|
when (preparationResult) {
|
||||||
is UpdateChainTipResult.Success -> { /* Lets continue to the next step */ }
|
is SbSPreparationResult.ProcessFailure -> {
|
||||||
is UpdateChainTipResult.Failure -> {
|
return preparationResult.toBlockProcessingResult()
|
||||||
return BlockProcessingResult.SyncFailure(result.failedAtHeight, result.exception)
|
|
||||||
}
|
}
|
||||||
}
|
SbSPreparationResult.ConnectionFailure -> {
|
||||||
|
|
||||||
// Get the suggested scan ranges from the wallet database
|
|
||||||
var suggestedRangesResult = suggestScanRanges(
|
|
||||||
backend,
|
|
||||||
lastValidHeight
|
|
||||||
)
|
|
||||||
val updateRangeResult = when (suggestedRangesResult) {
|
|
||||||
is SuggestScanRangesResult.Success -> {
|
|
||||||
updateRange(suggestedRangesResult.ranges)
|
|
||||||
}
|
|
||||||
is SuggestScanRangesResult.Failure -> {
|
|
||||||
Twig.error {
|
|
||||||
"Process suggested scan ranges failure: " +
|
|
||||||
"${(suggestedRangesResult as SuggestScanRangesResult.Failure).exception}"
|
|
||||||
}
|
|
||||||
return BlockProcessingResult.SyncFailure(
|
|
||||||
suggestedRangesResult.failedAtHeight,
|
|
||||||
suggestedRangesResult.exception
|
|
||||||
)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!updateRangeResult) {
|
|
||||||
Twig.warn { "Disconnection detected. Attempting to reconnect." }
|
|
||||||
return BlockProcessingResult.Reconnecting
|
return BlockProcessingResult.Reconnecting
|
||||||
} else if (_processorInfo.value.overallSyncRange.isNullOrEmpty()) {
|
}
|
||||||
Twig.info { "No more blocks to process." }
|
SbSPreparationResult.NoMoreBlocksToProcess -> {
|
||||||
return BlockProcessingResult.NoBlocksToProcess
|
return BlockProcessingResult.NoBlocksToProcess
|
||||||
}
|
}
|
||||||
|
is SbSPreparationResult.Success -> {
|
||||||
|
Twig.info { "Preparation phase done with result: $preparationResult" }
|
||||||
|
// Continue processing ranges
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
setState(State.Syncing)
|
var verifyRangeResult = preparationResult.verifyRangeResult
|
||||||
val allBatchCount = getBatchCount(suggestedRangesResult.ranges.map { it.range }).toFloat()
|
var suggestedRangesResult = preparationResult.suggestedRangesResult
|
||||||
var lastBatchOrder: Long = 0
|
val allBatchCountLocal = preparationResult.allBatchCount
|
||||||
|
val lastPreparationTime = System.currentTimeMillis()
|
||||||
// Parse and process ranges. If it recognizes a range with Priority.Verify, it runs the verification part.
|
|
||||||
var verifyRangeResult = shouldVerifySuggestedScanRanges(suggestedRangesResult)
|
|
||||||
|
|
||||||
Twig.info { "Check for verification of ranges resulted with: $verifyRangeResult" }
|
|
||||||
|
|
||||||
|
// Running synchronization for the [ScanRange.SuggestScanRangePriority.Verify] range
|
||||||
while (verifyRangeResult is VerifySuggestedScanRange.ShouldVerify) {
|
while (verifyRangeResult is VerifySuggestedScanRange.ShouldVerify) {
|
||||||
Twig.info { "Starting verification of range: $verifyRangeResult" }
|
Twig.info { "Starting verification of range: $verifyRangeResult" }
|
||||||
|
|
||||||
|
@ -479,9 +465,11 @@ class CompactBlockProcessor internal constructor(
|
||||||
enhanceStartHeight = firstUnenhancedHeight,
|
enhanceStartHeight = firstUnenhancedHeight,
|
||||||
lastBatchOrder = lastBatchOrder
|
lastBatchOrder = lastBatchOrder
|
||||||
).collect { rangeSyncProgress ->
|
).collect { rangeSyncProgress ->
|
||||||
setProgress(PercentDecimal(rangeSyncProgress.overallOrder / allBatchCount))
|
// We need to update lastBatchOrder for the processing of the following range. It can occasionally
|
||||||
// We need to update lastBatchOrder for the next ranges processing
|
// be over the precomputed all-batch count in case of inter-syncing failure.
|
||||||
lastBatchOrder = rangeSyncProgress.overallOrder
|
lastBatchOrder = min(rangeSyncProgress.overallOrder, allBatchCountLocal)
|
||||||
|
|
||||||
|
setProgress(PercentDecimal(lastBatchOrder / allBatchCountLocal.toFloat()))
|
||||||
|
|
||||||
when (rangeSyncProgress.resultState) {
|
when (rangeSyncProgress.resultState) {
|
||||||
SyncingResult.UpdateBirthday -> {
|
SyncingResult.UpdateBirthday -> {
|
||||||
|
@ -496,8 +484,11 @@ class CompactBlockProcessor internal constructor(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (syncingResult != SyncingResult.AllSuccess) {
|
when (syncingResult) {
|
||||||
// Remove persisted but not scanned blocks in case of any failure
|
is SyncingResult.AllSuccess -> {
|
||||||
|
// Continue with processing the rest of the ranges
|
||||||
|
} else -> {
|
||||||
|
// An error came - remove persisted but not scanned blocks
|
||||||
val lastScannedHeight = getLastScannedHeight(repository)
|
val lastScannedHeight = getLastScannedHeight(repository)
|
||||||
downloader.rewindToHeight(lastScannedHeight)
|
downloader.rewindToHeight(lastScannedHeight)
|
||||||
deleteAllBlockFiles(
|
deleteAllBlockFiles(
|
||||||
|
@ -506,6 +497,7 @@ class CompactBlockProcessor internal constructor(
|
||||||
)
|
)
|
||||||
return (syncingResult as SyncingResult.Failure).toBlockProcessingResult()
|
return (syncingResult as SyncingResult.Failure).toBlockProcessingResult()
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Re-request suggested scan ranges
|
// Re-request suggested scan ranges
|
||||||
suggestedRangesResult = suggestScanRanges(backend, lowerBoundHeight)
|
suggestedRangesResult = suggestScanRanges(backend, lowerBoundHeight)
|
||||||
|
@ -514,10 +506,7 @@ class CompactBlockProcessor internal constructor(
|
||||||
verifyRangeResult = shouldVerifySuggestedScanRanges(suggestedRangesResult)
|
verifyRangeResult = shouldVerifySuggestedScanRanges(suggestedRangesResult)
|
||||||
}
|
}
|
||||||
is SuggestScanRangesResult.Failure -> {
|
is SuggestScanRangesResult.Failure -> {
|
||||||
Twig.error {
|
Twig.error { "Process suggested scan ranges failure: ${suggestedRangesResult.exception}" }
|
||||||
"Process suggested scan ranges failure: " +
|
|
||||||
"${(suggestedRangesResult as SuggestScanRangesResult.Failure).exception}"
|
|
||||||
}
|
|
||||||
return BlockProcessingResult.SyncFailure(
|
return BlockProcessingResult.SyncFailure(
|
||||||
suggestedRangesResult.failedAtHeight,
|
suggestedRangesResult.failedAtHeight,
|
||||||
suggestedRangesResult.exception
|
suggestedRangesResult.exception
|
||||||
|
@ -527,12 +516,6 @@ class CompactBlockProcessor internal constructor(
|
||||||
}
|
}
|
||||||
|
|
||||||
// Process the rest of ranges
|
// Process the rest of ranges
|
||||||
|
|
||||||
// Get the suggested scan ranges from the wallet database
|
|
||||||
suggestedRangesResult = suggestScanRanges(
|
|
||||||
backend,
|
|
||||||
lastValidHeight
|
|
||||||
)
|
|
||||||
val scanRanges = when (suggestedRangesResult) {
|
val scanRanges = when (suggestedRangesResult) {
|
||||||
is SuggestScanRangesResult.Success -> { suggestedRangesResult.ranges }
|
is SuggestScanRangesResult.Success -> { suggestedRangesResult.ranges }
|
||||||
is SuggestScanRangesResult.Failure -> {
|
is SuggestScanRangesResult.Failure -> {
|
||||||
|
@ -558,25 +541,50 @@ class CompactBlockProcessor internal constructor(
|
||||||
withDownload = true,
|
withDownload = true,
|
||||||
enhanceStartHeight = firstUnenhancedHeight,
|
enhanceStartHeight = firstUnenhancedHeight,
|
||||||
lastBatchOrder = lastBatchOrder
|
lastBatchOrder = lastBatchOrder
|
||||||
).collect { rangeSyncProgress ->
|
).map { rangeSyncProgress ->
|
||||||
setProgress(PercentDecimal(rangeSyncProgress.overallOrder / allBatchCount))
|
// We need to update lastBatchOrder for the processing of the following range. It can occasionally
|
||||||
lastBatchOrder = rangeSyncProgress.overallOrder
|
// be over the precomputed all-batch count in case of inter-syncing failure.
|
||||||
|
lastBatchOrder = min(rangeSyncProgress.overallOrder, allBatchCountLocal)
|
||||||
|
|
||||||
|
setProgress(PercentDecimal(lastBatchOrder / allBatchCountLocal.toFloat()))
|
||||||
|
|
||||||
when (rangeSyncProgress.resultState) {
|
when (rangeSyncProgress.resultState) {
|
||||||
SyncingResult.UpdateBirthday -> {
|
SyncingResult.UpdateBirthday -> {
|
||||||
updateBirthdayHeight()
|
updateBirthdayHeight()
|
||||||
|
SyncingResult.AllSuccess
|
||||||
}
|
}
|
||||||
is SyncingResult.Failure -> {
|
is SyncingResult.Failure -> {
|
||||||
syncingResult = rangeSyncProgress.resultState
|
rangeSyncProgress.resultState
|
||||||
return@collect
|
|
||||||
} else -> {
|
} else -> {
|
||||||
|
// First, check the time and refresh the prepare phase inputs, if needed
|
||||||
|
val currentTimeMillis = System.currentTimeMillis()
|
||||||
|
if (shouldRefreshPreparation(
|
||||||
|
lastPreparationTime,
|
||||||
|
currentTimeMillis,
|
||||||
|
SBS_SYNCHRONIZATION_RESTART_TIMEOUT
|
||||||
|
)
|
||||||
|
) {
|
||||||
|
SyncingResult.RestartSynchronization
|
||||||
|
} else {
|
||||||
// Continue with processing
|
// Continue with processing
|
||||||
|
SyncingResult.AllSuccess
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}.takeWhile {
|
||||||
|
syncingResult = it
|
||||||
|
it == SyncingResult.AllSuccess
|
||||||
|
}.collect()
|
||||||
|
|
||||||
if (syncingResult != SyncingResult.AllSuccess) {
|
when (syncingResult) {
|
||||||
// Remove persisted but not scanned blocks in case of any failure
|
is SyncingResult.AllSuccess -> {
|
||||||
|
// Continue with processing the rest of the ranges
|
||||||
|
}
|
||||||
|
is SyncingResult.RestartSynchronization -> {
|
||||||
|
// Restarting the synchronization process
|
||||||
|
return BlockProcessingResult.RestartSynchronization
|
||||||
|
} else -> {
|
||||||
|
// An error came - remove persisted but not scanned blocks
|
||||||
val lastScannedHeight = getLastScannedHeight(repository)
|
val lastScannedHeight = getLastScannedHeight(repository)
|
||||||
downloader.rewindToHeight(lastScannedHeight)
|
downloader.rewindToHeight(lastScannedHeight)
|
||||||
deleteAllBlockFiles(
|
deleteAllBlockFiles(
|
||||||
|
@ -586,10 +594,87 @@ class CompactBlockProcessor internal constructor(
|
||||||
return (syncingResult as SyncingResult.Failure).toBlockProcessingResult()
|
return (syncingResult as SyncingResult.Failure).toBlockProcessingResult()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
return BlockProcessingResult.Success
|
return BlockProcessingResult.Success
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Suppress("ReturnCount")
|
||||||
|
internal suspend fun runSbSSyncingPreparation(
|
||||||
|
backend: TypesafeBackend,
|
||||||
|
downloader: CompactBlockDownloader,
|
||||||
|
network: ZcashNetwork,
|
||||||
|
lastValidHeight: BlockHeight
|
||||||
|
): SbSPreparationResult {
|
||||||
|
// Download chain tip metadata from lightwalletd
|
||||||
|
val chainTip = fetchLatestBlockHeight(
|
||||||
|
downloader = downloader,
|
||||||
|
network = network
|
||||||
|
) ?: let {
|
||||||
|
Twig.warn { "Disconnection detected. Attempting to reconnect." }
|
||||||
|
return SbSPreparationResult.ConnectionFailure
|
||||||
|
}
|
||||||
|
|
||||||
|
// Notify the underlying rust layer about the updated chain tip
|
||||||
|
when (
|
||||||
|
val result =
|
||||||
|
updateChainTip(
|
||||||
|
backend = backend,
|
||||||
|
chainTip = chainTip,
|
||||||
|
lastValidHeight = lastValidHeight
|
||||||
|
)
|
||||||
|
) {
|
||||||
|
is UpdateChainTipResult.Success -> { /* Lets continue to the next step */ }
|
||||||
|
is UpdateChainTipResult.Failure -> {
|
||||||
|
return SbSPreparationResult.ProcessFailure(
|
||||||
|
result.failedAtHeight,
|
||||||
|
result.exception
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get the suggested scan ranges from the wallet database
|
||||||
|
val suggestedRangesResult = suggestScanRanges(
|
||||||
|
backend,
|
||||||
|
lastValidHeight
|
||||||
|
)
|
||||||
|
val updateRangeResult = when (suggestedRangesResult) {
|
||||||
|
is SuggestScanRangesResult.Success -> {
|
||||||
|
updateRange(suggestedRangesResult.ranges)
|
||||||
|
}
|
||||||
|
is SuggestScanRangesResult.Failure -> {
|
||||||
|
Twig.error { "Process suggested scan ranges failure: ${suggestedRangesResult.exception}" }
|
||||||
|
return SbSPreparationResult.ProcessFailure(
|
||||||
|
suggestedRangesResult.failedAtHeight,
|
||||||
|
suggestedRangesResult.exception
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!updateRangeResult) {
|
||||||
|
Twig.warn { "Disconnection detected. Attempting to reconnect." }
|
||||||
|
return SbSPreparationResult.ConnectionFailure
|
||||||
|
} else if (_processorInfo.value.overallSyncRange.isNullOrEmpty()) {
|
||||||
|
Twig.info { "No more blocks to process." }
|
||||||
|
return SbSPreparationResult.NoMoreBlocksToProcess
|
||||||
|
}
|
||||||
|
|
||||||
|
setState(State.Syncing)
|
||||||
|
allBatchCount = max(allBatchCount, getBatchCount(suggestedRangesResult.ranges.map { it.range }))
|
||||||
|
lastBatchOrder = max(lastBatchOrder, 0)
|
||||||
|
|
||||||
|
// Parse and process ranges. If it recognizes a range with Priority.Verify, it runs the verification part.
|
||||||
|
val verifyRangeResult = shouldVerifySuggestedScanRanges(suggestedRangesResult)
|
||||||
|
|
||||||
|
Twig.info { "Check for verification of ranges resulted with: $verifyRangeResult" }
|
||||||
|
|
||||||
|
return SbSPreparationResult.Success(
|
||||||
|
suggestedRangesResult = suggestedRangesResult,
|
||||||
|
verifyRangeResult = verifyRangeResult,
|
||||||
|
allBatchCount = allBatchCount,
|
||||||
|
lastBatchOrder = lastBatchOrder
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
@Suppress("ReturnCount")
|
@Suppress("ReturnCount")
|
||||||
private suspend fun syncBlocksAndEnhanceTransactionsLinearly(
|
private suspend fun syncBlocksAndEnhanceTransactionsLinearly(
|
||||||
syncRange: ClosedRange<BlockHeight>,
|
syncRange: ClosedRange<BlockHeight>,
|
||||||
|
@ -597,7 +682,7 @@ class CompactBlockProcessor internal constructor(
|
||||||
enhanceStartHeight: BlockHeight?
|
enhanceStartHeight: BlockHeight?
|
||||||
): BlockProcessingResult {
|
): BlockProcessingResult {
|
||||||
var syncingResult: SyncingResult = SyncingResult.AllSuccess
|
var syncingResult: SyncingResult = SyncingResult.AllSuccess
|
||||||
val allBatchCount = getBatchCount(listOf(syncRange)).toFloat()
|
allBatchCount = getBatchCount(listOf(syncRange))
|
||||||
|
|
||||||
// Syncing last blocks and enhancing transactions
|
// Syncing last blocks and enhancing transactions
|
||||||
runSyncingAndEnhancingOnRange(
|
runSyncingAndEnhancingOnRange(
|
||||||
|
@ -610,7 +695,7 @@ class CompactBlockProcessor internal constructor(
|
||||||
enhanceStartHeight = enhanceStartHeight,
|
enhanceStartHeight = enhanceStartHeight,
|
||||||
lastBatchOrder = 0
|
lastBatchOrder = 0
|
||||||
).collect { rangeSyncProgress ->
|
).collect { rangeSyncProgress ->
|
||||||
setProgress(PercentDecimal(rangeSyncProgress.overallOrder / allBatchCount))
|
setProgress(PercentDecimal(rangeSyncProgress.overallOrder / allBatchCount.toFloat()))
|
||||||
|
|
||||||
when (rangeSyncProgress.resultState) {
|
when (rangeSyncProgress.resultState) {
|
||||||
SyncingResult.UpdateBirthday -> {
|
SyncingResult.UpdateBirthday -> {
|
||||||
|
@ -643,6 +728,7 @@ class CompactBlockProcessor internal constructor(
|
||||||
object NoBlocksToProcess : BlockProcessingResult()
|
object NoBlocksToProcess : BlockProcessingResult()
|
||||||
object Success : BlockProcessingResult()
|
object Success : BlockProcessingResult()
|
||||||
object Reconnecting : BlockProcessingResult()
|
object Reconnecting : BlockProcessingResult()
|
||||||
|
object RestartSynchronization : BlockProcessingResult()
|
||||||
data class SyncFailure(val failedAtHeight: BlockHeight, val error: Throwable) : BlockProcessingResult()
|
data class SyncFailure(val failedAtHeight: BlockHeight, val error: Throwable) : BlockProcessingResult()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -657,7 +743,6 @@ class CompactBlockProcessor internal constructor(
|
||||||
*
|
*
|
||||||
* @return true when the update succeeds.
|
* @return true when the update succeeds.
|
||||||
*/
|
*/
|
||||||
@OptIn(ExperimentalStdlibApi::class)
|
|
||||||
private suspend fun updateRange(ranges: List<ScanRange>?): Boolean {
|
private suspend fun updateRange(ranges: List<ScanRange>?): Boolean {
|
||||||
// This fetches the latest height each time this method is called, which can be very inefficient
|
// This fetches the latest height each time this method is called, which can be very inefficient
|
||||||
// when downloading all of the blocks from the server
|
// when downloading all of the blocks from the server
|
||||||
|
@ -804,7 +889,7 @@ class CompactBlockProcessor internal constructor(
|
||||||
if (failedUtxoFetches < 9) { // there are 3 attempts per block
|
if (failedUtxoFetches < 9) { // there are 3 attempts per block
|
||||||
@Suppress("TooGenericExceptionCaught")
|
@Suppress("TooGenericExceptionCaught")
|
||||||
try {
|
try {
|
||||||
retryUpTo(UTXO_FETCH_RETRIES) {
|
retryUpToAndThrow(UTXO_FETCH_RETRIES) {
|
||||||
val tAddresses = backend.listTransparentReceivers(account)
|
val tAddresses = backend.listTransparentReceivers(account)
|
||||||
|
|
||||||
downloader.fetchUtxos(
|
downloader.fetchUtxos(
|
||||||
|
@ -952,6 +1037,24 @@ class CompactBlockProcessor internal constructor(
|
||||||
*/
|
*/
|
||||||
internal const val REWIND_DISTANCE = 10
|
internal const val REWIND_DISTANCE = 10
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Limit millis value for restarting currently running block synchronization that runs under the
|
||||||
|
* [SyncAlgorithm.SPEND_BEFORE_SYNC] synchronization.
|
||||||
|
*/
|
||||||
|
internal val SBS_SYNCHRONIZATION_RESTART_TIMEOUT = 10.minutes.inWholeMilliseconds
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check for the next restart of the block synchronization preparation phase. This function is only SbS
|
||||||
|
* synchronization algorithm-related.
|
||||||
|
*/
|
||||||
|
internal fun shouldRefreshPreparation(
|
||||||
|
lastPreparationTime: Long,
|
||||||
|
currentTimeMillis: Long,
|
||||||
|
limitTime: Long
|
||||||
|
): Boolean {
|
||||||
|
return (currentTimeMillis - lastPreparationTime) >= limitTime
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This operation fetches and returns the latest block height (chain tip)
|
* This operation fetches and returns the latest block height (chain tip)
|
||||||
*
|
*
|
||||||
|
@ -1173,6 +1276,8 @@ class CompactBlockProcessor internal constructor(
|
||||||
/**
|
/**
|
||||||
* Requests, processes and persists all blocks from the given range.
|
* Requests, processes and persists all blocks from the given range.
|
||||||
*
|
*
|
||||||
|
* Works the same for both [SyncAlgorithm.LINEAR] and [SyncAlgorithm.SPEND_BEFORE_SYNC] algorithms.
|
||||||
|
*
|
||||||
* @param backend the Rust backend component
|
* @param backend the Rust backend component
|
||||||
* @param downloader the compact block downloader component
|
* @param downloader the compact block downloader component
|
||||||
* @param repository the derived data repository component
|
* @param repository the derived data repository component
|
||||||
|
@ -1302,10 +1407,10 @@ class CompactBlockProcessor internal constructor(
|
||||||
backend = backend,
|
backend = backend,
|
||||||
downloader = downloader
|
downloader = downloader
|
||||||
).collect { enhancingResult ->
|
).collect { enhancingResult ->
|
||||||
Twig.debug { "Enhancing result: $enhancingResult" }
|
Twig.info { "Enhancing result: $enhancingResult" }
|
||||||
resultState = when (enhancingResult) {
|
resultState = when (enhancingResult) {
|
||||||
is SyncingResult.UpdateBirthday -> {
|
is SyncingResult.UpdateBirthday -> {
|
||||||
Twig.debug { "Birthday height update reporting" }
|
Twig.info { "Birthday height update reporting" }
|
||||||
enhancingResult
|
enhancingResult
|
||||||
}
|
}
|
||||||
is SyncingResult.EnhanceFailed -> {
|
is SyncingResult.EnhanceFailed -> {
|
||||||
|
@ -1594,7 +1699,7 @@ class CompactBlockProcessor internal constructor(
|
||||||
downloader: CompactBlockDownloader
|
downloader: CompactBlockDownloader
|
||||||
): ByteArray {
|
): ByteArray {
|
||||||
var transactionDataResult: ByteArray? = null
|
var transactionDataResult: ByteArray? = null
|
||||||
retryUpTo(TRANSACTION_FETCH_RETRIES) { failedAttempts ->
|
retryUpToAndThrow(TRANSACTION_FETCH_RETRIES) { failedAttempts ->
|
||||||
if (failedAttempts == 0) {
|
if (failedAttempts == 0) {
|
||||||
Twig.debug { "Starting to fetch transaction (id:$id, block:$minedHeight)" }
|
Twig.debug { "Starting to fetch transaction (id:$id, block:$minedHeight)" }
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -0,0 +1,28 @@
|
||||||
|
package cash.z.ecc.android.sdk.block.processor.model
|
||||||
|
|
||||||
|
import cash.z.ecc.android.sdk.block.processor.CompactBlockProcessor
|
||||||
|
import cash.z.ecc.android.sdk.model.BlockHeight
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Internal class for sharing pre-synchronization steps result.
|
||||||
|
*/
|
||||||
|
internal sealed class SbSPreparationResult {
|
||||||
|
object ConnectionFailure : SbSPreparationResult()
|
||||||
|
data class ProcessFailure(
|
||||||
|
val failedAtHeight: BlockHeight,
|
||||||
|
val exception: Throwable
|
||||||
|
) : SbSPreparationResult() {
|
||||||
|
fun toBlockProcessingResult(): CompactBlockProcessor.BlockProcessingResult =
|
||||||
|
CompactBlockProcessor.BlockProcessingResult.SyncFailure(
|
||||||
|
this.failedAtHeight,
|
||||||
|
this.exception
|
||||||
|
)
|
||||||
|
}
|
||||||
|
data class Success(
|
||||||
|
val suggestedRangesResult: SuggestScanRangesResult,
|
||||||
|
val verifyRangeResult: VerifySuggestedScanRange,
|
||||||
|
val allBatchCount: Long,
|
||||||
|
val lastBatchOrder: Long
|
||||||
|
) : SbSPreparationResult()
|
||||||
|
object NoMoreBlocksToProcess : SbSPreparationResult()
|
||||||
|
}
|
|
@ -9,11 +9,12 @@ import cash.z.ecc.android.sdk.model.BlockHeight
|
||||||
* Internal class for the overall synchronization process result reporting.
|
* Internal class for the overall synchronization process result reporting.
|
||||||
*/
|
*/
|
||||||
internal sealed class SyncingResult {
|
internal sealed class SyncingResult {
|
||||||
|
override fun toString(): String = this::class.java.simpleName
|
||||||
|
|
||||||
object AllSuccess : SyncingResult()
|
object AllSuccess : SyncingResult()
|
||||||
|
object RestartSynchronization : SyncingResult()
|
||||||
data class DownloadSuccess(val downloadedBlocks: List<JniBlockMeta>?) : SyncingResult() {
|
data class DownloadSuccess(val downloadedBlocks: List<JniBlockMeta>?) : SyncingResult() {
|
||||||
override fun toString(): String {
|
override fun toString() = "${this::class.java.simpleName} with ${downloadedBlocks?.size ?: "none"} blocks"
|
||||||
return "DownloadSuccess with ${downloadedBlocks?.size ?: "none"} blocks"
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
interface Failure {
|
interface Failure {
|
||||||
val failedAtHeight: BlockHeight
|
val failedAtHeight: BlockHeight
|
||||||
|
|
|
@ -2,7 +2,7 @@ package cash.z.ecc.android.sdk.internal.block
|
||||||
|
|
||||||
import cash.z.ecc.android.sdk.exception.LightWalletException
|
import cash.z.ecc.android.sdk.exception.LightWalletException
|
||||||
import cash.z.ecc.android.sdk.internal.Twig
|
import cash.z.ecc.android.sdk.internal.Twig
|
||||||
import cash.z.ecc.android.sdk.internal.ext.retryUpTo
|
import cash.z.ecc.android.sdk.internal.ext.retryUpToAndThrow
|
||||||
import cash.z.ecc.android.sdk.internal.model.JniBlockMeta
|
import cash.z.ecc.android.sdk.internal.model.JniBlockMeta
|
||||||
import cash.z.ecc.android.sdk.internal.model.ext.from
|
import cash.z.ecc.android.sdk.internal.model.ext.from
|
||||||
import cash.z.ecc.android.sdk.internal.repository.CompactBlockRepository
|
import cash.z.ecc.android.sdk.internal.repository.CompactBlockRepository
|
||||||
|
@ -111,7 +111,7 @@ open class CompactBlockDownloader private constructor(val compactBlockRepository
|
||||||
compactBlockRepository.getLatestHeight()
|
compactBlockRepository.getLatestHeight()
|
||||||
|
|
||||||
suspend fun getServerInfo(): LightWalletEndpointInfoUnsafe? = withContext(IO) {
|
suspend fun getServerInfo(): LightWalletEndpointInfoUnsafe? = withContext(IO) {
|
||||||
retryUpTo(GET_SERVER_INFO_RETRIES) {
|
retryUpToAndThrow(GET_SERVER_INFO_RETRIES) {
|
||||||
when (val response = lightWalletClient.getServerInfo()) {
|
when (val response = lightWalletClient.getServerInfo()) {
|
||||||
is Response.Success -> return@withContext response.result
|
is Response.Success -> return@withContext response.result
|
||||||
else -> {
|
else -> {
|
||||||
|
|
|
@ -17,7 +17,7 @@ import kotlin.random.Random
|
||||||
* @param block the code to execute, which will be wrapped in a try/catch and retried whenever an
|
* @param block the code to execute, which will be wrapped in a try/catch and retried whenever an
|
||||||
* exception is thrown up to [retries] attempts.
|
* exception is thrown up to [retries] attempts.
|
||||||
*/
|
*/
|
||||||
suspend inline fun retryUpTo(
|
suspend inline fun retryUpToAndThrow(
|
||||||
retries: Int,
|
retries: Int,
|
||||||
exceptionWrapper: (Throwable) -> Throwable = { it },
|
exceptionWrapper: (Throwable) -> Throwable = { it },
|
||||||
initialDelayMillis: Long = 500L,
|
initialDelayMillis: Long = 500L,
|
||||||
|
|
|
@ -0,0 +1,30 @@
|
||||||
|
package cash.z.ecc.android.sdk.block.processor
|
||||||
|
|
||||||
|
import kotlin.test.Test
|
||||||
|
import kotlin.test.assertFalse
|
||||||
|
import kotlin.test.assertTrue
|
||||||
|
|
||||||
|
class CompactBlockProcessorTest {
|
||||||
|
|
||||||
|
@Test
|
||||||
|
fun should_refresh_preparation_test() {
|
||||||
|
assertTrue {
|
||||||
|
CompactBlockProcessor.shouldRefreshPreparation(
|
||||||
|
lastPreparationTime = CompactBlockProcessor.SBS_SYNCHRONIZATION_RESTART_TIMEOUT,
|
||||||
|
currentTimeMillis = CompactBlockProcessor.SBS_SYNCHRONIZATION_RESTART_TIMEOUT * 2,
|
||||||
|
limitTime = CompactBlockProcessor.SBS_SYNCHRONIZATION_RESTART_TIMEOUT
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
fun should_not_refresh_preparation_test() {
|
||||||
|
assertFalse {
|
||||||
|
CompactBlockProcessor.shouldRefreshPreparation(
|
||||||
|
lastPreparationTime = CompactBlockProcessor.SBS_SYNCHRONIZATION_RESTART_TIMEOUT,
|
||||||
|
currentTimeMillis = CompactBlockProcessor.SBS_SYNCHRONIZATION_RESTART_TIMEOUT,
|
||||||
|
limitTime = CompactBlockProcessor.SBS_SYNCHRONIZATION_RESTART_TIMEOUT
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue