[#1061] Add failure methods for state machine CBP

Closes #1061
This commit is contained in:
Michal Fousek 2023-05-10 12:31:49 +02:00
parent ea54da2c56
commit 4d9d4a48ef
1 changed files with 69 additions and 17 deletions

View File

@ -18,18 +18,22 @@ class CompactBlockProcessorNG {
private let actions: [CBPState: Action]
private var context: ActionContext
private let metrics: SDKMetrics
private(set) var config: Configuration
private let accountRepository: AccountRepository
private let blockDownloaderService: BlockDownloaderService
private let internalSyncProgress: InternalSyncProgress
private let latestBlocksDataProvider: LatestBlocksDataProvider
private let logger: Logger
private let metrics: SDKMetrics
private let rustBackend: ZcashRustBackendWelding
private let service: LightWalletService
private let storage: CompactBlockRepository
private let transactionRepository: TransactionRepository
private let accountRepository: AccountRepository
private let rustBackend: ZcashRustBackendWelding
private let logger: Logger
private let internalSyncProgress: InternalSyncProgress
private(set) var config: Configuration
private var retryAttempts: Int = 0
private var backoffTimer: Timer?
private var consecutiveChainValidationErrors: Int = 0
/// Compact Block Processor configuration
///
@ -156,7 +160,7 @@ class CompactBlockProcessorNG {
)
}
internal init(container: DIContainer, config: Configuration, accountRepository: AccountRepository) {
init(container: DIContainer, config: Configuration, accountRepository: AccountRepository) {
// Dependencies.setupCompactBlockProcessor(
// in: container,
// config: config,
@ -355,17 +359,25 @@ extension CompactBlockProcessorNG {
await self?.update(progress: progress)
}
} catch {
logger.error("Sync failed with error: \(error)")
if Task.isCancelled {
logger.info("Processing cancelled.")
logger.info("Sync cancelled.")
await syncStopped()
break
} else {
if case let ZcashError.rustValidateCombinedChainInvalidChain(height) = error {
await validationFailed(at: BlockHeight(height))
logger.error("Sync failed because of validation error: \(error)")
do {
try await validationFailed(at: BlockHeight(height))
// Start sync all over again
await context.update(state: .validateServer)
} catch {
await failure(error)
break
}
} else {
logger.error("processing failed with error: \(error)")
await fail(error)
logger.error("Sync failed with error: \(error)")
await failure(error)
break
}
}
}
@ -392,12 +404,43 @@ extension CompactBlockProcessorNG {
// await handleAfterSyncHooks()
}
func validationFailed(at height: BlockHeight) async {
// handle validation failure
func validationFailed(at height: BlockHeight) async throws {
// cancel all Tasks
syncTask?.cancel()
// await blockDownloader.stopDownload()
// rewind
let rewindHeight = determineLowerBound(
errorHeight: height,
consecutiveErrors: consecutiveChainValidationErrors,
walletBirthday: config.walletBirthday
)
consecutiveChainValidationErrors += 1
try await rustBackend.rewindToHeight(height: Int32(rewindHeight))
try await blockDownloaderService.rewind(to: rewindHeight)
await internalSyncProgress.rewind(to: rewindHeight)
await send(event: .handledReorg(height, rewindHeight))
}
func fail(_ error: Error) async {
// handle failure
func failure(_ error: Error) async {
await context.update(state: .failed)
logger.error("Fail with error: \(error)")
syncTask?.cancel()
backoffTimer?.invalidate()
backoffTimer = nil
// await blockDownloader.stopDownload()
self.retryAttempts += 1
await send(event: .failed(error))
// don't set a new timer if there are no more attempts.
if hasRetryAttempt() {
// await self.setTimer()
}
}
}
@ -413,4 +456,13 @@ extension CompactBlockProcessorNG {
return false
}
}
func hasRetryAttempt() -> Bool {
retryAttempts < config.retries
}
func determineLowerBound(errorHeight: Int, consecutiveErrors: Int, walletBirthday: BlockHeight) -> BlockHeight {
let offset = min(ZcashSDK.maxReorgSize, ZcashSDK.defaultRewindDistance * (consecutiveErrors + 1))
return max(errorHeight - offset, walletBirthday - ZcashSDK.maxReorgSize)
}
}