[#700] Implement sending of some events from CompactBlockProcessorNG

This commit is contained in:
Michal Fousek 2023-05-11 10:53:12 +02:00
parent bc9fbe7e7d
commit 85643e598c
1 changed files with 72 additions and 12 deletions

View File

@ -268,7 +268,7 @@ extension CompactBlockProcessorNG {
case failed (Error)
/// Event sent when the CompactBlockProcessor has finished syncing the blockchain to latest height
case finished (_ lastScannedHeight: BlockHeight, _ foundBlocks: Bool)
case finished (_ lastScannedHeight: BlockHeight)
/// Event sent when the CompactBlockProcessor enhanced a bunch of transactions in some range.
case foundTransactions ([ZcashTransaction.Overview], CompactBlockRange)
@ -327,18 +327,25 @@ extension CompactBlockProcessorNG {
}
}
// Try to find action for state.
guard let action = actions[await context.state] else {
await syncFinished()
break
}
do {
try Task.checkCancellation()
// Try to find action for state.
guard let action = actions[await context.state] else {
if try await syncFinished() {
resetContext()
continue
} else {
break
}
}
// Execute action.
context = try await action.run(with: context) { [weak self] progress in
await self?.update(progress: progress)
}
await didFinishAction()
} catch {
if Task.isCancelled {
logger.info("Sync cancelled.")
@ -370,6 +377,47 @@ extension CompactBlockProcessorNG {
}
}
// swiftlint:disable:next cyclomatic_complexity
private func didFinishAction() async {
// This is evalution of the state setup by previous action.
switch await context.state {
case .idle:
break
case .migrateLegacyCacheDB:
break
case .validateServer:
break
case .computeSyncRanges:
break
case .checksBeforeSync:
break
case .scanDownloaded:
break
case .download:
break
case .validate:
break
case .scan:
break
case .clearAlreadyScannedBlocks:
break
case .enhance:
await send(event: .startedEnhancing)
case .fetchUTXO:
await send(event: .startedFetching)
case .handleSaplingParams:
break
case .clearCache:
break
case .finished:
break
case .failed:
break
case .stopped:
break
}
}
private func resetContext() {
context = ActionContext(state: .idle)
}
@ -379,10 +427,23 @@ extension CompactBlockProcessorNG {
await send(event: .startedSyncing)
}
private func syncFinished() async {
syncTask = nil
// handle finish of the sync
// await send(event: .finished(<#T##lastScannedHeight: BlockHeight##BlockHeight#>, <#T##foundBlocks: Bool##Bool#>))
private func syncFinished() async throws -> Bool {
let newerBlocksWereMinedDuringSync = await context.syncRanges.latestBlockHeight < latestBlocksDataProvider.latestBlockHeight
retryAttempts = 0
consecutiveChainValidationErrors = 0
let lastScannedHeight = try await transactionRepository.lastScannedHeight()
await send(event: .finished(lastScannedHeight))
await context.update(state: .finished)
// If new blocks were mined during previous sync run the sync process again
if newerBlocksWereMinedDuringSync {
return true
} else {
await setTimer()
return false
}
}
private func update(progress: ActionProgress) async {
@ -393,7 +454,6 @@ extension CompactBlockProcessorNG {
syncTask = nil
await context.update(state: .stopped)
await send(event: .stopped)
// await handleAfterSyncHooks()
}
private func validationFailed(at height: BlockHeight) async throws {