[#1013] Sync stages parallelization

* [#1013] Sync blockchain sub-phases parallelization

- Remove the unnecessary comment after the latest changes in this code fragment

* Move solely CompactBlockProcessor-related constants

* Simplify sync range update construction

* [#1013] Sync blockchain sub-phases parallelization

* Changelog update

* Block files deletion documentation update

* Leverage buildList API

* CompactBlockRepository documentation update

* Move BlockBatch to internal models
This commit is contained in:
Honza Rychnovsky 2023-05-10 12:45:23 +02:00 committed by GitHub
parent e5c7e4b6c8
commit dfaa827fd1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 358 additions and 244 deletions

View File

@ -1,6 +1,18 @@
Change Log
==========
## Unreleased
- The SDK's `CompactBlockProcessor` switched from processing **all blocks in one run** mechanism to **batched blocks**
processing. This was necessary for the sync state's parallelization. Example of syncing of the latest
100 blocks:
- Previously: _Download 100 blocks -> Validate 100 blocks -> Scan 100 blocks -> SYNCED_
- Now: _10x (Download 10 blocks -> Validate 10 blocks -> Scan 10 blocks) -> SYNCED_
- `Synchronizer.progress` now returns `Flow<PercentDecimal>` instead of `Flow<Int>`. PercentDecimal is a type-safe
model.
Use `PercentDecimal.toPercentage()` to get a number within 0-100% scale.
- `Synchronizer.status` now provides a new `SYNCING` state, which covers all three previous `DOWNLOADING`,
`VALIDATING`, and `SCANNING` states, which were eliminated in favor of `SYNCING` state.
## 1.17.0-beta01
- Synchronizer APIs for listing sent and received transactions have been removed.
- Synchronizer APIs for listing pending transactions have been removed, along with the `PendingTransaction` object.

View File

@ -21,6 +21,7 @@ import cash.z.ecc.android.sdk.ext.ZcashSdk
import cash.z.ecc.android.sdk.ext.convertZatoshiToZecString
import cash.z.ecc.android.sdk.internal.Twig
import cash.z.ecc.android.sdk.model.Account
import cash.z.ecc.android.sdk.model.PercentDecimal
import cash.z.ecc.android.sdk.model.WalletBalance
import cash.z.ecc.android.sdk.model.Zatoshi
import cash.z.ecc.android.sdk.model.ZcashNetwork
@ -182,9 +183,9 @@ class GetBalanceFragment : BaseDemoFragment<FragmentGetBalanceBinding>() {
}
@Suppress("MagicNumber")
private fun onProgress(i: Int) {
if (i < 100) {
binding.textStatus.text = "Syncing blocks...$i%"
private fun onProgress(percent: PercentDecimal) {
if (percent.isLessThanHundredPercent()) {
binding.textStatus.text = "Syncing blocks...${percent.toPercentage()}%"
}
}

View File

@ -14,6 +14,7 @@ import cash.z.ecc.android.sdk.demoapp.BaseDemoFragment
import cash.z.ecc.android.sdk.demoapp.R
import cash.z.ecc.android.sdk.demoapp.databinding.FragmentListTransactionsBinding
import cash.z.ecc.android.sdk.internal.Twig
import cash.z.ecc.android.sdk.model.PercentDecimal
import cash.z.ecc.android.sdk.model.TransactionOverview
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.flow.filterNotNull
@ -81,8 +82,8 @@ class ListTransactionsFragment : BaseDemoFragment<FragmentListTransactionsBindin
}
@Suppress("MagicNumber")
private fun onProgress(i: Int) {
if (i < 100) binding.textInfo.text = "Syncing blocks...$i%"
private fun onProgress(percent: PercentDecimal) {
if (percent.isLessThanHundredPercent()) binding.textInfo.text = "Syncing blocks...${percent.toPercentage()}%"
}
private fun onStatus(status: Synchronizer.Status) {

View File

@ -19,6 +19,7 @@ import cash.z.ecc.android.sdk.demoapp.util.mainActivity
import cash.z.ecc.android.sdk.internal.Twig
import cash.z.ecc.android.sdk.model.Account
import cash.z.ecc.android.sdk.model.BlockHeight
import cash.z.ecc.android.sdk.model.PercentDecimal
import cash.z.ecc.android.sdk.model.TransactionOverview
import cash.z.ecc.android.sdk.model.ZcashNetwork
import kotlinx.coroutines.Dispatchers
@ -202,8 +203,8 @@ class ListUtxosFragment : BaseDemoFragment<FragmentListUtxosBinding>() {
}
@Suppress("MagicNumber")
private fun onProgress(i: Int) {
if (i < 100) binding.textStatus.text = "Syncing blocks...$i%"
private fun onProgress(percent: PercentDecimal) {
if (percent.isLessThanHundredPercent()) binding.textStatus.text = "Syncing blocks...${percent.toPercentage()}%"
}
private fun onStatus(status: Synchronizer.Status) {

View File

@ -19,6 +19,7 @@ import cash.z.ecc.android.sdk.demoapp.util.mainActivity
import cash.z.ecc.android.sdk.ext.convertZatoshiToZecString
import cash.z.ecc.android.sdk.ext.convertZecToZatoshi
import cash.z.ecc.android.sdk.ext.toZecString
import cash.z.ecc.android.sdk.model.PercentDecimal
import cash.z.ecc.android.sdk.model.UnifiedSpendingKey
import cash.z.ecc.android.sdk.model.WalletBalance
import kotlinx.coroutines.ExperimentalCoroutinesApi
@ -125,9 +126,9 @@ class SendFragment : BaseDemoFragment<FragmentSendBinding>() {
}
@Suppress("MagicNumber")
private fun onProgress(i: Int) {
if (i < 100) {
binding.textStatus.text = "Syncing blocks...$i%"
private fun onProgress(percent: PercentDecimal) {
if (percent.isLessThanHundredPercent()) {
binding.textStatus.text = "Syncing blocks...${percent.toPercentage()}%"
binding.textBalance.visibility = View.INVISIBLE
} else {
binding.textBalance.visibility = View.VISIBLE

View File

@ -337,13 +337,7 @@ private fun Synchronizer.toWalletSnapshot() =
val orchardBalance = flows[2] as WalletBalance?
val saplingBalance = flows[3] as WalletBalance?
val transparentBalance = flows[4] as WalletBalance?
val progressPercentDecimal = (flows[5] as Int).let { value ->
if (value > PercentDecimal.MAX || value < PercentDecimal.MIN) {
PercentDecimal.ZERO_PERCENT
}
PercentDecimal(value / 100f)
}
val progressPercentDecimal = (flows[5] as PercentDecimal)
WalletSnapshot(
flows[0] as Synchronizer.Status,

View File

@ -1,19 +0,0 @@
package cash.z.ecc.android.sdk.model
import androidx.test.filters.SmallTest
import org.junit.Test
class PercentDecimalTest {
@Test(expected = IllegalArgumentException::class)
@SmallTest
fun require_greater_than_zero() {
PercentDecimal(-1.0f)
}
@Test(expected = IllegalArgumentException::class)
@SmallTest
fun require_less_than_one() {
PercentDecimal(1.5f)
}
}

View File

@ -108,9 +108,9 @@ class FileCompactBlockRepositoryTest {
assertTrue { rustBackend.metadata.isEmpty() }
val blocks = ListOfCompactBlocksFixture.newFlow()
val persistedCount = blockRepository.write(blocks)
val persistedBlocks = blockRepository.write(blocks)
assertEquals(blocks.count(), persistedCount)
assertEquals(blocks.count(), persistedBlocks.size)
assertEquals(blocks.count(), rustBackend.metadata.size)
}
@ -128,9 +128,9 @@ class FileCompactBlockRepositoryTest {
assertTrue { reduced.count() < FileCompactBlockRepository.BLOCKS_METADATA_BUFFER_SIZE }
}
val persistedCount = blockRepository.write(reducedBlocksList)
val persistedBlocks = blockRepository.write(reducedBlocksList)
assertEquals(reducedBlocksList.count(), persistedCount)
assertEquals(reducedBlocksList.count(), persistedBlocks.size)
assertEquals(reducedBlocksList.count(), rustBackend.metadata.size)
}
@ -146,10 +146,10 @@ class FileCompactBlockRepositoryTest {
val blocks = ListOfCompactBlocksFixture.newFlow()
val persistedCount = blockRepository.write(blocks)
val persistedBlocks = blockRepository.write(blocks)
assertTrue { rootBlocksDirectory.exists() }
assertEquals(blocks.count(), persistedCount)
assertEquals(blocks.count(), persistedBlocks.size)
assertEquals(blocks.count(), rootBlocksDirectory.list()!!.size)
}
@ -165,7 +165,7 @@ class FileCompactBlockRepositoryTest {
val testedBlocksRange = ListOfCompactBlocksFixture.DEFAULT_FILE_BLOCK_RANGE
val blocks = ListOfCompactBlocksFixture.newFlow(testedBlocksRange)
val persistedCount = blockRepository.write(blocks)
val persistedBlocks = blockRepository.write(blocks)
parentDirectory.also {
assertTrue(it.existsSuspend())
@ -174,10 +174,10 @@ class FileCompactBlockRepositoryTest {
blocksDirectory.also {
assertTrue(it.existsSuspend())
assertEquals(blocks.count(), persistedCount)
assertEquals(blocks.count(), persistedBlocks.size)
}
blockRepository.deleteCompactBlockFiles()
blockRepository.deleteAllCompactBlockFiles()
parentDirectory.also {
assertTrue(it.existsSuspend())

View File

@ -0,0 +1,38 @@
package cash.z.ecc.android.sdk.model
import androidx.test.filters.SmallTest
import org.junit.Test
import kotlin.test.assertTrue
class PercentDecimalTest {
@Test(expected = IllegalArgumentException::class)
@SmallTest
fun require_greater_than_zero() {
PercentDecimal(-1.0f)
}
@Test(expected = IllegalArgumentException::class)
@SmallTest
fun require_less_than_one() {
PercentDecimal(1.5f)
}
@SmallTest
@Test
fun is_less_than_hundred_percent_test() {
assertTrue(PercentDecimal(0.5f).isLessThanHundredPercent())
}
@SmallTest
@Test
fun is_more_than_zero_percent_test() {
assertTrue(PercentDecimal(0.5f).isMoreThanZeroPercent())
}
@SmallTest
@Test
fun to_percentage_test() {
assertTrue(PercentDecimal(0.5f).toPercentage() == 50)
}
}

View File

@ -37,6 +37,7 @@ import cash.z.ecc.android.sdk.internal.transaction.TransactionEncoderImpl
import cash.z.ecc.android.sdk.jni.RustBackend
import cash.z.ecc.android.sdk.model.Account
import cash.z.ecc.android.sdk.model.BlockHeight
import cash.z.ecc.android.sdk.model.PercentDecimal
import cash.z.ecc.android.sdk.model.TransactionOverview
import cash.z.ecc.android.sdk.model.TransactionRecipient
import cash.z.ecc.android.sdk.model.UnifiedFullViewingKey
@ -193,12 +194,12 @@ class SdkSynchronizer private constructor(
override val status = _status.asStateFlow()
/**
* Indicates the download progress of the Synchronizer. When progress reaches 100, that
* signals that the Synchronizer is in sync with the network. Balances should be considered
* Indicates the download progress of the Synchronizer. When progress reaches `PercentDecimal.ONE_HUNDRED_PERCENT`,
* that signals that the Synchronizer is in sync with the network. Balances should be considered
* inaccurate and outbound transactions should be prevented until this sync is complete. It is
* a simplified version of [processorInfo].
*/
override val progress: Flow<Int> = processor.progress
override val progress: Flow<PercentDecimal> = processor.progress
/**
* Indicates the latest information about the blocks that have been processed by the SDK. This
@ -479,18 +480,6 @@ class SdkSynchronizer private constructor(
val shouldRefresh = !scannedRange.isNullOrEmpty() || elapsedMillis > (ZcashSdk.POLL_INTERVAL * 5)
val reason = if (scannedRange.isNullOrEmpty()) "it's been a while" else "new blocks were scanned"
// TRICKY:
// Keep an eye on this section because there is a potential for concurrent DB
// modification. A change in transactions means a change in balance. Calculating the
// balance requires touching transactions. If both are done in separate threads, the
// database can have issues. On Android, this would manifest as a false positive for a
// "malformed database" exception when the database is not actually corrupt but rather
// locked (i.e. it's a bad error message).
// The balance refresh is done first because it is coroutine-based and will fully
// complete by the time the function returns.
// Ultimately, refreshing the transactions just invalidates views of data that
// already exists and it completes on another thread so it should come after the
// balance refresh is complete.
if (shouldRefresh) {
Twig.debug { "Triggering utxo refresh since $reason!" }
refreshUtxos()

View File

@ -7,6 +7,7 @@ import cash.z.ecc.android.sdk.internal.SaplingParamTool
import cash.z.ecc.android.sdk.internal.db.DatabaseCoordinator
import cash.z.ecc.android.sdk.model.Account
import cash.z.ecc.android.sdk.model.BlockHeight
import cash.z.ecc.android.sdk.model.PercentDecimal
import cash.z.ecc.android.sdk.model.TransactionOverview
import cash.z.ecc.android.sdk.model.TransactionRecipient
import cash.z.ecc.android.sdk.model.UnifiedSpendingKey
@ -41,10 +42,11 @@ interface Synchronizer {
/**
* A flow of progress values, typically corresponding to this Synchronizer downloading blocks.
* Typically, any non- zero value below 100 indicates that progress indicators can be shown and
* a value of 100 signals that progress is complete and any progress indicators can be hidden.
* Typically, any non-zero value below `PercentDecimal.ONE_HUNDRED_PERCENT` indicates that progress indicators can
* be shown and a value of `PercentDecimal.ONE_HUNDRED_PERCENT` signals that progress is complete and any progress
* indicators can be hidden.
*/
val progress: Flow<Int>
val progress: Flow<PercentDecimal>
/**
* A flow of processor details, updated every time blocks are processed to include the latest

View File

@ -13,11 +13,7 @@ import cash.z.ecc.android.sdk.exception.RustLayerException
import cash.z.ecc.android.sdk.ext.BatchMetrics
import cash.z.ecc.android.sdk.ext.ZcashSdk
import cash.z.ecc.android.sdk.ext.ZcashSdk.MAX_BACKOFF_INTERVAL
import cash.z.ecc.android.sdk.ext.ZcashSdk.MAX_REORG_SIZE
import cash.z.ecc.android.sdk.ext.ZcashSdk.POLL_INTERVAL
import cash.z.ecc.android.sdk.ext.ZcashSdk.RETRIES
import cash.z.ecc.android.sdk.ext.ZcashSdk.REWIND_DISTANCE
import cash.z.ecc.android.sdk.ext.ZcashSdk.SYNC_BATCH_SIZE
import cash.z.ecc.android.sdk.internal.Twig
import cash.z.ecc.android.sdk.internal.block.CompactBlockDownloader
import cash.z.ecc.android.sdk.internal.ext.retryUpTo
@ -25,7 +21,9 @@ import cash.z.ecc.android.sdk.internal.ext.retryWithBackoff
import cash.z.ecc.android.sdk.internal.ext.toHexReversed
import cash.z.ecc.android.sdk.internal.isNullOrEmpty
import cash.z.ecc.android.sdk.internal.length
import cash.z.ecc.android.sdk.internal.model.BlockBatch
import cash.z.ecc.android.sdk.internal.model.DbTransactionOverview
import cash.z.ecc.android.sdk.internal.model.JniBlockMeta
import cash.z.ecc.android.sdk.internal.model.ext.from
import cash.z.ecc.android.sdk.internal.model.ext.toBlockHeight
import cash.z.ecc.android.sdk.internal.repository.DerivedDataRepository
@ -42,6 +40,7 @@ import cash.z.ecc.android.sdk.jni.rewindToHeight
import cash.z.ecc.android.sdk.jni.validateCombinedChainOrErrorBlockHeight
import cash.z.ecc.android.sdk.model.Account
import cash.z.ecc.android.sdk.model.BlockHeight
import cash.z.ecc.android.sdk.model.PercentDecimal
import cash.z.ecc.android.sdk.model.UnifiedSpendingKey
import cash.z.ecc.android.sdk.model.WalletBalance
import cash.z.ecc.android.sdk.model.ZcashNetwork
@ -54,12 +53,16 @@ import co.electriccoin.lightwallet.client.model.Response
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.asStateFlow
import kotlinx.coroutines.flow.buffer
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.filterIsInstance
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.onCompletion
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.takeWhile
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import java.util.Locale
@ -135,7 +138,7 @@ class CompactBlockProcessor internal constructor(
)
private val _state: MutableStateFlow<State> = MutableStateFlow(State.Initialized)
private val _progress = MutableStateFlow(0)
private val _progress = MutableStateFlow(PercentDecimal.ZERO_PERCENT)
private val _processorInfo = MutableStateFlow(ProcessorInfo(null, null, null))
private val _networkHeight = MutableStateFlow<BlockHeight?>(null)
private val processingMutex = Mutex()
@ -191,7 +194,15 @@ class CompactBlockProcessor internal constructor(
@Suppress("LongMethod")
suspend fun start() {
verifySetup()
updateBirthdayHeight()
// Clear any undeleted left over block files from previous sync attempts
deleteAllBlockFiles(
downloader = downloader,
lastKnownHeight = getLastScannedHeight(repository)
)
Twig.debug { "setup verified. processor starting" }
// using do/while makes it easier to execute exactly one loop which helps with testing this processor quickly
@ -237,24 +248,24 @@ class CompactBlockProcessor internal constructor(
}
is BlockProcessingResult.FailedDeleteBlocks -> {
Twig.warn {
Twig.error {
"Failed to delete temporary blocks files from the device disk. It will be retried on the" +
" next time, while downloading new blocks."
}
// Do nothing. The other phases went correctly.
checkErrorResult(result.failedAtHeight)
}
is BlockProcessingResult.FailedDownloadingBlocks -> {
is BlockProcessingResult.FailedDownloadBlocks -> {
Twig.error { "Failed while downloading blocks at height: ${result.failedAtHeight}" }
checkErrorResult(result.failedAtHeight)
}
is BlockProcessingResult.FailedValidationBlocks -> {
is BlockProcessingResult.FailedValidateBlocks -> {
Twig.error { "Failed while validating blocks at height: ${result.failedAtHeight}" }
checkErrorResult(result.failedAtHeight)
}
is BlockProcessingResult.FailedScanningBlocks -> {
is BlockProcessingResult.FailedScanBlocks -> {
Twig.error { "Failed while scanning blocks at height: ${result.failedAtHeight}" }
checkErrorResult(result.failedAtHeight)
}
@ -262,6 +273,10 @@ class CompactBlockProcessor internal constructor(
is BlockProcessingResult.Success -> {
// Do nothing. We are done.
}
is BlockProcessingResult.DownloadSuccess -> {
// Do nothing. Syncing of blocks is in progress.
}
}
}
} while (_state.value !is State.Stopped)
@ -281,7 +296,7 @@ class CompactBlockProcessor internal constructor(
}
/**
* Sets the state to [Stopped], which causes the processor loop to exit.
* Sets the state to [State.Stopped], which causes the processor loop to exit.
*/
suspend fun stop() {
runCatching {
@ -352,16 +367,21 @@ class CompactBlockProcessor internal constructor(
_progress.value = syncProgress.percentage
updateProgress(lastSyncedHeight = syncProgress.lastSyncedHeight)
if (syncProgress.result != BlockProcessingResult.Success ||
syncProgress.result != BlockProcessingResult.FailedDeleteBlocks
) {
// Cancel collecting in case of any unwanted state comes
if (syncProgress.result != BlockProcessingResult.Success) {
syncResult = syncProgress.result
return@collect
}
}
if (syncResult != BlockProcessingResult.Success) {
// We should also set the synchronizer status to error here
// Remove persisted but not validated and scanned blocks in case of any failure
val lastScannedHeight = getLastScannedHeight(repository)
downloader.rewindToHeight(lastScannedHeight)
deleteAllBlockFiles(
downloader = downloader,
lastKnownHeight = lastScannedHeight
)
return syncResult
}
@ -380,11 +400,12 @@ class CompactBlockProcessor internal constructor(
sealed class BlockProcessingResult {
object NoBlocksToProcess : BlockProcessingResult()
object Success : BlockProcessingResult()
data class DownloadSuccess(val downloadedBlocks: List<JniBlockMeta>?) : BlockProcessingResult()
object Reconnecting : BlockProcessingResult()
data class FailedDownloadingBlocks(val failedAtHeight: BlockHeight) : BlockProcessingResult()
data class FailedScanningBlocks(val failedAtHeight: BlockHeight) : BlockProcessingResult()
data class FailedValidationBlocks(val failedAtHeight: BlockHeight) : BlockProcessingResult()
object FailedDeleteBlocks : BlockProcessingResult()
data class FailedDownloadBlocks(val failedAtHeight: BlockHeight) : BlockProcessingResult()
data class FailedScanBlocks(val failedAtHeight: BlockHeight) : BlockProcessingResult()
data class FailedValidateBlocks(val failedAtHeight: BlockHeight) : BlockProcessingResult()
data class FailedDeleteBlocks(val failedAtHeight: BlockHeight) : BlockProcessingResult()
object FailedEnhance : BlockProcessingResult()
}
@ -431,24 +452,11 @@ class CompactBlockProcessor internal constructor(
lastDownloadedHeight
}
ProcessorInfo(
updateProgress(
networkBlockHeight = networkBlockHeight,
lastSyncedHeight = lastSyncedHeight,
lastSyncRange = null
).let { initialInfo ->
updateProgress(
networkBlockHeight = initialInfo.networkBlockHeight,
lastSyncedHeight = initialInfo.lastSyncedHeight,
lastSyncRange = if (
initialInfo.lastSyncedHeight != null &&
initialInfo.networkBlockHeight != null
) {
initialInfo.lastSyncedHeight + 1..initialInfo.networkBlockHeight
} else {
null
}
)
}
lastSyncRange = lastSyncedHeight + 1..networkBlockHeight
)
return true
}
@ -704,14 +712,43 @@ class CompactBlockProcessor internal constructor(
companion object {
/**
* Request all blocks in the given range and persist them locally for processing, later.
* Default attempts at retrying.
*/
internal const val RETRIES = 5
/**
* The theoretical maximum number of blocks in a reorg, due to other bottlenecks in the protocol design.
*/
internal const val MAX_REORG_SIZE = 100
/**
* Default size of batches of blocks to request from the compact block service. Then it's also used as a default
* size of batches of blocks to scan via librustzcash. The smaller this number the more granular information can
* be provided about scan state. Unfortunately, it may also lead to a lot of overhead during scanning.
*/
internal const val SYNC_BATCH_SIZE = 10
/**
* Default number of blocks to rewind when a chain reorg is detected. This should be large enough to recover
* from the reorg but smaller than the theoretical max reorg size of 100.
*/
internal const val REWIND_DISTANCE = 10
/**
* Requests, processes and persists all blocks from the given range.
*
* @param syncRange the range of blocks to download.
* @param backend the Rust backend component
* @param downloader the compact block downloader component
* @param repository the derived data repository component
* @param network the network in which the sync mechanism operates
* @param syncRange the range of blocks to download
* @param withDownload the flag indicating whether the blocks should also be downloaded and processed, or
* processed existing blocks.
* processed existing blocks
* @return Flow of BatchSyncProgress sync results
*/
@VisibleForTesting
@Suppress("MagicNumber", "LongParameterList", "LongMethod")
@Suppress("LongParameterList", "LongMethod")
internal suspend fun syncNewBlocks(
backend: Backend,
downloader: CompactBlockDownloader,
@ -722,104 +759,104 @@ class CompactBlockProcessor internal constructor(
): Flow<BatchSyncProgress> = flow {
if (syncRange.isEmpty()) {
Twig.debug { "No blocks to sync" }
emit(
BatchSyncProgress(
percentage = PercentDecimal.ONE_HUNDRED_PERCENT,
lastSyncedHeight = getLastScannedHeight(repository),
result = BlockProcessingResult.Success
)
)
} else {
Twig.debug { "Syncing blocks in range $syncRange" }
val batches = getBatchedBlockList(syncRange, network)
// While we run the sync sub-phases for each batch serially now, we'd like to run them in
// parallel to speed up the overall sync process time
batches.forEach { batch ->
Twig.debug { "Sync batch: ${batch.index} of ${batches.size} - $batch" }
Twig.verbose { "Starting to sync batch: $batch" }
batches.asFlow().map {
Twig.debug { "Syncing process starts for batch: $it" }
// Download
if (withDownload) {
downloadBatchOfBlocks(
downloader = downloader,
batch = batch
).takeIf { it != BlockProcessingResult.Success }?.let { result ->
emit(
BatchSyncProgress(
percentage = (batch.index / batches.size.toFloat() * 100).roundToInt(),
lastSyncedHeight = getLastScannedHeight(repository),
result = result
)
// Run downloading stage
SyncStageResult(
batch = it,
stageResult = if (withDownload) {
downloadBatchOfBlocks(
downloader = downloader,
batch = it
)
return@flow
} else {
BlockProcessingResult.DownloadSuccess(null)
}
}
)
}.buffer(1).map { downloadStageResult ->
Twig.debug { "Download stage done with result: $downloadStageResult" }
// Validate
validateBatchOfBlocks(
backend = backend,
batch = batch
).takeIf { it != BlockProcessingResult.Success }?.let { result ->
emit(
BatchSyncProgress(
percentage = (batch.index / batches.size.toFloat() * 100).roundToInt(),
lastSyncedHeight = getLastScannedHeight(repository),
result = result
if (downloadStageResult.stageResult !is BlockProcessingResult.DownloadSuccess) {
// In case of any failure, we just propagate the result
downloadStageResult
} else {
// Enrich batch model with fetched blocks. It's useful for later blocks deletion
downloadStageResult.batch.blocks = downloadStageResult.stageResult.downloadedBlocks
// Run validation stage
SyncStageResult(
downloadStageResult.batch,
validateBatchOfBlocks(
backend = backend,
batch = downloadStageResult.batch
)
)
return@flow
}
}.map { validateResult ->
Twig.debug { "Validation stage done with result: $validateResult" }
// Scan
scanBatchOfBlocks(
backend = backend,
batch = batch
).takeIf { it != BlockProcessingResult.Success }?.let { result ->
emit(
BatchSyncProgress(
percentage = (batch.index / batches.size.toFloat() * 100).roundToInt(),
lastSyncedHeight = getLastScannedHeight(repository),
result = result
if (validateResult.stageResult != BlockProcessingResult.Success) {
validateResult
} else {
// Run scanning stage
SyncStageResult(
validateResult.batch,
scanBatchOfBlocks(
backend = backend,
batch = validateResult.batch
)
)
return@flow
}
}.map { scanResult ->
Twig.debug { "Scan stage done with result: $scanResult" }
// Delete
deleteAllBlockFiles(
downloader = downloader
).takeIf { it != BlockProcessingResult.Success }?.let { result ->
Twig.warn { "Delete batch block files failed with: $result" }
emit(
BatchSyncProgress(
percentage = (batch.index / batches.size.toFloat() * 100).roundToInt(),
lastSyncedHeight = getLastScannedHeight(repository),
result = result
if (scanResult.stageResult != BlockProcessingResult.Success) {
scanResult
} else {
// Run deletion stage
SyncStageResult(
scanResult.batch,
deleteFilesOfBatchOfBlocks(
downloader = downloader,
batch = scanResult.batch
)
)
// We intentionally do not exit the processing of blocks as the failed deletion phase is not
// critical
}
}.onEach { deleteResult ->
Twig.debug { "Deletion stage done with result: $deleteResult" }
Twig.verbose { "Done with batch: $batch" }
emit(
BatchSyncProgress(
percentage = (batch.index / batches.size.toFloat() * 100).roundToInt(),
percentage = PercentDecimal(deleteResult.batch.index / batches.size.toFloat()),
lastSyncedHeight = getLastScannedHeight(repository),
result = BlockProcessingResult.Success
result = deleteResult.stageResult
)
)
}
}
emit(
BatchSyncProgress(
percentage = 100,
lastSyncedHeight = getLastScannedHeight(repository),
result = BlockProcessingResult.Success
)
)
Twig.debug { "All sync stages done for the batch: ${deleteResult.batch}" }
}.takeWhile { continuousResult ->
continuousResult.stageResult == BlockProcessingResult.Success
}.collect()
}
}
private fun getBatchedBlockList(
syncRange: ClosedRange<BlockHeight>,
network: ZcashNetwork
): List<Batch> {
): List<BlockBatch> {
val missingBlockCount = syncRange.endInclusive.value - syncRange.start.value + 1
val batchCount = (
missingBlockCount / SYNC_BATCH_SIZE +
@ -831,7 +868,7 @@ class CompactBlockProcessor internal constructor(
}
var start = syncRange.start
return mutableListOf<Batch>().apply {
return buildList {
for (index in 1..batchCount) {
val end = BlockHeight.new(
network,
@ -841,7 +878,7 @@ class CompactBlockProcessor internal constructor(
)
) // subtract 1 on the first value because the range is inclusive
add(Batch(index, start..end))
add(BlockBatch(index, start..end))
start = end + 1
}
}
@ -857,9 +894,9 @@ class CompactBlockProcessor internal constructor(
@Suppress("MagicNumber")
internal suspend fun downloadBatchOfBlocks(
downloader: CompactBlockDownloader,
batch: Batch
batch: BlockBatch
): BlockProcessingResult {
var downloadedCount = 0
var downloadedBlocks = listOf<JniBlockMeta>()
retryUpTo(RETRIES, { CompactBlockProcessorException.FailedDownload(it) }) { failedAttempts ->
if (failedAttempts == 0) {
Twig.verbose { "Starting to download batch $batch" }
@ -867,19 +904,19 @@ class CompactBlockProcessor internal constructor(
Twig.verbose { "Retrying to download batch $batch after $failedAttempts failure(s)..." }
}
downloadedCount = downloader.downloadBlockRange(batch.range)
downloadedBlocks = downloader.downloadBlockRange(batch.range)
}
Twig.verbose { "Successfully downloaded batch: $batch of $downloadedCount blocks" }
Twig.verbose { "Successfully downloaded batch: $batch of $downloadedBlocks blocks" }
return if (downloadedCount > 0) {
BlockProcessingResult.Success
return if (downloadedBlocks.isNotEmpty()) {
BlockProcessingResult.DownloadSuccess(downloadedBlocks)
} else {
BlockProcessingResult.FailedDownloadingBlocks(batch.range.start)
BlockProcessingResult.FailedDownloadBlocks(batch.range.start)
}
}
@VisibleForTesting
internal suspend fun validateBatchOfBlocks(batch: Batch, backend: Backend): BlockProcessingResult {
internal suspend fun validateBatchOfBlocks(batch: BlockBatch, backend: Backend): BlockProcessingResult {
Twig.verbose { "Starting to validate batch $batch" }
val result = backend.validateCombinedChainOrErrorBlockHeight(batch.range.length())
@ -888,33 +925,53 @@ class CompactBlockProcessor internal constructor(
Twig.verbose { "Successfully validated batch $batch" }
BlockProcessingResult.Success
} else {
BlockProcessingResult.FailedValidationBlocks(result)
BlockProcessingResult.FailedValidateBlocks(result)
}
}
@VisibleForTesting
@Suppress("MagicNumber")
internal suspend fun scanBatchOfBlocks(batch: Batch, backend: Backend): BlockProcessingResult {
internal suspend fun scanBatchOfBlocks(batch: BlockBatch, backend: Backend): BlockProcessingResult {
val scanResult = backend.scanBlocks(batch.range.length())
return if (scanResult) {
Twig.verbose { "Successfully scanned batch $batch" }
BlockProcessingResult.Success
} else {
BlockProcessingResult.FailedScanningBlocks(batch.range.start)
BlockProcessingResult.FailedScanBlocks(batch.range.start)
}
}
@VisibleForTesting
internal suspend fun deleteAllBlockFiles(downloader: CompactBlockDownloader): BlockProcessingResult {
Twig.verbose { "Starting delete all temporary block files now" }
return if (downloader.compactBlockRepository.deleteCompactBlockFiles()) {
internal suspend fun deleteAllBlockFiles(
downloader: CompactBlockDownloader,
lastKnownHeight: BlockHeight
): BlockProcessingResult {
Twig.verbose { "Starting to delete all temporary block files" }
return if (downloader.compactBlockRepository.deleteAllCompactBlockFiles()) {
Twig.verbose { "Successfully deleted all temporary block files" }
BlockProcessingResult.Success
} else {
BlockProcessingResult.FailedDeleteBlocks
BlockProcessingResult.FailedDeleteBlocks(lastKnownHeight)
}
}
@VisibleForTesting
internal suspend fun deleteFilesOfBatchOfBlocks(
batch: BlockBatch,
downloader: CompactBlockDownloader
): BlockProcessingResult {
Twig.verbose { "Starting to delete temporary block files from batch: $batch" }
return batch.blocks?.let { blocks ->
val deleted = downloader.compactBlockRepository.deleteCompactBlockFiles(blocks)
if (deleted) {
Twig.verbose { "Successfully deleted all temporary batched block files" }
BlockProcessingResult.Success
} else {
BlockProcessingResult.FailedDeleteBlocks(batch.range.start)
}
} ?: BlockProcessingResult.Success
}
/**
* Get the height of the last block that was scanned by this processor.
*
@ -979,7 +1036,7 @@ class CompactBlockProcessor internal constructor(
* wanted to sync. In most cases, it will be an invalid range because we'd like to sync blocks
* that we don't yet have.
*/
private suspend fun updateProgress(
private fun updateProgress(
networkBlockHeight: BlockHeight? = _processorInfo.value.networkBlockHeight,
lastSyncedHeight: BlockHeight? = _processorInfo.value.lastSyncedHeight,
lastSyncRange: ClosedRange<BlockHeight>? = _processorInfo.value.lastSyncRange,
@ -1086,7 +1143,7 @@ class CompactBlockProcessor internal constructor(
lastSyncRange = (targetHeight + 1)..currentNetworkBlockHeight
)
}
_progress.value = 0
_progress.value = PercentDecimal.ZERO_PERCENT
} else {
if (null == currentNetworkBlockHeight) {
updateProgress(
@ -1100,7 +1157,7 @@ class CompactBlockProcessor internal constructor(
)
}
_progress.value = 0
_progress.value = PercentDecimal.ZERO_PERCENT
if (null != lastSyncedHeight) {
val range = (targetHeight + 1)..lastSyncedHeight
@ -1259,7 +1316,7 @@ class CompactBlockProcessor internal constructor(
interface ISyncing
/**
* [State] for common syncing phase. It starts with downloading new blocks, then validating these blocks
* [State] for common syncing stage. It starts with downloading new blocks, then validating these blocks
* and scanning them at the end.
*
* **Downloading** is when the wallet is actively downloading compact blocks because the latest
@ -1275,7 +1332,7 @@ class CompactBlockProcessor internal constructor(
object Syncing : IConnected, ISyncing, State()
/**
* [State] for when we are done with syncing the blocks, for now, i.e. all necessary phases done (download,
* [State] for when we are done with syncing the blocks, for now, i.e. all necessary stages done (download,
* validate, and scan).
*/
class Synced(val syncedRange: ClosedRange<BlockHeight>?) : IConnected, ISyncing, State()
@ -1306,17 +1363,23 @@ class CompactBlockProcessor internal constructor(
object Initialized : State()
}
internal data class Batch(
val index: Long,
val range: ClosedRange<BlockHeight>
)
/**
* Progress model class for sharing the whole batch sync progress out of the sync process.
*/
internal data class BatchSyncProgress(
val percentage: Int,
val percentage: PercentDecimal,
val lastSyncedHeight: BlockHeight?,
val result: BlockProcessingResult
)
/**
* Progress model class for sharing particular sync stage result internally in the sync process.
*/
private data class SyncStageResult(
val batch: BlockBatch,
val stageResult: BlockProcessingResult
)
/**
* Data class for holding detailed information about the processor.
*

View File

@ -16,11 +16,6 @@ object ZcashSdk {
*/
val MINERS_FEE = Zatoshi(1_000L)
/**
* The theoretical maximum number of blocks in a reorg, due to other bottlenecks in the protocol design.
*/
const val MAX_REORG_SIZE = 100
/**
* The maximum length of a memo.
*/
@ -32,13 +27,6 @@ object ZcashSdk {
*/
const val EXPIRY_OFFSET = 20
/**
* Default size of batches of blocks to request from the compact block service. Then it's also used as a default
* size of batches of blocks to scan via librustzcash. The smaller this number the more granular information can be
* provided about scan state. Unfortunately, it may also lead to a lot of overhead during scanning.
*/
const val SYNC_BATCH_SIZE = 10
/**
* Default amount of time, in milliseconds, to poll for new blocks. Typically, this should be about half the average
* block time.
@ -50,23 +38,12 @@ object ZcashSdk {
*/
const val BLOCK_INTERVAL_MILLIS = 75_000L
/**
* Default attempts at retrying.
*/
const val RETRIES = 5
/**
* The default maximum amount of time to wait during retry backoff intervals. Failed loops will never wait longer
* than this before retyring.
*/
const val MAX_BACKOFF_INTERVAL = 600_000L
/**
* Default number of blocks to rewind when a chain reorg is detected. This should be large enough to recover from
* the reorg but smaller than the theoretical max reorg size of 100.
*/
const val REWIND_DISTANCE = 10
/**
* The default memo to use when shielding transparent funds.
*/

View File

@ -3,6 +3,7 @@ package cash.z.ecc.android.sdk.internal.block
import cash.z.ecc.android.sdk.exception.LightWalletException
import cash.z.ecc.android.sdk.internal.Twig
import cash.z.ecc.android.sdk.internal.ext.retryUpTo
import cash.z.ecc.android.sdk.internal.model.JniBlockMeta
import cash.z.ecc.android.sdk.internal.model.ext.from
import cash.z.ecc.android.sdk.internal.repository.CompactBlockRepository
import cash.z.ecc.android.sdk.model.BlockHeight
@ -47,10 +48,11 @@ open class CompactBlockDownloader private constructor(val compactBlockRepository
* @param heightRange the inclusive range of heights to request. For example 10..20 would
* request 11 blocks (including block 10 and block 20).
* @throws LightWalletException.DownloadBlockException if any error while downloading the blocks occurs
* @return Number of blocks, which were successfully written to storage
* @return List of JniBlockMeta objects, which describe the original CompactBlock objects, which were just
* downloaded and persisted on the device disk
*/
@Throws(LightWalletException.DownloadBlockException::class)
suspend fun downloadBlockRange(heightRange: ClosedRange<BlockHeight>): Int {
suspend fun downloadBlockRange(heightRange: ClosedRange<BlockHeight>): List<JniBlockMeta> {
val filteredFlow = lightWalletClient.getBlockRange(
BlockHeightUnsafe.from(heightRange.start)..BlockHeightUnsafe.from(heightRange.endInclusive)
).onEach { response ->

View File

@ -0,0 +1,9 @@
package cash.z.ecc.android.sdk.internal.model
import cash.z.ecc.android.sdk.model.BlockHeight
internal data class BlockBatch(
val index: Long,
val range: ClosedRange<BlockHeight>,
var blocks: List<JniBlockMeta>? = null
)

View File

@ -12,32 +12,42 @@ interface CompactBlockRepository {
/**
* Gets the highest block that is currently stored.
*
* @return the latest block height.
* @return the latest block height
*/
suspend fun getLatestHeight(): BlockHeight?
/**
* Fetch the compact block for the given height, if it exists.
*
* @return the compact block or null when it did not exist.
* @param height of the block we are looking for
* @return the compact block summary object or null when it did not exist
*/
suspend fun findCompactBlock(height: BlockHeight): JniBlockMeta?
/**
* This function is supposed to be used once the whole blocks sync process done. It removes all the temporary
* block files from the device disk together with theirs parent directory.
* block files from the device disk.
*
* @return true when all block files are deleted, false only if the deletion fails
* @return true when all block files are deleted or do not exist, false only if the deletion fails
*/
suspend fun deleteCompactBlockFiles(): Boolean
suspend fun deleteAllCompactBlockFiles(): Boolean
/**
* This function is supposed to be used continuously while sync process is in progress. It removes all the temporary
* block files from the given list of blocks.
*
* @param blocks list of compact block summary objects of blocks to delete
* @return true when all block files from the list are deleted or do not exist, false only if the deletion fails
*/
suspend fun deleteCompactBlockFiles(blocks: List<JniBlockMeta>): Boolean
/**
* Write the given flow of blocks to this store, which may be anything from an in-memory cache to a DB.
*
* @param blocks Flow of compact blocks to persist.
* @return Flow of number of blocks that were written.
* @param blocks Flow of compact blocks to persist
* @return list of compact block summary objects of blocks that were written
*/
suspend fun write(blocks: Flow<CompactBlockUnsafe>): Int
suspend fun write(blocks: Flow<CompactBlockUnsafe>): List<JniBlockMeta>
/**
* Remove every block above the given height.

View File

@ -33,8 +33,8 @@ internal class FileCompactBlockRepository(
override suspend fun findCompactBlock(height: BlockHeight) = rustBackend.findBlockMetadata(height)
override suspend fun write(blocks: Flow<CompactBlockUnsafe>): Int {
var totalBlocksWritten = 0
override suspend fun write(blocks: Flow<CompactBlockUnsafe>): List<JniBlockMeta> {
val processingBlocks = mutableListOf<JniBlockMeta>()
val metaDataBuffer = mutableListOf<JniBlockMeta>()
blocks.collect { block ->
val tmpFile = block.createTemporaryFile(blocksDirectory)
@ -49,32 +49,30 @@ internal class FileCompactBlockRepository(
}
if (metaDataBuffer.isBufferFull()) {
val blocksWritten = writeAndClearBuffer(metaDataBuffer)
totalBlocksWritten += blocksWritten
processingBlocks.addAll(metaDataBuffer)
writeAndClearBuffer(metaDataBuffer)
}
}
if (metaDataBuffer.isNotEmpty()) {
val blocksWritten = writeAndClearBuffer(metaDataBuffer)
totalBlocksWritten += blocksWritten
processingBlocks.addAll(metaDataBuffer)
writeAndClearBuffer(metaDataBuffer)
}
return totalBlocksWritten
return processingBlocks
}
/*
* Write block metadata to storage when the buffer is full or when we reached the current range end.
*/
private suspend fun writeAndClearBuffer(metaDataBuffer: MutableList<JniBlockMeta>): Int {
private suspend fun writeAndClearBuffer(metaDataBuffer: MutableList<JniBlockMeta>) {
rustBackend.writeBlockMetadata(metaDataBuffer)
val blocksWrittenCount = metaDataBuffer.size
metaDataBuffer.clear()
return blocksWrittenCount
}
override suspend fun rewindTo(height: BlockHeight) = rustBackend.rewindBlockMetadataToHeight(height)
override suspend fun deleteCompactBlockFiles(): Boolean {
override suspend fun deleteAllCompactBlockFiles(): Boolean {
Twig.verbose { "Deleting all blocks from directory ${blocksDirectory.path}" }
if (blocksDirectory.existsSuspend()) {
@ -92,6 +90,24 @@ internal class FileCompactBlockRepository(
return true
}
override suspend fun deleteCompactBlockFiles(blocks: List<JniBlockMeta>): Boolean {
Twig.verbose { "Deleting ${blocks.size} blocks from directory ${blocksDirectory.path}" }
if (blocksDirectory.existsSuspend()) {
blocks.forEach { block ->
val blockFile = block.getFile(blocksDirectory)
if (!blockFile.existsSuspend()) {
return@forEach // aka continue
}
val deleted = blockFile.deleteSuspend()
if (!deleted) {
return false
}
}
}
return true
}
companion object {
/**
* The name of the directory for downloading blocks
@ -145,6 +161,16 @@ private fun CompactBlockUnsafe.toJniMetaData(): JniBlockMeta {
return JniBlockMeta.new(this)
}
private fun JniBlockMeta.createFilename(): String {
val hashHex = hash.toHexReversed()
return "$height-$hashHex${FileCompactBlockRepository.BLOCK_FILENAME_SUFFIX}"
}
@VisibleForTesting(otherwise = VisibleForTesting.PRIVATE)
private fun JniBlockMeta.getFile(blocksDirectory: File): File {
return File(blocksDirectory, createFilename())
}
private fun CompactBlockUnsafe.createFilename(): String {
val hashHex = hash.toHexReversed()
return "$height-$hashHex${FileCompactBlockRepository.BLOCK_FILENAME_SUFFIX}"

View File

@ -10,9 +10,16 @@ value class PercentDecimal(val decimal: Float) {
require(decimal <= MAX)
}
fun isLessThanHundredPercent(): Boolean = decimal < MAX
fun isMoreThanZeroPercent(): Boolean = decimal > MIN
@Suppress("MagicNumber")
fun toPercentage(): Int = (decimal * 100).toInt()
companion object {
const val MIN = 0.0f
const val MAX = 1.0f
private const val MIN = 0.0f
private const val MAX = 1.0f
val ZERO_PERCENT = PercentDecimal(MIN)
val ONE_HUNDRED_PERCENT = PercentDecimal(MAX)