[#700] Add idle state to CBP state machine

This is required so the CBP can detect start of the sync process.
This commit is contained in:
Michal Fousek 2023-05-11 10:26:33 +02:00
parent 3f513e9c6d
commit bc9fbe7e7d
6 changed files with 59 additions and 61 deletions

View File

@ -22,6 +22,7 @@ actor ActionContext {
} }
enum CBPState: CaseIterable { enum CBPState: CaseIterable {
case idle
case migrateLegacyCacheDB case migrateLegacyCacheDB
case validateServer case validateServer
case computeSyncRanges case computeSyncRanges
@ -38,11 +39,8 @@ enum CBPState: CaseIterable {
case finished case finished
case failed case failed
case stopped case stopped
static let initialState: CBPState = .migrateLegacyCacheDB
} }
// this is replacement for CompactBlockProgress // this is replacement for CompactBlockProgress
enum ActionProgress { enum ActionProgress {
case scan case scan

View File

@ -13,7 +13,6 @@ class SaplingParamsAction {
extension SaplingParamsAction: Action { extension SaplingParamsAction: Action {
func run(with context: ActionContext, didUpdate: @escaping (ActionProgress) async -> Void) async throws -> ActionContext { func run(with context: ActionContext, didUpdate: @escaping (ActionProgress) async -> Void) async throws -> ActionContext {
// Download files with sapling params. // Download files with sapling params.
await context.update(state: .scanDownloaded) await context.update(state: .scanDownloaded)

View File

@ -21,4 +21,3 @@ extension ScanAction: Action {
func stop() async { } func stop() async { }
} }

View File

@ -185,7 +185,7 @@ actor CompactBlockProcessorNG {
} }
// swiftlint:disable:next cyclomatic_complexity // swiftlint:disable:next cyclomatic_complexity
static func makeActions(container: DIContainer, config: Configuration) -> [CBPState: Action] { private static func makeActions(container: DIContainer, config: Configuration) -> [CBPState: Action] {
let actionsDefinition = CBPState.allCases.compactMap { state -> (CBPState, Action)? in let actionsDefinition = CBPState.allCases.compactMap { state -> (CBPState, Action)? in
let action: Action let action: Action
switch state { switch state {
@ -215,7 +215,7 @@ actor CompactBlockProcessorNG {
action = SaplingParamsAction(container: container) action = SaplingParamsAction(container: container)
case .clearCache: case .clearCache:
action = ClearCacheAction(container: container) action = ClearCacheAction(container: container)
case .finished, .failed, .stopped: case .finished, .failed, .stopped, .idle:
return nil return nil
} }
@ -237,25 +237,10 @@ extension CompactBlockProcessorNG {
} }
guard await canStartSync() else { guard await canStartSync() else {
switch await context.state { if await isIdle() {
case .migrateLegacyCacheDB: logger.warn("max retry attempts reached on \(await context.state) state")
// max attempts have been reached
logger.warn("max retry attempts reached on \(CBPState.initialState) state")
await send(event: .failed(ZcashError.compactBlockProcessorMaxAttemptsReached(config.retries))) await send(event: .failed(ZcashError.compactBlockProcessorMaxAttemptsReached(config.retries)))
case .finished: } else {
// max attempts have been reached
logger.warn("max retry attempts reached on synced state, this indicates malfunction")
await send(event: .failed(ZcashError.compactBlockProcessorMaxAttemptsReached(config.retries)))
case .failed:
// max attempts have been reached
logger.info("max retry attempts reached with failed state")
await send(event: .failed(ZcashError.compactBlockProcessorMaxAttemptsReached(config.retries)))
case .stopped:
// max attempts have been reached
logger.info("max retry attempts reached")
await send(event: .failed(ZcashError.compactBlockProcessorMaxAttemptsReached(config.retries)))
case .computeSyncRanges, .checksBeforeSync, .scanDownloaded, .download, .validate, .scan, .clearAlreadyScannedBlocks, .enhance,
.fetchUTXO, .handleSaplingParams, .clearCache, .validateServer:
logger.debug("Warning: compact block processor was started while busy!!!!") logger.debug("Warning: compact block processor was started while busy!!!!")
afterSyncHooksManager.insert(hook: .anotherSync) afterSyncHooksManager.insert(hook: .anotherSync)
} }
@ -316,7 +301,7 @@ extension CompactBlockProcessorNG {
eventClosures[identifier] = closure eventClosures[identifier] = closure
} }
func send(event: Event) async { private func send(event: Event) async {
for item in eventClosures { for item in eventClosures {
await item.value(event) await item.value(event)
} }
@ -329,17 +314,20 @@ extension CompactBlockProcessorNG {
// This is main loop of the sync process. It simply takes state and try to find action which handles it. If action is found it executes the // This is main loop of the sync process. It simply takes state and try to find action which handles it. If action is found it executes the
// action. If action is not found then loop finishes. Thanks to this it's super easy to identify start point of sync process and end points // action. If action is not found then loop finishes. Thanks to this it's super easy to identify start point of sync process and end points
// of sync process without any side effects. // of sync process without any side effects.
func run() async { private func run() async {
// Prepare for sync and set everything to default values. resetContext()
await context.update(state: CBPState.initialState)
await syncStarted()
if backoffTimer == nil {
await setTimer()
}
// Try to find action for state.
while true { while true {
// Sync is starting when the state is `idle`.
if await context.state == .idle {
await syncStarted()
if backoffTimer == nil {
await setTimer()
}
}
// Try to find action for state.
guard let action = actions[await context.state] else { guard let action = actions[await context.state] else {
await syncFinished() await syncFinished()
break break
@ -357,7 +345,7 @@ extension CompactBlockProcessorNG {
await syncStopped() await syncStopped()
if await handleAfterSyncHooks() { if await handleAfterSyncHooks() {
// Start sync all over again // Start sync all over again
await context.update(state: CBPState.initialState) resetContext()
} else { } else {
break break
} }
@ -367,7 +355,7 @@ extension CompactBlockProcessorNG {
do { do {
try await validationFailed(at: BlockHeight(height)) try await validationFailed(at: BlockHeight(height))
// Start sync all over again // Start sync all over again
await context.update(state: CBPState.initialState) resetContext()
} catch { } catch {
await failure(error) await failure(error)
break break
@ -382,29 +370,33 @@ extension CompactBlockProcessorNG {
} }
} }
func syncStarted() async { private func resetContext() {
context = ActionContext(state: .idle)
}
private func syncStarted() async {
// handle start of the sync process // handle start of the sync process
await send(event: .startedSyncing) await send(event: .startedSyncing)
} }
func syncFinished() async { private func syncFinished() async {
syncTask = nil syncTask = nil
// handle finish of the sync // handle finish of the sync
// await send(event: .finished(<#T##lastScannedHeight: BlockHeight##BlockHeight#>, <#T##foundBlocks: Bool##Bool#>)) // await send(event: .finished(<#T##lastScannedHeight: BlockHeight##BlockHeight#>, <#T##foundBlocks: Bool##Bool#>))
} }
func update(progress: ActionProgress) async { private func update(progress: ActionProgress) async {
// handle update of the progree // handle update of the progree
} }
func syncStopped() async { private func syncStopped() async {
syncTask = nil syncTask = nil
await context.update(state: .stopped) await context.update(state: .stopped)
await send(event: .stopped) await send(event: .stopped)
// await handleAfterSyncHooks() // await handleAfterSyncHooks()
} }
func validationFailed(at height: BlockHeight) async throws { private func validationFailed(at height: BlockHeight) async throws {
// cancel all Tasks // cancel all Tasks
await rawStop() await rawStop()
@ -425,7 +417,7 @@ extension CompactBlockProcessorNG {
await send(event: .handledReorg(height, rewindHeight)) await send(event: .handledReorg(height, rewindHeight))
} }
func failure(_ error: Error) async { private func failure(_ error: Error) async {
await context.update(state: .failed) await context.update(state: .failed)
logger.error("Fail with error: \(error)") logger.error("Fail with error: \(error)")
@ -467,12 +459,10 @@ extension CompactBlockProcessorNG {
/// - Note: If this is called while sync is in progress then the sync process is stopped first and then rewind is executed. /// - Note: If this is called while sync is in progress then the sync process is stopped first and then rewind is executed.
func rewind(context: AfterSyncHooksManager.RewindContext) async { func rewind(context: AfterSyncHooksManager.RewindContext) async {
logger.debug("Starting rewind") logger.debug("Starting rewind")
switch await self.context.state { if await isIdle() {
case .stopped, .failed, .finished, .migrateLegacyCacheDB:
logger.debug("Sync doesn't run. Executing rewind.") logger.debug("Sync doesn't run. Executing rewind.")
await doRewind(context: context) await doRewind(context: context)
case .computeSyncRanges, .checksBeforeSync, .download, .validate, .scan, .enhance, .fetchUTXO, .handleSaplingParams, .clearCache, } else {
.scanDownloaded, .clearAlreadyScannedBlocks, .validateServer:
logger.debug("Stopping sync because of rewind") logger.debug("Stopping sync because of rewind")
afterSyncHooksManager.insert(hook: .rewind(context)) afterSyncHooksManager.insert(hook: .rewind(context))
await stop() await stop()
@ -521,12 +511,10 @@ extension CompactBlockProcessorNG {
extension CompactBlockProcessorNG { extension CompactBlockProcessorNG {
func wipe(context: AfterSyncHooksManager.WipeContext) async { func wipe(context: AfterSyncHooksManager.WipeContext) async {
logger.debug("Starting wipe") logger.debug("Starting wipe")
switch await self.context.state { if await isIdle() {
case .stopped, .failed, .finished, .migrateLegacyCacheDB:
logger.debug("Sync doesn't run. Executing wipe.") logger.debug("Sync doesn't run. Executing wipe.")
await doWipe(context: context) await doWipe(context: context)
case .computeSyncRanges, .checksBeforeSync, .download, .validate, .scan, .enhance, .fetchUTXO, .handleSaplingParams, .clearCache, } else {
.scanDownloaded, .clearAlreadyScannedBlocks, .validateServer:
logger.debug("Stopping sync because of wipe") logger.debug("Stopping sync because of wipe")
afterSyncHooksManager.insert(hook: .wipe(context)) afterSyncHooksManager.insert(hook: .wipe(context))
await stop() await stop()
@ -573,8 +561,7 @@ extension CompactBlockProcessorNG {
block: { [weak self] _ in block: { [weak self] _ in
Task { [weak self] in Task { [weak self] in
guard let self else { return } guard let self else { return }
switch await self.context.state { if await self.isIdle() {
case .stopped, .failed, .finished, .validateServer:
if await self.canStartSync() { if await self.canStartSync() {
self.logger.debug( self.logger.debug(
""" """
@ -588,8 +575,7 @@ extension CompactBlockProcessorNG {
} else if await hasRetryAttempt() { } else if await hasRetryAttempt() {
await self.failure(ZcashError.compactBlockProcessorMaxAttemptsReached(self.config.retries)) await self.failure(ZcashError.compactBlockProcessorMaxAttemptsReached(self.config.retries))
} }
case .computeSyncRanges, .checksBeforeSync, .download, .validate, .scan, .enhance, .fetchUTXO, .handleSaplingParams, .clearCache, } else {
.scanDownloaded, .clearAlreadyScannedBlocks, .migrateLegacyCacheDB:
await self.latestBlocksDataProvider.updateBlockData() await self.latestBlocksDataProvider.updateBlockData()
} }
} }
@ -600,16 +586,31 @@ extension CompactBlockProcessorNG {
self.backoffTimer = timer self.backoffTimer = timer
} }
private func canStartSync() async -> Bool { private func isIdle() async -> Bool {
switch await context.state { switch await context.state {
case .stopped, .failed, .finished, .migrateLegacyCacheDB: case .stopped, .failed, .finished, .idle:
return hasRetryAttempt() return true
case .computeSyncRanges, .checksBeforeSync, .download, .validate, .scan, .enhance, .fetchUTXO, .handleSaplingParams, .clearCache, case .computeSyncRanges,
.scanDownloaded, .clearAlreadyScannedBlocks, .validateServer: .checksBeforeSync,
.download,
.validate,
.scan,
.enhance,
.fetchUTXO,
.handleSaplingParams,
.clearCache,
.scanDownloaded,
.clearAlreadyScannedBlocks,
.validateServer,
.migrateLegacyCacheDB:
return false return false
} }
} }
private func canStartSync() async -> Bool {
return await isIdle() && hasRetryAttempt()
}
private func hasRetryAttempt() -> Bool { private func hasRetryAttempt() -> Bool {
retryAttempts < config.retries retryAttempts < config.retries
} }

View File

@ -11,8 +11,9 @@ note as Lines
end note end note
[*] -> migrateLegacyCacheDB [*] -> idle
idle -[#green,bold]-> migrateLegacyCacheDB
migrateLegacyCacheDB : MigrateLegacyCacheDBAction migrateLegacyCacheDB : MigrateLegacyCacheDBAction
migrateLegacyCacheDB -[#green,bold]-> validateServer migrateLegacyCacheDB -[#green,bold]-> validateServer

Binary file not shown.

Before

Width:  |  Height:  |  Size: 276 KiB

After

Width:  |  Height:  |  Size: 278 KiB