[#700] Make CBP state machine work

- new proposal for the progress computation
- OverallProgress value is passed to the Synchronizer as a Float
- OverallProgress is a result of fetch, scan and enhance operations
- Order of actions no longer influences the computation
- Actions report partial updates, CompactBlockProgress actor holds current state and computes the OverallProgress

[#700] Make CBP state machine work

- code cleanup

[#700] Make CBP state machine work

- internal

[#700] Make CBP state machine work

- removed unused computed property
This commit is contained in:
Lukas Korba 2023-05-15 16:20:09 +02:00
parent f338d3b377
commit 9f2a97b415
8 changed files with 87 additions and 100 deletions

View File

@ -64,7 +64,6 @@ extension EnhanceAction: Action {
let transactions = try await blockEnhancer.enhance(
at: enhanceRange,
didEnhance: { progress in
await didUpdate(.progressUpdated(.enhance(progress)))
if let foundTx = progress.lastFoundTransaction, progress.newlyMined {
await didUpdate(.minedTransaction(foundTx))
}

View File

@ -24,7 +24,7 @@ extension FetchUTXOsAction: Action {
if let range = await context.syncRanges.fetchUTXORange {
logger.debug("Fetching UTXO with range: \(range.lowerBound)...\(range.upperBound)")
let result = try await utxoFetcher.fetch(at: range) { fetchProgress in
await didUpdate(.progressUpdated(.fetch(fetchProgress)))
await didUpdate(.progressPartialUpdate(.fetch(fetchProgress)))
}
await didUpdate(.storedUTXOs(result))
}

View File

@ -55,7 +55,7 @@ extension ScanAction: Action {
progressHeight: lastScannedHeight
)
self?.logger.debug("progress: \(progress)")
await didUpdate(.progressUpdated(.syncing(progress)))
await didUpdate(.progressPartialUpdate(.syncing(progress)))
}
return await update(context: context)

View File

@ -24,14 +24,7 @@ extension ScanDownloadedButUnscannedAction: Action {
if let range = await context.syncRanges.downloadedButUnscannedRange {
logger.debug("Starting scan with downloaded but not scanned blocks with range: \(range.lowerBound)...\(range.upperBound)")
let totalProgressRange = await context.totalProgressRange
try await blockScanner.scanBlocks(at: range, totalProgressRange: totalProgressRange) { lastScannedHeight in
let progress = BlockProgress(
startHeight: totalProgressRange.lowerBound,
targetHeight: totalProgressRange.upperBound,
progressHeight: lastScannedHeight
)
await didUpdate(.progressUpdated(.syncing(progress)))
}
try await blockScanner.scanBlocks(at: range, totalProgressRange: totalProgressRange) { _ in }
}
await context.update(state: .download)
return context

View File

@ -41,7 +41,9 @@ actor CompactBlockProcessor {
private var retryAttempts: Int = 0
private var backoffTimer: Timer?
private var consecutiveChainValidationErrors: Int = 0
private var compactBlockProgress: CompactBlockProgress = .zero
/// Compact Block Processor configuration
///
/// - parameter fsBlockCacheRoot: absolute root path where the filesystem block cache will be stored.
@ -400,8 +402,11 @@ extension CompactBlockProcessor {
/// `rewindHeight` is the height that the processor backed to in order to solve the Reorg.
case handledReorg(_ reorgHeight: BlockHeight, _ rewindHeight: BlockHeight)
/// Event sent when progress of some specific action happened.
case progressPartialUpdate(CompactBlockProgressUpdate)
/// Event sent when progress of the sync process changes.
case progressUpdated(CompactBlockProgress)
case progressUpdated(Float)
/// Event sent when the CompactBlockProcessor fetched utxos from lightwalletd attempted to store them.
case storedUTXOs((inserted: [UnspentTransactionOutputEntity], skipped: [UnspentTransactionOutputEntity]))
@ -438,7 +443,7 @@ extension CompactBlockProcessor {
// of sync process without any side effects.
private func run() async {
logger.debug("Starting run")
resetContext()
await resetContext()
while true {
// Sync is starting when the state is `idle`.
@ -457,7 +462,7 @@ extension CompactBlockProcessor {
// Try to find action for state.
guard let action = actions[state] else {
if await syncFinished() {
resetContext()
await resetContext()
continue
} else {
break
@ -470,6 +475,11 @@ extension CompactBlockProcessor {
// Execute action.
context = try await action.run(with: context) { [weak self] event in
await self?.send(event: event)
if let progressChanged = await self?.compactBlockProgress.event(event), progressChanged {
if let progress = await self?.compactBlockProgress.progress {
await self?.send(event: .progressUpdated(progress))
}
}
}
await didFinishAction()
@ -477,7 +487,7 @@ extension CompactBlockProcessor {
if Task.isCancelled {
if await syncTaskWasCancelled() {
// Start sync all over again
resetContext()
await resetContext()
} else {
// end the sync loop
break
@ -485,7 +495,7 @@ extension CompactBlockProcessor {
} else {
if await handleSyncFailure(action: action, error: error) {
// Start sync all over again
resetContext()
await resetContext()
} else {
// end the sync loop
break
@ -568,8 +578,9 @@ extension CompactBlockProcessor {
}
}
private func resetContext() {
private func resetContext() async {
context = ActionContext(state: .idle)
await compactBlockProgress.reset()
}
private func syncStarted() async {

View File

@ -7,47 +7,58 @@
import Foundation
public enum CompactBlockProgress {
final actor CompactBlockProgress {
static let zero = CompactBlockProgress()
enum Action: Equatable {
case enhance
case fetch
case scan
func weight() -> Float {
switch self {
case .enhance: return 0.08
case .fetch: return 0.02
case .scan: return 0.9
}
}
}
var actionProgresses: [Action: Float] = [:]
var progress: Float {
var overallProgress = Float(0)
actionProgresses.forEach { key, value in
overallProgress += value * key.weight()
}
return overallProgress
}
func event(_ event: CompactBlockProcessor.Event) -> Bool {
guard case .progressPartialUpdate(let update) = event else {
return false
}
switch update {
case .syncing(let progress):
actionProgresses[.scan] = progress.progress
case .enhance(let progress):
actionProgresses[.enhance] = progress.progress
case .fetch(let progress):
actionProgresses[.fetch] = progress
}
return true
}
func reset() {
actionProgresses.removeAll()
}
}
enum CompactBlockProgressUpdate: Equatable {
case syncing(_ progress: BlockProgress)
case enhance(_ progress: EnhancementProgress)
case fetch(_ progress: Float)
public var progress: Float {
switch self {
case .syncing(let blockProgress):
return blockProgress.progress
case .enhance(let enhancementProgress):
return enhancementProgress.progress
case .fetch(let fetchingProgress):
return fetchingProgress
}
}
public var progressHeight: BlockHeight? {
switch self {
case .syncing(let blockProgress):
return blockProgress.progressHeight
case .enhance(let enhancementProgress):
return enhancementProgress.lastFoundTransaction?.minedHeight
default:
return 0
}
}
public var blockDate: Date? {
if case .enhance(let enhancementProgress) = self, let time = enhancementProgress.lastFoundTransaction?.blockTime {
return Date(timeIntervalSince1970: time)
}
return nil
}
public var targetHeight: BlockHeight? {
switch self {
case .syncing(let blockProgress):
return blockProgress.targetHeight
default:
return nil
}
}
}

View File

@ -366,15 +366,9 @@ enum InternalSyncStatus: Equatable {
/// taking other maintenance steps that need to occur after an upgrade.
case unprepared
case syncing(_ progress: BlockProgress)
/// Indicates that this Synchronizer is actively enhancing newly scanned blocks
/// with additional transaction details, fetched from the server.
case enhancing(_ progress: EnhancementProgress)
/// fetches the transparent balance and stores it locally
case fetching(_ progress: Float)
/// Indicates that this Synchronizer is actively processing new blocks (consists of fetch, scan and enhance operations)
case syncing(Float)
/// Indicates that this Synchronizer is fully up to date and ready for all wallet functions.
/// When set, a UI element may want to turn green.
case synced
@ -390,7 +384,7 @@ enum InternalSyncStatus: Equatable {
public var isSyncing: Bool {
switch self {
case .syncing, .enhancing, .fetching:
case .syncing:
return true
default:
return false
@ -416,8 +410,6 @@ enum InternalSyncStatus: Equatable {
switch self {
case .unprepared: return "unprepared"
case .syncing: return "syncing"
case .enhancing: return "enhancing"
case .fetching: return "fetching"
case .synced: return "synced"
case .stopped: return "stopped"
case .disconnected: return "disconnected"
@ -449,8 +441,6 @@ extension InternalSyncStatus {
switch (lhs, rhs) {
case (.unprepared, .unprepared): return true
case let (.syncing(lhsProgress), .syncing(rhsProgress)): return lhsProgress == rhsProgress
case let (.enhancing(lhsProgress), .enhancing(rhsProgress)): return lhsProgress == rhsProgress
case (.fetching, .fetching): return true
case (.synced, .synced): return true
case (.stopped, .stopped): return true
case (.disconnected, .disconnected): return true
@ -461,15 +451,8 @@ extension InternalSyncStatus {
}
extension InternalSyncStatus {
init(_ blockProcessorProgress: CompactBlockProgress) {
switch blockProcessorProgress {
case .syncing(let progressReport):
self = .syncing(progressReport)
case .enhance(let enhancingReport):
self = .enhancing(enhancingReport)
case .fetch(let fetchingProgress):
self = .fetching(fetchingProgress)
}
init(_ blockProcessorProgress: Float) {
self = .syncing(blockProcessorProgress)
}
}
@ -479,11 +462,7 @@ extension InternalSyncStatus {
case .unprepared:
return .unprepared
case .syncing(let progress):
return .syncing(0.9 * progress.progress)
case .enhancing(let progress):
return .syncing(0.9 + 0.08 * progress.progress)
case .fetching(let progress):
return .syncing(0.98 + 0.02 * progress)
return .syncing(progress)
case .synced:
return .upToDate
case .stopped:

View File

@ -157,14 +157,14 @@ public class SDKSynchronizer: Synchronizer {
case .unprepared:
throw ZcashError.synchronizerNotPrepared
case .syncing, .enhancing, .fetching:
case .syncing:
logger.warn("warning: Synchronizer started when already running. Next sync process will be started when the current one stops.")
/// This may look strange but `CompactBlockProcessor` has mechanisms which can handle this situation. So we are fine with calling
/// it's start here.
await blockProcessor.start(retry: retry)
case .stopped, .synced, .disconnected, .error:
await updateStatus(.syncing(.nullProgress))
await updateStatus(.syncing(0))
syncStartDate = Date()
await blockProcessor.start(retry: retry)
}
@ -197,7 +197,6 @@ public class SDKSynchronizer: Synchronizer {
// MARK: Handle CompactBlockProcessor.Flow
// swiftlint:disable:next cyclomatic_complexity
private func subscribeToProcessorEvents(_ processor: CompactBlockProcessor) async {
let eventClosure: CompactBlockProcessor.EventClosure = { [weak self] event in
switch event {
@ -217,17 +216,14 @@ public class SDKSynchronizer: Synchronizer {
case let .progressUpdated(progress):
await self?.progressUpdated(progress: progress)
case .progressPartialUpdate:
break
case let .storedUTXOs(utxos):
self?.storedUTXOs(utxos: utxos)
case .startedEnhancing:
await self?.updateStatus(.enhancing(.zero))
case .startedFetching:
await self?.updateStatus(.fetching(0))
case .startedSyncing:
await self?.updateStatus(.syncing(.nullProgress))
case .startedEnhancing, .startedFetching, .startedSyncing:
break
case .stopped:
await self?.updateStatus(.stopped)
@ -263,7 +259,7 @@ public class SDKSynchronizer: Synchronizer {
}
}
private func progressUpdated(progress: CompactBlockProgress) async {
private func progressUpdated(progress: Float) async {
let newStatus = InternalSyncStatus(progress)
await updateStatus(newStatus)
}
@ -626,8 +622,6 @@ extension InternalSyncStatus {
switch (self, otherStatus) {
case (.unprepared, .unprepared): return false
case (.syncing, .syncing): return false
case (.enhancing, .enhancing): return false
case (.fetching, .fetching): return false
case (.synced, .synced): return false
case (.stopped, .stopped): return false
case (.disconnected, .disconnected): return false