[#1016] Rebuild download stream periodically while downloading

- Rebuild download stream in `BlockDownloaderImpl` each 10 batches. So
  currently stream downloads 1000 blocks and then new stream is created.
- Memory stays between 60MB and 70MB during sync in the Sample app.
This commit is contained in:
Michal Fousek 2023-05-16 11:03:22 +02:00
parent bb3d7d06c5
commit d091660b79
12 changed files with 52 additions and 16 deletions

View File

@ -681,8 +681,10 @@ actor CompactBlockProcessor {
if let range = ranges.downloadAndScanRange {
logger.debug("Starting sync with range: \(range.lowerBound)...\(range.upperBound)")
try await blockDownloader.setSyncRange(range)
try await blockDownloader.setSyncRange(range, batchSize: batchSize)
try await downloadAndScanBlocks(at: range, totalProgressRange: totalProgressRange)
// Side effect of calling stop is to delete last used download stream. To be sure that it doesn't keep any data in memory.
await blockDownloader.stopDownload()
}
if let range = ranges.enhanceRange {
@ -691,7 +693,7 @@ actor CompactBlockProcessor {
await updateState(.enhancing)
if let transactions = try await blockEnhancer.enhance(
at: range,
didEnhance: { [weak self] progress in
didEnhance: { [weak self] progress in
await self?.notifyProgress(.enhance(progress))
if
let foundTx = progress.lastFoundTransaction,
@ -726,6 +728,8 @@ actor CompactBlockProcessor {
await processBatchFinished(height: (anyActionExecuted && !newBlocksMined) ? ranges.latestBlockHeight : nil)
}
} catch {
// Side effect of calling stop is to delete last used download stream. To be sure that it doesn't keep any data in memory.
await blockDownloader.stopDownload()
logger.error("Sync failed with error: \(error)")
if Task.isCancelled {

View File

@ -31,7 +31,7 @@ protocol BlockDownloader {
/// Set the range for the whole sync process. This method creates stream that is used to download blocks.
/// This method must be called before `startDownload()` is called. And it can't be called while download is in progress otherwise bad things
/// happen.
func setSyncRange(_ range: CompactBlockRange) async throws
func setSyncRange(_ range: CompactBlockRange, batchSize: Int) async throws
/// Start downloading blocks.
///
@ -55,6 +55,10 @@ protocol BlockDownloader {
}
actor BlockDownloaderImpl {
private enum Constants {
static let rebuildStreamAfterBatchesCount = 3
}
let service: LightWalletService
let downloaderService: BlockDownloaderService
let storage: CompactBlockRepository
@ -62,8 +66,10 @@ actor BlockDownloaderImpl {
let metrics: SDKMetrics
let logger: Logger
private var downloadStreamCreatedAtRange: CompactBlockRange = 0...0
private var downloadStream: BlockDownloaderStream?
private var syncRange: CompactBlockRange?
private var batchSize: Int?
private var downloadToHeight: BlockHeight = 0
private var isDownloading = false
@ -89,7 +95,7 @@ actor BlockDownloaderImpl {
private func doDownload(maxBlockBufferSize: Int) async {
lastError = nil
do {
guard let downloadStream = self.downloadStream, let syncRange = self.syncRange else {
guard let batchSize = self.batchSize, let syncRange = self.syncRange else {
logger.error("Dont have downloadStream. Trying to download blocks before sync range is not set.")
throw ZcashError.blockDownloadSyncRangeNotSet
}
@ -109,6 +115,26 @@ actor BlockDownloaderImpl {
}
let range = downloadFrom...downloadTo
let maxAmountBlocksDownloadedByStream = Constants.rebuildStreamAfterBatchesCount * batchSize
let createNewStream =
self.downloadStream == nil ||
range.lowerBound - downloadStreamCreatedAtRange.lowerBound >= maxAmountBlocksDownloadedByStream ||
downloadTo >= downloadStreamCreatedAtRange.upperBound
let downloadStream: BlockDownloaderStream
if let stream = self.downloadStream, !createNewStream {
downloadStream = stream
} else {
// In case that limit is larger than Constants.rebuildStreamAfterBatchesCount * batchSize we need to set upper bound of the range like
// this. This is not normal operational mode but something can request to download whole sync range at one go for example.
let streamRange = range.lowerBound...max(downloadToHeight, range.lowerBound + maxAmountBlocksDownloadedByStream)
logger.debug("Creating new stream for range \(streamRange.lowerBound)...\(streamRange.upperBound)")
downloadStreamCreatedAtRange = streamRange
let stream = service.blockStream(startHeight: streamRange.lowerBound, endHeight: streamRange.upperBound)
downloadStream = BlockDownloaderStream(stream: stream)
self.downloadStream = downloadStream
}
logger.debug("""
Starting downloading blocks.
@ -223,8 +249,9 @@ extension BlockDownloaderImpl: BlockDownloader {
downloadToHeight = limit
}
func setSyncRange(_ range: CompactBlockRange) async throws {
downloadStream = try await compactBlocksDownloadStream(startHeight: range.lowerBound, targetHeight: range.upperBound)
func setSyncRange(_ range: CompactBlockRange, batchSize: Int) async throws {
downloadStream = nil
self.batchSize = batchSize
syncRange = range
}
@ -250,6 +277,7 @@ extension BlockDownloaderImpl: BlockDownloader {
break
}
}
downloadStream = nil
}
func waitUntilRequestedBlocksAreDownloaded(in range: CompactBlockRange) async throws {

View File

@ -63,7 +63,6 @@ extension BlockEnhancerImpl: BlockEnhancer {
let newlyMinedRange = newlyMinedLowerBound...chainTipHeight
for index in 0 ..< transactions.count {
let transaction = transactions[index]
var retry = true

View File

@ -17,7 +17,10 @@ struct UTXOFetcherConfig {
}
protocol UTXOFetcher {
func fetch(at range: CompactBlockRange, didFetch: (Float) async -> ()) async throws -> (inserted: [UnspentTransactionOutputEntity], skipped: [UnspentTransactionOutputEntity])
func fetch(
at range: CompactBlockRange,
didFetch: (Float) async -> Void
) async throws -> (inserted: [UnspentTransactionOutputEntity], skipped: [UnspentTransactionOutputEntity])
}
struct UTXOFetcherImpl {
@ -31,7 +34,10 @@ struct UTXOFetcherImpl {
}
extension UTXOFetcherImpl: UTXOFetcher {
func fetch(at range: CompactBlockRange, didFetch: (Float) async -> ()) async throws -> (inserted: [UnspentTransactionOutputEntity], skipped: [UnspentTransactionOutputEntity]) {
func fetch(
at range: CompactBlockRange,
didFetch: (Float) async -> Void
) async throws -> (inserted: [UnspentTransactionOutputEntity], skipped: [UnspentTransactionOutputEntity]) {
try Task.checkCancellation()
let accounts = try accountRepository.getAll()

View File

@ -33,6 +33,7 @@ extension BlockValidatorImpl: BlockValidator {
pushProgressReport(startTime: startTime, finishTime: Date())
logger.debug("validateChainFinished")
} catch {
logger.debug("Validate chaing failed with \(error)")
pushProgressReport(startTime: startTime, finishTime: Date())
throw error
}

View File

@ -483,7 +483,7 @@ actor ZcashRustBackend: ZcashRustBackendWelding {
}
func validateCombinedChain(limit: UInt32 = 0) async throws {
let result = zcashlc_validate_combined_chain(fsBlockDbRoot.0, fsBlockDbRoot.1, dbData.0, dbData.1, networkType.networkId, limit)
let result = zcashlc_validate_combined_chain(fsBlockDbRoot.0, fsBlockDbRoot.1, dbData.0, dbData.1, limit, networkType.networkId)
switch result {
case -1:

View File

@ -8,7 +8,6 @@
import Foundation
enum Dependencies {
// swiftlint:disable:next function_parameter_count
static func setup(
in container: DIContainer,
urls: Initializer.URLs,

View File

@ -197,6 +197,7 @@ public class SDKSynchronizer: Synchronizer {
// MARK: Handle CompactBlockProcessor.Flow
// swiftlint:disable:next cyclomatic_complexity
private func subscribeToProcessorEvents(_ processor: CompactBlockProcessor) async {
let eventClosure: CompactBlockProcessor.EventClosure = { [weak self] event in
switch event {

View File

@ -905,7 +905,6 @@ class BalanceTests: ZcashTestCase {
let memo = try Memo(string: "shielding is fun!")
var pendingTx: ZcashTransaction.Overview?
let transaction = try await coordinator.synchronizer.sendToAddress(
spendingKey: spendingKey,
zatoshi: sendAmount,

View File

@ -197,7 +197,7 @@ class BlockScanTests: ZcashTestCase {
do {
let blockDownloader = await compactBlockProcessor.blockDownloader
await blockDownloader.setDownloadLimit(range.upperBound)
try await blockDownloader.setSyncRange(range)
try await blockDownloader.setSyncRange(range, batchSize: 100)
await blockDownloader.startDownload(maxBlockBufferSize: 10)
try await blockDownloader.waitUntilRequestedBlocksAreDownloaded(in: range)

View File

@ -106,7 +106,7 @@ class BlockStreamingTest: ZcashTestCase {
do {
let blockDownloader = await compactBlockProcessor.blockDownloader
await blockDownloader.setDownloadLimit(latestBlockHeight)
try await blockDownloader.setSyncRange(startHeight...latestBlockHeight)
try await blockDownloader.setSyncRange(startHeight...latestBlockHeight, batchSize: 100)
await blockDownloader.startDownload(maxBlockBufferSize: 10)
try await blockDownloader.waitUntilRequestedBlocksAreDownloaded(in: startHeight...latestBlockHeight)
} catch {
@ -149,7 +149,7 @@ class BlockStreamingTest: ZcashTestCase {
do {
let blockDownloader = await compactBlockProcessor.blockDownloader
await blockDownloader.setDownloadLimit(latestBlockHeight)
try await blockDownloader.setSyncRange(startHeight...latestBlockHeight)
try await blockDownloader.setSyncRange(startHeight...latestBlockHeight, batchSize: 100)
await blockDownloader.startDownload(maxBlockBufferSize: 10)
try await blockDownloader.waitUntilRequestedBlocksAreDownloaded(in: startHeight...latestBlockHeight)
} catch {

View File

@ -152,7 +152,6 @@ extension ZcashRustBackend {
}
}
extension Zatoshi: CustomDebugStringConvertible {
public var debugDescription: String {
"Zatoshi(\(self.amount))"