From 841f42ab6e107af1eb5fc2080dee0ba33a68ca38 Mon Sep 17 00:00:00 2001 From: Lukas Korba Date: Mon, 15 May 2023 16:20:09 +0200 Subject: [PATCH] [#1051] Update how progress is computed after switch to state machine Closes #1051 - new proposal for the progress computation - OverallProgress value is passed to the Synchronizer as a Float - OverallProgress is a result of fetch, scan and enhance operations - Order of actions no longer influences the computation - Actions report partial updates, CompactBlockProgress actor holds current state and computes the OverallProgress --- .../Block/Actions/EnhanceAction.swift | 1 - .../Block/Actions/FetchUTXOsAction.swift | 2 +- .../Block/Actions/ScanAction.swift | 2 +- .../ScanDownloadedButUnscannedAction.swift | 9 +- .../Block/CompactBlockProcessor.swift | 25 +++-- .../Block/Utils/CompactBlockProgress.swift | 91 +++++++++++-------- .../ZcashLightClientKit/Synchronizer.swift | 35 ++----- .../Synchronizer/SDKSynchronizer.swift | 22 ++--- 8 files changed, 87 insertions(+), 100 deletions(-) diff --git a/Sources/ZcashLightClientKit/Block/Actions/EnhanceAction.swift b/Sources/ZcashLightClientKit/Block/Actions/EnhanceAction.swift index e9262419..ea934035 100644 --- a/Sources/ZcashLightClientKit/Block/Actions/EnhanceAction.swift +++ b/Sources/ZcashLightClientKit/Block/Actions/EnhanceAction.swift @@ -64,7 +64,6 @@ extension EnhanceAction: Action { let transactions = try await blockEnhancer.enhance( at: enhanceRange, didEnhance: { progress in - await didUpdate(.progressUpdated(.enhance(progress))) if let foundTx = progress.lastFoundTransaction, progress.newlyMined { await didUpdate(.minedTransaction(foundTx)) } diff --git a/Sources/ZcashLightClientKit/Block/Actions/FetchUTXOsAction.swift b/Sources/ZcashLightClientKit/Block/Actions/FetchUTXOsAction.swift index 1d9cb8c6..f218a1a3 100644 --- a/Sources/ZcashLightClientKit/Block/Actions/FetchUTXOsAction.swift +++ b/Sources/ZcashLightClientKit/Block/Actions/FetchUTXOsAction.swift @@ -24,7 +24,7 @@ extension FetchUTXOsAction: Action { if let range = await context.syncRanges.fetchUTXORange { logger.debug("Fetching UTXO with range: \(range.lowerBound)...\(range.upperBound)") let result = try await utxoFetcher.fetch(at: range) { fetchProgress in - await didUpdate(.progressUpdated(.fetch(fetchProgress))) + await didUpdate(.progressPartialUpdate(.fetch(fetchProgress))) } await didUpdate(.storedUTXOs(result)) } diff --git a/Sources/ZcashLightClientKit/Block/Actions/ScanAction.swift b/Sources/ZcashLightClientKit/Block/Actions/ScanAction.swift index b19b6278..3310697d 100644 --- a/Sources/ZcashLightClientKit/Block/Actions/ScanAction.swift +++ b/Sources/ZcashLightClientKit/Block/Actions/ScanAction.swift @@ -55,7 +55,7 @@ extension ScanAction: Action { progressHeight: lastScannedHeight ) self?.logger.debug("progress: \(progress)") - await didUpdate(.progressUpdated(.syncing(progress))) + await didUpdate(.progressPartialUpdate(.syncing(progress))) } return await update(context: context) diff --git a/Sources/ZcashLightClientKit/Block/Actions/ScanDownloadedButUnscannedAction.swift b/Sources/ZcashLightClientKit/Block/Actions/ScanDownloadedButUnscannedAction.swift index 52cd0801..fca7b6a8 100644 --- a/Sources/ZcashLightClientKit/Block/Actions/ScanDownloadedButUnscannedAction.swift +++ b/Sources/ZcashLightClientKit/Block/Actions/ScanDownloadedButUnscannedAction.swift @@ -24,14 +24,7 @@ extension ScanDownloadedButUnscannedAction: Action { if let range = await context.syncRanges.downloadedButUnscannedRange { logger.debug("Starting scan with downloaded but not scanned blocks with range: \(range.lowerBound)...\(range.upperBound)") let totalProgressRange = await context.totalProgressRange - try await blockScanner.scanBlocks(at: range, totalProgressRange: totalProgressRange) { lastScannedHeight in - let progress = BlockProgress( - startHeight: totalProgressRange.lowerBound, - targetHeight: totalProgressRange.upperBound, - progressHeight: lastScannedHeight - ) - await didUpdate(.progressUpdated(.syncing(progress))) - } + try await blockScanner.scanBlocks(at: range, totalProgressRange: totalProgressRange) { _ in } } await context.update(state: .download) return context diff --git a/Sources/ZcashLightClientKit/Block/CompactBlockProcessor.swift b/Sources/ZcashLightClientKit/Block/CompactBlockProcessor.swift index b8ff223a..95bc42e4 100644 --- a/Sources/ZcashLightClientKit/Block/CompactBlockProcessor.swift +++ b/Sources/ZcashLightClientKit/Block/CompactBlockProcessor.swift @@ -41,7 +41,9 @@ actor CompactBlockProcessor { private var retryAttempts: Int = 0 private var backoffTimer: Timer? private var consecutiveChainValidationErrors: Int = 0 - + + private var compactBlockProgress: CompactBlockProgress = .zero + /// Compact Block Processor configuration /// /// - parameter fsBlockCacheRoot: absolute root path where the filesystem block cache will be stored. @@ -400,8 +402,11 @@ extension CompactBlockProcessor { /// `rewindHeight` is the height that the processor backed to in order to solve the Reorg. case handledReorg(_ reorgHeight: BlockHeight, _ rewindHeight: BlockHeight) + /// Event sent when progress of some specific action happened. + case progressPartialUpdate(CompactBlockProgressUpdate) + /// Event sent when progress of the sync process changes. - case progressUpdated(CompactBlockProgress) + case progressUpdated(Float) /// Event sent when the CompactBlockProcessor fetched utxos from lightwalletd attempted to store them. case storedUTXOs((inserted: [UnspentTransactionOutputEntity], skipped: [UnspentTransactionOutputEntity])) @@ -438,7 +443,7 @@ extension CompactBlockProcessor { // of sync process without any side effects. private func run() async { logger.debug("Starting run") - resetContext() + await resetContext() while true { // Sync is starting when the state is `idle`. @@ -457,7 +462,7 @@ extension CompactBlockProcessor { // Try to find action for state. guard let action = actions[state] else { if await syncFinished() { - resetContext() + await resetContext() continue } else { break @@ -470,6 +475,11 @@ extension CompactBlockProcessor { // Execute action. context = try await action.run(with: context) { [weak self] event in await self?.send(event: event) + if let progressChanged = await self?.compactBlockProgress.event(event), progressChanged { + if let progress = await self?.compactBlockProgress.progress { + await self?.send(event: .progressUpdated(progress)) + } + } } await didFinishAction() @@ -477,7 +487,7 @@ extension CompactBlockProcessor { if Task.isCancelled { if await syncTaskWasCancelled() { // Start sync all over again - resetContext() + await resetContext() } else { // end the sync loop break @@ -485,7 +495,7 @@ extension CompactBlockProcessor { } else { if await handleSyncFailure(action: action, error: error) { // Start sync all over again - resetContext() + await resetContext() } else { // end the sync loop break @@ -568,8 +578,9 @@ extension CompactBlockProcessor { } } - private func resetContext() { + private func resetContext() async { context = ActionContext(state: .idle) + await compactBlockProgress.reset() } private func syncStarted() async { diff --git a/Sources/ZcashLightClientKit/Block/Utils/CompactBlockProgress.swift b/Sources/ZcashLightClientKit/Block/Utils/CompactBlockProgress.swift index 449c95e4..78f68495 100644 --- a/Sources/ZcashLightClientKit/Block/Utils/CompactBlockProgress.swift +++ b/Sources/ZcashLightClientKit/Block/Utils/CompactBlockProgress.swift @@ -7,47 +7,58 @@ import Foundation -public enum CompactBlockProgress { +final actor CompactBlockProgress { + static let zero = CompactBlockProgress() + + enum Action: Equatable { + case enhance + case fetch + case scan + + func weight() -> Float { + switch self { + case .enhance: return 0.08 + case .fetch: return 0.02 + case .scan: return 0.9 + } + } + } + + var actionProgresses: [Action: Float] = [:] + + var progress: Float { + var overallProgress = Float(0) + actionProgresses.forEach { key, value in + overallProgress += value * key.weight() + } + + return overallProgress + } + + func event(_ event: CompactBlockProcessor.Event) -> Bool { + guard case .progressPartialUpdate(let update) = event else { + return false + } + + switch update { + case .syncing(let progress): + actionProgresses[.scan] = progress.progress + case .enhance(let progress): + actionProgresses[.enhance] = progress.progress + case .fetch(let progress): + actionProgresses[.fetch] = progress + } + + return true + } + + func reset() { + actionProgresses.removeAll() + } +} + +enum CompactBlockProgressUpdate: Equatable { case syncing(_ progress: BlockProgress) case enhance(_ progress: EnhancementProgress) case fetch(_ progress: Float) - - public var progress: Float { - switch self { - case .syncing(let blockProgress): - return blockProgress.progress - case .enhance(let enhancementProgress): - return enhancementProgress.progress - case .fetch(let fetchingProgress): - return fetchingProgress - } - } - - public var progressHeight: BlockHeight? { - switch self { - case .syncing(let blockProgress): - return blockProgress.progressHeight - case .enhance(let enhancementProgress): - return enhancementProgress.lastFoundTransaction?.minedHeight - default: - return 0 - } - } - - public var blockDate: Date? { - if case .enhance(let enhancementProgress) = self, let time = enhancementProgress.lastFoundTransaction?.blockTime { - return Date(timeIntervalSince1970: time) - } - - return nil - } - - public var targetHeight: BlockHeight? { - switch self { - case .syncing(let blockProgress): - return blockProgress.targetHeight - default: - return nil - } - } } diff --git a/Sources/ZcashLightClientKit/Synchronizer.swift b/Sources/ZcashLightClientKit/Synchronizer.swift index dcda5c36..1e505e48 100644 --- a/Sources/ZcashLightClientKit/Synchronizer.swift +++ b/Sources/ZcashLightClientKit/Synchronizer.swift @@ -366,15 +366,9 @@ enum InternalSyncStatus: Equatable { /// taking other maintenance steps that need to occur after an upgrade. case unprepared - case syncing(_ progress: BlockProgress) - - /// Indicates that this Synchronizer is actively enhancing newly scanned blocks - /// with additional transaction details, fetched from the server. - case enhancing(_ progress: EnhancementProgress) - - /// fetches the transparent balance and stores it locally - case fetching(_ progress: Float) - + /// Indicates that this Synchronizer is actively processing new blocks (consists of fetch, scan and enhance operations) + case syncing(Float) + /// Indicates that this Synchronizer is fully up to date and ready for all wallet functions. /// When set, a UI element may want to turn green. case synced @@ -390,7 +384,7 @@ enum InternalSyncStatus: Equatable { public var isSyncing: Bool { switch self { - case .syncing, .enhancing, .fetching: + case .syncing: return true default: return false @@ -416,8 +410,6 @@ enum InternalSyncStatus: Equatable { switch self { case .unprepared: return "unprepared" case .syncing: return "syncing" - case .enhancing: return "enhancing" - case .fetching: return "fetching" case .synced: return "synced" case .stopped: return "stopped" case .disconnected: return "disconnected" @@ -449,8 +441,6 @@ extension InternalSyncStatus { switch (lhs, rhs) { case (.unprepared, .unprepared): return true case let (.syncing(lhsProgress), .syncing(rhsProgress)): return lhsProgress == rhsProgress - case let (.enhancing(lhsProgress), .enhancing(rhsProgress)): return lhsProgress == rhsProgress - case (.fetching, .fetching): return true case (.synced, .synced): return true case (.stopped, .stopped): return true case (.disconnected, .disconnected): return true @@ -461,15 +451,8 @@ extension InternalSyncStatus { } extension InternalSyncStatus { - init(_ blockProcessorProgress: CompactBlockProgress) { - switch blockProcessorProgress { - case .syncing(let progressReport): - self = .syncing(progressReport) - case .enhance(let enhancingReport): - self = .enhancing(enhancingReport) - case .fetch(let fetchingProgress): - self = .fetching(fetchingProgress) - } + init(_ blockProcessorProgress: Float) { + self = .syncing(blockProcessorProgress) } } @@ -479,11 +462,7 @@ extension InternalSyncStatus { case .unprepared: return .unprepared case .syncing(let progress): - return .syncing(0.9 * progress.progress) - case .enhancing(let progress): - return .syncing(0.9 + 0.08 * progress.progress) - case .fetching(let progress): - return .syncing(0.98 + 0.02 * progress) + return .syncing(progress) case .synced: return .upToDate case .stopped: diff --git a/Sources/ZcashLightClientKit/Synchronizer/SDKSynchronizer.swift b/Sources/ZcashLightClientKit/Synchronizer/SDKSynchronizer.swift index 9408be02..3e47eba8 100644 --- a/Sources/ZcashLightClientKit/Synchronizer/SDKSynchronizer.swift +++ b/Sources/ZcashLightClientKit/Synchronizer/SDKSynchronizer.swift @@ -157,14 +157,14 @@ public class SDKSynchronizer: Synchronizer { case .unprepared: throw ZcashError.synchronizerNotPrepared - case .syncing, .enhancing, .fetching: + case .syncing: logger.warn("warning: Synchronizer started when already running. Next sync process will be started when the current one stops.") /// This may look strange but `CompactBlockProcessor` has mechanisms which can handle this situation. So we are fine with calling /// it's start here. await blockProcessor.start(retry: retry) case .stopped, .synced, .disconnected, .error: - await updateStatus(.syncing(.nullProgress)) + await updateStatus(.syncing(0)) syncStartDate = Date() await blockProcessor.start(retry: retry) } @@ -197,7 +197,6 @@ public class SDKSynchronizer: Synchronizer { // MARK: Handle CompactBlockProcessor.Flow - // swiftlint:disable:next cyclomatic_complexity private func subscribeToProcessorEvents(_ processor: CompactBlockProcessor) async { let eventClosure: CompactBlockProcessor.EventClosure = { [weak self] event in switch event { @@ -217,17 +216,14 @@ public class SDKSynchronizer: Synchronizer { case let .progressUpdated(progress): await self?.progressUpdated(progress: progress) + case .progressPartialUpdate: + break + case let .storedUTXOs(utxos): self?.storedUTXOs(utxos: utxos) - case .startedEnhancing: - await self?.updateStatus(.enhancing(.zero)) - - case .startedFetching: - await self?.updateStatus(.fetching(0)) - - case .startedSyncing: - await self?.updateStatus(.syncing(.nullProgress)) + case .startedEnhancing, .startedFetching, .startedSyncing: + break case .stopped: await self?.updateStatus(.stopped) @@ -263,7 +259,7 @@ public class SDKSynchronizer: Synchronizer { } } - private func progressUpdated(progress: CompactBlockProgress) async { + private func progressUpdated(progress: Float) async { let newStatus = InternalSyncStatus(progress) await updateStatus(newStatus) } @@ -626,8 +622,6 @@ extension InternalSyncStatus { switch (self, otherStatus) { case (.unprepared, .unprepared): return false case (.syncing, .syncing): return false - case (.enhancing, .enhancing): return false - case (.fetching, .fetching): return false case (.synced, .synced): return false case (.stopped, .stopped): return false case (.disconnected, .disconnected): return false