[#700] Implement cache clearing when some actions fail
This commit is contained in:
parent
75a9e3622e
commit
d699d935b4
|
@ -43,6 +43,9 @@ enum CBPState: CaseIterable {
|
|||
}
|
||||
|
||||
protocol Action {
|
||||
/// If this is true and action fails with error then blocks cache is cleared.
|
||||
var removeBlocksCacheWhenFailed: Bool { get }
|
||||
|
||||
// When any action is created it can get `DIContainer` and resolve any depedencies it requires.
|
||||
|
||||
// Every action uses `context` to get some informartion like download range.
|
||||
|
|
|
@ -12,6 +12,8 @@ class ChecksBeforeSyncAction {
|
|||
}
|
||||
|
||||
extension ChecksBeforeSyncAction: Action {
|
||||
var removeBlocksCacheWhenFailed: Bool { false }
|
||||
|
||||
func run(with context: ActionContext, didUpdate: @escaping (CompactBlockProcessorNG.Event) async -> Void) async throws -> ActionContext {
|
||||
// clear any present cached state if needed.
|
||||
// this checks if there was a sync in progress that was
|
||||
|
|
|
@ -12,6 +12,8 @@ class ClearAlreadyScannedBlocksAction {
|
|||
}
|
||||
|
||||
extension ClearAlreadyScannedBlocksAction: Action {
|
||||
var removeBlocksCacheWhenFailed: Bool { false }
|
||||
|
||||
func run(with context: ActionContext, didUpdate: @escaping (CompactBlockProcessorNG.Event) async -> Void) async throws -> ActionContext {
|
||||
// clear storage but delete only blocks that were already scanned, when doing parallel download all blocks can't be deleted
|
||||
|
||||
|
|
|
@ -12,6 +12,8 @@ class ClearCacheAction {
|
|||
}
|
||||
|
||||
extension ClearCacheAction: Action {
|
||||
var removeBlocksCacheWhenFailed: Bool { false }
|
||||
|
||||
func run(with context: ActionContext, didUpdate: @escaping (CompactBlockProcessorNG.Event) async -> Void) async throws -> ActionContext {
|
||||
// clear storage
|
||||
await context.update(state: .finished)
|
||||
|
|
|
@ -39,6 +39,8 @@ class ComputeSyncRangesAction {
|
|||
}
|
||||
|
||||
extension ComputeSyncRangesAction: Action {
|
||||
var removeBlocksCacheWhenFailed: Bool { false }
|
||||
|
||||
func run(with context: ActionContext, didUpdate: @escaping (CompactBlockProcessorNG.Event) async -> Void) async throws -> ActionContext {
|
||||
// call internalSyncProgress and compute sync ranges and store them in context
|
||||
// if there is nothing sync just switch to finished state
|
||||
|
|
|
@ -24,6 +24,8 @@ class DownloadAction {
|
|||
}
|
||||
|
||||
extension DownloadAction: Action {
|
||||
var removeBlocksCacheWhenFailed: Bool { true }
|
||||
|
||||
func run(with context: ActionContext, didUpdate: @escaping (CompactBlockProcessorNG.Event) async -> Void) async throws -> ActionContext {
|
||||
guard let downloadRange = await context.syncRanges.downloadAndScanRange else {
|
||||
return await update(context: context)
|
||||
|
|
|
@ -12,6 +12,8 @@ class EnhanceAction {
|
|||
}
|
||||
|
||||
extension EnhanceAction: Action {
|
||||
var removeBlocksCacheWhenFailed: Bool { false }
|
||||
|
||||
func run(with context: ActionContext, didUpdate: @escaping (CompactBlockProcessorNG.Event) async -> Void) async throws -> ActionContext {
|
||||
// Use `BlockEnhancer` to enhance blocks.
|
||||
// This action is executed on each downloaded and scanned batch (typically each 100 blocks). But we want to run enhancement each 1000 blocks.
|
||||
|
|
|
@ -15,11 +15,14 @@ class FetchUTXOsAction {
|
|||
}
|
||||
|
||||
extension FetchUTXOsAction: Action {
|
||||
var removeBlocksCacheWhenFailed: Bool { false }
|
||||
|
||||
func run(with context: ActionContext, didUpdate: @escaping (CompactBlockProcessorNG.Event) async -> Void) async throws -> ActionContext {
|
||||
if let range = await context.syncRanges.fetchUTXORange {
|
||||
let result = try await utxoFetcher.fetch(at: range)
|
||||
await didUpdate(.storedUTXOs(result))
|
||||
}
|
||||
|
||||
await context.update(state: .handleSaplingParams)
|
||||
return context
|
||||
}
|
||||
|
|
|
@ -27,6 +27,8 @@ class MigrateLegacyCacheDBAction {
|
|||
}
|
||||
|
||||
extension MigrateLegacyCacheDBAction: Action {
|
||||
var removeBlocksCacheWhenFailed: Bool { false }
|
||||
|
||||
func run(with context: ActionContext, didUpdate: @escaping (CompactBlockProcessorNG.Event) async -> Void) async throws -> ActionContext {
|
||||
guard let legacyCacheDbURL = config.cacheDbURL else {
|
||||
return await updateState(context)
|
||||
|
|
|
@ -12,6 +12,8 @@ class SaplingParamsAction {
|
|||
}
|
||||
|
||||
extension SaplingParamsAction: Action {
|
||||
var removeBlocksCacheWhenFailed: Bool { false }
|
||||
|
||||
func run(with context: ActionContext, didUpdate: @escaping (CompactBlockProcessorNG.Event) async -> Void) async throws -> ActionContext {
|
||||
// Download files with sapling params.
|
||||
|
||||
|
|
|
@ -12,6 +12,8 @@ class ScanAction {
|
|||
}
|
||||
|
||||
extension ScanAction: Action {
|
||||
var removeBlocksCacheWhenFailed: Bool { true }
|
||||
|
||||
func run(with context: ActionContext, didUpdate: @escaping (CompactBlockProcessorNG.Event) async -> Void) async throws -> ActionContext {
|
||||
// Scan in range latestScannedHeight...latestScannedHeight+batchSize.
|
||||
|
||||
|
|
|
@ -12,6 +12,8 @@ class ScanDownloadedButUnscannedAction {
|
|||
}
|
||||
|
||||
extension ScanDownloadedButUnscannedAction: Action {
|
||||
var removeBlocksCacheWhenFailed: Bool { false }
|
||||
|
||||
func run(with context: ActionContext, didUpdate: @escaping (CompactBlockProcessorNG.Event) async -> Void) async throws -> ActionContext {
|
||||
// if let range = ranges.downloadedButUnscannedRange {
|
||||
// logger.debug("Starting scan with downloaded but not scanned blocks with range: \(range.lowerBound)...\(range.upperBound)")
|
||||
|
|
|
@ -15,6 +15,8 @@ class ValidateAction {
|
|||
}
|
||||
|
||||
extension ValidateAction: Action {
|
||||
var removeBlocksCacheWhenFailed: Bool { true }
|
||||
|
||||
func run(with context: ActionContext, didUpdate: @escaping (CompactBlockProcessorNG.Event) async -> Void) async throws -> ActionContext {
|
||||
try await validator.validate()
|
||||
await context.update(state: .scan)
|
||||
|
|
|
@ -20,6 +20,8 @@ class ValidateServerAction {
|
|||
}
|
||||
|
||||
extension ValidateServerAction: Action {
|
||||
var removeBlocksCacheWhenFailed: Bool { false }
|
||||
|
||||
func run(with context: ActionContext, didUpdate: @escaping (CompactBlockProcessorNG.Event) async -> Void) async throws -> ActionContext {
|
||||
let info = try await service.getInfo()
|
||||
let localNetwork = config.network
|
||||
|
|
|
@ -327,19 +327,19 @@ extension CompactBlockProcessorNG {
|
|||
}
|
||||
}
|
||||
|
||||
// Try to find action for state.
|
||||
guard let action = actions[await context.state] else {
|
||||
if await syncFinished() {
|
||||
resetContext()
|
||||
continue
|
||||
} else {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
do {
|
||||
try Task.checkCancellation()
|
||||
|
||||
// Try to find action for state.
|
||||
guard let action = actions[await context.state] else {
|
||||
if try await syncFinished() {
|
||||
resetContext()
|
||||
continue
|
||||
} else {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// Execute action.
|
||||
context = try await action.run(with: context) { [weak self] event in
|
||||
await self?.send(event: event)
|
||||
|
@ -348,28 +348,19 @@ extension CompactBlockProcessorNG {
|
|||
await didFinishAction()
|
||||
} catch {
|
||||
if Task.isCancelled {
|
||||
logger.info("Sync cancelled.")
|
||||
await syncStopped()
|
||||
if await handleAfterSyncHooks() {
|
||||
if await syncTaskWasCancelled() {
|
||||
// Start sync all over again
|
||||
resetContext()
|
||||
} else {
|
||||
// end the sync loop
|
||||
break
|
||||
}
|
||||
} else {
|
||||
if case let ZcashError.rustValidateCombinedChainInvalidChain(height) = error {
|
||||
logger.error("Sync failed because of validation error: \(error)")
|
||||
do {
|
||||
try await validationFailed(at: BlockHeight(height))
|
||||
// Start sync all over again
|
||||
resetContext()
|
||||
} catch {
|
||||
await failure(error)
|
||||
break
|
||||
}
|
||||
if await handleSyncFailure(action: action, error: error) {
|
||||
// Start sync all over again
|
||||
resetContext()
|
||||
} else {
|
||||
logger.error("Sync failed with error: \(error)")
|
||||
await failure(error)
|
||||
// end the sync loop
|
||||
break
|
||||
}
|
||||
}
|
||||
|
@ -377,6 +368,36 @@ extension CompactBlockProcessorNG {
|
|||
}
|
||||
}
|
||||
|
||||
private func syncTaskWasCancelled() async -> Bool {
|
||||
logger.info("Sync cancelled.")
|
||||
syncTask = nil
|
||||
await context.update(state: .stopped)
|
||||
await send(event: .stopped)
|
||||
return await handleAfterSyncHooks()
|
||||
}
|
||||
|
||||
private func handleSyncFailure(action: Action, error: Error) async -> Bool {
|
||||
if action.removeBlocksCacheWhenFailed {
|
||||
await ifTaskIsNotCanceledClearCompactBlockCache()
|
||||
}
|
||||
|
||||
if case let ZcashError.rustValidateCombinedChainInvalidChain(height) = error {
|
||||
logger.error("Sync failed because of validation error: \(error)")
|
||||
do {
|
||||
try await validationFailed(at: BlockHeight(height))
|
||||
// Start sync all over again
|
||||
return true
|
||||
} catch {
|
||||
await failure(error)
|
||||
return false
|
||||
}
|
||||
} else {
|
||||
logger.error("Sync failed with error: \(error)")
|
||||
await failure(error)
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
// swiftlint:disable:next cyclomatic_complexity
|
||||
private func didFinishAction() async {
|
||||
// This is evalution of the state setup by previous action.
|
||||
|
@ -427,13 +448,13 @@ extension CompactBlockProcessorNG {
|
|||
await send(event: .startedSyncing)
|
||||
}
|
||||
|
||||
private func syncFinished() async throws -> Bool {
|
||||
private func syncFinished() async -> Bool {
|
||||
let newerBlocksWereMinedDuringSync = await context.syncRanges.latestBlockHeight < latestBlocksDataProvider.latestBlockHeight
|
||||
|
||||
retryAttempts = 0
|
||||
consecutiveChainValidationErrors = 0
|
||||
|
||||
let lastScannedHeight = try await transactionRepository.lastScannedHeight()
|
||||
let lastScannedHeight = await latestBlocksDataProvider.latestScannedHeight
|
||||
await send(event: .finished(lastScannedHeight))
|
||||
await context.update(state: .finished)
|
||||
|
||||
|
@ -450,12 +471,6 @@ extension CompactBlockProcessorNG {
|
|||
await send(event: .progressUpdated(progress))
|
||||
}
|
||||
|
||||
private func syncStopped() async {
|
||||
syncTask = nil
|
||||
await context.update(state: .stopped)
|
||||
await send(event: .stopped)
|
||||
}
|
||||
|
||||
private func validationFailed(at height: BlockHeight) async throws {
|
||||
// cancel all Tasks
|
||||
await rawStop()
|
||||
|
@ -692,4 +707,32 @@ extension CompactBlockProcessorNG {
|
|||
await action.stop()
|
||||
}
|
||||
}
|
||||
|
||||
private func ifTaskIsNotCanceledClearCompactBlockCache() async {
|
||||
guard !Task.isCancelled else { return }
|
||||
let lastScannedHeight = await latestBlocksDataProvider.latestScannedHeight
|
||||
do {
|
||||
// Blocks download work in parallel with scanning. So imagine this scenario:
|
||||
//
|
||||
// Scanning is done until height 10300. Blocks are downloaded until height 10400.
|
||||
// And now validation fails and this method is called. And `.latestDownloadedBlockHeight` in `internalSyncProgress` is set to 10400. And
|
||||
// all the downloaded blocks are removed here.
|
||||
//
|
||||
// If this line doesn't happen then when sync starts next time it thinks that all the blocks are downloaded until 10400. But all were
|
||||
// removed. So blocks between 10300 and 10400 wouldn't ever be scanned.
|
||||
//
|
||||
// Scanning is done until 10300 so the SDK can be sure that blocks with height below 10300 are not required. So it makes sense to set
|
||||
// `.latestDownloadedBlockHeight` to `lastScannedHeight`. And sync will work fine in next run.
|
||||
await internalSyncProgress.set(lastScannedHeight, .latestDownloadedBlockHeight)
|
||||
try await clearCompactBlockCache()
|
||||
} catch {
|
||||
logger.error("`clearCompactBlockCache` failed after error: \(error)")
|
||||
}
|
||||
}
|
||||
|
||||
private func clearCompactBlockCache() async throws {
|
||||
await stopAllActions()
|
||||
try await storage.clear()
|
||||
logger.info("Cache removed")
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue