parent
15680e7b64
commit
f8ea5bb859
|
@ -7,7 +7,7 @@
|
|||
|
||||
import Foundation
|
||||
|
||||
class CompactBlockProcessorNG {
|
||||
actor CompactBlockProcessorNG {
|
||||
// It would be better to use Combine here but Combine doesn't work great with async. When this runs regularly only one closure is stored here
|
||||
// and that is one provided by `SDKSynchronizer`. But while running tests more "subscribers" is required here. Therefore it's required to handle
|
||||
// more closures here.
|
||||
|
@ -31,6 +31,8 @@ class CompactBlockProcessorNG {
|
|||
private let storage: CompactBlockRepository
|
||||
private let transactionRepository: TransactionRepository
|
||||
|
||||
private(set) var config: Configuration
|
||||
|
||||
private var retryAttempts: Int = 0
|
||||
private var backoffTimer: Timer?
|
||||
private var consecutiveChainValidationErrors: Int = 0
|
||||
|
@ -345,6 +347,10 @@ extension CompactBlockProcessorNG {
|
|||
await context.update(state: .validateServer)
|
||||
await syncStarted()
|
||||
|
||||
if backoffTimer == nil {
|
||||
await setTimer()
|
||||
}
|
||||
|
||||
// Try to find action for state.
|
||||
while true {
|
||||
guard let action = actions[await context.state] else {
|
||||
|
@ -439,7 +445,7 @@ extension CompactBlockProcessorNG {
|
|||
|
||||
// don't set a new timer if there are no more attempts.
|
||||
if hasRetryAttempt() {
|
||||
// await self.setTimer()
|
||||
await self.setTimer()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -447,6 +453,42 @@ extension CompactBlockProcessorNG {
|
|||
// MARK: - Utils
|
||||
|
||||
extension CompactBlockProcessorNG {
|
||||
private func setTimer() async {
|
||||
let interval = config.blockPollInterval
|
||||
self.backoffTimer?.invalidate()
|
||||
let timer = Timer(
|
||||
timeInterval: interval,
|
||||
repeats: true,
|
||||
block: { [weak self] _ in
|
||||
Task { [weak self] in
|
||||
guard let self else { return }
|
||||
switch await context.state {
|
||||
case .stopped, .failed, .finished, .validateServer:
|
||||
if await self.canStartSync() {
|
||||
self.logger.debug(
|
||||
"""
|
||||
Timer triggered: Starting compact Block processor!.
|
||||
Processor State: \(await self.context.state)
|
||||
latestHeight: \(try await self.transactionRepository.lastScannedHeight())
|
||||
attempts: \(await self.retryAttempts)
|
||||
"""
|
||||
)
|
||||
await self.start()
|
||||
} else if await hasRetryAttempt() {
|
||||
await self.failure(ZcashError.compactBlockProcessorMaxAttemptsReached(self.config.retries))
|
||||
}
|
||||
case .computeSyncRanges, .checksBeforeSync, .download, .validate, .scan, .enhance, .fetchUTXO, .handleSaplingParams, .clearCache,
|
||||
.scanDownloaded, .clearAlreadyScannedBlocks:
|
||||
await self.latestBlocksDataProvider.updateBlockData()
|
||||
}
|
||||
}
|
||||
}
|
||||
)
|
||||
RunLoop.main.add(timer, forMode: .default)
|
||||
|
||||
self.backoffTimer = timer
|
||||
}
|
||||
|
||||
func canStartSync() async -> Bool {
|
||||
switch await context.state {
|
||||
case .stopped, .failed, .finished, .validateServer:
|
||||
|
|
Loading…
Reference in New Issue