[#1051] Update how progress is computed after switch to state machine
Closes #1051 - new proposal for the progress computation - OverallProgress value is passed to the Synchronizer as a Float - OverallProgress is a result of fetch, scan and enhance operations - Order of actions no longer influences the computation - Actions report partial updates, CompactBlockProgress actor holds current state and computes the OverallProgress
This commit is contained in:
parent
1d1dedd26e
commit
1b0e9e00f5
|
@ -64,7 +64,6 @@ extension EnhanceAction: Action {
|
||||||
let transactions = try await blockEnhancer.enhance(
|
let transactions = try await blockEnhancer.enhance(
|
||||||
at: enhanceRange,
|
at: enhanceRange,
|
||||||
didEnhance: { progress in
|
didEnhance: { progress in
|
||||||
await didUpdate(.progressUpdated(.enhance(progress)))
|
|
||||||
if let foundTx = progress.lastFoundTransaction, progress.newlyMined {
|
if let foundTx = progress.lastFoundTransaction, progress.newlyMined {
|
||||||
await didUpdate(.minedTransaction(foundTx))
|
await didUpdate(.minedTransaction(foundTx))
|
||||||
}
|
}
|
||||||
|
|
|
@ -24,7 +24,7 @@ extension FetchUTXOsAction: Action {
|
||||||
if let range = await context.syncRanges.fetchUTXORange {
|
if let range = await context.syncRanges.fetchUTXORange {
|
||||||
logger.debug("Fetching UTXO with range: \(range.lowerBound)...\(range.upperBound)")
|
logger.debug("Fetching UTXO with range: \(range.lowerBound)...\(range.upperBound)")
|
||||||
let result = try await utxoFetcher.fetch(at: range) { fetchProgress in
|
let result = try await utxoFetcher.fetch(at: range) { fetchProgress in
|
||||||
await didUpdate(.progressUpdated(.fetch(fetchProgress)))
|
await didUpdate(.progressPartialUpdate(.fetch(fetchProgress)))
|
||||||
}
|
}
|
||||||
await didUpdate(.storedUTXOs(result))
|
await didUpdate(.storedUTXOs(result))
|
||||||
}
|
}
|
||||||
|
|
|
@ -55,7 +55,7 @@ extension ScanAction: Action {
|
||||||
progressHeight: lastScannedHeight
|
progressHeight: lastScannedHeight
|
||||||
)
|
)
|
||||||
self?.logger.debug("progress: \(progress)")
|
self?.logger.debug("progress: \(progress)")
|
||||||
await didUpdate(.progressUpdated(.syncing(progress)))
|
await didUpdate(.progressPartialUpdate(.syncing(progress)))
|
||||||
}
|
}
|
||||||
|
|
||||||
return await update(context: context)
|
return await update(context: context)
|
||||||
|
|
|
@ -24,14 +24,7 @@ extension ScanDownloadedButUnscannedAction: Action {
|
||||||
if let range = await context.syncRanges.downloadedButUnscannedRange {
|
if let range = await context.syncRanges.downloadedButUnscannedRange {
|
||||||
logger.debug("Starting scan with downloaded but not scanned blocks with range: \(range.lowerBound)...\(range.upperBound)")
|
logger.debug("Starting scan with downloaded but not scanned blocks with range: \(range.lowerBound)...\(range.upperBound)")
|
||||||
let totalProgressRange = await context.totalProgressRange
|
let totalProgressRange = await context.totalProgressRange
|
||||||
try await blockScanner.scanBlocks(at: range, totalProgressRange: totalProgressRange) { lastScannedHeight in
|
try await blockScanner.scanBlocks(at: range, totalProgressRange: totalProgressRange) { _ in }
|
||||||
let progress = BlockProgress(
|
|
||||||
startHeight: totalProgressRange.lowerBound,
|
|
||||||
targetHeight: totalProgressRange.upperBound,
|
|
||||||
progressHeight: lastScannedHeight
|
|
||||||
)
|
|
||||||
await didUpdate(.progressUpdated(.syncing(progress)))
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
await context.update(state: .download)
|
await context.update(state: .download)
|
||||||
return context
|
return context
|
||||||
|
|
|
@ -41,7 +41,9 @@ actor CompactBlockProcessor {
|
||||||
private var retryAttempts: Int = 0
|
private var retryAttempts: Int = 0
|
||||||
private var backoffTimer: Timer?
|
private var backoffTimer: Timer?
|
||||||
private var consecutiveChainValidationErrors: Int = 0
|
private var consecutiveChainValidationErrors: Int = 0
|
||||||
|
|
||||||
|
private var compactBlockProgress: CompactBlockProgress = .zero
|
||||||
|
|
||||||
/// Compact Block Processor configuration
|
/// Compact Block Processor configuration
|
||||||
///
|
///
|
||||||
/// - parameter fsBlockCacheRoot: absolute root path where the filesystem block cache will be stored.
|
/// - parameter fsBlockCacheRoot: absolute root path where the filesystem block cache will be stored.
|
||||||
|
@ -400,8 +402,11 @@ extension CompactBlockProcessor {
|
||||||
/// `rewindHeight` is the height that the processor backed to in order to solve the Reorg.
|
/// `rewindHeight` is the height that the processor backed to in order to solve the Reorg.
|
||||||
case handledReorg(_ reorgHeight: BlockHeight, _ rewindHeight: BlockHeight)
|
case handledReorg(_ reorgHeight: BlockHeight, _ rewindHeight: BlockHeight)
|
||||||
|
|
||||||
|
/// Event sent when progress of some specific action happened.
|
||||||
|
case progressPartialUpdate(CompactBlockProgressUpdate)
|
||||||
|
|
||||||
/// Event sent when progress of the sync process changes.
|
/// Event sent when progress of the sync process changes.
|
||||||
case progressUpdated(CompactBlockProgress)
|
case progressUpdated(Float)
|
||||||
|
|
||||||
/// Event sent when the CompactBlockProcessor fetched utxos from lightwalletd attempted to store them.
|
/// Event sent when the CompactBlockProcessor fetched utxos from lightwalletd attempted to store them.
|
||||||
case storedUTXOs((inserted: [UnspentTransactionOutputEntity], skipped: [UnspentTransactionOutputEntity]))
|
case storedUTXOs((inserted: [UnspentTransactionOutputEntity], skipped: [UnspentTransactionOutputEntity]))
|
||||||
|
@ -438,7 +443,7 @@ extension CompactBlockProcessor {
|
||||||
// of sync process without any side effects.
|
// of sync process without any side effects.
|
||||||
private func run() async {
|
private func run() async {
|
||||||
logger.debug("Starting run")
|
logger.debug("Starting run")
|
||||||
resetContext()
|
await resetContext()
|
||||||
|
|
||||||
while true {
|
while true {
|
||||||
// Sync is starting when the state is `idle`.
|
// Sync is starting when the state is `idle`.
|
||||||
|
@ -461,7 +466,7 @@ extension CompactBlockProcessor {
|
||||||
// Side effect of calling stop is to delete last used download stream. To be sure that it doesn't keep any data in memory.
|
// Side effect of calling stop is to delete last used download stream. To be sure that it doesn't keep any data in memory.
|
||||||
await stopAllActions()
|
await stopAllActions()
|
||||||
if await syncFinished() {
|
if await syncFinished() {
|
||||||
resetContext()
|
await resetContext()
|
||||||
continue
|
continue
|
||||||
} else {
|
} else {
|
||||||
break
|
break
|
||||||
|
@ -474,6 +479,11 @@ extension CompactBlockProcessor {
|
||||||
// Execute action.
|
// Execute action.
|
||||||
context = try await action.run(with: context) { [weak self] event in
|
context = try await action.run(with: context) { [weak self] event in
|
||||||
await self?.send(event: event)
|
await self?.send(event: event)
|
||||||
|
if let progressChanged = await self?.compactBlockProgress.event(event), progressChanged {
|
||||||
|
if let progress = await self?.compactBlockProgress.progress {
|
||||||
|
await self?.send(event: .progressUpdated(progress))
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
await didFinishAction()
|
await didFinishAction()
|
||||||
|
@ -485,7 +495,7 @@ extension CompactBlockProcessor {
|
||||||
if Task.isCancelled {
|
if Task.isCancelled {
|
||||||
if await syncTaskWasCancelled() {
|
if await syncTaskWasCancelled() {
|
||||||
// Start sync all over again
|
// Start sync all over again
|
||||||
resetContext()
|
await resetContext()
|
||||||
} else {
|
} else {
|
||||||
// end the sync loop
|
// end the sync loop
|
||||||
break
|
break
|
||||||
|
@ -493,7 +503,7 @@ extension CompactBlockProcessor {
|
||||||
} else {
|
} else {
|
||||||
if await handleSyncFailure(action: action, error: error) {
|
if await handleSyncFailure(action: action, error: error) {
|
||||||
// Start sync all over again
|
// Start sync all over again
|
||||||
resetContext()
|
await resetContext()
|
||||||
} else {
|
} else {
|
||||||
// end the sync loop
|
// end the sync loop
|
||||||
break
|
break
|
||||||
|
@ -576,8 +586,9 @@ extension CompactBlockProcessor {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private func resetContext() {
|
private func resetContext() async {
|
||||||
context = ActionContext(state: .idle)
|
context = ActionContext(state: .idle)
|
||||||
|
await compactBlockProgress.reset()
|
||||||
}
|
}
|
||||||
|
|
||||||
private func syncStarted() async {
|
private func syncStarted() async {
|
||||||
|
|
|
@ -7,47 +7,58 @@
|
||||||
|
|
||||||
import Foundation
|
import Foundation
|
||||||
|
|
||||||
public enum CompactBlockProgress {
|
final actor CompactBlockProgress {
|
||||||
|
static let zero = CompactBlockProgress()
|
||||||
|
|
||||||
|
enum Action: Equatable {
|
||||||
|
case enhance
|
||||||
|
case fetch
|
||||||
|
case scan
|
||||||
|
|
||||||
|
func weight() -> Float {
|
||||||
|
switch self {
|
||||||
|
case .enhance: return 0.08
|
||||||
|
case .fetch: return 0.02
|
||||||
|
case .scan: return 0.9
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
var actionProgresses: [Action: Float] = [:]
|
||||||
|
|
||||||
|
var progress: Float {
|
||||||
|
var overallProgress = Float(0)
|
||||||
|
actionProgresses.forEach { key, value in
|
||||||
|
overallProgress += value * key.weight()
|
||||||
|
}
|
||||||
|
|
||||||
|
return overallProgress
|
||||||
|
}
|
||||||
|
|
||||||
|
func event(_ event: CompactBlockProcessor.Event) -> Bool {
|
||||||
|
guard case .progressPartialUpdate(let update) = event else {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
switch update {
|
||||||
|
case .syncing(let progress):
|
||||||
|
actionProgresses[.scan] = progress.progress
|
||||||
|
case .enhance(let progress):
|
||||||
|
actionProgresses[.enhance] = progress.progress
|
||||||
|
case .fetch(let progress):
|
||||||
|
actionProgresses[.fetch] = progress
|
||||||
|
}
|
||||||
|
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
func reset() {
|
||||||
|
actionProgresses.removeAll()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
enum CompactBlockProgressUpdate: Equatable {
|
||||||
case syncing(_ progress: BlockProgress)
|
case syncing(_ progress: BlockProgress)
|
||||||
case enhance(_ progress: EnhancementProgress)
|
case enhance(_ progress: EnhancementProgress)
|
||||||
case fetch(_ progress: Float)
|
case fetch(_ progress: Float)
|
||||||
|
|
||||||
public var progress: Float {
|
|
||||||
switch self {
|
|
||||||
case .syncing(let blockProgress):
|
|
||||||
return blockProgress.progress
|
|
||||||
case .enhance(let enhancementProgress):
|
|
||||||
return enhancementProgress.progress
|
|
||||||
case .fetch(let fetchingProgress):
|
|
||||||
return fetchingProgress
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public var progressHeight: BlockHeight? {
|
|
||||||
switch self {
|
|
||||||
case .syncing(let blockProgress):
|
|
||||||
return blockProgress.progressHeight
|
|
||||||
case .enhance(let enhancementProgress):
|
|
||||||
return enhancementProgress.lastFoundTransaction?.minedHeight
|
|
||||||
default:
|
|
||||||
return 0
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public var blockDate: Date? {
|
|
||||||
if case .enhance(let enhancementProgress) = self, let time = enhancementProgress.lastFoundTransaction?.blockTime {
|
|
||||||
return Date(timeIntervalSince1970: time)
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
public var targetHeight: BlockHeight? {
|
|
||||||
switch self {
|
|
||||||
case .syncing(let blockProgress):
|
|
||||||
return blockProgress.targetHeight
|
|
||||||
default:
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -366,15 +366,9 @@ enum InternalSyncStatus: Equatable {
|
||||||
/// taking other maintenance steps that need to occur after an upgrade.
|
/// taking other maintenance steps that need to occur after an upgrade.
|
||||||
case unprepared
|
case unprepared
|
||||||
|
|
||||||
case syncing(_ progress: BlockProgress)
|
/// Indicates that this Synchronizer is actively processing new blocks (consists of fetch, scan and enhance operations)
|
||||||
|
case syncing(Float)
|
||||||
/// Indicates that this Synchronizer is actively enhancing newly scanned blocks
|
|
||||||
/// with additional transaction details, fetched from the server.
|
|
||||||
case enhancing(_ progress: EnhancementProgress)
|
|
||||||
|
|
||||||
/// fetches the transparent balance and stores it locally
|
|
||||||
case fetching(_ progress: Float)
|
|
||||||
|
|
||||||
/// Indicates that this Synchronizer is fully up to date and ready for all wallet functions.
|
/// Indicates that this Synchronizer is fully up to date and ready for all wallet functions.
|
||||||
/// When set, a UI element may want to turn green.
|
/// When set, a UI element may want to turn green.
|
||||||
case synced
|
case synced
|
||||||
|
@ -390,7 +384,7 @@ enum InternalSyncStatus: Equatable {
|
||||||
|
|
||||||
public var isSyncing: Bool {
|
public var isSyncing: Bool {
|
||||||
switch self {
|
switch self {
|
||||||
case .syncing, .enhancing, .fetching:
|
case .syncing:
|
||||||
return true
|
return true
|
||||||
default:
|
default:
|
||||||
return false
|
return false
|
||||||
|
@ -416,8 +410,6 @@ enum InternalSyncStatus: Equatable {
|
||||||
switch self {
|
switch self {
|
||||||
case .unprepared: return "unprepared"
|
case .unprepared: return "unprepared"
|
||||||
case .syncing: return "syncing"
|
case .syncing: return "syncing"
|
||||||
case .enhancing: return "enhancing"
|
|
||||||
case .fetching: return "fetching"
|
|
||||||
case .synced: return "synced"
|
case .synced: return "synced"
|
||||||
case .stopped: return "stopped"
|
case .stopped: return "stopped"
|
||||||
case .disconnected: return "disconnected"
|
case .disconnected: return "disconnected"
|
||||||
|
@ -449,8 +441,6 @@ extension InternalSyncStatus {
|
||||||
switch (lhs, rhs) {
|
switch (lhs, rhs) {
|
||||||
case (.unprepared, .unprepared): return true
|
case (.unprepared, .unprepared): return true
|
||||||
case let (.syncing(lhsProgress), .syncing(rhsProgress)): return lhsProgress == rhsProgress
|
case let (.syncing(lhsProgress), .syncing(rhsProgress)): return lhsProgress == rhsProgress
|
||||||
case let (.enhancing(lhsProgress), .enhancing(rhsProgress)): return lhsProgress == rhsProgress
|
|
||||||
case (.fetching, .fetching): return true
|
|
||||||
case (.synced, .synced): return true
|
case (.synced, .synced): return true
|
||||||
case (.stopped, .stopped): return true
|
case (.stopped, .stopped): return true
|
||||||
case (.disconnected, .disconnected): return true
|
case (.disconnected, .disconnected): return true
|
||||||
|
@ -461,15 +451,8 @@ extension InternalSyncStatus {
|
||||||
}
|
}
|
||||||
|
|
||||||
extension InternalSyncStatus {
|
extension InternalSyncStatus {
|
||||||
init(_ blockProcessorProgress: CompactBlockProgress) {
|
init(_ blockProcessorProgress: Float) {
|
||||||
switch blockProcessorProgress {
|
self = .syncing(blockProcessorProgress)
|
||||||
case .syncing(let progressReport):
|
|
||||||
self = .syncing(progressReport)
|
|
||||||
case .enhance(let enhancingReport):
|
|
||||||
self = .enhancing(enhancingReport)
|
|
||||||
case .fetch(let fetchingProgress):
|
|
||||||
self = .fetching(fetchingProgress)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -479,11 +462,7 @@ extension InternalSyncStatus {
|
||||||
case .unprepared:
|
case .unprepared:
|
||||||
return .unprepared
|
return .unprepared
|
||||||
case .syncing(let progress):
|
case .syncing(let progress):
|
||||||
return .syncing(0.9 * progress.progress)
|
return .syncing(progress)
|
||||||
case .enhancing(let progress):
|
|
||||||
return .syncing(0.9 + 0.08 * progress.progress)
|
|
||||||
case .fetching(let progress):
|
|
||||||
return .syncing(0.98 + 0.02 * progress)
|
|
||||||
case .synced:
|
case .synced:
|
||||||
return .upToDate
|
return .upToDate
|
||||||
case .stopped:
|
case .stopped:
|
||||||
|
|
|
@ -157,14 +157,14 @@ public class SDKSynchronizer: Synchronizer {
|
||||||
case .unprepared:
|
case .unprepared:
|
||||||
throw ZcashError.synchronizerNotPrepared
|
throw ZcashError.synchronizerNotPrepared
|
||||||
|
|
||||||
case .syncing, .enhancing, .fetching:
|
case .syncing:
|
||||||
logger.warn("warning: Synchronizer started when already running. Next sync process will be started when the current one stops.")
|
logger.warn("warning: Synchronizer started when already running. Next sync process will be started when the current one stops.")
|
||||||
/// This may look strange but `CompactBlockProcessor` has mechanisms which can handle this situation. So we are fine with calling
|
/// This may look strange but `CompactBlockProcessor` has mechanisms which can handle this situation. So we are fine with calling
|
||||||
/// it's start here.
|
/// it's start here.
|
||||||
await blockProcessor.start(retry: retry)
|
await blockProcessor.start(retry: retry)
|
||||||
|
|
||||||
case .stopped, .synced, .disconnected, .error:
|
case .stopped, .synced, .disconnected, .error:
|
||||||
await updateStatus(.syncing(.nullProgress))
|
await updateStatus(.syncing(0))
|
||||||
syncStartDate = Date()
|
syncStartDate = Date()
|
||||||
await blockProcessor.start(retry: retry)
|
await blockProcessor.start(retry: retry)
|
||||||
}
|
}
|
||||||
|
@ -197,7 +197,6 @@ public class SDKSynchronizer: Synchronizer {
|
||||||
|
|
||||||
// MARK: Handle CompactBlockProcessor.Flow
|
// MARK: Handle CompactBlockProcessor.Flow
|
||||||
|
|
||||||
// swiftlint:disable:next cyclomatic_complexity
|
|
||||||
private func subscribeToProcessorEvents(_ processor: CompactBlockProcessor) async {
|
private func subscribeToProcessorEvents(_ processor: CompactBlockProcessor) async {
|
||||||
let eventClosure: CompactBlockProcessor.EventClosure = { [weak self] event in
|
let eventClosure: CompactBlockProcessor.EventClosure = { [weak self] event in
|
||||||
switch event {
|
switch event {
|
||||||
|
@ -217,17 +216,14 @@ public class SDKSynchronizer: Synchronizer {
|
||||||
case let .progressUpdated(progress):
|
case let .progressUpdated(progress):
|
||||||
await self?.progressUpdated(progress: progress)
|
await self?.progressUpdated(progress: progress)
|
||||||
|
|
||||||
|
case .progressPartialUpdate:
|
||||||
|
break
|
||||||
|
|
||||||
case let .storedUTXOs(utxos):
|
case let .storedUTXOs(utxos):
|
||||||
self?.storedUTXOs(utxos: utxos)
|
self?.storedUTXOs(utxos: utxos)
|
||||||
|
|
||||||
case .startedEnhancing:
|
case .startedEnhancing, .startedFetching, .startedSyncing:
|
||||||
await self?.updateStatus(.enhancing(.zero))
|
break
|
||||||
|
|
||||||
case .startedFetching:
|
|
||||||
await self?.updateStatus(.fetching(0))
|
|
||||||
|
|
||||||
case .startedSyncing:
|
|
||||||
await self?.updateStatus(.syncing(.nullProgress))
|
|
||||||
|
|
||||||
case .stopped:
|
case .stopped:
|
||||||
await self?.updateStatus(.stopped)
|
await self?.updateStatus(.stopped)
|
||||||
|
@ -263,7 +259,7 @@ public class SDKSynchronizer: Synchronizer {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private func progressUpdated(progress: CompactBlockProgress) async {
|
private func progressUpdated(progress: Float) async {
|
||||||
let newStatus = InternalSyncStatus(progress)
|
let newStatus = InternalSyncStatus(progress)
|
||||||
await updateStatus(newStatus)
|
await updateStatus(newStatus)
|
||||||
}
|
}
|
||||||
|
@ -626,8 +622,6 @@ extension InternalSyncStatus {
|
||||||
switch (self, otherStatus) {
|
switch (self, otherStatus) {
|
||||||
case (.unprepared, .unprepared): return false
|
case (.unprepared, .unprepared): return false
|
||||||
case (.syncing, .syncing): return false
|
case (.syncing, .syncing): return false
|
||||||
case (.enhancing, .enhancing): return false
|
|
||||||
case (.fetching, .fetching): return false
|
|
||||||
case (.synced, .synced): return false
|
case (.synced, .synced): return false
|
||||||
case (.stopped, .stopped): return false
|
case (.stopped, .stopped): return false
|
||||||
case (.disconnected, .disconnected): return false
|
case (.disconnected, .disconnected): return false
|
||||||
|
|
Loading…
Reference in New Issue