diff --git a/Sources/ZcashLightClientKit/Block/CompactBlockProcessor.swift b/Sources/ZcashLightClientKit/Block/CompactBlockProcessor.swift index fc0cae61..6e5e26ca 100644 --- a/Sources/ZcashLightClientKit/Block/CompactBlockProcessor.swift +++ b/Sources/ZcashLightClientKit/Block/CompactBlockProcessor.swift @@ -681,8 +681,10 @@ actor CompactBlockProcessor { if let range = ranges.downloadAndScanRange { logger.debug("Starting sync with range: \(range.lowerBound)...\(range.upperBound)") - try await blockDownloader.setSyncRange(range) + try await blockDownloader.setSyncRange(range, batchSize: batchSize) try await downloadAndScanBlocks(at: range, totalProgressRange: totalProgressRange) + // Side effect of calling stop is to delete last used download stream. To be sure that it doesn't keep any data in memory. + await blockDownloader.stopDownload() } if let range = ranges.enhanceRange { @@ -691,7 +693,7 @@ actor CompactBlockProcessor { await updateState(.enhancing) if let transactions = try await blockEnhancer.enhance( at: range, - didEnhance: { [weak self] progress in + didEnhance: { [weak self] progress in await self?.notifyProgress(.enhance(progress)) if let foundTx = progress.lastFoundTransaction, @@ -726,6 +728,8 @@ actor CompactBlockProcessor { await processBatchFinished(height: (anyActionExecuted && !newBlocksMined) ? ranges.latestBlockHeight : nil) } } catch { + // Side effect of calling stop is to delete last used download stream. To be sure that it doesn't keep any data in memory. + await blockDownloader.stopDownload() logger.error("Sync failed with error: \(error)") if Task.isCancelled { diff --git a/Sources/ZcashLightClientKit/Block/Download/BlockDownloader.swift b/Sources/ZcashLightClientKit/Block/Download/BlockDownloader.swift index 6699373e..41940a73 100644 --- a/Sources/ZcashLightClientKit/Block/Download/BlockDownloader.swift +++ b/Sources/ZcashLightClientKit/Block/Download/BlockDownloader.swift @@ -31,7 +31,7 @@ protocol BlockDownloader { /// Set the range for the whole sync process. This method creates stream that is used to download blocks. /// This method must be called before `startDownload()` is called. And it can't be called while download is in progress otherwise bad things /// happen. - func setSyncRange(_ range: CompactBlockRange) async throws + func setSyncRange(_ range: CompactBlockRange, batchSize: Int) async throws /// Start downloading blocks. /// @@ -55,6 +55,10 @@ protocol BlockDownloader { } actor BlockDownloaderImpl { + private enum Constants { + static let rebuildStreamAfterBatchesCount = 3 + } + let service: LightWalletService let downloaderService: BlockDownloaderService let storage: CompactBlockRepository @@ -62,8 +66,10 @@ actor BlockDownloaderImpl { let metrics: SDKMetrics let logger: Logger + private var downloadStreamCreatedAtRange: CompactBlockRange = 0...0 private var downloadStream: BlockDownloaderStream? private var syncRange: CompactBlockRange? + private var batchSize: Int? private var downloadToHeight: BlockHeight = 0 private var isDownloading = false @@ -89,7 +95,7 @@ actor BlockDownloaderImpl { private func doDownload(maxBlockBufferSize: Int) async { lastError = nil do { - guard let downloadStream = self.downloadStream, let syncRange = self.syncRange else { + guard let batchSize = self.batchSize, let syncRange = self.syncRange else { logger.error("Dont have downloadStream. Trying to download blocks before sync range is not set.") throw ZcashError.blockDownloadSyncRangeNotSet } @@ -109,6 +115,26 @@ actor BlockDownloaderImpl { } let range = downloadFrom...downloadTo + let maxAmountBlocksDownloadedByStream = Constants.rebuildStreamAfterBatchesCount * batchSize + let createNewStream = + self.downloadStream == nil || + range.lowerBound - downloadStreamCreatedAtRange.lowerBound >= maxAmountBlocksDownloadedByStream || + downloadTo >= downloadStreamCreatedAtRange.upperBound + + let downloadStream: BlockDownloaderStream + if let stream = self.downloadStream, !createNewStream { + downloadStream = stream + } else { + // In case that limit is larger than Constants.rebuildStreamAfterBatchesCount * batchSize we need to set upper bound of the range like + // this. This is not normal operational mode but something can request to download whole sync range at one go for example. + let streamRange = range.lowerBound...max(downloadToHeight, range.lowerBound + maxAmountBlocksDownloadedByStream) + logger.debug("Creating new stream for range \(streamRange.lowerBound)...\(streamRange.upperBound)") + + downloadStreamCreatedAtRange = streamRange + let stream = service.blockStream(startHeight: streamRange.lowerBound, endHeight: streamRange.upperBound) + downloadStream = BlockDownloaderStream(stream: stream) + self.downloadStream = downloadStream + } logger.debug(""" Starting downloading blocks. @@ -223,8 +249,9 @@ extension BlockDownloaderImpl: BlockDownloader { downloadToHeight = limit } - func setSyncRange(_ range: CompactBlockRange) async throws { - downloadStream = try await compactBlocksDownloadStream(startHeight: range.lowerBound, targetHeight: range.upperBound) + func setSyncRange(_ range: CompactBlockRange, batchSize: Int) async throws { + downloadStream = nil + self.batchSize = batchSize syncRange = range } @@ -250,6 +277,7 @@ extension BlockDownloaderImpl: BlockDownloader { break } } + downloadStream = nil } func waitUntilRequestedBlocksAreDownloaded(in range: CompactBlockRange) async throws { diff --git a/Sources/ZcashLightClientKit/Block/Enhance/BlockEnhancer.swift b/Sources/ZcashLightClientKit/Block/Enhance/BlockEnhancer.swift index 3b8ee924..638c6c2d 100644 --- a/Sources/ZcashLightClientKit/Block/Enhance/BlockEnhancer.swift +++ b/Sources/ZcashLightClientKit/Block/Enhance/BlockEnhancer.swift @@ -63,7 +63,6 @@ extension BlockEnhancerImpl: BlockEnhancer { let newlyMinedRange = newlyMinedLowerBound...chainTipHeight - for index in 0 ..< transactions.count { let transaction = transactions[index] var retry = true diff --git a/Sources/ZcashLightClientKit/Block/FetchUnspentTxOutputs/UTXOFetcher.swift b/Sources/ZcashLightClientKit/Block/FetchUnspentTxOutputs/UTXOFetcher.swift index 95ad08df..f9f2466c 100644 --- a/Sources/ZcashLightClientKit/Block/FetchUnspentTxOutputs/UTXOFetcher.swift +++ b/Sources/ZcashLightClientKit/Block/FetchUnspentTxOutputs/UTXOFetcher.swift @@ -17,7 +17,10 @@ struct UTXOFetcherConfig { } protocol UTXOFetcher { - func fetch(at range: CompactBlockRange, didFetch: (Float) async -> ()) async throws -> (inserted: [UnspentTransactionOutputEntity], skipped: [UnspentTransactionOutputEntity]) + func fetch( + at range: CompactBlockRange, + didFetch: (Float) async -> Void + ) async throws -> (inserted: [UnspentTransactionOutputEntity], skipped: [UnspentTransactionOutputEntity]) } struct UTXOFetcherImpl { @@ -31,7 +34,10 @@ struct UTXOFetcherImpl { } extension UTXOFetcherImpl: UTXOFetcher { - func fetch(at range: CompactBlockRange, didFetch: (Float) async -> ()) async throws -> (inserted: [UnspentTransactionOutputEntity], skipped: [UnspentTransactionOutputEntity]) { + func fetch( + at range: CompactBlockRange, + didFetch: (Float) async -> Void + ) async throws -> (inserted: [UnspentTransactionOutputEntity], skipped: [UnspentTransactionOutputEntity]) { try Task.checkCancellation() let accounts = try accountRepository.getAll() diff --git a/Sources/ZcashLightClientKit/Block/Validate/BlockValidator.swift b/Sources/ZcashLightClientKit/Block/Validate/BlockValidator.swift index c2dedfcc..1b9c71eb 100644 --- a/Sources/ZcashLightClientKit/Block/Validate/BlockValidator.swift +++ b/Sources/ZcashLightClientKit/Block/Validate/BlockValidator.swift @@ -33,6 +33,7 @@ extension BlockValidatorImpl: BlockValidator { pushProgressReport(startTime: startTime, finishTime: Date()) logger.debug("validateChainFinished") } catch { + logger.debug("Validate chain failed with \(error)") pushProgressReport(startTime: startTime, finishTime: Date()) throw error } diff --git a/Sources/ZcashLightClientKit/Rust/ZcashRustBackend.swift b/Sources/ZcashLightClientKit/Rust/ZcashRustBackend.swift index b0606b39..35ac1d53 100644 --- a/Sources/ZcashLightClientKit/Rust/ZcashRustBackend.swift +++ b/Sources/ZcashLightClientKit/Rust/ZcashRustBackend.swift @@ -483,7 +483,7 @@ actor ZcashRustBackend: ZcashRustBackendWelding { } func validateCombinedChain(limit: UInt32 = 0) async throws { - let result = zcashlc_validate_combined_chain(fsBlockDbRoot.0, fsBlockDbRoot.1, dbData.0, dbData.1, networkType.networkId, limit) + let result = zcashlc_validate_combined_chain(fsBlockDbRoot.0, fsBlockDbRoot.1, dbData.0, dbData.1, limit, networkType.networkId) switch result { case -1: diff --git a/Sources/ZcashLightClientKit/Synchronizer/Dependencies.swift b/Sources/ZcashLightClientKit/Synchronizer/Dependencies.swift index d51963de..b8c36fb5 100644 --- a/Sources/ZcashLightClientKit/Synchronizer/Dependencies.swift +++ b/Sources/ZcashLightClientKit/Synchronizer/Dependencies.swift @@ -8,7 +8,6 @@ import Foundation enum Dependencies { - // swiftlint:disable:next function_parameter_count static func setup( in container: DIContainer, urls: Initializer.URLs, diff --git a/Sources/ZcashLightClientKit/Synchronizer/SDKSynchronizer.swift b/Sources/ZcashLightClientKit/Synchronizer/SDKSynchronizer.swift index 47d788f5..c3f0bd74 100644 --- a/Sources/ZcashLightClientKit/Synchronizer/SDKSynchronizer.swift +++ b/Sources/ZcashLightClientKit/Synchronizer/SDKSynchronizer.swift @@ -197,6 +197,7 @@ public class SDKSynchronizer: Synchronizer { // MARK: Handle CompactBlockProcessor.Flow + // swiftlint:disable:next cyclomatic_complexity private func subscribeToProcessorEvents(_ processor: CompactBlockProcessor) async { let eventClosure: CompactBlockProcessor.EventClosure = { [weak self] event in switch event { diff --git a/Tests/DarksideTests/BalanceTests.swift b/Tests/DarksideTests/BalanceTests.swift index d45c4d4c..95553f0f 100644 --- a/Tests/DarksideTests/BalanceTests.swift +++ b/Tests/DarksideTests/BalanceTests.swift @@ -905,7 +905,6 @@ class BalanceTests: ZcashTestCase { let memo = try Memo(string: "shielding is fun!") var pendingTx: ZcashTransaction.Overview? - let transaction = try await coordinator.synchronizer.sendToAddress( spendingKey: spendingKey, zatoshi: sendAmount, diff --git a/Tests/NetworkTests/BlockScanTests.swift b/Tests/NetworkTests/BlockScanTests.swift index 94d210f3..fc5eb961 100644 --- a/Tests/NetworkTests/BlockScanTests.swift +++ b/Tests/NetworkTests/BlockScanTests.swift @@ -197,7 +197,7 @@ class BlockScanTests: ZcashTestCase { do { let blockDownloader = await compactBlockProcessor.blockDownloader await blockDownloader.setDownloadLimit(range.upperBound) - try await blockDownloader.setSyncRange(range) + try await blockDownloader.setSyncRange(range, batchSize: 100) await blockDownloader.startDownload(maxBlockBufferSize: 10) try await blockDownloader.waitUntilRequestedBlocksAreDownloaded(in: range) diff --git a/Tests/NetworkTests/BlockStreamingTest.swift b/Tests/NetworkTests/BlockStreamingTest.swift index 972a2584..2dbd1601 100644 --- a/Tests/NetworkTests/BlockStreamingTest.swift +++ b/Tests/NetworkTests/BlockStreamingTest.swift @@ -106,7 +106,7 @@ class BlockStreamingTest: ZcashTestCase { do { let blockDownloader = await compactBlockProcessor.blockDownloader await blockDownloader.setDownloadLimit(latestBlockHeight) - try await blockDownloader.setSyncRange(startHeight...latestBlockHeight) + try await blockDownloader.setSyncRange(startHeight...latestBlockHeight, batchSize: 100) await blockDownloader.startDownload(maxBlockBufferSize: 10) try await blockDownloader.waitUntilRequestedBlocksAreDownloaded(in: startHeight...latestBlockHeight) } catch { @@ -149,7 +149,7 @@ class BlockStreamingTest: ZcashTestCase { do { let blockDownloader = await compactBlockProcessor.blockDownloader await blockDownloader.setDownloadLimit(latestBlockHeight) - try await blockDownloader.setSyncRange(startHeight...latestBlockHeight) + try await blockDownloader.setSyncRange(startHeight...latestBlockHeight, batchSize: 100) await blockDownloader.startDownload(maxBlockBufferSize: 10) try await blockDownloader.waitUntilRequestedBlocksAreDownloaded(in: startHeight...latestBlockHeight) } catch { diff --git a/Tests/TestUtils/Tests+Utils.swift b/Tests/TestUtils/Tests+Utils.swift index 66b09126..bdfebd9f 100644 --- a/Tests/TestUtils/Tests+Utils.swift +++ b/Tests/TestUtils/Tests+Utils.swift @@ -152,7 +152,6 @@ extension ZcashRustBackend { } } - extension Zatoshi: CustomDebugStringConvertible { public var debugDescription: String { "Zatoshi(\(self.amount))"