[#1051] Update how progress is computed after switch to state machine
Closes #1051 - new proposal for the progress computation - OverallProgress value is passed to the Synchronizer as a Float - OverallProgress is a result of fetch, scan and enhance operations - Order of actions no longer influences the computation - Actions report partial updates, CompactBlockProgress actor holds current state and computes the OverallProgress
This commit is contained in:
parent
c3e11989dc
commit
841f42ab6e
|
@ -64,7 +64,6 @@ extension EnhanceAction: Action {
|
|||
let transactions = try await blockEnhancer.enhance(
|
||||
at: enhanceRange,
|
||||
didEnhance: { progress in
|
||||
await didUpdate(.progressUpdated(.enhance(progress)))
|
||||
if let foundTx = progress.lastFoundTransaction, progress.newlyMined {
|
||||
await didUpdate(.minedTransaction(foundTx))
|
||||
}
|
||||
|
|
|
@ -24,7 +24,7 @@ extension FetchUTXOsAction: Action {
|
|||
if let range = await context.syncRanges.fetchUTXORange {
|
||||
logger.debug("Fetching UTXO with range: \(range.lowerBound)...\(range.upperBound)")
|
||||
let result = try await utxoFetcher.fetch(at: range) { fetchProgress in
|
||||
await didUpdate(.progressUpdated(.fetch(fetchProgress)))
|
||||
await didUpdate(.progressPartialUpdate(.fetch(fetchProgress)))
|
||||
}
|
||||
await didUpdate(.storedUTXOs(result))
|
||||
}
|
||||
|
|
|
@ -55,7 +55,7 @@ extension ScanAction: Action {
|
|||
progressHeight: lastScannedHeight
|
||||
)
|
||||
self?.logger.debug("progress: \(progress)")
|
||||
await didUpdate(.progressUpdated(.syncing(progress)))
|
||||
await didUpdate(.progressPartialUpdate(.syncing(progress)))
|
||||
}
|
||||
|
||||
return await update(context: context)
|
||||
|
|
|
@ -24,14 +24,7 @@ extension ScanDownloadedButUnscannedAction: Action {
|
|||
if let range = await context.syncRanges.downloadedButUnscannedRange {
|
||||
logger.debug("Starting scan with downloaded but not scanned blocks with range: \(range.lowerBound)...\(range.upperBound)")
|
||||
let totalProgressRange = await context.totalProgressRange
|
||||
try await blockScanner.scanBlocks(at: range, totalProgressRange: totalProgressRange) { lastScannedHeight in
|
||||
let progress = BlockProgress(
|
||||
startHeight: totalProgressRange.lowerBound,
|
||||
targetHeight: totalProgressRange.upperBound,
|
||||
progressHeight: lastScannedHeight
|
||||
)
|
||||
await didUpdate(.progressUpdated(.syncing(progress)))
|
||||
}
|
||||
try await blockScanner.scanBlocks(at: range, totalProgressRange: totalProgressRange) { _ in }
|
||||
}
|
||||
await context.update(state: .download)
|
||||
return context
|
||||
|
|
|
@ -41,7 +41,9 @@ actor CompactBlockProcessor {
|
|||
private var retryAttempts: Int = 0
|
||||
private var backoffTimer: Timer?
|
||||
private var consecutiveChainValidationErrors: Int = 0
|
||||
|
||||
|
||||
private var compactBlockProgress: CompactBlockProgress = .zero
|
||||
|
||||
/// Compact Block Processor configuration
|
||||
///
|
||||
/// - parameter fsBlockCacheRoot: absolute root path where the filesystem block cache will be stored.
|
||||
|
@ -400,8 +402,11 @@ extension CompactBlockProcessor {
|
|||
/// `rewindHeight` is the height that the processor backed to in order to solve the Reorg.
|
||||
case handledReorg(_ reorgHeight: BlockHeight, _ rewindHeight: BlockHeight)
|
||||
|
||||
/// Event sent when progress of some specific action happened.
|
||||
case progressPartialUpdate(CompactBlockProgressUpdate)
|
||||
|
||||
/// Event sent when progress of the sync process changes.
|
||||
case progressUpdated(CompactBlockProgress)
|
||||
case progressUpdated(Float)
|
||||
|
||||
/// Event sent when the CompactBlockProcessor fetched utxos from lightwalletd attempted to store them.
|
||||
case storedUTXOs((inserted: [UnspentTransactionOutputEntity], skipped: [UnspentTransactionOutputEntity]))
|
||||
|
@ -438,7 +443,7 @@ extension CompactBlockProcessor {
|
|||
// of sync process without any side effects.
|
||||
private func run() async {
|
||||
logger.debug("Starting run")
|
||||
resetContext()
|
||||
await resetContext()
|
||||
|
||||
while true {
|
||||
// Sync is starting when the state is `idle`.
|
||||
|
@ -457,7 +462,7 @@ extension CompactBlockProcessor {
|
|||
// Try to find action for state.
|
||||
guard let action = actions[state] else {
|
||||
if await syncFinished() {
|
||||
resetContext()
|
||||
await resetContext()
|
||||
continue
|
||||
} else {
|
||||
break
|
||||
|
@ -470,6 +475,11 @@ extension CompactBlockProcessor {
|
|||
// Execute action.
|
||||
context = try await action.run(with: context) { [weak self] event in
|
||||
await self?.send(event: event)
|
||||
if let progressChanged = await self?.compactBlockProgress.event(event), progressChanged {
|
||||
if let progress = await self?.compactBlockProgress.progress {
|
||||
await self?.send(event: .progressUpdated(progress))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
await didFinishAction()
|
||||
|
@ -477,7 +487,7 @@ extension CompactBlockProcessor {
|
|||
if Task.isCancelled {
|
||||
if await syncTaskWasCancelled() {
|
||||
// Start sync all over again
|
||||
resetContext()
|
||||
await resetContext()
|
||||
} else {
|
||||
// end the sync loop
|
||||
break
|
||||
|
@ -485,7 +495,7 @@ extension CompactBlockProcessor {
|
|||
} else {
|
||||
if await handleSyncFailure(action: action, error: error) {
|
||||
// Start sync all over again
|
||||
resetContext()
|
||||
await resetContext()
|
||||
} else {
|
||||
// end the sync loop
|
||||
break
|
||||
|
@ -568,8 +578,9 @@ extension CompactBlockProcessor {
|
|||
}
|
||||
}
|
||||
|
||||
private func resetContext() {
|
||||
private func resetContext() async {
|
||||
context = ActionContext(state: .idle)
|
||||
await compactBlockProgress.reset()
|
||||
}
|
||||
|
||||
private func syncStarted() async {
|
||||
|
|
|
@ -7,47 +7,58 @@
|
|||
|
||||
import Foundation
|
||||
|
||||
public enum CompactBlockProgress {
|
||||
final actor CompactBlockProgress {
|
||||
static let zero = CompactBlockProgress()
|
||||
|
||||
enum Action: Equatable {
|
||||
case enhance
|
||||
case fetch
|
||||
case scan
|
||||
|
||||
func weight() -> Float {
|
||||
switch self {
|
||||
case .enhance: return 0.08
|
||||
case .fetch: return 0.02
|
||||
case .scan: return 0.9
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
var actionProgresses: [Action: Float] = [:]
|
||||
|
||||
var progress: Float {
|
||||
var overallProgress = Float(0)
|
||||
actionProgresses.forEach { key, value in
|
||||
overallProgress += value * key.weight()
|
||||
}
|
||||
|
||||
return overallProgress
|
||||
}
|
||||
|
||||
func event(_ event: CompactBlockProcessor.Event) -> Bool {
|
||||
guard case .progressPartialUpdate(let update) = event else {
|
||||
return false
|
||||
}
|
||||
|
||||
switch update {
|
||||
case .syncing(let progress):
|
||||
actionProgresses[.scan] = progress.progress
|
||||
case .enhance(let progress):
|
||||
actionProgresses[.enhance] = progress.progress
|
||||
case .fetch(let progress):
|
||||
actionProgresses[.fetch] = progress
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
func reset() {
|
||||
actionProgresses.removeAll()
|
||||
}
|
||||
}
|
||||
|
||||
enum CompactBlockProgressUpdate: Equatable {
|
||||
case syncing(_ progress: BlockProgress)
|
||||
case enhance(_ progress: EnhancementProgress)
|
||||
case fetch(_ progress: Float)
|
||||
|
||||
public var progress: Float {
|
||||
switch self {
|
||||
case .syncing(let blockProgress):
|
||||
return blockProgress.progress
|
||||
case .enhance(let enhancementProgress):
|
||||
return enhancementProgress.progress
|
||||
case .fetch(let fetchingProgress):
|
||||
return fetchingProgress
|
||||
}
|
||||
}
|
||||
|
||||
public var progressHeight: BlockHeight? {
|
||||
switch self {
|
||||
case .syncing(let blockProgress):
|
||||
return blockProgress.progressHeight
|
||||
case .enhance(let enhancementProgress):
|
||||
return enhancementProgress.lastFoundTransaction?.minedHeight
|
||||
default:
|
||||
return 0
|
||||
}
|
||||
}
|
||||
|
||||
public var blockDate: Date? {
|
||||
if case .enhance(let enhancementProgress) = self, let time = enhancementProgress.lastFoundTransaction?.blockTime {
|
||||
return Date(timeIntervalSince1970: time)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
public var targetHeight: BlockHeight? {
|
||||
switch self {
|
||||
case .syncing(let blockProgress):
|
||||
return blockProgress.targetHeight
|
||||
default:
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -366,15 +366,9 @@ enum InternalSyncStatus: Equatable {
|
|||
/// taking other maintenance steps that need to occur after an upgrade.
|
||||
case unprepared
|
||||
|
||||
case syncing(_ progress: BlockProgress)
|
||||
|
||||
/// Indicates that this Synchronizer is actively enhancing newly scanned blocks
|
||||
/// with additional transaction details, fetched from the server.
|
||||
case enhancing(_ progress: EnhancementProgress)
|
||||
|
||||
/// fetches the transparent balance and stores it locally
|
||||
case fetching(_ progress: Float)
|
||||
|
||||
/// Indicates that this Synchronizer is actively processing new blocks (consists of fetch, scan and enhance operations)
|
||||
case syncing(Float)
|
||||
|
||||
/// Indicates that this Synchronizer is fully up to date and ready for all wallet functions.
|
||||
/// When set, a UI element may want to turn green.
|
||||
case synced
|
||||
|
@ -390,7 +384,7 @@ enum InternalSyncStatus: Equatable {
|
|||
|
||||
public var isSyncing: Bool {
|
||||
switch self {
|
||||
case .syncing, .enhancing, .fetching:
|
||||
case .syncing:
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
|
@ -416,8 +410,6 @@ enum InternalSyncStatus: Equatable {
|
|||
switch self {
|
||||
case .unprepared: return "unprepared"
|
||||
case .syncing: return "syncing"
|
||||
case .enhancing: return "enhancing"
|
||||
case .fetching: return "fetching"
|
||||
case .synced: return "synced"
|
||||
case .stopped: return "stopped"
|
||||
case .disconnected: return "disconnected"
|
||||
|
@ -449,8 +441,6 @@ extension InternalSyncStatus {
|
|||
switch (lhs, rhs) {
|
||||
case (.unprepared, .unprepared): return true
|
||||
case let (.syncing(lhsProgress), .syncing(rhsProgress)): return lhsProgress == rhsProgress
|
||||
case let (.enhancing(lhsProgress), .enhancing(rhsProgress)): return lhsProgress == rhsProgress
|
||||
case (.fetching, .fetching): return true
|
||||
case (.synced, .synced): return true
|
||||
case (.stopped, .stopped): return true
|
||||
case (.disconnected, .disconnected): return true
|
||||
|
@ -461,15 +451,8 @@ extension InternalSyncStatus {
|
|||
}
|
||||
|
||||
extension InternalSyncStatus {
|
||||
init(_ blockProcessorProgress: CompactBlockProgress) {
|
||||
switch blockProcessorProgress {
|
||||
case .syncing(let progressReport):
|
||||
self = .syncing(progressReport)
|
||||
case .enhance(let enhancingReport):
|
||||
self = .enhancing(enhancingReport)
|
||||
case .fetch(let fetchingProgress):
|
||||
self = .fetching(fetchingProgress)
|
||||
}
|
||||
init(_ blockProcessorProgress: Float) {
|
||||
self = .syncing(blockProcessorProgress)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -479,11 +462,7 @@ extension InternalSyncStatus {
|
|||
case .unprepared:
|
||||
return .unprepared
|
||||
case .syncing(let progress):
|
||||
return .syncing(0.9 * progress.progress)
|
||||
case .enhancing(let progress):
|
||||
return .syncing(0.9 + 0.08 * progress.progress)
|
||||
case .fetching(let progress):
|
||||
return .syncing(0.98 + 0.02 * progress)
|
||||
return .syncing(progress)
|
||||
case .synced:
|
||||
return .upToDate
|
||||
case .stopped:
|
||||
|
|
|
@ -157,14 +157,14 @@ public class SDKSynchronizer: Synchronizer {
|
|||
case .unprepared:
|
||||
throw ZcashError.synchronizerNotPrepared
|
||||
|
||||
case .syncing, .enhancing, .fetching:
|
||||
case .syncing:
|
||||
logger.warn("warning: Synchronizer started when already running. Next sync process will be started when the current one stops.")
|
||||
/// This may look strange but `CompactBlockProcessor` has mechanisms which can handle this situation. So we are fine with calling
|
||||
/// it's start here.
|
||||
await blockProcessor.start(retry: retry)
|
||||
|
||||
case .stopped, .synced, .disconnected, .error:
|
||||
await updateStatus(.syncing(.nullProgress))
|
||||
await updateStatus(.syncing(0))
|
||||
syncStartDate = Date()
|
||||
await blockProcessor.start(retry: retry)
|
||||
}
|
||||
|
@ -197,7 +197,6 @@ public class SDKSynchronizer: Synchronizer {
|
|||
|
||||
// MARK: Handle CompactBlockProcessor.Flow
|
||||
|
||||
// swiftlint:disable:next cyclomatic_complexity
|
||||
private func subscribeToProcessorEvents(_ processor: CompactBlockProcessor) async {
|
||||
let eventClosure: CompactBlockProcessor.EventClosure = { [weak self] event in
|
||||
switch event {
|
||||
|
@ -217,17 +216,14 @@ public class SDKSynchronizer: Synchronizer {
|
|||
case let .progressUpdated(progress):
|
||||
await self?.progressUpdated(progress: progress)
|
||||
|
||||
case .progressPartialUpdate:
|
||||
break
|
||||
|
||||
case let .storedUTXOs(utxos):
|
||||
self?.storedUTXOs(utxos: utxos)
|
||||
|
||||
case .startedEnhancing:
|
||||
await self?.updateStatus(.enhancing(.zero))
|
||||
|
||||
case .startedFetching:
|
||||
await self?.updateStatus(.fetching(0))
|
||||
|
||||
case .startedSyncing:
|
||||
await self?.updateStatus(.syncing(.nullProgress))
|
||||
case .startedEnhancing, .startedFetching, .startedSyncing:
|
||||
break
|
||||
|
||||
case .stopped:
|
||||
await self?.updateStatus(.stopped)
|
||||
|
@ -263,7 +259,7 @@ public class SDKSynchronizer: Synchronizer {
|
|||
}
|
||||
}
|
||||
|
||||
private func progressUpdated(progress: CompactBlockProgress) async {
|
||||
private func progressUpdated(progress: Float) async {
|
||||
let newStatus = InternalSyncStatus(progress)
|
||||
await updateStatus(newStatus)
|
||||
}
|
||||
|
@ -626,8 +622,6 @@ extension InternalSyncStatus {
|
|||
switch (self, otherStatus) {
|
||||
case (.unprepared, .unprepared): return false
|
||||
case (.syncing, .syncing): return false
|
||||
case (.enhancing, .enhancing): return false
|
||||
case (.fetching, .fetching): return false
|
||||
case (.synced, .synced): return false
|
||||
case (.stopped, .stopped): return false
|
||||
case (.disconnected, .disconnected): return false
|
||||
|
|
Loading…
Reference in New Issue