diff --git a/Sources/ZcashLightClientKit/Block/Actions/Action.swift b/Sources/ZcashLightClientKit/Block/Actions/Action.swift index 68c79665..c53a5510 100644 --- a/Sources/ZcashLightClientKit/Block/Actions/Action.swift +++ b/Sources/ZcashLightClientKit/Block/Actions/Action.swift @@ -22,6 +22,7 @@ actor ActionContext { } enum CBPState: CaseIterable { + case idle case migrateLegacyCacheDB case validateServer case computeSyncRanges @@ -38,11 +39,8 @@ enum CBPState: CaseIterable { case finished case failed case stopped - - static let initialState: CBPState = .migrateLegacyCacheDB } - // this is replacement for CompactBlockProgress enum ActionProgress { case scan diff --git a/Sources/ZcashLightClientKit/Block/Actions/SaplingParamsAction.swift b/Sources/ZcashLightClientKit/Block/Actions/SaplingParamsAction.swift index b6020bc8..763a6e4f 100644 --- a/Sources/ZcashLightClientKit/Block/Actions/SaplingParamsAction.swift +++ b/Sources/ZcashLightClientKit/Block/Actions/SaplingParamsAction.swift @@ -13,7 +13,6 @@ class SaplingParamsAction { extension SaplingParamsAction: Action { func run(with context: ActionContext, didUpdate: @escaping (ActionProgress) async -> Void) async throws -> ActionContext { - // Download files with sapling params. await context.update(state: .scanDownloaded) diff --git a/Sources/ZcashLightClientKit/Block/Actions/ScanAction.swift b/Sources/ZcashLightClientKit/Block/Actions/ScanAction.swift index 01c8b8da..92f4f977 100644 --- a/Sources/ZcashLightClientKit/Block/Actions/ScanAction.swift +++ b/Sources/ZcashLightClientKit/Block/Actions/ScanAction.swift @@ -21,4 +21,3 @@ extension ScanAction: Action { func stop() async { } } - diff --git a/Sources/ZcashLightClientKit/Block/CompactBlockProcessorNG.swift b/Sources/ZcashLightClientKit/Block/CompactBlockProcessorNG.swift index 748def29..cecaf70d 100644 --- a/Sources/ZcashLightClientKit/Block/CompactBlockProcessorNG.swift +++ b/Sources/ZcashLightClientKit/Block/CompactBlockProcessorNG.swift @@ -185,7 +185,7 @@ actor CompactBlockProcessorNG { } // swiftlint:disable:next cyclomatic_complexity - static func makeActions(container: DIContainer, config: Configuration) -> [CBPState: Action] { + private static func makeActions(container: DIContainer, config: Configuration) -> [CBPState: Action] { let actionsDefinition = CBPState.allCases.compactMap { state -> (CBPState, Action)? in let action: Action switch state { @@ -215,7 +215,7 @@ actor CompactBlockProcessorNG { action = SaplingParamsAction(container: container) case .clearCache: action = ClearCacheAction(container: container) - case .finished, .failed, .stopped: + case .finished, .failed, .stopped, .idle: return nil } @@ -237,25 +237,10 @@ extension CompactBlockProcessorNG { } guard await canStartSync() else { - switch await context.state { - case .migrateLegacyCacheDB: - // max attempts have been reached - logger.warn("max retry attempts reached on \(CBPState.initialState) state") + if await isIdle() { + logger.warn("max retry attempts reached on \(await context.state) state") await send(event: .failed(ZcashError.compactBlockProcessorMaxAttemptsReached(config.retries))) - case .finished: - // max attempts have been reached - logger.warn("max retry attempts reached on synced state, this indicates malfunction") - await send(event: .failed(ZcashError.compactBlockProcessorMaxAttemptsReached(config.retries))) - case .failed: - // max attempts have been reached - logger.info("max retry attempts reached with failed state") - await send(event: .failed(ZcashError.compactBlockProcessorMaxAttemptsReached(config.retries))) - case .stopped: - // max attempts have been reached - logger.info("max retry attempts reached") - await send(event: .failed(ZcashError.compactBlockProcessorMaxAttemptsReached(config.retries))) - case .computeSyncRanges, .checksBeforeSync, .scanDownloaded, .download, .validate, .scan, .clearAlreadyScannedBlocks, .enhance, - .fetchUTXO, .handleSaplingParams, .clearCache, .validateServer: + } else { logger.debug("Warning: compact block processor was started while busy!!!!") afterSyncHooksManager.insert(hook: .anotherSync) } @@ -316,7 +301,7 @@ extension CompactBlockProcessorNG { eventClosures[identifier] = closure } - func send(event: Event) async { + private func send(event: Event) async { for item in eventClosures { await item.value(event) } @@ -329,17 +314,20 @@ extension CompactBlockProcessorNG { // This is main loop of the sync process. It simply takes state and try to find action which handles it. If action is found it executes the // action. If action is not found then loop finishes. Thanks to this it's super easy to identify start point of sync process and end points // of sync process without any side effects. - func run() async { - // Prepare for sync and set everything to default values. - await context.update(state: CBPState.initialState) - await syncStarted() + private func run() async { + resetContext() - if backoffTimer == nil { - await setTimer() - } - - // Try to find action for state. while true { + // Sync is starting when the state is `idle`. + if await context.state == .idle { + await syncStarted() + + if backoffTimer == nil { + await setTimer() + } + } + + // Try to find action for state. guard let action = actions[await context.state] else { await syncFinished() break @@ -357,7 +345,7 @@ extension CompactBlockProcessorNG { await syncStopped() if await handleAfterSyncHooks() { // Start sync all over again - await context.update(state: CBPState.initialState) + resetContext() } else { break } @@ -367,7 +355,7 @@ extension CompactBlockProcessorNG { do { try await validationFailed(at: BlockHeight(height)) // Start sync all over again - await context.update(state: CBPState.initialState) + resetContext() } catch { await failure(error) break @@ -382,29 +370,33 @@ extension CompactBlockProcessorNG { } } - func syncStarted() async { + private func resetContext() { + context = ActionContext(state: .idle) + } + + private func syncStarted() async { // handle start of the sync process await send(event: .startedSyncing) } - func syncFinished() async { + private func syncFinished() async { syncTask = nil // handle finish of the sync // await send(event: .finished(<#T##lastScannedHeight: BlockHeight##BlockHeight#>, <#T##foundBlocks: Bool##Bool#>)) } - func update(progress: ActionProgress) async { + private func update(progress: ActionProgress) async { // handle update of the progree } - func syncStopped() async { + private func syncStopped() async { syncTask = nil await context.update(state: .stopped) await send(event: .stopped) // await handleAfterSyncHooks() } - func validationFailed(at height: BlockHeight) async throws { + private func validationFailed(at height: BlockHeight) async throws { // cancel all Tasks await rawStop() @@ -425,7 +417,7 @@ extension CompactBlockProcessorNG { await send(event: .handledReorg(height, rewindHeight)) } - func failure(_ error: Error) async { + private func failure(_ error: Error) async { await context.update(state: .failed) logger.error("Fail with error: \(error)") @@ -467,12 +459,10 @@ extension CompactBlockProcessorNG { /// - Note: If this is called while sync is in progress then the sync process is stopped first and then rewind is executed. func rewind(context: AfterSyncHooksManager.RewindContext) async { logger.debug("Starting rewind") - switch await self.context.state { - case .stopped, .failed, .finished, .migrateLegacyCacheDB: + if await isIdle() { logger.debug("Sync doesn't run. Executing rewind.") await doRewind(context: context) - case .computeSyncRanges, .checksBeforeSync, .download, .validate, .scan, .enhance, .fetchUTXO, .handleSaplingParams, .clearCache, - .scanDownloaded, .clearAlreadyScannedBlocks, .validateServer: + } else { logger.debug("Stopping sync because of rewind") afterSyncHooksManager.insert(hook: .rewind(context)) await stop() @@ -521,12 +511,10 @@ extension CompactBlockProcessorNG { extension CompactBlockProcessorNG { func wipe(context: AfterSyncHooksManager.WipeContext) async { logger.debug("Starting wipe") - switch await self.context.state { - case .stopped, .failed, .finished, .migrateLegacyCacheDB: + if await isIdle() { logger.debug("Sync doesn't run. Executing wipe.") await doWipe(context: context) - case .computeSyncRanges, .checksBeforeSync, .download, .validate, .scan, .enhance, .fetchUTXO, .handleSaplingParams, .clearCache, - .scanDownloaded, .clearAlreadyScannedBlocks, .validateServer: + } else { logger.debug("Stopping sync because of wipe") afterSyncHooksManager.insert(hook: .wipe(context)) await stop() @@ -573,8 +561,7 @@ extension CompactBlockProcessorNG { block: { [weak self] _ in Task { [weak self] in guard let self else { return } - switch await self.context.state { - case .stopped, .failed, .finished, .validateServer: + if await self.isIdle() { if await self.canStartSync() { self.logger.debug( """ @@ -588,8 +575,7 @@ extension CompactBlockProcessorNG { } else if await hasRetryAttempt() { await self.failure(ZcashError.compactBlockProcessorMaxAttemptsReached(self.config.retries)) } - case .computeSyncRanges, .checksBeforeSync, .download, .validate, .scan, .enhance, .fetchUTXO, .handleSaplingParams, .clearCache, - .scanDownloaded, .clearAlreadyScannedBlocks, .migrateLegacyCacheDB: + } else { await self.latestBlocksDataProvider.updateBlockData() } } @@ -600,16 +586,31 @@ extension CompactBlockProcessorNG { self.backoffTimer = timer } - private func canStartSync() async -> Bool { + private func isIdle() async -> Bool { switch await context.state { - case .stopped, .failed, .finished, .migrateLegacyCacheDB: - return hasRetryAttempt() - case .computeSyncRanges, .checksBeforeSync, .download, .validate, .scan, .enhance, .fetchUTXO, .handleSaplingParams, .clearCache, - .scanDownloaded, .clearAlreadyScannedBlocks, .validateServer: + case .stopped, .failed, .finished, .idle: + return true + case .computeSyncRanges, + .checksBeforeSync, + .download, + .validate, + .scan, + .enhance, + .fetchUTXO, + .handleSaplingParams, + .clearCache, + .scanDownloaded, + .clearAlreadyScannedBlocks, + .validateServer, + .migrateLegacyCacheDB: return false } } + private func canStartSync() async -> Bool { + return await isIdle() && hasRetryAttempt() + } + private func hasRetryAttempt() -> Bool { retryAttempts < config.retries } diff --git a/docs/cbp_state_machine.puml b/docs/cbp_state_machine.puml index bc857874..a8845014 100644 --- a/docs/cbp_state_machine.puml +++ b/docs/cbp_state_machine.puml @@ -11,8 +11,9 @@ note as Lines end note -[*] -> migrateLegacyCacheDB +[*] -> idle +idle -[#green,bold]-> migrateLegacyCacheDB migrateLegacyCacheDB : MigrateLegacyCacheDBAction migrateLegacyCacheDB -[#green,bold]-> validateServer diff --git a/docs/images/cbp_state_machine.png b/docs/images/cbp_state_machine.png index 0d060214..93263d51 100644 Binary files a/docs/images/cbp_state_machine.png and b/docs/images/cbp_state_machine.png differ