[#1057] Implement start for state machine CBP

Closes #1057
This commit is contained in:
Michal Fousek 2023-05-10 12:57:46 +02:00
parent f8030e0a73
commit 1d029ff90f
5 changed files with 124 additions and 44 deletions

View File

@ -22,6 +22,7 @@ actor ActionContext {
}
enum CBPState: CaseIterable {
case migrateLegacyCacheDB
case validateServer
case computeSyncRanges
case checksBeforeSync
@ -37,6 +38,8 @@ enum CBPState: CaseIterable {
case finished
case failed
case stopped
static let initialState: CBPState = .migrateLegacyCacheDB
}

View File

@ -0,0 +1,74 @@
//
// MigrateLegacyCacheDB.swift
//
//
// Created by Michal Fousek on 10.05.2023.
//
import Foundation
class MigrateLegacyCacheDBAction {
private let config: CompactBlockProcessorNG.Configuration
private let internalSyncProgress: InternalSyncProgress
private let storage: CompactBlockRepository
private let transactionRepository: TransactionRepository
init(container: DIContainer, config: CompactBlockProcessorNG.Configuration) {
self.config = config
internalSyncProgress = container.resolve(InternalSyncProgress.self)
storage = container.resolve(CompactBlockRepository.self)
transactionRepository = container.resolve(TransactionRepository.self)
}
private func updateState(_ context: ActionContext) async -> ActionContext {
await context.update(state: .validateServer)
return context
}
}
extension MigrateLegacyCacheDBAction: Action {
func run(with context: ActionContext, didUpdate: @escaping (ActionProgress) async -> Void) async throws -> ActionContext {
guard let legacyCacheDbURL = config.cacheDbURL else {
return await updateState(context)
}
guard legacyCacheDbURL != config.fsBlockCacheRoot else {
throw ZcashError.compactBlockProcessorCacheDbMigrationFsCacheMigrationFailedSameURL
}
// Instance with alias `default` is same as instance before the Alias was introduced. So it makes sense that only this instance handles
// legacy cache DB. Any instance with different than `default` alias was created after the Alias was introduced and at this point legacy
// cache DB is't anymore. So there is nothing to migrate for instances with not default Alias.
guard config.alias == .default else {
return await updateState(context)
}
// if the URL provided is not readable, it means that the client has a reference
// to the cacheDb file but it has been deleted in a prior sync cycle. there's
// nothing to do here.
guard FileManager.default.isReadableFile(atPath: legacyCacheDbURL.path) else {
return await updateState(context)
}
do {
// if there's a readable file at the provided URL, delete it.
try FileManager.default.removeItem(at: legacyCacheDbURL)
} catch {
throw ZcashError.compactBlockProcessorCacheDbMigrationFailedToDeleteLegacyDb(error)
}
// create the storage
try await self.storage.create()
// The database has been deleted, so we have adjust the internal state of the
// `CompactBlockProcessor` so that it doesn't rely on download heights set
// by a previous processing cycle.
let lastScannedHeight = try await transactionRepository.lastScannedHeight()
await internalSyncProgress.set(lastScannedHeight, .latestDownloadedBlockHeight)
return await updateState(context)
}
func stop() { }
}

View File

@ -171,7 +171,7 @@ actor CompactBlockProcessorNG {
context = ActionContext(state: .validateServer)
actions = Self.makeActions(container: container, config: config)
self.metrics = container.resolve(SDKMetrics.self)
self.logger = container.resolve(Logger.self)
self.latestBlocksDataProvider = container.resolve(LatestBlocksDataProvider.self)
@ -190,6 +190,8 @@ actor CompactBlockProcessorNG {
let actionsDefinition = CBPState.allCases.compactMap { state -> (CBPState, Action)? in
let action: Action
switch state {
case .migrateLegacyCacheDB:
action = MigrateLegacyCacheDBAction(container: container, config: config)
case .validateServer:
action = ValidateServerAction(container: container, config: config)
case .computeSyncRanges:
@ -229,45 +231,37 @@ actor CompactBlockProcessorNG {
extension CompactBlockProcessorNG {
func start(retry: Bool = false) async {
// if retry {
// self.retryAttempts = 0
// self.processingError = nil
// self.backoffTimer?.invalidate()
// self.backoffTimer = nil
// }
guard await canStartSync() else {
// switch self.state {
// case .error(let error):
// // max attempts have been reached
// logger.info("max retry attempts reached with error: \(error)")
// await notifyError(ZcashError.compactBlockProcessorMaxAttemptsReached(self.maxAttempts))
// await updateState(.stopped)
// case .stopped:
// // max attempts have been reached
// logger.info("max retry attempts reached")
// await notifyError(ZcashError.compactBlockProcessorMaxAttemptsReached(self.maxAttempts))
// case .synced:
// // max attempts have been reached
// logger.warn("max retry attempts reached on synced state, this indicates malfunction")
// await notifyError(ZcashError.compactBlockProcessorMaxAttemptsReached(self.maxAttempts))
// case .syncing, .enhancing, .fetching, .handlingSaplingFiles:
// logger.debug("Warning: compact block processor was started while busy!!!!")
// afterSyncHooksManager.insert(hook: .anotherSync)
// }
// return
return
if retry {
self.retryAttempts = 0
self.backoffTimer?.invalidate()
self.backoffTimer = nil
}
//
// do {
// if let legacyCacheDbURL = self.config.cacheDbURL {
// try await self.migrateCacheDb(legacyCacheDbURL)
// }
// } catch {
// await self.fail(error)
// }
guard await canStartSync() else {
switch await context.state {
case .migrateLegacyCacheDB:
// max attempts have been reached
logger.warn("max retry attempts reached on \(CBPState.initialState) state")
await send(event: .failed(ZcashError.compactBlockProcessorMaxAttemptsReached(config.retries)))
case .finished:
// max attempts have been reached
logger.warn("max retry attempts reached on synced state, this indicates malfunction")
await send(event: .failed(ZcashError.compactBlockProcessorMaxAttemptsReached(config.retries)))
case .failed:
// max attempts have been reached
logger.info("max retry attempts reached with failed state")
await send(event: .failed(ZcashError.compactBlockProcessorMaxAttemptsReached(config.retries)))
case .stopped:
// max attempts have been reached
logger.info("max retry attempts reached")
await send(event: .failed(ZcashError.compactBlockProcessorMaxAttemptsReached(config.retries)))
case .computeSyncRanges, .checksBeforeSync, .scanDownloaded, .download, .validate, .scan, .clearAlreadyScannedBlocks, .enhance,
.fetchUTXO, .handleSaplingParams, .clearCache, .validateServer:
logger.debug("Warning: compact block processor was started while busy!!!!")
// afterSyncHooksManager.insert(hook: .anotherSync)
}
return
}
syncTask = Task(priority: .userInitiated) {
await run()
@ -344,7 +338,7 @@ extension CompactBlockProcessorNG {
// of sync process without any side effects.
func run() async {
// Prepare for sync and set everything to default values.
await context.update(state: .validateServer)
await context.update(state: CBPState.initialState)
await syncStarted()
if backoffTimer == nil {
@ -375,7 +369,7 @@ extension CompactBlockProcessorNG {
do {
try await validationFailed(at: BlockHeight(height))
// Start sync all over again
await context.update(state: .validateServer)
await context.update(state: CBPState.initialState)
} catch {
await failure(error)
break
@ -396,6 +390,7 @@ extension CompactBlockProcessorNG {
}
func syncFinished() async {
syncTask = nil
// handle finish of the sync
// await send(event: .finished(<#T##lastScannedHeight: BlockHeight##BlockHeight#>, <#T##foundBlocks: Bool##Bool#>))
}
@ -405,6 +400,7 @@ extension CompactBlockProcessorNG {
}
func syncStopped() async {
syncTask = nil
await context.update(state: .stopped)
await send(event: .stopped)
// await handleAfterSyncHooks()
@ -437,6 +433,7 @@ extension CompactBlockProcessorNG {
logger.error("Fail with error: \(error)")
syncTask?.cancel()
syncTask = nil
backoffTimer?.invalidate()
backoffTimer = nil
// await blockDownloader.stopDownload()
@ -491,10 +488,10 @@ extension CompactBlockProcessorNG {
func canStartSync() async -> Bool {
switch await context.state {
case .stopped, .failed, .finished, .validateServer:
return true
case .stopped, .failed, .finished, .migrateLegacyCacheDB:
return hasRetryAttempt()
case .computeSyncRanges, .checksBeforeSync, .download, .validate, .scan, .enhance, .fetchUTXO, .handleSaplingParams, .clearCache,
.scanDownloaded, .clearAlreadyScannedBlocks:
.scanDownloaded, .clearAlreadyScannedBlocks, .validateServer:
return false
}
}

View File

@ -11,7 +11,13 @@ note as Lines
end note
[*] -> validateServer
[*] -> migrateLegacyCacheDB
migrateLegacyCacheDB : MigrateLegacyCacheDBAction
migrateLegacyCacheDB -[#green,bold]-> validateServer
migrateLegacyCacheDB -[#red]-> failed : Error occured.
migrateLegacyCacheDB -[#blue]-> stopped : Sync was stopped.
validateServer : ValidateServerAction
validateServer -[#green,bold]-> computeSyncRanges

Binary file not shown.

Before

Width:  |  Height:  |  Size: 249 KiB

After

Width:  |  Height:  |  Size: 276 KiB