[#1045] Implement FetchUTXOsAction
- draft of the fetching [#1045] Implement FetchUTXOsAction - updated the way Actions communicate data back to the CBP - used this mechanism to pass result of utxos fetch so it's passed to the SDKSynchronizer as an Event
This commit is contained in:
parent
98226d8a6d
commit
ce8fcdf3cc
|
@ -53,7 +53,7 @@ protocol Action {
|
|||
// Each action updates context accordingly. It should at least set new state. Reason for this is that action can return different states for
|
||||
// different conditions. And action is the thing that knows these conditions.
|
||||
|
||||
func run(with context: ActionContext, didUpdate: @escaping (CompactBlockProgress) async -> Void) async throws -> ActionContext
|
||||
func run(with context: ActionContext, didUpdate: @escaping (CompactBlockProcessorNG.Event) async -> Void) async throws -> ActionContext
|
||||
|
||||
// Should be called on each existing action when processor wants to stop. Some actions may do it's own background work.
|
||||
func stop() async
|
||||
|
|
|
@ -12,7 +12,7 @@ class ChecksBeforeSyncAction {
|
|||
}
|
||||
|
||||
extension ChecksBeforeSyncAction: Action {
|
||||
func run(with context: ActionContext, didUpdate: @escaping (CompactBlockProgress) async -> Void) async throws -> ActionContext {
|
||||
func run(with context: ActionContext, didUpdate: @escaping (CompactBlockProcessorNG.Event) async -> Void) async throws -> ActionContext {
|
||||
// clear any present cached state if needed.
|
||||
// this checks if there was a sync in progress that was
|
||||
// interrupted abruptly and cache was not able to be cleared
|
||||
|
|
|
@ -12,7 +12,7 @@ class ClearAlreadyScannedBlocksAction {
|
|||
}
|
||||
|
||||
extension ClearAlreadyScannedBlocksAction: Action {
|
||||
func run(with context: ActionContext, didUpdate: @escaping (CompactBlockProgress) async -> Void) async throws -> ActionContext {
|
||||
func run(with context: ActionContext, didUpdate: @escaping (CompactBlockProcessorNG.Event) async -> Void) async throws -> ActionContext {
|
||||
// clear storage but delete only blocks that were already scanned, when doing parallel download all blocks can't be deleted
|
||||
|
||||
await context.update(state: .enhance)
|
||||
|
|
|
@ -12,7 +12,7 @@ class ClearCacheAction {
|
|||
}
|
||||
|
||||
extension ClearCacheAction: Action {
|
||||
func run(with context: ActionContext, didUpdate: @escaping (CompactBlockProgress) async -> Void) async throws -> ActionContext {
|
||||
func run(with context: ActionContext, didUpdate: @escaping (CompactBlockProcessorNG.Event) async -> Void) async throws -> ActionContext {
|
||||
// clear storage
|
||||
await context.update(state: .finished)
|
||||
return context
|
||||
|
|
|
@ -12,7 +12,7 @@ class ComputeSyncRangesAction {
|
|||
}
|
||||
|
||||
extension ComputeSyncRangesAction: Action {
|
||||
func run(with context: ActionContext, didUpdate: @escaping (CompactBlockProgress) async -> Void) async throws -> ActionContext {
|
||||
func run(with context: ActionContext, didUpdate: @escaping (CompactBlockProcessorNG.Event) async -> Void) async throws -> ActionContext {
|
||||
// call internalSyncProgress and compute sync ranges and store them in context
|
||||
// if there is nothing sync just switch to finished state
|
||||
|
||||
|
|
|
@ -24,7 +24,7 @@ class DownloadAction {
|
|||
}
|
||||
|
||||
extension DownloadAction: Action {
|
||||
func run(with context: ActionContext, didUpdate: @escaping (CompactBlockProgress) async -> Void) async throws -> ActionContext {
|
||||
func run(with context: ActionContext, didUpdate: @escaping (CompactBlockProcessorNG.Event) async -> Void) async throws -> ActionContext {
|
||||
guard let downloadRange = await context.syncRanges.downloadAndScanRange else {
|
||||
return await update(context: context)
|
||||
}
|
||||
|
|
|
@ -12,7 +12,7 @@ class EnhanceAction {
|
|||
}
|
||||
|
||||
extension EnhanceAction: Action {
|
||||
func run(with context: ActionContext, didUpdate: @escaping (CompactBlockProgress) async -> Void) async throws -> ActionContext {
|
||||
func run(with context: ActionContext, didUpdate: @escaping (CompactBlockProcessorNG.Event) async -> Void) async throws -> ActionContext {
|
||||
// Use `BlockEnhancer` to enhance blocks.
|
||||
// This action is executed on each downloaded and scanned batch (typically each 100 blocks). But we want to run enhancement each 1000 blocks.
|
||||
// This action can use `InternalSyncProgress` and last scanned height to compute when it should do work.
|
||||
|
|
|
@ -8,13 +8,18 @@
|
|||
import Foundation
|
||||
|
||||
class FetchUTXOsAction {
|
||||
init(container: DIContainer) { }
|
||||
let utxoFetcher: UTXOFetcher
|
||||
init(container: DIContainer) {
|
||||
utxoFetcher = container.resolve(UTXOFetcher.self)
|
||||
}
|
||||
}
|
||||
|
||||
extension FetchUTXOsAction: Action {
|
||||
func run(with context: ActionContext, didUpdate: @escaping (CompactBlockProgress) async -> Void) async throws -> ActionContext {
|
||||
// Use `UTXOFetcher` to fetch UTXOs.
|
||||
|
||||
func run(with context: ActionContext, didUpdate: @escaping (CompactBlockProcessorNG.Event) async -> Void) async throws -> ActionContext {
|
||||
if let range = await context.syncRanges.fetchUTXORange {
|
||||
let result = try await utxoFetcher.fetch(at: range)
|
||||
await didUpdate(.storedUTXOs(result))
|
||||
}
|
||||
await context.update(state: .handleSaplingParams)
|
||||
return context
|
||||
}
|
||||
|
|
|
@ -27,7 +27,7 @@ class MigrateLegacyCacheDBAction {
|
|||
}
|
||||
|
||||
extension MigrateLegacyCacheDBAction: Action {
|
||||
func run(with context: ActionContext, didUpdate: @escaping (CompactBlockProgress) async -> Void) async throws -> ActionContext {
|
||||
func run(with context: ActionContext, didUpdate: @escaping (CompactBlockProcessorNG.Event) async -> Void) async throws -> ActionContext {
|
||||
guard let legacyCacheDbURL = config.cacheDbURL else {
|
||||
return await updateState(context)
|
||||
}
|
||||
|
|
|
@ -12,7 +12,7 @@ class SaplingParamsAction {
|
|||
}
|
||||
|
||||
extension SaplingParamsAction: Action {
|
||||
func run(with context: ActionContext, didUpdate: @escaping (CompactBlockProgress) async -> Void) async throws -> ActionContext {
|
||||
func run(with context: ActionContext, didUpdate: @escaping (CompactBlockProcessorNG.Event) async -> Void) async throws -> ActionContext {
|
||||
// Download files with sapling params.
|
||||
|
||||
await context.update(state: .scanDownloaded)
|
||||
|
|
|
@ -12,7 +12,7 @@ class ScanAction {
|
|||
}
|
||||
|
||||
extension ScanAction: Action {
|
||||
func run(with context: ActionContext, didUpdate: @escaping (CompactBlockProgress) async -> Void) async throws -> ActionContext {
|
||||
func run(with context: ActionContext, didUpdate: @escaping (CompactBlockProcessorNG.Event) async -> Void) async throws -> ActionContext {
|
||||
// Scan in range latestScannedHeight...latestScannedHeight+batchSize.
|
||||
|
||||
await context.update(state: .clearAlreadyScannedBlocks)
|
||||
|
|
|
@ -12,7 +12,7 @@ class ScanDownloadedButUnscannedAction {
|
|||
}
|
||||
|
||||
extension ScanDownloadedButUnscannedAction: Action {
|
||||
func run(with context: ActionContext, didUpdate: @escaping (CompactBlockProgress) async -> Void) async throws -> ActionContext {
|
||||
func run(with context: ActionContext, didUpdate: @escaping (CompactBlockProcessorNG.Event) async -> Void) async throws -> ActionContext {
|
||||
// if let range = ranges.downloadedButUnscannedRange {
|
||||
// logger.debug("Starting scan with downloaded but not scanned blocks with range: \(range.lowerBound)...\(range.upperBound)")
|
||||
// try await blockScanner.scanBlocks(at: range, totalProgressRange: totalProgressRange) { [weak self] lastScannedHeight in
|
||||
|
|
|
@ -15,7 +15,7 @@ class ValidateAction {
|
|||
}
|
||||
|
||||
extension ValidateAction: Action {
|
||||
func run(with context: ActionContext, didUpdate: @escaping (CompactBlockProgress) async -> Void) async throws -> ActionContext {
|
||||
func run(with context: ActionContext, didUpdate: @escaping (CompactBlockProcessorNG.Event) async -> Void) async throws -> ActionContext {
|
||||
try await validator.validate()
|
||||
await context.update(state: .scan)
|
||||
return context
|
||||
|
|
|
@ -20,7 +20,7 @@ class ValidateServerAction {
|
|||
}
|
||||
|
||||
extension ValidateServerAction: Action {
|
||||
func run(with context: ActionContext, didUpdate: @escaping (CompactBlockProgress) async -> Void) async throws -> ActionContext {
|
||||
func run(with context: ActionContext, didUpdate: @escaping (CompactBlockProcessorNG.Event) async -> Void) async throws -> ActionContext {
|
||||
let info = try await service.getInfo()
|
||||
let localNetwork = config.network
|
||||
let saplingActivation = config.saplingActivation
|
||||
|
|
|
@ -341,8 +341,8 @@ extension CompactBlockProcessorNG {
|
|||
}
|
||||
|
||||
// Execute action.
|
||||
context = try await action.run(with: context) { [weak self] progress in
|
||||
await self?.update(progress: progress)
|
||||
context = try await action.run(with: context) { [weak self] event in
|
||||
await self?.send(event: event)
|
||||
}
|
||||
|
||||
await didFinishAction()
|
||||
|
|
Loading…
Reference in New Issue