Small updates

This commit is contained in:
Michal Fousek 2023-05-10 08:47:03 +02:00
parent 92994b067e
commit 2501cea4b4
15 changed files with 134 additions and 57 deletions

View File

@ -39,6 +39,8 @@ enum CBPState: CaseIterable {
case stopped
}
// this is replacement for CompactBlockProgress
enum ActionProgress {
case scan
}
@ -56,4 +58,7 @@ protocol Action {
// different conditions. And action is the thing that knows these conditions.
func run(with context: ActionContext, didUpdate: @escaping (ActionProgress) async -> Void) async throws -> ActionContext
// Should be called on each existing action when processor wants to stop. Some actions may do it's own background work.
func stop()
}

View File

@ -8,7 +8,7 @@
import Foundation
class ChecksBeforeSyncAction {
init() { }
init(container: DIContainer) { }
}
extension ChecksBeforeSyncAction: Action {
@ -24,7 +24,11 @@ extension ChecksBeforeSyncAction: Action {
// try await storage.create()
// }
await context.update(state: .scan)
await context.update(state: .fetchUTXO)
return context
}
func stop() {
}
}

View File

@ -8,18 +8,18 @@
import Foundation
class ClearAlreadyScannedBlocksAction {
init() { }
init(container: DIContainer) { }
}
extension ClearAlreadyScannedBlocksAction: Action {
func run(with context: ActionContext, didUpdate: @escaping (ActionProgress) async -> Void) async throws -> ActionContext {
// clear storage but delete only blocks that were already scanned, when doing parallel download all blocks can't be deleted
// if latestScannedHeight == context.scanRanges.downloadAndScanRange?.upperBound then set state `enhance`. Everything is scanned.
// If latestScannedHeight < context.scanRanges.downloadAndScanRange?.upperBound thne set state to `download` because there are blocks to
// download and scan.
await context.update(state: .enhance)
return context
}
func stop() {
}
}

View File

@ -8,7 +8,7 @@
import Foundation
class ClearCacheAction {
init() { }
init(container: DIContainer) { }
}
extension ClearCacheAction: Action {
@ -17,4 +17,8 @@ extension ClearCacheAction: Action {
await context.update(state: .finished)
return context
}
func stop() {
}
}

View File

@ -8,13 +8,19 @@
import Foundation
class ComputeSyncRangesAction {
init() { }
init(container: DIContainer) { }
}
extension ComputeSyncRangesAction: Action {
func run(with context: ActionContext, didUpdate: @escaping (ActionProgress) async -> Void) async throws -> ActionContext {
// call internalSyncProgress and compute sync ranges and store them in context
// if there is nothing sync just switch to finished state
await context.update(state: .checksBeforeSync)
return context
}
func stop() {
}
}

View File

@ -8,15 +8,30 @@
import Foundation
class DownloadAction {
init() { }
init(container: DIContainer) { }
}
extension DownloadAction: Action {
func run(with context: ActionContext, didUpdate: @escaping (ActionProgress) async -> Void) async throws -> ActionContext {
// Use `BlockDownloader` to set download limit to latestScannedHeight + (2*batchSize) (after parallel is merged).
// And start download.
// Compute batch sync range (range used by one loop in `downloadAndScanBlocks` method) and wait until blocks in this range are downloaded.
// do {
// await blockDownloader.setDownloadLimit(processingRange.upperBound + (2 * batchSize))
// await blockDownloader.startDownload(maxBlockBufferSize: config.downloadBufferSize)
//
// try await blockDownloader.waitUntilRequestedBlocksAreDownloaded(in: processingRange)
// } catch {
// await ifTaskIsNotCanceledClearCompactBlockCache(lastScannedHeight: lastScannedHeight)
// throw error
// }
await context.update(state: .validate)
return context
}
func stop() {
}
}

View File

@ -8,14 +8,24 @@
import Foundation
class EnhanceAction {
init() { }
init(container: DIContainer) { }
}
extension EnhanceAction: Action {
func run(with context: ActionContext, didUpdate: @escaping (ActionProgress) async -> Void) async throws -> ActionContext {
// Use `BlockEnhancer` to enhance blocks.
// This action is executed on each downloaded and scanned batch (typically each 100 blocks). But we want to run enhancement each 1000 blocks.
// This action can use `InternalSyncProgress` and last scanned height to compute when it should do work.
await context.update(state: .fetchUTXO)
// if latestScannedHeight == context.scanRanges.downloadAndScanRange?.upperBound then set state `enhance`. Everything is scanned.
// If latestScannedHeight < context.scanRanges.downloadAndScanRange?.upperBound thne set state to `download` because there are blocks to
// download and scan.
await context.update(state: .clearCache)
return context
}
func stop() {
}
}

View File

@ -8,7 +8,7 @@
import Foundation
class FetchUTXOsAction {
init() { }
init(container: DIContainer) { }
}
extension FetchUTXOsAction: Action {
@ -18,4 +18,8 @@ extension FetchUTXOsAction: Action {
await context.update(state: .handleSaplingParams)
return context
}
func stop() {
}
}

View File

@ -8,7 +8,7 @@
import Foundation
class SaplingParamsAction {
init() { }
init(container: DIContainer) { }
}
extension SaplingParamsAction: Action {
@ -16,7 +16,11 @@ extension SaplingParamsAction: Action {
// Download files with sapling params.
await context.update(state: .clearCache)
await context.update(state: .scanDownloaded)
return context
}
func stop() {
}
}

View File

@ -8,7 +8,7 @@
import Foundation
class ScanAction {
init() { }
init(container: DIContainer) { }
}
extension ScanAction: Action {
@ -18,4 +18,9 @@ extension ScanAction: Action {
await context.update(state: .clearAlreadyScannedBlocks)
return context
}
func stop() {
}
}

View File

@ -0,0 +1,35 @@
//
// ScandownloadedButUnscannedAction.swift
//
//
// Created by Michal Fousek on 05.05.2023.
//
import Foundation
class ScanDownloadedButUnscannedAction {
init(container: DIContainer) { }
}
extension ScanDownloadedButUnscannedAction: Action {
func run(with context: ActionContext, didUpdate: @escaping (ActionProgress) async -> Void) async throws -> ActionContext {
// if let range = ranges.downloadedButUnscannedRange {
// logger.debug("Starting scan with downloaded but not scanned blocks with range: \(range.lowerBound)...\(range.upperBound)")
// try await blockScanner.scanBlocks(at: range, totalProgressRange: totalProgressRange) { [weak self] lastScannedHeight in
// let progress = BlockProgress(
// startHeight: totalProgressRange.lowerBound,
// targetHeight: totalProgressRange.upperBound,
// progressHeight: lastScannedHeight
// )
// await self?.notifyProgress(.syncing(progress))
// }
// }
await context.update(state: .download)
return context
}
func stop() {
}
}

View File

@ -1,23 +0,0 @@
//
// ScandownloadedButUnscannedAction.swift
//
//
// Created by Michal Fousek on 05.05.2023.
//
import Foundation
class ScandownloadedButUnscannedAction {
init() { }
}
extension ScandownloadedButUnscannedAction: Action {
func run(with context: ActionContext, didUpdate: @escaping (ActionProgress) async -> Void) async throws -> ActionContext {
if let downloadedButUnscannedRange = await context.syncRanges.downloadedButUnscannedRange {
// Use `BlockScanner` to do the scanning in this range.
}
await context.update(state: .download)
return context
}
}

View File

@ -8,7 +8,7 @@
import Foundation
class ValidateAction {
init() { }
init(container: DIContainer) { }
}
extension ValidateAction: Action {
@ -19,4 +19,8 @@ extension ValidateAction: Action {
await context.update(state: .scan)
return context
}
func stop() {
}
}

View File

@ -8,7 +8,7 @@
import Foundation
class ValidateServerAction {
init() { }
init(container: DIContainer) { }
}
extension ValidateServerAction: Action {
@ -41,4 +41,8 @@ extension ValidateServerAction: Action {
await context.update(state: .computeSyncRanges)
return context
}
func stop() {
}
}

View File

@ -20,41 +20,41 @@ class CompactBlockProcessorNG {
let logger: Logger
init(logger: Logger) {
init(container: DIContainer) {
context = ActionContext(state: .validateServer)
actions = Self.makeActions()
self.logger = logger
actions = Self.makeActions(container: container)
self.logger = container.resolve(Logger.self)
}
// swiftlint:disable:next cyclomatic_complexity
static func makeActions() -> [CBPState: Action] {
static func makeActions(container: DIContainer) -> [CBPState: Action] {
let actionsDefinition = CBPState.allCases.compactMap { state -> (CBPState, Action)? in
let action: Action
switch state {
case .validateServer:
action = ValidateServerAction()
action = ValidateServerAction(container: container)
case .computeSyncRanges:
action = ComputeSyncRangesAction()
action = ComputeSyncRangesAction(container: container)
case .checksBeforeSync:
action = ChecksBeforeSyncAction()
action = ChecksBeforeSyncAction(container: container)
case .scanDownloaded:
action = ScandownloadedButUnscannedAction()
action = ScanDownloadedButUnscannedAction(container: container)
case .download:
action = DownloadAction()
action = DownloadAction(container: container)
case .validate:
action = ValidateAction()
action = ValidateAction(container: container)
case .scan:
action = ScanAction()
action = ScanAction(container: container)
case .clearAlreadyScannedBlocks:
action = ClearAlreadyScannedBlocksAction()
action = ClearAlreadyScannedBlocksAction(container: container)
case .enhance:
action = EnhanceAction()
action = EnhanceAction(container: container)
case .fetchUTXO:
action = FetchUTXOsAction()
action = FetchUTXOsAction(container: container)
case .handleSaplingParams:
action = SaplingParamsAction()
action = SaplingParamsAction(container: container)
case .clearCache:
action = ClearCacheAction()
action = ClearCacheAction(container: container)
case .finished, .failed, .stopped:
return nil
}