[#1045] Implement FetchUTXOsAction

- draft of the fetching

[#1045] Implement FetchUTXOsAction

- updated the way Actions communicate data back to the CBP
- used this mechanism to pass result of utxos fetch so it's passed to the SDKSynchronizer as an Event
This commit is contained in:
Lukas Korba 2023-05-11 12:48:45 +02:00 committed by Michal Fousek
parent a9dec21f39
commit c3db5a1c2d
15 changed files with 24 additions and 19 deletions

View File

@ -53,7 +53,7 @@ protocol Action {
// Each action updates context accordingly. It should at least set new state. Reason for this is that action can return different states for
// different conditions. And action is the thing that knows these conditions.
func run(with context: ActionContext, didUpdate: @escaping (CompactBlockProgress) async -> Void) async throws -> ActionContext
func run(with context: ActionContext, didUpdate: @escaping (CompactBlockProcessorNG.Event) async -> Void) async throws -> ActionContext
// Should be called on each existing action when processor wants to stop. Some actions may do it's own background work.
func stop() async

View File

@ -12,7 +12,7 @@ class ChecksBeforeSyncAction {
}
extension ChecksBeforeSyncAction: Action {
func run(with context: ActionContext, didUpdate: @escaping (CompactBlockProgress) async -> Void) async throws -> ActionContext {
func run(with context: ActionContext, didUpdate: @escaping (CompactBlockProcessorNG.Event) async -> Void) async throws -> ActionContext {
// clear any present cached state if needed.
// this checks if there was a sync in progress that was
// interrupted abruptly and cache was not able to be cleared

View File

@ -12,7 +12,7 @@ class ClearAlreadyScannedBlocksAction {
}
extension ClearAlreadyScannedBlocksAction: Action {
func run(with context: ActionContext, didUpdate: @escaping (CompactBlockProgress) async -> Void) async throws -> ActionContext {
func run(with context: ActionContext, didUpdate: @escaping (CompactBlockProcessorNG.Event) async -> Void) async throws -> ActionContext {
// clear storage but delete only blocks that were already scanned, when doing parallel download all blocks can't be deleted
await context.update(state: .enhance)

View File

@ -12,7 +12,7 @@ class ClearCacheAction {
}
extension ClearCacheAction: Action {
func run(with context: ActionContext, didUpdate: @escaping (CompactBlockProgress) async -> Void) async throws -> ActionContext {
func run(with context: ActionContext, didUpdate: @escaping (CompactBlockProcessorNG.Event) async -> Void) async throws -> ActionContext {
// clear storage
await context.update(state: .finished)
return context

View File

@ -12,7 +12,7 @@ class ComputeSyncRangesAction {
}
extension ComputeSyncRangesAction: Action {
func run(with context: ActionContext, didUpdate: @escaping (CompactBlockProgress) async -> Void) async throws -> ActionContext {
func run(with context: ActionContext, didUpdate: @escaping (CompactBlockProcessorNG.Event) async -> Void) async throws -> ActionContext {
// call internalSyncProgress and compute sync ranges and store them in context
// if there is nothing sync just switch to finished state

View File

@ -24,7 +24,7 @@ class DownloadAction {
}
extension DownloadAction: Action {
func run(with context: ActionContext, didUpdate: @escaping (CompactBlockProgress) async -> Void) async throws -> ActionContext {
func run(with context: ActionContext, didUpdate: @escaping (CompactBlockProcessorNG.Event) async -> Void) async throws -> ActionContext {
guard let downloadRange = await context.syncRanges.downloadAndScanRange else {
return await update(context: context)
}

View File

@ -12,7 +12,7 @@ class EnhanceAction {
}
extension EnhanceAction: Action {
func run(with context: ActionContext, didUpdate: @escaping (CompactBlockProgress) async -> Void) async throws -> ActionContext {
func run(with context: ActionContext, didUpdate: @escaping (CompactBlockProcessorNG.Event) async -> Void) async throws -> ActionContext {
// Use `BlockEnhancer` to enhance blocks.
// This action is executed on each downloaded and scanned batch (typically each 100 blocks). But we want to run enhancement each 1000 blocks.
// This action can use `InternalSyncProgress` and last scanned height to compute when it should do work.

View File

@ -8,13 +8,18 @@
import Foundation
class FetchUTXOsAction {
init(container: DIContainer) { }
let utxoFetcher: UTXOFetcher
init(container: DIContainer) {
utxoFetcher = container.resolve(UTXOFetcher.self)
}
}
extension FetchUTXOsAction: Action {
func run(with context: ActionContext, didUpdate: @escaping (CompactBlockProgress) async -> Void) async throws -> ActionContext {
// Use `UTXOFetcher` to fetch UTXOs.
func run(with context: ActionContext, didUpdate: @escaping (CompactBlockProcessorNG.Event) async -> Void) async throws -> ActionContext {
if let range = await context.syncRanges.fetchUTXORange {
let result = try await utxoFetcher.fetch(at: range)
await didUpdate(.storedUTXOs(result))
}
await context.update(state: .handleSaplingParams)
return context
}

View File

@ -27,7 +27,7 @@ class MigrateLegacyCacheDBAction {
}
extension MigrateLegacyCacheDBAction: Action {
func run(with context: ActionContext, didUpdate: @escaping (CompactBlockProgress) async -> Void) async throws -> ActionContext {
func run(with context: ActionContext, didUpdate: @escaping (CompactBlockProcessorNG.Event) async -> Void) async throws -> ActionContext {
guard let legacyCacheDbURL = config.cacheDbURL else {
return await updateState(context)
}

View File

@ -12,7 +12,7 @@ class SaplingParamsAction {
}
extension SaplingParamsAction: Action {
func run(with context: ActionContext, didUpdate: @escaping (CompactBlockProgress) async -> Void) async throws -> ActionContext {
func run(with context: ActionContext, didUpdate: @escaping (CompactBlockProcessorNG.Event) async -> Void) async throws -> ActionContext {
// Download files with sapling params.
await context.update(state: .scanDownloaded)

View File

@ -12,7 +12,7 @@ class ScanAction {
}
extension ScanAction: Action {
func run(with context: ActionContext, didUpdate: @escaping (CompactBlockProgress) async -> Void) async throws -> ActionContext {
func run(with context: ActionContext, didUpdate: @escaping (CompactBlockProcessorNG.Event) async -> Void) async throws -> ActionContext {
// Scan in range latestScannedHeight...latestScannedHeight+batchSize.
await context.update(state: .clearAlreadyScannedBlocks)

View File

@ -12,7 +12,7 @@ class ScanDownloadedButUnscannedAction {
}
extension ScanDownloadedButUnscannedAction: Action {
func run(with context: ActionContext, didUpdate: @escaping (CompactBlockProgress) async -> Void) async throws -> ActionContext {
func run(with context: ActionContext, didUpdate: @escaping (CompactBlockProcessorNG.Event) async -> Void) async throws -> ActionContext {
// if let range = ranges.downloadedButUnscannedRange {
// logger.debug("Starting scan with downloaded but not scanned blocks with range: \(range.lowerBound)...\(range.upperBound)")
// try await blockScanner.scanBlocks(at: range, totalProgressRange: totalProgressRange) { [weak self] lastScannedHeight in

View File

@ -15,7 +15,7 @@ class ValidateAction {
}
extension ValidateAction: Action {
func run(with context: ActionContext, didUpdate: @escaping (CompactBlockProgress) async -> Void) async throws -> ActionContext {
func run(with context: ActionContext, didUpdate: @escaping (CompactBlockProcessorNG.Event) async -> Void) async throws -> ActionContext {
try await validator.validate()
await context.update(state: .scan)
return context

View File

@ -20,7 +20,7 @@ class ValidateServerAction {
}
extension ValidateServerAction: Action {
func run(with context: ActionContext, didUpdate: @escaping (CompactBlockProgress) async -> Void) async throws -> ActionContext {
func run(with context: ActionContext, didUpdate: @escaping (CompactBlockProcessorNG.Event) async -> Void) async throws -> ActionContext {
let info = try await service.getInfo()
let localNetwork = config.network
let saplingActivation = config.saplingActivation

View File

@ -341,8 +341,8 @@ extension CompactBlockProcessorNG {
}
// Execute action.
context = try await action.run(with: context) { [weak self] progress in
await self?.update(progress: progress)
context = try await action.run(with: context) { [weak self] event in
await self?.send(event: event)
}
await didFinishAction()