diff --git a/Sources/ZcashLightClientKit/Block/Processor/CompactBlockDownloadOperation.swift b/Sources/ZcashLightClientKit/Block/Processor/CompactBlockDownloadOperation.swift index 93ec2412..61d1734c 100644 --- a/Sources/ZcashLightClientKit/Block/Processor/CompactBlockDownloadOperation.swift +++ b/Sources/ZcashLightClientKit/Block/Processor/CompactBlockDownloadOperation.swift @@ -194,10 +194,11 @@ class CompactBlockBatchDownloadOperation: ZcashOperation { override var isAsynchronous: Bool { false } private var batch: Int + private var done = false private var maxRetries: Int private var storage: CompactBlockStorage private var service: LightWalletService - private var cancelable: CancellableCall? + private var cancelableTask: Task? private var startHeight: BlockHeight private var targetHeight: BlockHeight @@ -229,72 +230,85 @@ class CompactBlockBatchDownloadOperation: ZcashOperation { return } self.startedHandler?() - do { - let localDownloadedHeight = try self.storage.latestHeight() - - if localDownloadedHeight != BlockHeight.empty() && localDownloadedHeight > startHeight { - LoggerProxy.warn("provided startHeight (\(startHeight)) differs from local latest downloaded height (\(localDownloadedHeight))") - startHeight = localDownloadedHeight + 1 - } - - var currentHeight = startHeight - self.progressDelegate?.progressUpdated( - .download( - BlockProgress( - startHeight: currentHeight, - targetHeight: targetHeight, - progressHeight: currentHeight - ) - ) - ) - - while !isCancelled && currentHeight <= targetHeight { - var retries = 0 - var success = true - var localError: Error? - - let range = nextRange(currentHeight: currentHeight, targetHeight: targetHeight) + + cancelableTask = Task { + do { + let localDownloadedHeight = try await self.storage.latestHeightAsync() - repeat { - do { - let blocks = try service.blockRange(range) - try storage.insert(blocks) - success = true - } catch { - success = false - localError = error - retries += 1 - } - } while !isCancelled && !success && retries < maxRetries - - if retries >= maxRetries { - throw CompactBlockBatchDownloadOperationError.batchDownloadFailed(range: range, error: localError) + if localDownloadedHeight != BlockHeight.empty() && localDownloadedHeight > startHeight { + LoggerProxy.warn("provided startHeight (\(startHeight)) differs from local latest downloaded height (\(localDownloadedHeight))") + startHeight = localDownloadedHeight + 1 } + var currentHeight = startHeight self.progressDelegate?.progressUpdated( .download( BlockProgress( - startHeight: startHeight, + startHeight: currentHeight, targetHeight: targetHeight, - progressHeight: range.upperBound + progressHeight: currentHeight ) ) ) + + while !isCancelled && currentHeight <= targetHeight { + var retries = 0 + var success = true + var localError: Error? + + let range = nextRange(currentHeight: currentHeight, targetHeight: targetHeight) + + repeat { + do { + let stream: AsyncThrowingStream = service.blockRange(range) - currentHeight = range.upperBound + 1 + var blocks: [ZcashCompactBlock] = [] + for try await compactBlock in stream { + blocks.append(compactBlock) + } + try storage.insert(blocks) + success = true + } catch { + success = false + localError = error + retries += 1 + } + } while !isCancelled && !success && retries < maxRetries + + if retries >= maxRetries { + throw CompactBlockBatchDownloadOperationError.batchDownloadFailed(range: range, error: localError) + } + + self.progressDelegate?.progressUpdated( + .download( + BlockProgress( + startHeight: startHeight, + targetHeight: targetHeight, + progressHeight: range.upperBound + ) + ) + ) + + currentHeight = range.upperBound + 1 + } + self.done = true + } catch { + self.fail(error: error) } - } catch { - self.fail(error: error) + } + + while !done && !isCancelled { + sleep(1) } } override func fail(error: Error? = nil) { - self.cancelable?.cancel() + self.cancelableTask?.cancel() super.fail(error: error) } override func cancel() { - self.cancelable?.cancel() + self.cancelableTask?.cancel() super.cancel() } diff --git a/Sources/ZcashLightClientKit/Block/Processor/CompactBlockScanningOperation.swift b/Sources/ZcashLightClientKit/Block/Processor/CompactBlockScanningOperation.swift index 1ba32318..5afc17d3 100644 --- a/Sources/ZcashLightClientKit/Block/Processor/CompactBlockScanningOperation.swift +++ b/Sources/ZcashLightClientKit/Block/Processor/CompactBlockScanningOperation.swift @@ -125,6 +125,8 @@ class CompactBlockBatchScanningOperation: ZcashOperation { private var blockRange: CompactBlockRange private var transactionRepository: TransactionRepository private var network: NetworkType + private var cancelableTask: Task? + private var done = false private weak var progressDelegate: CompactBlockProgressDelegate? @@ -157,82 +159,94 @@ class CompactBlockBatchScanningOperation: ZcashOperation { self.startedHandler?() - do { - if batchSize == 0 { - let scanStartTime = Date() - guard self.rustBackend.scanBlocks(dbCache: self.cacheDb, dbData: self.dataDb, limit: batchSize, networkType: network) else { - self.scanFailed(self.rustBackend.lastError() ?? ZcashOperationError.unknown) - return - } - let scanFinishTime = Date() - NotificationCenter.default.post( - SDKMetrics.progressReportNotification( - progress: BlockProgress( - startHeight: self.blockRange.lowerBound, - targetHeight: self.blockRange.upperBound, - progressHeight: self.blockRange.upperBound - ), - start: scanStartTime, - end: scanFinishTime, - task: .scanBlocks - ) - ) - let seconds = scanFinishTime.timeIntervalSinceReferenceDate - scanStartTime.timeIntervalSinceReferenceDate - LoggerProxy.debug("Scanned \(blockRange.count) blocks in \(seconds) seconds") - } else { - let scanStartHeight = try transactionRepository.lastScannedHeight() - let targetScanHeight = blockRange.upperBound - - var scannedNewBlocks = false - var lastScannedHeight = scanStartHeight - - repeat { - guard !shouldCancel() else { - cancel() - return - } - let previousScannedHeight = lastScannedHeight + cancelableTask = Task { + do { + if batchSize == 0 { let scanStartTime = Date() - guard self.rustBackend.scanBlocks( - dbCache: self.cacheDb, - dbData: self.dataDb, - limit: batchSize, - networkType: network - ) else { + guard self.rustBackend.scanBlocks(dbCache: self.cacheDb, dbData: self.dataDb, limit: batchSize, networkType: network) else { self.scanFailed(self.rustBackend.lastError() ?? ZcashOperationError.unknown) return } let scanFinishTime = Date() - - lastScannedHeight = try transactionRepository.lastScannedHeight() - - scannedNewBlocks = previousScannedHeight != lastScannedHeight - if scannedNewBlocks { - let progress = BlockProgress(startHeight: scanStartHeight, targetHeight: targetScanHeight, progressHeight: lastScannedHeight) - progressDelegate?.progressUpdated(.scan(progress)) - NotificationCenter.default.post( - SDKMetrics.progressReportNotification( - progress: progress, - start: scanStartTime, - end: scanFinishTime, - task: .scanBlocks - ) + NotificationCenter.default.post( + SDKMetrics.progressReportNotification( + progress: BlockProgress( + startHeight: self.blockRange.lowerBound, + targetHeight: self.blockRange.upperBound, + progressHeight: self.blockRange.upperBound + ), + start: scanStartTime, + end: scanFinishTime, + task: .scanBlocks ) - - let heightCount = lastScannedHeight - previousScannedHeight - let seconds = scanFinishTime.timeIntervalSinceReferenceDate - scanStartTime.timeIntervalSinceReferenceDate - LoggerProxy.debug("Scanned \(heightCount) blocks in \(seconds) seconds") - } - } while !self.isCancelled && scannedNewBlocks && lastScannedHeight < targetScanHeight + ) + let seconds = scanFinishTime.timeIntervalSinceReferenceDate - scanStartTime.timeIntervalSinceReferenceDate + LoggerProxy.debug("Scanned \(blockRange.count) blocks in \(seconds) seconds") + } else { + let scanStartHeight = try transactionRepository.lastScannedHeight() + let targetScanHeight = blockRange.upperBound + + var scannedNewBlocks = false + var lastScannedHeight = scanStartHeight + + repeat { + guard !shouldCancel() else { + cancel() + return + } + let previousScannedHeight = lastScannedHeight + let scanStartTime = Date() + guard self.rustBackend.scanBlocks( + dbCache: self.cacheDb, + dbData: self.dataDb, + limit: batchSize, + networkType: network + ) else { + self.scanFailed(self.rustBackend.lastError() ?? ZcashOperationError.unknown) + return + } + let scanFinishTime = Date() + + lastScannedHeight = try transactionRepository.lastScannedHeight() + + scannedNewBlocks = previousScannedHeight != lastScannedHeight + if scannedNewBlocks { + let progress = BlockProgress(startHeight: scanStartHeight, targetHeight: targetScanHeight, progressHeight: lastScannedHeight) + progressDelegate?.progressUpdated(.scan(progress)) + NotificationCenter.default.post( + SDKMetrics.progressReportNotification( + progress: progress, + start: scanStartTime, + end: scanFinishTime, + task: .scanBlocks + ) + ) + + let heightCount = lastScannedHeight - previousScannedHeight + let seconds = scanFinishTime.timeIntervalSinceReferenceDate - scanStartTime.timeIntervalSinceReferenceDate + LoggerProxy.debug("Scanned \(heightCount) blocks in \(seconds) seconds") + } + } while !self.isCancelled && scannedNewBlocks && lastScannedHeight < targetScanHeight + self.done = true + } + } catch { + scanFailed(error) } - } catch { - scanFailed(error) + } + + while !done && !isCancelled { + sleep(1) } } func scanFailed(_ error: Error) { - self.error = error + self.cancelableTask?.cancel() LoggerProxy.debug("block scanning failed with error: \(String(describing: self.error))") - self.fail() + super.fail(error: error) + } + + override func cancel() { + self.cancelableTask?.cancel() + super.cancel() } }