[#1043] Implement DownloadAction

Closes #1043
This commit is contained in:
Michal Fousek 2023-05-10 14:42:54 +02:00
parent 776deeb002
commit d8f189a799
4 changed files with 40 additions and 30 deletions

View File

@ -8,28 +8,41 @@
import Foundation import Foundation
class DownloadAction { class DownloadAction {
init(container: DIContainer) { } let config: CompactBlockProcessorNG.Configuration
let downloader: BlockDownloader
let transactionRepository: TransactionRepository
init(container: DIContainer, config: CompactBlockProcessorNG.Configuration) {
self.config = config
downloader = container.resolve(BlockDownloader.self)
transactionRepository = container.resolve(TransactionRepository.self)
}
private func update(context: ActionContext) async -> ActionContext {
await context.update(state: .validate)
return context
}
} }
extension DownloadAction: Action { extension DownloadAction: Action {
func run(with context: ActionContext, didUpdate: @escaping (ActionProgress) async -> Void) async throws -> ActionContext { func run(with context: ActionContext, didUpdate: @escaping (ActionProgress) async -> Void) async throws -> ActionContext {
// Use `BlockDownloader` to set download limit to latestScannedHeight + (2*batchSize) (after parallel is merged). guard let downloadRange = await context.syncRanges.downloadAndScanRange else {
// And start download. return await update(context: context)
// Compute batch sync range (range used by one loop in `downloadAndScanBlocks` method) and wait until blocks in this range are downloaded. }
// do { let lastScannedHeight = try await transactionRepository.lastScannedHeight()
// await blockDownloader.setDownloadLimit(processingRange.upperBound + (2 * batchSize)) let downloadLimit = lastScannedHeight + (2 * config.batchSize)
// await blockDownloader.startDownload(maxBlockBufferSize: config.downloadBufferSize) let batchRange = lastScannedHeight...lastScannedHeight + config.batchSize
//
// try await blockDownloader.waitUntilRequestedBlocksAreDownloaded(in: processingRange) try await downloader.setSyncRange(downloadRange)
// } catch { await downloader.setDownloadLimit(downloadLimit)
// await ifTaskIsNotCanceledClearCompactBlockCache(lastScannedHeight: lastScannedHeight)
// throw error try await downloader.waitUntilRequestedBlocksAreDownloaded(in: batchRange)
// }
await context.update(state: .validate) await context.update(state: .validate)
return context return context
} }
func stop() async { } func stop() async {
await downloader.stopDownload()
}
} }

View File

@ -35,8 +35,7 @@ class CompactBlockProcessorNG {
let dataDb: URL let dataDb: URL
let spendParamsURL: URL let spendParamsURL: URL
let outputParamsURL: URL let outputParamsURL: URL
let downloadBatchSize: Int let batchSize: Int
let scanningBatchSize: Int
let retries: Int let retries: Int
let maxBackoffInterval: TimeInterval let maxBackoffInterval: TimeInterval
let maxReorgSize = ZcashSDK.maxReorgSize let maxReorgSize = ZcashSDK.maxReorgSize
@ -59,11 +58,10 @@ class CompactBlockProcessorNG {
spendParamsURL: URL, spendParamsURL: URL,
outputParamsURL: URL, outputParamsURL: URL,
saplingParamsSourceURL: SaplingParamsSourceURL, saplingParamsSourceURL: SaplingParamsSourceURL,
downloadBatchSize: Int = ZcashSDK.DefaultDownloadBatch, batchSize: Int = ZcashSDK.DefaultSyncBatch,
retries: Int = ZcashSDK.defaultRetries, retries: Int = ZcashSDK.defaultRetries,
maxBackoffInterval: TimeInterval = ZcashSDK.defaultMaxBackOffInterval, maxBackoffInterval: TimeInterval = ZcashSDK.defaultMaxBackOffInterval,
rewindDistance: Int = ZcashSDK.defaultRewindDistance, rewindDistance: Int = ZcashSDK.defaultRewindDistance,
scanningBatchSize: Int = ZcashSDK.DefaultScanningBatch,
walletBirthdayProvider: @escaping () -> BlockHeight, walletBirthdayProvider: @escaping () -> BlockHeight,
saplingActivation: BlockHeight, saplingActivation: BlockHeight,
network: ZcashNetwork network: ZcashNetwork
@ -75,15 +73,13 @@ class CompactBlockProcessorNG {
self.outputParamsURL = outputParamsURL self.outputParamsURL = outputParamsURL
self.saplingParamsSourceURL = saplingParamsSourceURL self.saplingParamsSourceURL = saplingParamsSourceURL
self.network = network self.network = network
self.downloadBatchSize = downloadBatchSize self.batchSize = batchSize
self.retries = retries self.retries = retries
self.maxBackoffInterval = maxBackoffInterval self.maxBackoffInterval = maxBackoffInterval
self.rewindDistance = rewindDistance self.rewindDistance = rewindDistance
self.scanningBatchSize = scanningBatchSize
self.walletBirthdayProvider = walletBirthdayProvider self.walletBirthdayProvider = walletBirthdayProvider
self.saplingActivation = saplingActivation self.saplingActivation = saplingActivation
self.cacheDbURL = cacheDbURL self.cacheDbURL = cacheDbURL
assert(downloadBatchSize >= scanningBatchSize)
} }
init( init(
@ -93,11 +89,10 @@ class CompactBlockProcessorNG {
spendParamsURL: URL, spendParamsURL: URL,
outputParamsURL: URL, outputParamsURL: URL,
saplingParamsSourceURL: SaplingParamsSourceURL, saplingParamsSourceURL: SaplingParamsSourceURL,
downloadBatchSize: Int = ZcashSDK.DefaultDownloadBatch, batchSize: Int = ZcashSDK.DefaultSyncBatch,
retries: Int = ZcashSDK.defaultRetries, retries: Int = ZcashSDK.defaultRetries,
maxBackoffInterval: TimeInterval = ZcashSDK.defaultMaxBackOffInterval, maxBackoffInterval: TimeInterval = ZcashSDK.defaultMaxBackOffInterval,
rewindDistance: Int = ZcashSDK.defaultRewindDistance, rewindDistance: Int = ZcashSDK.defaultRewindDistance,
scanningBatchSize: Int = ZcashSDK.DefaultScanningBatch,
walletBirthdayProvider: @escaping () -> BlockHeight, walletBirthdayProvider: @escaping () -> BlockHeight,
network: ZcashNetwork network: ZcashNetwork
) { ) {
@ -111,25 +106,22 @@ class CompactBlockProcessorNG {
self.saplingActivation = network.constants.saplingActivationHeight self.saplingActivation = network.constants.saplingActivationHeight
self.network = network self.network = network
self.cacheDbURL = nil self.cacheDbURL = nil
self.downloadBatchSize = downloadBatchSize self.batchSize = batchSize
self.retries = retries self.retries = retries
self.maxBackoffInterval = maxBackoffInterval self.maxBackoffInterval = maxBackoffInterval
self.rewindDistance = rewindDistance self.rewindDistance = rewindDistance
self.scanningBatchSize = scanningBatchSize
assert(downloadBatchSize >= scanningBatchSize)
} }
} }
init(container: DIContainer, config: Configuration) { init(container: DIContainer, config: Configuration) {
context = ActionContext(state: .validateServer) context = ActionContext(state: .validateServer)
actions = Self.makeActions(container: container) actions = Self.makeActions(container: container, config: config)
self.logger = container.resolve(Logger.self) self.logger = container.resolve(Logger.self)
self.config = config self.config = config
} }
// swiftlint:disable:next cyclomatic_complexity // swiftlint:disable:next cyclomatic_complexity
static func makeActions(container: DIContainer) -> [CBPState: Action] { static func makeActions(container: DIContainer, config: Configuration) -> [CBPState: Action] {
let actionsDefinition = CBPState.allCases.compactMap { state -> (CBPState, Action)? in let actionsDefinition = CBPState.allCases.compactMap { state -> (CBPState, Action)? in
let action: Action let action: Action
switch state { switch state {
@ -142,7 +134,7 @@ class CompactBlockProcessorNG {
case .scanDownloaded: case .scanDownloaded:
action = ScanDownloadedButUnscannedAction(container: container) action = ScanDownloadedButUnscannedAction(container: container)
case .download: case .download:
action = DownloadAction(container: container) action = DownloadAction(container: container, config: config)
case .validate: case .validate:
action = ValidateAction(container: container) action = ValidateAction(container: container)
case .scan: case .scan:

View File

@ -224,6 +224,7 @@ extension BlockDownloaderImpl: BlockDownloader {
} }
func setSyncRange(_ range: CompactBlockRange) async throws { func setSyncRange(_ range: CompactBlockRange) async throws {
guard range != syncRange else { return }
downloadStream = try await compactBlocksDownloadStream(startHeight: range.lowerBound, targetHeight: range.upperBound) downloadStream = try await compactBlocksDownloadStream(startHeight: range.lowerBound, targetHeight: range.upperBound)
syncRange = range syncRange = range
} }

View File

@ -100,6 +100,10 @@ public enum ZcashSDK {
/// Default batch size for scanning blocks for the compact block processor /// Default batch size for scanning blocks for the compact block processor
public static let DefaultScanningBatch = 100 public static let DefaultScanningBatch = 100
/// Default batch size for downloading and scanning blocks for the compact block processor. Be careful with this number. This amount of blocks
/// times three is held in memory at some point of the sync process.
public static let DefaultSyncBatch = 100
/// Default amount of time, in in seconds, to poll for new blocks. Typically, this should be about half the average /// Default amount of time, in in seconds, to poll for new blocks. Typically, this should be about half the average
/// block time. /// block time.
public static let defaultPollInterval: TimeInterval = 20 public static let defaultPollInterval: TimeInterval = 20