[#1058] Implement stop for state machine CBP

Closes #1058
This commit is contained in:
Michal Fousek 2023-05-10 13:03:45 +02:00
parent 1d029ff90f
commit 4efab5a3fb
1 changed files with 24 additions and 22 deletions

View File

@ -31,8 +31,6 @@ actor CompactBlockProcessorNG {
private let storage: CompactBlockRepository private let storage: CompactBlockRepository
private let transactionRepository: TransactionRepository private let transactionRepository: TransactionRepository
private(set) var config: Configuration
private var retryAttempts: Int = 0 private var retryAttempts: Int = 0
private var backoffTimer: Timer? private var backoffTimer: Timer?
private var consecutiveChainValidationErrors: Int = 0 private var consecutiveChainValidationErrors: Int = 0
@ -268,14 +266,9 @@ extension CompactBlockProcessorNG {
} }
} }
func stop() { func stop() async {
syncTask?.cancel() await rawStop()
// self.backoffTimer?.invalidate() self.retryAttempts = 0
// self.backoffTimer = nil
//
// cancelableTask?.cancel()
//
// self.retryAttempts = 0
} }
} }
@ -408,8 +401,7 @@ extension CompactBlockProcessorNG {
func validationFailed(at height: BlockHeight) async throws { func validationFailed(at height: BlockHeight) async throws {
// cancel all Tasks // cancel all Tasks
syncTask?.cancel() await rawStop()
// await blockDownloader.stopDownload()
// rewind // rewind
let rewindHeight = determineLowerBound( let rewindHeight = determineLowerBound(
@ -432,11 +424,8 @@ extension CompactBlockProcessorNG {
await context.update(state: .failed) await context.update(state: .failed)
logger.error("Fail with error: \(error)") logger.error("Fail with error: \(error)")
syncTask?.cancel() await rawStop()
syncTask = nil
backoffTimer?.invalidate()
backoffTimer = nil
// await blockDownloader.stopDownload()
self.retryAttempts += 1 self.retryAttempts += 1
await send(event: .failed(error)) await send(event: .failed(error))
@ -459,7 +448,7 @@ extension CompactBlockProcessorNG {
block: { [weak self] _ in block: { [weak self] _ in
Task { [weak self] in Task { [weak self] in
guard let self else { return } guard let self else { return }
switch await context.state { switch await self.context.state {
case .stopped, .failed, .finished, .validateServer: case .stopped, .failed, .finished, .validateServer:
if await self.canStartSync() { if await self.canStartSync() {
self.logger.debug( self.logger.debug(
@ -475,7 +464,7 @@ extension CompactBlockProcessorNG {
await self.failure(ZcashError.compactBlockProcessorMaxAttemptsReached(self.config.retries)) await self.failure(ZcashError.compactBlockProcessorMaxAttemptsReached(self.config.retries))
} }
case .computeSyncRanges, .checksBeforeSync, .download, .validate, .scan, .enhance, .fetchUTXO, .handleSaplingParams, .clearCache, case .computeSyncRanges, .checksBeforeSync, .download, .validate, .scan, .enhance, .fetchUTXO, .handleSaplingParams, .clearCache,
.scanDownloaded, .clearAlreadyScannedBlocks: .scanDownloaded, .clearAlreadyScannedBlocks, .migrateLegacyCacheDB:
await self.latestBlocksDataProvider.updateBlockData() await self.latestBlocksDataProvider.updateBlockData()
} }
} }
@ -486,7 +475,7 @@ extension CompactBlockProcessorNG {
self.backoffTimer = timer self.backoffTimer = timer
} }
func canStartSync() async -> Bool { private func canStartSync() async -> Bool {
switch await context.state { switch await context.state {
case .stopped, .failed, .finished, .migrateLegacyCacheDB: case .stopped, .failed, .finished, .migrateLegacyCacheDB:
return hasRetryAttempt() return hasRetryAttempt()
@ -496,12 +485,25 @@ extension CompactBlockProcessorNG {
} }
} }
func hasRetryAttempt() -> Bool { private func hasRetryAttempt() -> Bool {
retryAttempts < config.retries retryAttempts < config.retries
} }
func determineLowerBound(errorHeight: Int, consecutiveErrors: Int, walletBirthday: BlockHeight) -> BlockHeight { private func determineLowerBound(errorHeight: Int, consecutiveErrors: Int, walletBirthday: BlockHeight) -> BlockHeight {
let offset = min(ZcashSDK.maxReorgSize, ZcashSDK.defaultRewindDistance * (consecutiveErrors + 1)) let offset = min(ZcashSDK.maxReorgSize, ZcashSDK.defaultRewindDistance * (consecutiveErrors + 1))
return max(errorHeight - offset, walletBirthday - ZcashSDK.maxReorgSize) return max(errorHeight - offset, walletBirthday - ZcashSDK.maxReorgSize)
} }
private func rawStop() async {
syncTask?.cancel()
self.backoffTimer?.invalidate()
self.backoffTimer = nil
await stopAllActions()
}
private func stopAllActions() async {
for action in actions.values {
await action.stop()
}
}
} }