[#700] CompactBlockProcessor as state machine proof of concept

- For now I created `CompactBlockProcessorNG` where I started with
  implementation of the state machine. I did it to not break the rest of
  the SDK. This change will be merged to the feature branch. And before
  it is merged to `main` branch code from `CompactBlockProcessorNG` will
  be moved to `CompactBlockProcessor`.
- The new code is not used. It just shows and explains how it is done.
  It is proof of concept.
- I did put either commented current code or comment to some places to
  explain what should be done there.
- New important data types:
  - `ActionContext` is context that can hold any data that needs to be
    shared between actions. For example sync ranges or current state.
  - `CBPState` is state of the `CompactBlockProcessor`. Each state is
    handled by one action. This doesn't apply to terminal states like
    `finished` or `failed`.
  - `ActionProgress` is very similar to `CompactBlockProgress`.
    Different actions reports progress differently and `ActionProgress`
    represents this.
  - `Action` is protocol that defines API of an action. It has one run
    method that executes the code of the action
- CBP first creates actions for (almost) each state in `makeActions()`
  method. Then the "magic" is done in `CompactBlockProcessorNG.run()` method.
  Here is main loop which takes action for current state and execute it.
  It's expected that action does it's work and then updates the context
  with new state. And this happens until some terminal state
  (`finished`, `failed`, `stopped`) is reached.
- After the transition to state machine API of the
  `CompactBlockProcessor` should stay the same. No changes should be
  required in `SDKSynchronizer`.
This commit is contained in:
Michal Fousek 2023-05-05 17:04:13 +02:00
parent ea4da6e0e6
commit 92994b067e
15 changed files with 625 additions and 0 deletions

View File

@ -0,0 +1,59 @@
//
// Action.swift
//
//
// Created by Michal Fousek on 05.05.2023.
//
import Foundation
actor ActionContext {
var state: CBPState
var syncRanges: SyncRanges
init(state: CBPState) {
self.state = state
syncRanges = SyncRanges.empty
}
func update(state: CBPState) async {
self.state = state
}
}
enum CBPState: CaseIterable {
case validateServer
case computeSyncRanges
case checksBeforeSync
case scanDownloaded
case download
case validate
case scan
case clearAlreadyScannedBlocks
case enhance
case fetchUTXO
case handleSaplingParams
case clearCache
case finished
case failed
case stopped
}
enum ActionProgress {
case scan
}
protocol Action {
// When any action is created it can get `DIContainer` and resolve any depedencies it requires.
// Every action uses `context` to get some informartion like download range.
//
// `didUpdate` is closure that action use to tell CBP that some part of the work is done. For example if download action would like to
// update progress on every block downloaded it can use this closure. Also if action doesn't need to update progress on partial work it doesn't
// need to use this closure at all.
//
// Each action updates context accordingly. It should at least set new state. Reason for this is that action can return different states for
// different conditions. And action is the thing that knows these conditions.
func run(with context: ActionContext, didUpdate: @escaping (ActionProgress) async -> Void) async throws -> ActionContext
}

View File

@ -0,0 +1,30 @@
//
// ChecksBeforeSyncAction.swift
//
//
// Created by Michal Fousek on 05.05.2023.
//
import Foundation
class ChecksBeforeSyncAction {
init() { }
}
extension ChecksBeforeSyncAction: Action {
func run(with context: ActionContext, didUpdate: @escaping (ActionProgress) async -> Void) async throws -> ActionContext {
// clear any present cached state if needed.
// this checks if there was a sync in progress that was
// interrupted abruptly and cache was not able to be cleared
// properly and internal state set to the appropriate value
// if let newLatestDownloadedHeight = ranges.shouldClearBlockCacheAndUpdateInternalState() {
// try await storage.clear()
// await internalSyncProgress.set(newLatestDownloadedHeight, .latestDownloadedBlockHeight)
// } else {
// try await storage.create()
// }
await context.update(state: .scan)
return context
}
}

View File

@ -0,0 +1,25 @@
//
// ClearCacheForLastScannedBatch.swift
//
//
// Created by Michal Fousek on 08.05.2023.
//
import Foundation
class ClearAlreadyScannedBlocksAction {
init() { }
}
extension ClearAlreadyScannedBlocksAction: Action {
func run(with context: ActionContext, didUpdate: @escaping (ActionProgress) async -> Void) async throws -> ActionContext {
// clear storage but delete only blocks that were already scanned, when doing parallel download all blocks can't be deleted
// if latestScannedHeight == context.scanRanges.downloadAndScanRange?.upperBound then set state `enhance`. Everything is scanned.
// If latestScannedHeight < context.scanRanges.downloadAndScanRange?.upperBound thne set state to `download` because there are blocks to
// download and scan.
await context.update(state: .enhance)
return context
}
}

View File

@ -0,0 +1,20 @@
//
// ClearCacheAction.swift
//
//
// Created by Michal Fousek on 05.05.2023.
//
import Foundation
class ClearCacheAction {
init() { }
}
extension ClearCacheAction: Action {
func run(with context: ActionContext, didUpdate: @escaping (ActionProgress) async -> Void) async throws -> ActionContext {
// clear storage
await context.update(state: .finished)
return context
}
}

View File

@ -0,0 +1,20 @@
//
// ComputeSyncRangesAction.swift
//
//
// Created by Michal Fousek on 05.05.2023.
//
import Foundation
class ComputeSyncRangesAction {
init() { }
}
extension ComputeSyncRangesAction: Action {
func run(with context: ActionContext, didUpdate: @escaping (ActionProgress) async -> Void) async throws -> ActionContext {
// call internalSyncProgress and compute sync ranges and store them in context
await context.update(state: .checksBeforeSync)
return context
}
}

View File

@ -0,0 +1,22 @@
//
// DownloadAction.swift
//
//
// Created by Michal Fousek on 05.05.2023.
//
import Foundation
class DownloadAction {
init() { }
}
extension DownloadAction: Action {
func run(with context: ActionContext, didUpdate: @escaping (ActionProgress) async -> Void) async throws -> ActionContext {
// Use `BlockDownloader` to set download limit to latestScannedHeight + (2*batchSize) (after parallel is merged).
// And start download.
await context.update(state: .validate)
return context
}
}

View File

@ -0,0 +1,21 @@
//
// EnhanceAction.swift
//
//
// Created by Michal Fousek on 05.05.2023.
//
import Foundation
class EnhanceAction {
init() { }
}
extension EnhanceAction: Action {
func run(with context: ActionContext, didUpdate: @escaping (ActionProgress) async -> Void) async throws -> ActionContext {
// Use `BlockEnhancer` to enhance blocks.
await context.update(state: .fetchUTXO)
return context
}
}

View File

@ -0,0 +1,21 @@
//
// FetchUTXOsAction.swift
//
//
// Created by Michal Fousek on 05.05.2023.
//
import Foundation
class FetchUTXOsAction {
init() { }
}
extension FetchUTXOsAction: Action {
func run(with context: ActionContext, didUpdate: @escaping (ActionProgress) async -> Void) async throws -> ActionContext {
// Use `UTXOFetcher` to fetch UTXOs.
await context.update(state: .handleSaplingParams)
return context
}
}

View File

@ -0,0 +1,22 @@
//
// SaplingParamsAction.swift
//
//
// Created by Michal Fousek on 05.05.2023.
//
import Foundation
class SaplingParamsAction {
init() { }
}
extension SaplingParamsAction: Action {
func run(with context: ActionContext, didUpdate: @escaping (ActionProgress) async -> Void) async throws -> ActionContext {
// Download files with sapling params.
await context.update(state: .clearCache)
return context
}
}

View File

@ -0,0 +1,21 @@
//
// ScanAction.swift
//
//
// Created by Michal Fousek on 05.05.2023.
//
import Foundation
class ScanAction {
init() { }
}
extension ScanAction: Action {
func run(with context: ActionContext, didUpdate: @escaping (ActionProgress) async -> Void) async throws -> ActionContext {
// Scan in range latestScannedHeight...latestScannedHeight+batchSize.
await context.update(state: .clearAlreadyScannedBlocks)
return context
}
}

View File

@ -0,0 +1,23 @@
//
// ScandownloadedButUnscannedAction.swift
//
//
// Created by Michal Fousek on 05.05.2023.
//
import Foundation
class ScandownloadedButUnscannedAction {
init() { }
}
extension ScandownloadedButUnscannedAction: Action {
func run(with context: ActionContext, didUpdate: @escaping (ActionProgress) async -> Void) async throws -> ActionContext {
if let downloadedButUnscannedRange = await context.syncRanges.downloadedButUnscannedRange {
// Use `BlockScanner` to do the scanning in this range.
}
await context.update(state: .download)
return context
}
}

View File

@ -0,0 +1,22 @@
//
// ValidateAction.swift
//
//
// Created by Michal Fousek on 05.05.2023.
//
import Foundation
class ValidateAction {
init() { }
}
extension ValidateAction: Action {
func run(with context: ActionContext, didUpdate: @escaping (ActionProgress) async -> Void) async throws -> ActionContext {
// Wait until all blocks in range latestScannedHeight...latestScannedHeight+batchSize are downloaded and then run validation.
await context.update(state: .scan)
return context
}
}

View File

@ -0,0 +1,44 @@
//
// ValidateServerAction.swift
//
//
// Created by Michal Fousek on 05.05.2023.
//
import Foundation
class ValidateServerAction {
init() { }
}
extension ValidateServerAction: Action {
func run(with context: ActionContext, didUpdate: @escaping (ActionProgress) async -> Void) async throws -> ActionContext {
// // check network types
// guard let remoteNetworkType = NetworkType.forChainName(info.chainName) else {
// throw ZcashError.compactBlockProcessorChainName(info.chainName)
// }
//
// guard remoteNetworkType == localNetwork.networkType else {
// throw ZcashError.compactBlockProcessorNetworkMismatch(localNetwork.networkType, remoteNetworkType)
// }
//
// guard saplingActivation == info.saplingActivationHeight else {
// throw ZcashError.compactBlockProcessorSaplingActivationMismatch(saplingActivation, BlockHeight(info.saplingActivationHeight))
// }
//
// // check branch id
// let localBranch = try rustBackend.consensusBranchIdFor(height: Int32(info.blockHeight))
//
// guard let remoteBranchID = ConsensusBranchID.fromString(info.consensusBranchID) else {
// throw ZcashError.compactBlockProcessorConsensusBranchID
// }
//
// guard remoteBranchID == localBranch else {
// throw ZcashError.compactBlockProcessorWrongConsensusBranchId(localBranch, remoteBranchID)
// }
await context.update(state: .computeSyncRanges)
return context
}
}

View File

@ -0,0 +1,263 @@
//
// CompactBlockProcessor.swift
//
//
// Created by Michal Fousek on 05.05.2023.
//
import Foundation
class CompactBlockProcessorNG {
// It would be better to use Combine here but Combine doesn't work great with async. When this runs regularly only one closure is stored here
// and that is one provided by `SDKSynchronizer`. But while running tests more "subscribers" is required here. Therefore it's required to handle
// more closures here.
private var eventClosures: [String: EventClosure] = [:]
private var syncTask: Task<Void, Error>?
private let actions: [CBPState: Action]
private var context: ActionContext
let logger: Logger
init(logger: Logger) {
context = ActionContext(state: .validateServer)
actions = Self.makeActions()
self.logger = logger
}
// swiftlint:disable:next cyclomatic_complexity
static func makeActions() -> [CBPState: Action] {
let actionsDefinition = CBPState.allCases.compactMap { state -> (CBPState, Action)? in
let action: Action
switch state {
case .validateServer:
action = ValidateServerAction()
case .computeSyncRanges:
action = ComputeSyncRangesAction()
case .checksBeforeSync:
action = ChecksBeforeSyncAction()
case .scanDownloaded:
action = ScandownloadedButUnscannedAction()
case .download:
action = DownloadAction()
case .validate:
action = ValidateAction()
case .scan:
action = ScanAction()
case .clearAlreadyScannedBlocks:
action = ClearAlreadyScannedBlocksAction()
case .enhance:
action = EnhanceAction()
case .fetchUTXO:
action = FetchUTXOsAction()
case .handleSaplingParams:
action = SaplingParamsAction()
case .clearCache:
action = ClearCacheAction()
case .finished, .failed, .stopped:
return nil
}
return (state, action)
}
return Dictionary(uniqueKeysWithValues: actionsDefinition)
}
}
// MARK: - "Public" API
extension CompactBlockProcessorNG {
func start(retry: Bool = false) async {
// if retry {
// self.retryAttempts = 0
// self.processingError = nil
// self.backoffTimer?.invalidate()
// self.backoffTimer = nil
// }
guard await canStartSync() else {
// switch self.state {
// case .error(let error):
// // max attempts have been reached
// logger.info("max retry attempts reached with error: \(error)")
// await notifyError(ZcashError.compactBlockProcessorMaxAttemptsReached(self.maxAttempts))
// await updateState(.stopped)
// case .stopped:
// // max attempts have been reached
// logger.info("max retry attempts reached")
// await notifyError(ZcashError.compactBlockProcessorMaxAttemptsReached(self.maxAttempts))
// case .synced:
// // max attempts have been reached
// logger.warn("max retry attempts reached on synced state, this indicates malfunction")
// await notifyError(ZcashError.compactBlockProcessorMaxAttemptsReached(self.maxAttempts))
// case .syncing, .enhancing, .fetching, .handlingSaplingFiles:
// logger.debug("Warning: compact block processor was started while busy!!!!")
// afterSyncHooksManager.insert(hook: .anotherSync)
// }
// return
return
}
//
// do {
// if let legacyCacheDbURL = self.config.cacheDbURL {
// try await self.migrateCacheDb(legacyCacheDbURL)
// }
// } catch {
// await self.fail(error)
// }
syncTask = Task(priority: .userInitiated) {
await run()
}
}
func stop() {
syncTask?.cancel()
// self.backoffTimer?.invalidate()
// self.backoffTimer = nil
//
// cancelableTask?.cancel()
//
// self.retryAttempts = 0
}
}
// MARK: - Events
extension CompactBlockProcessorNG {
typealias EventClosure = (Event) async -> Void
enum Event {
/// Event sent when the CompactBlockProcessor presented an error.
case failed (Error)
/// Event sent when the CompactBlockProcessor has finished syncing the blockchain to latest height
case finished (_ lastScannedHeight: BlockHeight, _ foundBlocks: Bool)
/// Event sent when the CompactBlockProcessor enhanced a bunch of transactions in some range.
case foundTransactions ([ZcashTransaction.Overview], CompactBlockRange)
/// Event sent when the CompactBlockProcessor handled a ReOrg.
/// `reorgHeight` is the height on which the reorg was detected.
/// `rewindHeight` is the height that the processor backed to in order to solve the Reorg.
case handledReorg (_ reorgHeight: BlockHeight, _ rewindHeight: BlockHeight)
/// Event sent when progress of the sync process changes.
case progressUpdated (CompactBlockProgress)
/// Event sent when the CompactBlockProcessor fetched utxos from lightwalletd attempted to store them.
case storedUTXOs ((inserted: [UnspentTransactionOutputEntity], skipped: [UnspentTransactionOutputEntity]))
/// Event sent when the CompactBlockProcessor starts enhancing of the transactions.
case startedEnhancing
/// Event sent when the CompactBlockProcessor starts fetching of the UTXOs.
case startedFetching
/// Event sent when the CompactBlockProcessor starts syncing.
case startedSyncing
/// Event sent when the CompactBlockProcessor stops syncing.
case stopped
}
func updateEventClosure(identifier: String, closure: @escaping (Event) async -> Void) async {
eventClosures[identifier] = closure
}
func send(event: Event) async {
for item in eventClosures {
await item.value(event)
}
}
}
// MARK: - Main loop
extension CompactBlockProcessorNG {
// This is main loop of the sync process. It simply takes state and try to find action which handles it. If action is found it executes the
// action. If action is not found then loop finishes. Thanks to this it's super easy to identify start point of sync process and end points
// of sync process without any side effects.
func run() async {
// Prepare for sync and set everything to default values.
await context.update(state: .validateServer)
await syncStarted()
// Try to find action for state.
while true {
guard let action = actions[await context.state] else {
await syncFinished()
break
}
do {
try Task.checkCancellation()
// Execute action.
context = try await action.run(with: context) { [weak self] progress in
await self?.update(progress: progress)
}
} catch {
logger.error("Sync failed with error: \(error)")
if Task.isCancelled {
logger.info("Processing cancelled.")
await syncStopped()
} else {
if case let ZcashError.rustValidateCombinedChainInvalidChain(height) = error {
await validationFailed(at: BlockHeight(height))
} else {
logger.error("processing failed with error: \(error)")
await fail(error)
}
}
}
}
}
func syncStarted() async {
// handle start of the sync process
await send(event: .startedSyncing)
}
func syncFinished() async {
// handle finish of the sync
// await send(event: .finished(<#T##lastScannedHeight: BlockHeight##BlockHeight#>, <#T##foundBlocks: Bool##Bool#>))
}
func update(progress: ActionProgress) async {
// handle update of the progree
}
func syncStopped() async {
await context.update(state: .stopped)
await send(event: .stopped)
// await handleAfterSyncHooks()
}
func validationFailed(at height: BlockHeight) async {
// handle validation failure
}
func fail(_ error: Error) async {
// handle failure
}
}
// MARK: - Utils
extension CompactBlockProcessorNG {
func canStartSync() async -> Bool {
switch await context.state {
case .stopped, .failed, .finished, .validateServer:
return true
case .computeSyncRanges, .checksBeforeSync, .download, .validate, .scan, .enhance, .fetchUTXO, .handleSaplingParams, .clearCache,
.scanDownloaded, .clearAlreadyScannedBlocks:
return false
}
}
}

View File

@ -23,6 +23,18 @@ struct SyncRanges: Equatable {
let latestScannedHeight: BlockHeight?
let latestDownloadedBlockHeight: BlockHeight?
static var empty: SyncRanges {
SyncRanges(
latestBlockHeight: 0,
downloadedButUnscannedRange: nil,
downloadAndScanRange: nil,
enhanceRange: nil,
fetchUTXORange: nil,
latestScannedHeight: nil,
latestDownloadedBlockHeight: nil
)
}
}
protocol InternalSyncProgressStorage {