[#1055] Implement retry timer to state machine CBP

Closes #1055
This commit is contained in:
Michal Fousek 2023-05-10 13:58:57 +02:00
parent 4d9d4a48ef
commit f8030e0a73
1 changed files with 44 additions and 2 deletions

View File

@ -7,7 +7,7 @@
import Foundation
class CompactBlockProcessorNG {
actor CompactBlockProcessorNG {
// It would be better to use Combine here but Combine doesn't work great with async. When this runs regularly only one closure is stored here
// and that is one provided by `SDKSynchronizer`. But while running tests more "subscribers" is required here. Therefore it's required to handle
// more closures here.
@ -31,6 +31,8 @@ class CompactBlockProcessorNG {
private let storage: CompactBlockRepository
private let transactionRepository: TransactionRepository
private(set) var config: Configuration
private var retryAttempts: Int = 0
private var backoffTimer: Timer?
private var consecutiveChainValidationErrors: Int = 0
@ -345,6 +347,10 @@ extension CompactBlockProcessorNG {
await context.update(state: .validateServer)
await syncStarted()
if backoffTimer == nil {
await setTimer()
}
// Try to find action for state.
while true {
guard let action = actions[await context.state] else {
@ -439,7 +445,7 @@ extension CompactBlockProcessorNG {
// don't set a new timer if there are no more attempts.
if hasRetryAttempt() {
// await self.setTimer()
await self.setTimer()
}
}
}
@ -447,6 +453,42 @@ extension CompactBlockProcessorNG {
// MARK: - Utils
extension CompactBlockProcessorNG {
private func setTimer() async {
let interval = config.blockPollInterval
self.backoffTimer?.invalidate()
let timer = Timer(
timeInterval: interval,
repeats: true,
block: { [weak self] _ in
Task { [weak self] in
guard let self else { return }
switch await context.state {
case .stopped, .failed, .finished, .validateServer:
if await self.canStartSync() {
self.logger.debug(
"""
Timer triggered: Starting compact Block processor!.
Processor State: \(await self.context.state)
latestHeight: \(try await self.transactionRepository.lastScannedHeight())
attempts: \(await self.retryAttempts)
"""
)
await self.start()
} else if await hasRetryAttempt() {
await self.failure(ZcashError.compactBlockProcessorMaxAttemptsReached(self.config.retries))
}
case .computeSyncRanges, .checksBeforeSync, .download, .validate, .scan, .enhance, .fetchUTXO, .handleSaplingParams, .clearCache,
.scanDownloaded, .clearAlreadyScannedBlocks:
await self.latestBlocksDataProvider.updateBlockData()
}
}
}
)
RunLoop.main.add(timer, forMode: .default)
self.backoffTimer = timer
}
func canStartSync() async -> Bool {
switch await context.state {
case .stopped, .failed, .finished, .validateServer: