Merge pull request #1093 from Chlup/1016_stream_per_batch_parallel

[#1016] Rebuild download stream periodically while downloading
This commit is contained in:
Michal Fousek 2023-05-17 17:00:55 +02:00 committed by GitHub
commit ea4da6e0e6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 52 additions and 16 deletions

View File

@ -681,8 +681,10 @@ actor CompactBlockProcessor {
if let range = ranges.downloadAndScanRange { if let range = ranges.downloadAndScanRange {
logger.debug("Starting sync with range: \(range.lowerBound)...\(range.upperBound)") logger.debug("Starting sync with range: \(range.lowerBound)...\(range.upperBound)")
try await blockDownloader.setSyncRange(range) try await blockDownloader.setSyncRange(range, batchSize: batchSize)
try await downloadAndScanBlocks(at: range, totalProgressRange: totalProgressRange) try await downloadAndScanBlocks(at: range, totalProgressRange: totalProgressRange)
// Side effect of calling stop is to delete last used download stream. To be sure that it doesn't keep any data in memory.
await blockDownloader.stopDownload()
} }
if let range = ranges.enhanceRange { if let range = ranges.enhanceRange {
@ -691,7 +693,7 @@ actor CompactBlockProcessor {
await updateState(.enhancing) await updateState(.enhancing)
if let transactions = try await blockEnhancer.enhance( if let transactions = try await blockEnhancer.enhance(
at: range, at: range,
didEnhance: { [weak self] progress in didEnhance: { [weak self] progress in
await self?.notifyProgress(.enhance(progress)) await self?.notifyProgress(.enhance(progress))
if if
let foundTx = progress.lastFoundTransaction, let foundTx = progress.lastFoundTransaction,
@ -726,6 +728,8 @@ actor CompactBlockProcessor {
await processBatchFinished(height: (anyActionExecuted && !newBlocksMined) ? ranges.latestBlockHeight : nil) await processBatchFinished(height: (anyActionExecuted && !newBlocksMined) ? ranges.latestBlockHeight : nil)
} }
} catch { } catch {
// Side effect of calling stop is to delete last used download stream. To be sure that it doesn't keep any data in memory.
await blockDownloader.stopDownload()
logger.error("Sync failed with error: \(error)") logger.error("Sync failed with error: \(error)")
if Task.isCancelled { if Task.isCancelled {

View File

@ -31,7 +31,7 @@ protocol BlockDownloader {
/// Set the range for the whole sync process. This method creates stream that is used to download blocks. /// Set the range for the whole sync process. This method creates stream that is used to download blocks.
/// This method must be called before `startDownload()` is called. And it can't be called while download is in progress otherwise bad things /// This method must be called before `startDownload()` is called. And it can't be called while download is in progress otherwise bad things
/// happen. /// happen.
func setSyncRange(_ range: CompactBlockRange) async throws func setSyncRange(_ range: CompactBlockRange, batchSize: Int) async throws
/// Start downloading blocks. /// Start downloading blocks.
/// ///
@ -55,6 +55,10 @@ protocol BlockDownloader {
} }
actor BlockDownloaderImpl { actor BlockDownloaderImpl {
private enum Constants {
static let rebuildStreamAfterBatchesCount = 3
}
let service: LightWalletService let service: LightWalletService
let downloaderService: BlockDownloaderService let downloaderService: BlockDownloaderService
let storage: CompactBlockRepository let storage: CompactBlockRepository
@ -62,8 +66,10 @@ actor BlockDownloaderImpl {
let metrics: SDKMetrics let metrics: SDKMetrics
let logger: Logger let logger: Logger
private var downloadStreamCreatedAtRange: CompactBlockRange = 0...0
private var downloadStream: BlockDownloaderStream? private var downloadStream: BlockDownloaderStream?
private var syncRange: CompactBlockRange? private var syncRange: CompactBlockRange?
private var batchSize: Int?
private var downloadToHeight: BlockHeight = 0 private var downloadToHeight: BlockHeight = 0
private var isDownloading = false private var isDownloading = false
@ -89,7 +95,7 @@ actor BlockDownloaderImpl {
private func doDownload(maxBlockBufferSize: Int) async { private func doDownload(maxBlockBufferSize: Int) async {
lastError = nil lastError = nil
do { do {
guard let downloadStream = self.downloadStream, let syncRange = self.syncRange else { guard let batchSize = self.batchSize, let syncRange = self.syncRange else {
logger.error("Dont have downloadStream. Trying to download blocks before sync range is not set.") logger.error("Dont have downloadStream. Trying to download blocks before sync range is not set.")
throw ZcashError.blockDownloadSyncRangeNotSet throw ZcashError.blockDownloadSyncRangeNotSet
} }
@ -109,6 +115,26 @@ actor BlockDownloaderImpl {
} }
let range = downloadFrom...downloadTo let range = downloadFrom...downloadTo
let maxAmountBlocksDownloadedByStream = Constants.rebuildStreamAfterBatchesCount * batchSize
let createNewStream =
self.downloadStream == nil ||
range.lowerBound - downloadStreamCreatedAtRange.lowerBound >= maxAmountBlocksDownloadedByStream ||
downloadTo >= downloadStreamCreatedAtRange.upperBound
let downloadStream: BlockDownloaderStream
if let stream = self.downloadStream, !createNewStream {
downloadStream = stream
} else {
// In case that limit is larger than Constants.rebuildStreamAfterBatchesCount * batchSize we need to set upper bound of the range like
// this. This is not normal operational mode but something can request to download whole sync range at one go for example.
let streamRange = range.lowerBound...max(downloadToHeight, range.lowerBound + maxAmountBlocksDownloadedByStream)
logger.debug("Creating new stream for range \(streamRange.lowerBound)...\(streamRange.upperBound)")
downloadStreamCreatedAtRange = streamRange
let stream = service.blockStream(startHeight: streamRange.lowerBound, endHeight: streamRange.upperBound)
downloadStream = BlockDownloaderStream(stream: stream)
self.downloadStream = downloadStream
}
logger.debug(""" logger.debug("""
Starting downloading blocks. Starting downloading blocks.
@ -223,8 +249,9 @@ extension BlockDownloaderImpl: BlockDownloader {
downloadToHeight = limit downloadToHeight = limit
} }
func setSyncRange(_ range: CompactBlockRange) async throws { func setSyncRange(_ range: CompactBlockRange, batchSize: Int) async throws {
downloadStream = try await compactBlocksDownloadStream(startHeight: range.lowerBound, targetHeight: range.upperBound) downloadStream = nil
self.batchSize = batchSize
syncRange = range syncRange = range
} }
@ -250,6 +277,7 @@ extension BlockDownloaderImpl: BlockDownloader {
break break
} }
} }
downloadStream = nil
} }
func waitUntilRequestedBlocksAreDownloaded(in range: CompactBlockRange) async throws { func waitUntilRequestedBlocksAreDownloaded(in range: CompactBlockRange) async throws {

View File

@ -63,7 +63,6 @@ extension BlockEnhancerImpl: BlockEnhancer {
let newlyMinedRange = newlyMinedLowerBound...chainTipHeight let newlyMinedRange = newlyMinedLowerBound...chainTipHeight
for index in 0 ..< transactions.count { for index in 0 ..< transactions.count {
let transaction = transactions[index] let transaction = transactions[index]
var retry = true var retry = true

View File

@ -17,7 +17,10 @@ struct UTXOFetcherConfig {
} }
protocol UTXOFetcher { protocol UTXOFetcher {
func fetch(at range: CompactBlockRange, didFetch: (Float) async -> ()) async throws -> (inserted: [UnspentTransactionOutputEntity], skipped: [UnspentTransactionOutputEntity]) func fetch(
at range: CompactBlockRange,
didFetch: (Float) async -> Void
) async throws -> (inserted: [UnspentTransactionOutputEntity], skipped: [UnspentTransactionOutputEntity])
} }
struct UTXOFetcherImpl { struct UTXOFetcherImpl {
@ -31,7 +34,10 @@ struct UTXOFetcherImpl {
} }
extension UTXOFetcherImpl: UTXOFetcher { extension UTXOFetcherImpl: UTXOFetcher {
func fetch(at range: CompactBlockRange, didFetch: (Float) async -> ()) async throws -> (inserted: [UnspentTransactionOutputEntity], skipped: [UnspentTransactionOutputEntity]) { func fetch(
at range: CompactBlockRange,
didFetch: (Float) async -> Void
) async throws -> (inserted: [UnspentTransactionOutputEntity], skipped: [UnspentTransactionOutputEntity]) {
try Task.checkCancellation() try Task.checkCancellation()
let accounts = try accountRepository.getAll() let accounts = try accountRepository.getAll()

View File

@ -33,6 +33,7 @@ extension BlockValidatorImpl: BlockValidator {
pushProgressReport(startTime: startTime, finishTime: Date()) pushProgressReport(startTime: startTime, finishTime: Date())
logger.debug("validateChainFinished") logger.debug("validateChainFinished")
} catch { } catch {
logger.debug("Validate chain failed with \(error)")
pushProgressReport(startTime: startTime, finishTime: Date()) pushProgressReport(startTime: startTime, finishTime: Date())
throw error throw error
} }

View File

@ -483,7 +483,7 @@ actor ZcashRustBackend: ZcashRustBackendWelding {
} }
func validateCombinedChain(limit: UInt32 = 0) async throws { func validateCombinedChain(limit: UInt32 = 0) async throws {
let result = zcashlc_validate_combined_chain(fsBlockDbRoot.0, fsBlockDbRoot.1, dbData.0, dbData.1, networkType.networkId, limit) let result = zcashlc_validate_combined_chain(fsBlockDbRoot.0, fsBlockDbRoot.1, dbData.0, dbData.1, limit, networkType.networkId)
switch result { switch result {
case -1: case -1:

View File

@ -8,7 +8,6 @@
import Foundation import Foundation
enum Dependencies { enum Dependencies {
// swiftlint:disable:next function_parameter_count
static func setup( static func setup(
in container: DIContainer, in container: DIContainer,
urls: Initializer.URLs, urls: Initializer.URLs,

View File

@ -197,6 +197,7 @@ public class SDKSynchronizer: Synchronizer {
// MARK: Handle CompactBlockProcessor.Flow // MARK: Handle CompactBlockProcessor.Flow
// swiftlint:disable:next cyclomatic_complexity
private func subscribeToProcessorEvents(_ processor: CompactBlockProcessor) async { private func subscribeToProcessorEvents(_ processor: CompactBlockProcessor) async {
let eventClosure: CompactBlockProcessor.EventClosure = { [weak self] event in let eventClosure: CompactBlockProcessor.EventClosure = { [weak self] event in
switch event { switch event {

View File

@ -905,7 +905,6 @@ class BalanceTests: ZcashTestCase {
let memo = try Memo(string: "shielding is fun!") let memo = try Memo(string: "shielding is fun!")
var pendingTx: ZcashTransaction.Overview? var pendingTx: ZcashTransaction.Overview?
let transaction = try await coordinator.synchronizer.sendToAddress( let transaction = try await coordinator.synchronizer.sendToAddress(
spendingKey: spendingKey, spendingKey: spendingKey,
zatoshi: sendAmount, zatoshi: sendAmount,

View File

@ -197,7 +197,7 @@ class BlockScanTests: ZcashTestCase {
do { do {
let blockDownloader = await compactBlockProcessor.blockDownloader let blockDownloader = await compactBlockProcessor.blockDownloader
await blockDownloader.setDownloadLimit(range.upperBound) await blockDownloader.setDownloadLimit(range.upperBound)
try await blockDownloader.setSyncRange(range) try await blockDownloader.setSyncRange(range, batchSize: 100)
await blockDownloader.startDownload(maxBlockBufferSize: 10) await blockDownloader.startDownload(maxBlockBufferSize: 10)
try await blockDownloader.waitUntilRequestedBlocksAreDownloaded(in: range) try await blockDownloader.waitUntilRequestedBlocksAreDownloaded(in: range)

View File

@ -106,7 +106,7 @@ class BlockStreamingTest: ZcashTestCase {
do { do {
let blockDownloader = await compactBlockProcessor.blockDownloader let blockDownloader = await compactBlockProcessor.blockDownloader
await blockDownloader.setDownloadLimit(latestBlockHeight) await blockDownloader.setDownloadLimit(latestBlockHeight)
try await blockDownloader.setSyncRange(startHeight...latestBlockHeight) try await blockDownloader.setSyncRange(startHeight...latestBlockHeight, batchSize: 100)
await blockDownloader.startDownload(maxBlockBufferSize: 10) await blockDownloader.startDownload(maxBlockBufferSize: 10)
try await blockDownloader.waitUntilRequestedBlocksAreDownloaded(in: startHeight...latestBlockHeight) try await blockDownloader.waitUntilRequestedBlocksAreDownloaded(in: startHeight...latestBlockHeight)
} catch { } catch {
@ -149,7 +149,7 @@ class BlockStreamingTest: ZcashTestCase {
do { do {
let blockDownloader = await compactBlockProcessor.blockDownloader let blockDownloader = await compactBlockProcessor.blockDownloader
await blockDownloader.setDownloadLimit(latestBlockHeight) await blockDownloader.setDownloadLimit(latestBlockHeight)
try await blockDownloader.setSyncRange(startHeight...latestBlockHeight) try await blockDownloader.setSyncRange(startHeight...latestBlockHeight, batchSize: 100)
await blockDownloader.startDownload(maxBlockBufferSize: 10) await blockDownloader.startDownload(maxBlockBufferSize: 10)
try await blockDownloader.waitUntilRequestedBlocksAreDownloaded(in: startHeight...latestBlockHeight) try await blockDownloader.waitUntilRequestedBlocksAreDownloaded(in: startHeight...latestBlockHeight)
} catch { } catch {

View File

@ -152,7 +152,6 @@ extension ZcashRustBackend {
} }
} }
extension Zatoshi: CustomDebugStringConvertible { extension Zatoshi: CustomDebugStringConvertible {
public var debugDescription: String { public var debugDescription: String {
"Zatoshi(\(self.amount))" "Zatoshi(\(self.amount))"