[#700] Implement progress reporting in state machine CBP

This commit is contained in:
Michal Fousek 2023-05-11 12:31:02 +02:00
parent 52ef45970d
commit 98226d8a6d
18 changed files with 113 additions and 110 deletions

View File

@ -41,11 +41,6 @@ enum CBPState: CaseIterable {
case stopped
}
// this is replacement for CompactBlockProgress
enum ActionProgress {
case scan
}
protocol Action {
// When any action is created it can get `DIContainer` and resolve any depedencies it requires.
@ -58,7 +53,7 @@ protocol Action {
// Each action updates context accordingly. It should at least set new state. Reason for this is that action can return different states for
// different conditions. And action is the thing that knows these conditions.
func run(with context: ActionContext, didUpdate: @escaping (ActionProgress) async -> Void) async throws -> ActionContext
func run(with context: ActionContext, didUpdate: @escaping (CompactBlockProgress) async -> Void) async throws -> ActionContext
// Should be called on each existing action when processor wants to stop. Some actions may do it's own background work.
func stop() async

View File

@ -12,7 +12,7 @@ class ChecksBeforeSyncAction {
}
extension ChecksBeforeSyncAction: Action {
func run(with context: ActionContext, didUpdate: @escaping (ActionProgress) async -> Void) async throws -> ActionContext {
func run(with context: ActionContext, didUpdate: @escaping (CompactBlockProgress) async -> Void) async throws -> ActionContext {
// clear any present cached state if needed.
// this checks if there was a sync in progress that was
// interrupted abruptly and cache was not able to be cleared

View File

@ -12,7 +12,7 @@ class ClearAlreadyScannedBlocksAction {
}
extension ClearAlreadyScannedBlocksAction: Action {
func run(with context: ActionContext, didUpdate: @escaping (ActionProgress) async -> Void) async throws -> ActionContext {
func run(with context: ActionContext, didUpdate: @escaping (CompactBlockProgress) async -> Void) async throws -> ActionContext {
// clear storage but delete only blocks that were already scanned, when doing parallel download all blocks can't be deleted
await context.update(state: .enhance)

View File

@ -12,7 +12,7 @@ class ClearCacheAction {
}
extension ClearCacheAction: Action {
func run(with context: ActionContext, didUpdate: @escaping (ActionProgress) async -> Void) async throws -> ActionContext {
func run(with context: ActionContext, didUpdate: @escaping (CompactBlockProgress) async -> Void) async throws -> ActionContext {
// clear storage
await context.update(state: .finished)
return context

View File

@ -12,7 +12,7 @@ class ComputeSyncRangesAction {
}
extension ComputeSyncRangesAction: Action {
func run(with context: ActionContext, didUpdate: @escaping (ActionProgress) async -> Void) async throws -> ActionContext {
func run(with context: ActionContext, didUpdate: @escaping (CompactBlockProgress) async -> Void) async throws -> ActionContext {
// call internalSyncProgress and compute sync ranges and store them in context
// if there is nothing sync just switch to finished state

View File

@ -24,7 +24,7 @@ class DownloadAction {
}
extension DownloadAction: Action {
func run(with context: ActionContext, didUpdate: @escaping (ActionProgress) async -> Void) async throws -> ActionContext {
func run(with context: ActionContext, didUpdate: @escaping (CompactBlockProgress) async -> Void) async throws -> ActionContext {
guard let downloadRange = await context.syncRanges.downloadAndScanRange else {
return await update(context: context)
}

View File

@ -12,7 +12,7 @@ class EnhanceAction {
}
extension EnhanceAction: Action {
func run(with context: ActionContext, didUpdate: @escaping (ActionProgress) async -> Void) async throws -> ActionContext {
func run(with context: ActionContext, didUpdate: @escaping (CompactBlockProgress) async -> Void) async throws -> ActionContext {
// Use `BlockEnhancer` to enhance blocks.
// This action is executed on each downloaded and scanned batch (typically each 100 blocks). But we want to run enhancement each 1000 blocks.
// This action can use `InternalSyncProgress` and last scanned height to compute when it should do work.

View File

@ -12,7 +12,7 @@ class FetchUTXOsAction {
}
extension FetchUTXOsAction: Action {
func run(with context: ActionContext, didUpdate: @escaping (ActionProgress) async -> Void) async throws -> ActionContext {
func run(with context: ActionContext, didUpdate: @escaping (CompactBlockProgress) async -> Void) async throws -> ActionContext {
// Use `UTXOFetcher` to fetch UTXOs.
await context.update(state: .handleSaplingParams)

View File

@ -27,7 +27,7 @@ class MigrateLegacyCacheDBAction {
}
extension MigrateLegacyCacheDBAction: Action {
func run(with context: ActionContext, didUpdate: @escaping (ActionProgress) async -> Void) async throws -> ActionContext {
func run(with context: ActionContext, didUpdate: @escaping (CompactBlockProgress) async -> Void) async throws -> ActionContext {
guard let legacyCacheDbURL = config.cacheDbURL else {
return await updateState(context)
}

View File

@ -12,7 +12,7 @@ class SaplingParamsAction {
}
extension SaplingParamsAction: Action {
func run(with context: ActionContext, didUpdate: @escaping (ActionProgress) async -> Void) async throws -> ActionContext {
func run(with context: ActionContext, didUpdate: @escaping (CompactBlockProgress) async -> Void) async throws -> ActionContext {
// Download files with sapling params.
await context.update(state: .scanDownloaded)

View File

@ -12,7 +12,7 @@ class ScanAction {
}
extension ScanAction: Action {
func run(with context: ActionContext, didUpdate: @escaping (ActionProgress) async -> Void) async throws -> ActionContext {
func run(with context: ActionContext, didUpdate: @escaping (CompactBlockProgress) async -> Void) async throws -> ActionContext {
// Scan in range latestScannedHeight...latestScannedHeight+batchSize.
await context.update(state: .clearAlreadyScannedBlocks)

View File

@ -12,7 +12,7 @@ class ScanDownloadedButUnscannedAction {
}
extension ScanDownloadedButUnscannedAction: Action {
func run(with context: ActionContext, didUpdate: @escaping (ActionProgress) async -> Void) async throws -> ActionContext {
func run(with context: ActionContext, didUpdate: @escaping (CompactBlockProgress) async -> Void) async throws -> ActionContext {
// if let range = ranges.downloadedButUnscannedRange {
// logger.debug("Starting scan with downloaded but not scanned blocks with range: \(range.lowerBound)...\(range.upperBound)")
// try await blockScanner.scanBlocks(at: range, totalProgressRange: totalProgressRange) { [weak self] lastScannedHeight in

View File

@ -15,7 +15,7 @@ class ValidateAction {
}
extension ValidateAction: Action {
func run(with context: ActionContext, didUpdate: @escaping (ActionProgress) async -> Void) async throws -> ActionContext {
func run(with context: ActionContext, didUpdate: @escaping (CompactBlockProgress) async -> Void) async throws -> ActionContext {
try await validator.validate()
await context.update(state: .scan)
return context

View File

@ -20,7 +20,7 @@ class ValidateServerAction {
}
extension ValidateServerAction: Action {
func run(with context: ActionContext, didUpdate: @escaping (ActionProgress) async -> Void) async throws -> ActionContext {
func run(with context: ActionContext, didUpdate: @escaping (CompactBlockProgress) async -> Void) async throws -> ActionContext {
let info = try await service.getInfo()
let localNetwork = config.network
let saplingActivation = config.saplingActivation

View File

@ -12,95 +12,6 @@ import Combine
public typealias RefreshedUTXOs = (inserted: [UnspentTransactionOutputEntity], skipped: [UnspentTransactionOutputEntity])
public enum CompactBlockProgress {
case syncing(_ progress: BlockProgress)
case enhance(_ progress: EnhancementProgress)
case fetch(_ progress: Float)
public var progress: Float {
switch self {
case .syncing(let blockProgress):
return blockProgress.progress
case .enhance(let enhancementProgress):
return enhancementProgress.progress
case .fetch(let fetchingProgress):
return fetchingProgress
}
}
public var progressHeight: BlockHeight? {
switch self {
case .syncing(let blockProgress):
return blockProgress.progressHeight
case .enhance(let enhancementProgress):
return enhancementProgress.lastFoundTransaction?.minedHeight
default:
return 0
}
}
public var blockDate: Date? {
if case .enhance(let enhancementProgress) = self, let time = enhancementProgress.lastFoundTransaction?.blockTime {
return Date(timeIntervalSince1970: time)
}
return nil
}
public var targetHeight: BlockHeight? {
switch self {
case .syncing(let blockProgress):
return blockProgress.targetHeight
default:
return nil
}
}
}
public struct EnhancementProgress: Equatable {
/// total transactions that were detected in the `range`
public let totalTransactions: Int
/// enhanced transactions so far
public let enhancedTransactions: Int
/// last found transaction
public let lastFoundTransaction: ZcashTransaction.Overview?
/// block range that's being enhanced
public let range: CompactBlockRange
/// whether this transaction can be considered `newly mined` and not part of the
/// wallet catching up to stale and uneventful blocks.
public let newlyMined: Bool
public init(
totalTransactions: Int,
enhancedTransactions: Int,
lastFoundTransaction: ZcashTransaction.Overview?,
range: CompactBlockRange,
newlyMined: Bool
) {
self.totalTransactions = totalTransactions
self.enhancedTransactions = enhancedTransactions
self.lastFoundTransaction = lastFoundTransaction
self.range = range
self.newlyMined = newlyMined
}
public var progress: Float {
totalTransactions > 0 ? Float(enhancedTransactions) / Float(totalTransactions) : 0
}
public static var zero: EnhancementProgress {
EnhancementProgress(totalTransactions: 0, enhancedTransactions: 0, lastFoundTransaction: nil, range: 0...0, newlyMined: false)
}
public static func == (lhs: EnhancementProgress, rhs: EnhancementProgress) -> Bool {
return
lhs.totalTransactions == rhs.totalTransactions &&
lhs.enhancedTransactions == rhs.enhancedTransactions &&
lhs.lastFoundTransaction?.id == rhs.lastFoundTransaction?.id &&
lhs.range == rhs.range
}
}
/// The compact block processor is in charge of orchestrating the download and caching of compact blocks from a LightWalletEndpoint
/// when started the processor downloads does a download - validate - scan cycle until it reaches latest height on the blockchain.
actor CompactBlockProcessor {

View File

@ -446,8 +446,8 @@ extension CompactBlockProcessorNG {
}
}
private func update(progress: ActionProgress) async {
// handle update of the progree
private func update(progress: CompactBlockProgress) async {
await send(event: .progressUpdated(progress))
}
private func syncStopped() async {

View File

@ -7,6 +7,50 @@
import Foundation
public struct EnhancementProgress: Equatable {
/// total transactions that were detected in the `range`
public let totalTransactions: Int
/// enhanced transactions so far
public let enhancedTransactions: Int
/// last found transaction
public let lastFoundTransaction: ZcashTransaction.Overview?
/// block range that's being enhanced
public let range: CompactBlockRange
/// whether this transaction can be considered `newly mined` and not part of the
/// wallet catching up to stale and uneventful blocks.
public let newlyMined: Bool
public init(
totalTransactions: Int,
enhancedTransactions: Int,
lastFoundTransaction: ZcashTransaction.Overview?,
range: CompactBlockRange,
newlyMined: Bool
) {
self.totalTransactions = totalTransactions
self.enhancedTransactions = enhancedTransactions
self.lastFoundTransaction = lastFoundTransaction
self.range = range
self.newlyMined = newlyMined
}
public var progress: Float {
totalTransactions > 0 ? Float(enhancedTransactions) / Float(totalTransactions) : 0
}
public static var zero: EnhancementProgress {
EnhancementProgress(totalTransactions: 0, enhancedTransactions: 0, lastFoundTransaction: nil, range: 0...0, newlyMined: false)
}
public static func == (lhs: EnhancementProgress, rhs: EnhancementProgress) -> Bool {
return
lhs.totalTransactions == rhs.totalTransactions &&
lhs.enhancedTransactions == rhs.enhancedTransactions &&
lhs.lastFoundTransaction?.id == rhs.lastFoundTransaction?.id &&
lhs.range == rhs.range
}
}
protocol BlockEnhancer {
func enhance(at range: CompactBlockRange, didEnhance: (EnhancementProgress) async -> Void) async throws -> [ZcashTransaction.Overview]?
}

View File

@ -0,0 +1,53 @@
//
// CompactBlockProgress.swift
//
//
// Created by Michal Fousek on 11.05.2023.
//
import Foundation
public enum CompactBlockProgress {
case syncing(_ progress: BlockProgress)
case enhance(_ progress: EnhancementProgress)
case fetch(_ progress: Float)
public var progress: Float {
switch self {
case .syncing(let blockProgress):
return blockProgress.progress
case .enhance(let enhancementProgress):
return enhancementProgress.progress
case .fetch(let fetchingProgress):
return fetchingProgress
}
}
public var progressHeight: BlockHeight? {
switch self {
case .syncing(let blockProgress):
return blockProgress.progressHeight
case .enhance(let enhancementProgress):
return enhancementProgress.lastFoundTransaction?.minedHeight
default:
return 0
}
}
public var blockDate: Date? {
if case .enhance(let enhancementProgress) = self, let time = enhancementProgress.lastFoundTransaction?.blockTime {
return Date(timeIntervalSince1970: time)
}
return nil
}
public var targetHeight: BlockHeight? {
switch self {
case .syncing(let blockProgress):
return blockProgress.targetHeight
default:
return nil
}
}
}