[#700] Make CBP state machine work

This commit is contained in:
Michal Fousek 2023-05-15 12:27:26 +02:00
parent 97f3699436
commit 56d70ee164
25 changed files with 505 additions and 1674 deletions

View File

@ -57,7 +57,7 @@ protocol Action {
// Each action updates context accordingly. It should at least set new state. Reason for this is that action can return different states for
// different conditions. And action is the thing that knows these conditions.
func run(with context: ActionContext, didUpdate: @escaping (CompactBlockProcessorNG.Event) async -> Void) async throws -> ActionContext
func run(with context: ActionContext, didUpdate: @escaping (CompactBlockProcessor.Event) async -> Void) async throws -> ActionContext
// Should be called on each existing action when processor wants to stop. Some actions may do it's own background work.
func stop() async

View File

@ -44,7 +44,7 @@ class ChecksBeforeSyncAction {
extension ChecksBeforeSyncAction: Action {
var removeBlocksCacheWhenFailed: Bool { false }
func run(with context: ActionContext, didUpdate: @escaping (CompactBlockProcessorNG.Event) async -> Void) async throws -> ActionContext {
func run(with context: ActionContext, didUpdate: @escaping (CompactBlockProcessor.Event) async -> Void) async throws -> ActionContext {
// clear any present cached state if needed.
// this checks if there was a sync in progress that was
// interrupted abruptly and cache was not able to be cleared

View File

@ -19,7 +19,7 @@ class ClearAlreadyScannedBlocksAction {
extension ClearAlreadyScannedBlocksAction: Action {
var removeBlocksCacheWhenFailed: Bool { false }
func run(with context: ActionContext, didUpdate: @escaping (CompactBlockProcessorNG.Event) async -> Void) async throws -> ActionContext {
func run(with context: ActionContext, didUpdate: @escaping (CompactBlockProcessor.Event) async -> Void) async throws -> ActionContext {
let lastScannedHeight = try await transactionRepository.lastScannedHeight()
try await storage.clear(upTo: lastScannedHeight)

View File

@ -17,7 +17,7 @@ class ClearCacheAction {
extension ClearCacheAction: Action {
var removeBlocksCacheWhenFailed: Bool { false }
func run(with context: ActionContext, didUpdate: @escaping (CompactBlockProcessorNG.Event) async -> Void) async throws -> ActionContext {
func run(with context: ActionContext, didUpdate: @escaping (CompactBlockProcessor.Event) async -> Void) async throws -> ActionContext {
try await storage.clear()
await context.update(state: .finished)
return context

View File

@ -8,13 +8,13 @@
import Foundation
class ComputeSyncRangesAction {
let config: CompactBlockProcessorNG.Configuration
let config: CompactBlockProcessor.Configuration
let downloaderService: BlockDownloaderService
let internalSyncProgress: InternalSyncProgress
let latestBlocksDataProvider: LatestBlocksDataProvider
let logger: Logger
init(container: DIContainer, config: CompactBlockProcessorNG.Configuration) {
init(container: DIContainer, config: CompactBlockProcessor.Configuration) {
self.config = config
downloaderService = container.resolve(BlockDownloaderService.self)
internalSyncProgress = container.resolve(InternalSyncProgress.self)
@ -42,7 +42,7 @@ class ComputeSyncRangesAction {
extension ComputeSyncRangesAction: Action {
var removeBlocksCacheWhenFailed: Bool { false }
func run(with context: ActionContext, didUpdate: @escaping (CompactBlockProcessorNG.Event) async -> Void) async throws -> ActionContext {
func run(with context: ActionContext, didUpdate: @escaping (CompactBlockProcessor.Event) async -> Void) async throws -> ActionContext {
// call internalSyncProgress and compute sync ranges and store them in context
// if there is nothing sync just switch to finished state

View File

@ -8,12 +8,12 @@
import Foundation
class DownloadAction {
let config: CompactBlockProcessorNG.Configuration
let config: CompactBlockProcessor.Configuration
let downloader: BlockDownloader
let transactionRepository: TransactionRepository
let logger: Logger
init(container: DIContainer, config: CompactBlockProcessorNG.Configuration) {
init(container: DIContainer, config: CompactBlockProcessor.Configuration) {
self.config = config
downloader = container.resolve(BlockDownloader.self)
transactionRepository = container.resolve(TransactionRepository.self)
@ -29,7 +29,7 @@ class DownloadAction {
extension DownloadAction: Action {
var removeBlocksCacheWhenFailed: Bool { true }
func run(with context: ActionContext, didUpdate: @escaping (CompactBlockProcessorNG.Event) async -> Void) async throws -> ActionContext {
func run(with context: ActionContext, didUpdate: @escaping (CompactBlockProcessor.Event) async -> Void) async throws -> ActionContext {
guard let downloadRange = await context.syncRanges.downloadAndScanRange else {
return await update(context: context)
}
@ -38,17 +38,23 @@ extension DownloadAction: Action {
// This action is executed for each batch (batch size is 100 blocks by default) until all the blocks in whole `downloadRange` are downloaded.
// So the right range for this batch must be computed.
let batchRangeStart = max(downloadRange.lowerBound, lastScannedHeight)
let batchRange = batchRangeStart...batchRangeStart + config.batchSize
let batchRangeEnd = min(downloadRange.upperBound, batchRangeStart + config.batchSize)
guard batchRangeStart <= batchRangeEnd else {
return await update(context: context)
}
let batchRange = batchRangeStart...batchRangeEnd
let downloadLimit = batchRange.upperBound + (2 * config.batchSize)
logger.debug("Starting download with range: \(batchRange.lowerBound)...\(batchRange.upperBound)")
try await downloader.setSyncRange(downloadRange)
await downloader.setDownloadLimit(downloadLimit)
await downloader.startDownload(maxBlockBufferSize: config.downloadBufferSize)
try await downloader.waitUntilRequestedBlocksAreDownloaded(in: batchRange)
await context.update(state: .validate)
return context
return await update(context: context)
}
func stop() async {

View File

@ -9,11 +9,11 @@ import Foundation
class EnhanceAction {
let blockEnhancer: BlockEnhancer
let config: CompactBlockProcessorNG.Configuration
let config: CompactBlockProcessor.Configuration
let internalSyncProgress: InternalSyncProgress
let logger: Logger
let transactionRepository: TransactionRepository
init(container: DIContainer, config: CompactBlockProcessorNG.Configuration) {
init(container: DIContainer, config: CompactBlockProcessor.Configuration) {
blockEnhancer = container.resolve(BlockEnhancer.self)
self.config = config
internalSyncProgress = container.resolve(InternalSyncProgress.self)
@ -40,7 +40,7 @@ class EnhanceAction {
extension EnhanceAction: Action {
var removeBlocksCacheWhenFailed: Bool { false }
func run(with context: ActionContext, didUpdate: @escaping (CompactBlockProcessorNG.Event) async -> Void) async throws -> ActionContext {
func run(with context: ActionContext, didUpdate: @escaping (CompactBlockProcessor.Event) async -> Void) async throws -> ActionContext {
// Use `BlockEnhancer` to enhance blocks.
// This action is executed on each downloaded and scanned batch (typically each 100 blocks). But we want to run enhancement each 1000 blocks.
// This action can use `InternalSyncProgress` and last scanned height to compute when it should do work.
@ -59,13 +59,22 @@ extension EnhanceAction: Action {
let enhanceRangeStart = max(range.lowerBound, lastEnhancedHeight)
let enhanceRangeEnd = min(range.upperBound, lastScannedHeight)
if enhanceRangeStart <= enhanceRangeEnd && lastEnhancedHeight - lastScannedHeight >= config.enhanceBatchSize {
if enhanceRangeStart <= enhanceRangeEnd && lastScannedHeight - lastEnhancedHeight >= config.enhanceBatchSize {
let enhanceRange = enhanceRangeStart...enhanceRangeEnd
let transactions = try await blockEnhancer.enhance(at: enhanceRange) { progress in
let transactions = try await blockEnhancer.enhance(
at: enhanceRange,
didEnhance: { progress in
await didUpdate(.progressUpdated(.enhance(progress)))
if let foundTx = progress.lastFoundTransaction, progress.newlyMined {
await didUpdate(.minedTransaction(foundTx))
}
}
)
if let transactions {
await didUpdate(.foundTransactions(transactions, enhanceRange))
}
}
return await decideWhatToDoNext(context: context, lastScannedHeight: lastScannedHeight)
}

View File

@ -20,10 +20,12 @@ class FetchUTXOsAction {
extension FetchUTXOsAction: Action {
var removeBlocksCacheWhenFailed: Bool { false }
func run(with context: ActionContext, didUpdate: @escaping (CompactBlockProcessorNG.Event) async -> Void) async throws -> ActionContext {
func run(with context: ActionContext, didUpdate: @escaping (CompactBlockProcessor.Event) async -> Void) async throws -> ActionContext {
if let range = await context.syncRanges.fetchUTXORange {
logger.debug("Fetching UTXO with range: \(range.lowerBound)...\(range.upperBound)")
let result = try await utxoFetcher.fetch(at: range)
let result = try await utxoFetcher.fetch(at: range) { fetchProgress in
await didUpdate(.progressUpdated(.fetch(fetchProgress)))
}
await didUpdate(.storedUTXOs(result))
}

View File

@ -8,12 +8,12 @@
import Foundation
class MigrateLegacyCacheDBAction {
private let config: CompactBlockProcessorNG.Configuration
private let config: CompactBlockProcessor.Configuration
private let internalSyncProgress: InternalSyncProgress
private let storage: CompactBlockRepository
private let transactionRepository: TransactionRepository
init(container: DIContainer, config: CompactBlockProcessorNG.Configuration) {
init(container: DIContainer, config: CompactBlockProcessor.Configuration) {
self.config = config
internalSyncProgress = container.resolve(InternalSyncProgress.self)
storage = container.resolve(CompactBlockRepository.self)
@ -29,7 +29,7 @@ class MigrateLegacyCacheDBAction {
extension MigrateLegacyCacheDBAction: Action {
var removeBlocksCacheWhenFailed: Bool { false }
func run(with context: ActionContext, didUpdate: @escaping (CompactBlockProcessorNG.Event) async -> Void) async throws -> ActionContext {
func run(with context: ActionContext, didUpdate: @escaping (CompactBlockProcessor.Event) async -> Void) async throws -> ActionContext {
guard let legacyCacheDbURL = config.cacheDbURL else {
return await updateState(context)
}

View File

@ -20,7 +20,7 @@ class SaplingParamsAction {
extension SaplingParamsAction: Action {
var removeBlocksCacheWhenFailed: Bool { false }
func run(with context: ActionContext, didUpdate: @escaping (CompactBlockProcessorNG.Event) async -> Void) async throws -> ActionContext {
func run(with context: ActionContext, didUpdate: @escaping (CompactBlockProcessor.Event) async -> Void) async throws -> ActionContext {
logger.debug("Fetching sapling parameters")
try await saplingParametersHandler.handleIfNeeded()
await context.update(state: .scanDownloaded)

View File

@ -8,12 +8,12 @@
import Foundation
class ScanAction {
let config: CompactBlockProcessorNG.Configuration
let config: CompactBlockProcessor.Configuration
let blockScanner: BlockScanner
let logger: Logger
let transactionRepository: TransactionRepository
init(container: DIContainer, config: CompactBlockProcessorNG.Configuration) {
init(container: DIContainer, config: CompactBlockProcessor.Configuration) {
self.config = config
blockScanner = container.resolve(BlockScanner.self)
transactionRepository = container.resolve(TransactionRepository.self)
@ -29,7 +29,7 @@ class ScanAction {
extension ScanAction: Action {
var removeBlocksCacheWhenFailed: Bool { true }
func run(with context: ActionContext, didUpdate: @escaping (CompactBlockProcessorNG.Event) async -> Void) async throws -> ActionContext {
func run(with context: ActionContext, didUpdate: @escaping (CompactBlockProcessor.Event) async -> Void) async throws -> ActionContext {
guard let scanRange = await context.syncRanges.downloadAndScanRange else {
return await update(context: context)
}
@ -38,6 +38,12 @@ extension ScanAction: Action {
// This action is executed for each batch (batch size is 100 blocks by default) until all the blocks in whole `scanRange` are scanned.
// So the right range for this batch must be computed.
let batchRangeStart = max(scanRange.lowerBound, lastScannedHeight)
let batchRangeEnd = min(scanRange.upperBound, batchRangeStart + config.batchSize)
guard batchRangeStart <= batchRangeEnd else {
return await update(context: context)
}
let batchRange = batchRangeStart...batchRangeStart + config.batchSize
logger.debug("Starting scan blocks with range: \(batchRange.lowerBound)...\(batchRange.upperBound)")
@ -51,8 +57,8 @@ extension ScanAction: Action {
self?.logger.debug("progress: \(progress)")
await didUpdate(.progressUpdated(.syncing(progress)))
}
await context.update(state: .clearAlreadyScannedBlocks)
return context
return await update(context: context)
}
func stop() async { }

View File

@ -20,7 +20,7 @@ class ScanDownloadedButUnscannedAction {
extension ScanDownloadedButUnscannedAction: Action {
var removeBlocksCacheWhenFailed: Bool { false }
func run(with context: ActionContext, didUpdate: @escaping (CompactBlockProcessorNG.Event) async -> Void) async throws -> ActionContext {
func run(with context: ActionContext, didUpdate: @escaping (CompactBlockProcessor.Event) async -> Void) async throws -> ActionContext {
if let range = await context.syncRanges.downloadedButUnscannedRange {
logger.debug("Starting scan with downloaded but not scanned blocks with range: \(range.lowerBound)...\(range.upperBound)")
let totalProgressRange = await context.totalProgressRange

View File

@ -18,7 +18,7 @@ class ValidateAction {
extension ValidateAction: Action {
var removeBlocksCacheWhenFailed: Bool { true }
func run(with context: ActionContext, didUpdate: @escaping (CompactBlockProcessorNG.Event) async -> Void) async throws -> ActionContext {
func run(with context: ActionContext, didUpdate: @escaping (CompactBlockProcessor.Event) async -> Void) async throws -> ActionContext {
try await validator.validate()
await context.update(state: .scan)
return context

View File

@ -8,11 +8,11 @@
import Foundation
class ValidateServerAction {
let config: CompactBlockProcessorNG.Configuration
let config: CompactBlockProcessor.Configuration
let rustBackend: ZcashRustBackendWelding
let service: LightWalletService
init(container: DIContainer, config: CompactBlockProcessorNG.Configuration) {
init(container: DIContainer, config: CompactBlockProcessor.Configuration) {
self.config = config
rustBackend = container.resolve(ZcashRustBackendWelding.self)
service = container.resolve(LightWalletService.self)
@ -22,7 +22,7 @@ class ValidateServerAction {
extension ValidateServerAction: Action {
var removeBlocksCacheWhenFailed: Bool { false }
func run(with context: ActionContext, didUpdate: @escaping (CompactBlockProcessorNG.Event) async -> Void) async throws -> ActionContext {
func run(with context: ActionContext, didUpdate: @escaping (CompactBlockProcessor.Event) async -> Void) async throws -> ActionContext {
let info = try await service.getInfo()
let localNetwork = config.network
let saplingActivation = config.saplingActivation

File diff suppressed because it is too large Load Diff

View File

@ -1,743 +0,0 @@
//
// CompactBlockProcessor.swift
//
//
// Created by Michal Fousek on 05.05.2023.
//
import Foundation
actor CompactBlockProcessorNG {
// It would be better to use Combine here but Combine doesn't work great with async. When this runs regularly only one closure is stored here
// and that is one provided by `SDKSynchronizer`. But while running tests more "subscribers" is required here. Therefore it's required to handle
// more closures here.
private var eventClosures: [String: EventClosure] = [:]
private var syncTask: Task<Void, Error>?
private let actions: [CBPState: Action]
private var context: ActionContext
private(set) var config: Configuration
private var afterSyncHooksManager = AfterSyncHooksManager()
private let accountRepository: AccountRepository
private let blockDownloaderService: BlockDownloaderService
private let internalSyncProgress: InternalSyncProgress
private let latestBlocksDataProvider: LatestBlocksDataProvider
private let logger: Logger
private let metrics: SDKMetrics
private let rustBackend: ZcashRustBackendWelding
private let service: LightWalletService
private let storage: CompactBlockRepository
private let transactionRepository: TransactionRepository
private var retryAttempts: Int = 0
private var backoffTimer: Timer?
private var consecutiveChainValidationErrors: Int = 0
/// Compact Block Processor configuration
///
/// - parameter fsBlockCacheRoot: absolute root path where the filesystem block cache will be stored.
/// - parameter dataDb: absolute file path of the DB where all information derived from the cache DB is stored.
/// - parameter spendParamsURL: absolute file path of the sapling-spend.params file
/// - parameter outputParamsURL: absolute file path of the sapling-output.params file
struct Configuration {
let alias: ZcashSynchronizerAlias
let saplingParamsSourceURL: SaplingParamsSourceURL
let fsBlockCacheRoot: URL
let dataDb: URL
let spendParamsURL: URL
let outputParamsURL: URL
let enhanceBatchSize: Int
let batchSize: Int
let retries: Int
let maxBackoffInterval: TimeInterval
let maxReorgSize = ZcashSDK.maxReorgSize
let rewindDistance: Int
let walletBirthdayProvider: () -> BlockHeight
var walletBirthday: BlockHeight { walletBirthdayProvider() }
let downloadBufferSize: Int = 10
let network: ZcashNetwork
let saplingActivation: BlockHeight
let cacheDbURL: URL?
var blockPollInterval: TimeInterval {
TimeInterval.random(in: ZcashSDK.defaultPollInterval / 2 ... ZcashSDK.defaultPollInterval * 1.5)
}
init(
alias: ZcashSynchronizerAlias,
cacheDbURL: URL? = nil,
fsBlockCacheRoot: URL,
dataDb: URL,
spendParamsURL: URL,
outputParamsURL: URL,
saplingParamsSourceURL: SaplingParamsSourceURL,
enhanceBatchSize: Int = ZcashSDK.DefaultEnhanceBatch,
batchSize: Int = ZcashSDK.DefaultSyncBatch,
retries: Int = ZcashSDK.defaultRetries,
maxBackoffInterval: TimeInterval = ZcashSDK.defaultMaxBackOffInterval,
rewindDistance: Int = ZcashSDK.defaultRewindDistance,
walletBirthdayProvider: @escaping () -> BlockHeight,
saplingActivation: BlockHeight,
network: ZcashNetwork
) {
self.alias = alias
self.fsBlockCacheRoot = fsBlockCacheRoot
self.dataDb = dataDb
self.spendParamsURL = spendParamsURL
self.outputParamsURL = outputParamsURL
self.saplingParamsSourceURL = saplingParamsSourceURL
self.network = network
self.enhanceBatchSize = enhanceBatchSize
self.batchSize = batchSize
self.retries = retries
self.maxBackoffInterval = maxBackoffInterval
self.rewindDistance = rewindDistance
self.walletBirthdayProvider = walletBirthdayProvider
self.saplingActivation = saplingActivation
self.cacheDbURL = cacheDbURL
}
init(
alias: ZcashSynchronizerAlias,
fsBlockCacheRoot: URL,
dataDb: URL,
spendParamsURL: URL,
outputParamsURL: URL,
saplingParamsSourceURL: SaplingParamsSourceURL,
enhanceBatchSize: Int = ZcashSDK.DefaultEnhanceBatch,
batchSize: Int = ZcashSDK.DefaultSyncBatch,
retries: Int = ZcashSDK.defaultRetries,
maxBackoffInterval: TimeInterval = ZcashSDK.defaultMaxBackOffInterval,
rewindDistance: Int = ZcashSDK.defaultRewindDistance,
walletBirthdayProvider: @escaping () -> BlockHeight,
network: ZcashNetwork
) {
self.alias = alias
self.fsBlockCacheRoot = fsBlockCacheRoot
self.dataDb = dataDb
self.spendParamsURL = spendParamsURL
self.outputParamsURL = outputParamsURL
self.saplingParamsSourceURL = saplingParamsSourceURL
self.walletBirthdayProvider = walletBirthdayProvider
self.saplingActivation = network.constants.saplingActivationHeight
self.network = network
self.cacheDbURL = nil
self.enhanceBatchSize = enhanceBatchSize
self.batchSize = batchSize
self.retries = retries
self.maxBackoffInterval = maxBackoffInterval
self.rewindDistance = rewindDistance
}
}
/// Initializes a CompactBlockProcessor instance
/// - Parameters:
/// - service: concrete implementation of `LightWalletService` protocol
/// - storage: concrete implementation of `CompactBlockRepository` protocol
/// - backend: a class that complies to `ZcashRustBackendWelding`
/// - config: `Configuration` struct for this processor
init(container: DIContainer, config: Configuration) {
self.init(
container: container,
config: config,
accountRepository: AccountRepositoryBuilder.build(dataDbURL: config.dataDb, readOnly: true, logger: container.resolve(Logger.self))
)
}
/// Initializes a CompactBlockProcessor instance from an Initialized object
/// - Parameters:
/// - initializer: an instance that complies to CompactBlockDownloading protocol
init(initializer: Initializer, walletBirthdayProvider: @escaping () -> BlockHeight) {
self.init(
container: initializer.container,
config: Configuration(
alias: initializer.alias,
fsBlockCacheRoot: initializer.fsBlockDbRoot,
dataDb: initializer.dataDbURL,
spendParamsURL: initializer.spendParamsURL,
outputParamsURL: initializer.outputParamsURL,
saplingParamsSourceURL: initializer.saplingParamsSourceURL,
walletBirthdayProvider: walletBirthdayProvider,
network: initializer.network
),
accountRepository: initializer.accountRepository
)
}
init(container: DIContainer, config: Configuration, accountRepository: AccountRepository) {
// Dependencies.setupCompactBlockProcessor(
// in: container,
// config: config,
// accountRepository: accountRepository
// )
context = ActionContext(state: .validateServer)
actions = Self.makeActions(container: container, config: config)
self.metrics = container.resolve(SDKMetrics.self)
self.logger = container.resolve(Logger.self)
self.latestBlocksDataProvider = container.resolve(LatestBlocksDataProvider.self)
self.internalSyncProgress = container.resolve(InternalSyncProgress.self)
self.blockDownloaderService = container.resolve(BlockDownloaderService.self)
self.service = container.resolve(LightWalletService.self)
self.rustBackend = container.resolve(ZcashRustBackendWelding.self)
self.storage = container.resolve(CompactBlockRepository.self)
self.config = config
self.transactionRepository = container.resolve(TransactionRepository.self)
self.accountRepository = accountRepository
}
// swiftlint:disable:next cyclomatic_complexity
private static func makeActions(container: DIContainer, config: Configuration) -> [CBPState: Action] {
let actionsDefinition = CBPState.allCases.compactMap { state -> (CBPState, Action)? in
let action: Action
switch state {
case .migrateLegacyCacheDB:
action = MigrateLegacyCacheDBAction(container: container, config: config)
case .validateServer:
action = ValidateServerAction(container: container, config: config)
case .computeSyncRanges:
action = ComputeSyncRangesAction(container: container, config: config)
case .checksBeforeSync:
action = ChecksBeforeSyncAction(container: container)
case .scanDownloaded:
action = ScanDownloadedButUnscannedAction(container: container)
case .download:
action = DownloadAction(container: container, config: config)
case .validate:
action = ValidateAction(container: container)
case .scan:
action = ScanAction(container: container, config: config)
case .clearAlreadyScannedBlocks:
action = ClearAlreadyScannedBlocksAction(container: container)
case .enhance:
action = EnhanceAction(container: container, config: config)
case .fetchUTXO:
action = FetchUTXOsAction(container: container)
case .handleSaplingParams:
action = SaplingParamsAction(container: container)
case .clearCache:
action = ClearCacheAction(container: container)
case .finished, .failed, .stopped, .idle:
return nil
}
return (state, action)
}
return Dictionary(uniqueKeysWithValues: actionsDefinition)
}
}
// MARK: - "Public" API
extension CompactBlockProcessorNG {
func start(retry: Bool = false) async {
if retry {
self.retryAttempts = 0
self.backoffTimer?.invalidate()
self.backoffTimer = nil
}
guard await canStartSync() else {
if await isIdle() {
logger.warn("max retry attempts reached on \(await context.state) state")
await send(event: .failed(ZcashError.compactBlockProcessorMaxAttemptsReached(config.retries)))
} else {
logger.debug("Warning: compact block processor was started while busy!!!!")
afterSyncHooksManager.insert(hook: .anotherSync)
}
return
}
syncTask = Task(priority: .userInitiated) {
await run()
}
}
func stop() async {
await rawStop()
retryAttempts = 0
}
}
// MARK: - Events
extension CompactBlockProcessorNG {
typealias EventClosure = (Event) async -> Void
enum Event {
/// Event sent when the CompactBlockProcessor presented an error.
case failed (Error)
/// Event sent when the CompactBlockProcessor has finished syncing the blockchain to latest height
case finished (_ lastScannedHeight: BlockHeight)
/// Event sent when the CompactBlockProcessor enhanced a bunch of transactions in some range.
case foundTransactions ([ZcashTransaction.Overview], CompactBlockRange)
/// Event sent when the CompactBlockProcessor handled a ReOrg.
/// `reorgHeight` is the height on which the reorg was detected.
/// `rewindHeight` is the height that the processor backed to in order to solve the Reorg.
case handledReorg (_ reorgHeight: BlockHeight, _ rewindHeight: BlockHeight)
/// Event sent when progress of the sync process changes.
case progressUpdated (CompactBlockProgress)
/// Event sent when the CompactBlockProcessor fetched utxos from lightwalletd attempted to store them.
case storedUTXOs ((inserted: [UnspentTransactionOutputEntity], skipped: [UnspentTransactionOutputEntity]))
/// Event sent when the CompactBlockProcessor starts enhancing of the transactions.
case startedEnhancing
/// Event sent when the CompactBlockProcessor starts fetching of the UTXOs.
case startedFetching
/// Event sent when the CompactBlockProcessor starts syncing.
case startedSyncing
/// Event sent when the CompactBlockProcessor stops syncing.
case stopped
}
func updateEventClosure(identifier: String, closure: @escaping (Event) async -> Void) async {
eventClosures[identifier] = closure
}
private func send(event: Event) async {
for item in eventClosures {
await item.value(event)
}
}
}
// MARK: - Main loop
extension CompactBlockProcessorNG {
// This is main loop of the sync process. It simply takes state and try to find action which handles it. If action is found it executes the
// action. If action is not found then loop finishes. Thanks to this it's super easy to identify start point of sync process and end points
// of sync process without any side effects.
private func run() async {
resetContext()
while true {
// Sync is starting when the state is `idle`.
if await context.state == .idle {
await syncStarted()
if backoffTimer == nil {
await setTimer()
}
}
// Try to find action for state.
guard let action = actions[await context.state] else {
if await syncFinished() {
resetContext()
continue
} else {
break
}
}
do {
try Task.checkCancellation()
// Execute action.
context = try await action.run(with: context) { [weak self] event in
await self?.send(event: event)
}
await didFinishAction()
} catch {
if Task.isCancelled {
if await syncTaskWasCancelled() {
// Start sync all over again
resetContext()
} else {
// end the sync loop
break
}
} else {
if await handleSyncFailure(action: action, error: error) {
// Start sync all over again
resetContext()
} else {
// end the sync loop
break
}
}
}
}
}
private func syncTaskWasCancelled() async -> Bool {
logger.info("Sync cancelled.")
syncTask = nil
await context.update(state: .stopped)
await send(event: .stopped)
return await handleAfterSyncHooks()
}
private func handleSyncFailure(action: Action, error: Error) async -> Bool {
if action.removeBlocksCacheWhenFailed {
await ifTaskIsNotCanceledClearCompactBlockCache()
}
if case let ZcashError.rustValidateCombinedChainInvalidChain(height) = error {
logger.error("Sync failed because of validation error: \(error)")
do {
try await validationFailed(at: BlockHeight(height))
// Start sync all over again
return true
} catch {
await failure(error)
return false
}
} else {
logger.error("Sync failed with error: \(error)")
await failure(error)
return false
}
}
// swiftlint:disable:next cyclomatic_complexity
private func didFinishAction() async {
// This is evalution of the state setup by previous action.
switch await context.state {
case .idle:
break
case .migrateLegacyCacheDB:
break
case .validateServer:
break
case .computeSyncRanges:
break
case .checksBeforeSync:
break
case .scanDownloaded:
break
case .download:
break
case .validate:
break
case .scan:
break
case .clearAlreadyScannedBlocks:
break
case .enhance:
await send(event: .startedEnhancing)
case .fetchUTXO:
await send(event: .startedFetching)
case .handleSaplingParams:
break
case .clearCache:
break
case .finished:
break
case .failed:
break
case .stopped:
break
}
}
private func resetContext() {
context = ActionContext(state: .idle)
}
private func syncStarted() async {
// handle start of the sync process
await send(event: .startedSyncing)
}
private func syncFinished() async -> Bool {
let newerBlocksWereMinedDuringSync = await context.syncRanges.latestBlockHeight < latestBlocksDataProvider.latestBlockHeight
retryAttempts = 0
consecutiveChainValidationErrors = 0
let lastScannedHeight = await latestBlocksDataProvider.latestScannedHeight
await send(event: .finished(lastScannedHeight))
await context.update(state: .finished)
// If new blocks were mined during previous sync run the sync process again
if newerBlocksWereMinedDuringSync {
return true
} else {
await setTimer()
return false
}
}
private func update(progress: CompactBlockProgress) async {
await send(event: .progressUpdated(progress))
}
private func validationFailed(at height: BlockHeight) async throws {
// cancel all Tasks
await rawStop()
// rewind
let rewindHeight = determineLowerBound(
errorHeight: height,
consecutiveErrors: consecutiveChainValidationErrors,
walletBirthday: config.walletBirthday
)
consecutiveChainValidationErrors += 1
try await rustBackend.rewindToHeight(height: Int32(rewindHeight))
try await blockDownloaderService.rewind(to: rewindHeight)
await internalSyncProgress.rewind(to: rewindHeight)
await send(event: .handledReorg(height, rewindHeight))
}
private func failure(_ error: Error) async {
await context.update(state: .failed)
logger.error("Fail with error: \(error)")
await rawStop()
self.retryAttempts += 1
await send(event: .failed(error))
// don't set a new timer if there are no more attempts.
if hasRetryAttempt() {
await self.setTimer()
}
}
private func handleAfterSyncHooks() async -> Bool {
let afterSyncHooksManager = self.afterSyncHooksManager
self.afterSyncHooksManager = AfterSyncHooksManager()
if let wipeContext = afterSyncHooksManager.shouldExecuteWipeHook() {
await doWipe(context: wipeContext)
return false
} else if let rewindContext = afterSyncHooksManager.shouldExecuteRewindHook() {
await doRewind(context: rewindContext)
return false
} else if afterSyncHooksManager.shouldExecuteAnotherSyncHook() {
logger.debug("Starting new sync.")
return true
} else {
return false
}
}
}
// MARK: - Rewind
extension CompactBlockProcessorNG {
/// Rewinds to provided height.
/// - Parameter height: height to rewind to. If nil is provided, it will rescan to nearest height (quick rescan)
///
/// - Note: If this is called while sync is in progress then the sync process is stopped first and then rewind is executed.
func rewind(context: AfterSyncHooksManager.RewindContext) async {
logger.debug("Starting rewind")
if await isIdle() {
logger.debug("Sync doesn't run. Executing rewind.")
await doRewind(context: context)
} else {
logger.debug("Stopping sync because of rewind")
afterSyncHooksManager.insert(hook: .rewind(context))
await stop()
}
}
private func doRewind(context: AfterSyncHooksManager.RewindContext) async {
logger.debug("Executing rewind.")
let lastDownloaded = await internalSyncProgress.latestDownloadedBlockHeight
let height = Int32(context.height ?? lastDownloaded)
let nearestHeight: Int32
do {
nearestHeight = try await rustBackend.getNearestRewindHeight(height: height)
} catch {
await failure(error)
return await context.completion(.failure(error))
}
// FIXME: [#719] this should be done on the rust layer, https://github.com/zcash/ZcashLightClientKit/issues/719
let rewindHeight = max(Int32(nearestHeight - 1), Int32(config.walletBirthday))
do {
try await rustBackend.rewindToHeight(height: rewindHeight)
} catch {
await failure(error)
return await context.completion(.failure(error))
}
// clear cache
let rewindBlockHeight = BlockHeight(rewindHeight)
do {
try await blockDownloaderService.rewind(to: rewindBlockHeight)
} catch {
return await context.completion(.failure(error))
}
await internalSyncProgress.rewind(to: rewindBlockHeight)
await context.completion(.success(rewindBlockHeight))
}
}
// MARK: - Wipe
extension CompactBlockProcessorNG {
func wipe(context: AfterSyncHooksManager.WipeContext) async {
logger.debug("Starting wipe")
if await isIdle() {
logger.debug("Sync doesn't run. Executing wipe.")
await doWipe(context: context)
} else {
logger.debug("Stopping sync because of wipe")
afterSyncHooksManager.insert(hook: .wipe(context))
await stop()
}
}
private func doWipe(context: AfterSyncHooksManager.WipeContext) async {
logger.debug("Executing wipe.")
context.prewipe()
do {
try await self.storage.clear()
await internalSyncProgress.rewind(to: 0)
wipeLegacyCacheDbIfNeeded()
let fileManager = FileManager.default
if fileManager.fileExists(atPath: config.dataDb.path) {
try fileManager.removeItem(at: config.dataDb)
}
await context.completion(nil)
} catch {
await context.completion(error)
}
}
private func wipeLegacyCacheDbIfNeeded() {
guard let cacheDbURL = config.cacheDbURL else { return }
guard FileManager.default.isDeletableFile(atPath: cacheDbURL.pathExtension) else { return }
try? FileManager.default.removeItem(at: cacheDbURL)
}
}
// MARK: - Utils
extension CompactBlockProcessorNG {
private func setTimer() async {
let interval = config.blockPollInterval
self.backoffTimer?.invalidate()
let timer = Timer(
timeInterval: interval,
repeats: true,
block: { [weak self] _ in
Task { [weak self] in
guard let self else { return }
if await self.isIdle() {
if await self.canStartSync() {
self.logger.debug(
"""
Timer triggered: Starting compact Block processor!.
Processor State: \(await self.context.state)
latestHeight: \(try await self.transactionRepository.lastScannedHeight())
attempts: \(await self.retryAttempts)
"""
)
await self.start()
} else if await hasRetryAttempt() {
await self.failure(ZcashError.compactBlockProcessorMaxAttemptsReached(self.config.retries))
}
} else {
await self.latestBlocksDataProvider.updateBlockData()
}
}
}
)
RunLoop.main.add(timer, forMode: .default)
self.backoffTimer = timer
}
private func isIdle() async -> Bool {
switch await context.state {
case .stopped, .failed, .finished, .idle:
return true
case .computeSyncRanges,
.checksBeforeSync,
.download,
.validate,
.scan,
.enhance,
.fetchUTXO,
.handleSaplingParams,
.clearCache,
.scanDownloaded,
.clearAlreadyScannedBlocks,
.validateServer,
.migrateLegacyCacheDB:
return false
}
}
private func canStartSync() async -> Bool {
return await isIdle() && hasRetryAttempt()
}
private func hasRetryAttempt() -> Bool {
retryAttempts < config.retries
}
private func determineLowerBound(errorHeight: Int, consecutiveErrors: Int, walletBirthday: BlockHeight) -> BlockHeight {
let offset = min(ZcashSDK.maxReorgSize, ZcashSDK.defaultRewindDistance * (consecutiveErrors + 1))
return max(errorHeight - offset, walletBirthday - ZcashSDK.maxReorgSize)
}
private func rawStop() async {
syncTask?.cancel()
self.backoffTimer?.invalidate()
self.backoffTimer = nil
await stopAllActions()
}
private func stopAllActions() async {
for action in actions.values {
await action.stop()
}
}
private func ifTaskIsNotCanceledClearCompactBlockCache() async {
guard !Task.isCancelled else { return }
let lastScannedHeight = await latestBlocksDataProvider.latestScannedHeight
do {
// Blocks download work in parallel with scanning. So imagine this scenario:
//
// Scanning is done until height 10300. Blocks are downloaded until height 10400.
// And now validation fails and this method is called. And `.latestDownloadedBlockHeight` in `internalSyncProgress` is set to 10400. And
// all the downloaded blocks are removed here.
//
// If this line doesn't happen then when sync starts next time it thinks that all the blocks are downloaded until 10400. But all were
// removed. So blocks between 10300 and 10400 wouldn't ever be scanned.
//
// Scanning is done until 10300 so the SDK can be sure that blocks with height below 10300 are not required. So it makes sense to set
// `.latestDownloadedBlockHeight` to `lastScannedHeight`. And sync will work fine in next run.
await internalSyncProgress.set(lastScannedHeight, .latestDownloadedBlockHeight)
try await clearCompactBlockCache()
} catch {
logger.error("`clearCompactBlockCache` failed after error: \(error)")
}
}
private func clearCompactBlockCache() async throws {
await stopAllActions()
try await storage.clear()
logger.info("Cache removed")
}
}

View File

@ -107,7 +107,6 @@ extension BlockEnhancerImpl: BlockEnhancer {
let newlyMinedRange = newlyMinedLowerBound...chainTipHeight
for index in 0 ..< transactions.count {
let transaction = transactions[index]
var retry = true

View File

@ -17,7 +17,10 @@ struct UTXOFetcherConfig {
}
protocol UTXOFetcher {
func fetch(at range: CompactBlockRange, didFetch: (Float) async -> ()) async throws -> (inserted: [UnspentTransactionOutputEntity], skipped: [UnspentTransactionOutputEntity])
func fetch(
at range: CompactBlockRange,
didFetch: (Float) async -> Void
) async throws -> (inserted: [UnspentTransactionOutputEntity], skipped: [UnspentTransactionOutputEntity])
}
struct UTXOFetcherImpl {
@ -31,7 +34,10 @@ struct UTXOFetcherImpl {
}
extension UTXOFetcherImpl: UTXOFetcher {
func fetch(at range: CompactBlockRange, didFetch: (Float) async -> ()) async throws -> (inserted: [UnspentTransactionOutputEntity], skipped: [UnspentTransactionOutputEntity]) {
func fetch(
at range: CompactBlockRange,
didFetch: (Float) async -> Void
) async throws -> (inserted: [UnspentTransactionOutputEntity], skipped: [UnspentTransactionOutputEntity]) {
try Task.checkCancellation()
let accounts = try accountRepository.getAll()

View File

@ -37,6 +37,12 @@ struct SyncRanges: Equatable {
}
}
enum NextState: Equatable {
case finishProcessing(height: BlockHeight)
case processNewBlocks(ranges: SyncRanges)
case wait(latestHeight: BlockHeight, latestDownloadHeight: BlockHeight)
}
protocol InternalSyncProgressStorage {
func bool(forKey defaultName: String) -> Bool
func integer(forKey defaultName: String) -> Int
@ -127,7 +133,7 @@ actor InternalSyncProgress {
latestBlockHeight: BlockHeight,
latestScannedHeight: BlockHeight,
walletBirthday: BlockHeight
) -> CompactBlockProcessor.NextState {
) -> NextState {
logger.debug("""
Init numbers:
latestBlockHeight: \(latestBlockHeight)

View File

@ -88,22 +88,8 @@ public enum ZcashSDK {
// MARK: Defaults
/// Default size of batches of blocks to request from the compact block service. Which was used both for scanning and downloading.
/// consider basing your code assumptions on `DefaultDownloadBatch` and `DefaultScanningBatch` instead.
@available(*, deprecated, message: "this value is being deprecated in favor of `DefaultDownloadBatch` and `DefaultScanningBatch`")
public static let DefaultBatchSize = 100
/// Default batch size for downloading blocks for the compact block processor. Be careful with this number. This amount of blocks is held in
/// memory at some point of the sync process.
/// This values can't be smaller than `DefaultScanningBatch`. Otherwise bad things will happen.
public static let DefaultDownloadBatch = 100
/// Default batch size for scanning blocks for the compact block processor
public static let DefaultScanningBatch = 100
/// Default batch size for downloading and scanning blocks for the compact block processor. Be careful with this number. This amount of blocks
/// times three is held in memory at some point of the sync process.
public static let DefaultSyncBatch = 100
/// Default batch size for enhancing transactions for the compact block processor
public static let DefaultEnhanceBatch = 1000

View File

@ -137,7 +137,7 @@ enum Dependencies {
let blockScannerConfig = BlockScannerConfig(
networkType: config.network.networkType,
scanningBatchSize: config.scanningBatchSize
scanningBatchSize: config.batchSize
)
return BlockScannerImpl(

View File

@ -197,14 +197,15 @@ public class SDKSynchronizer: Synchronizer {
// MARK: Handle CompactBlockProcessor.Flow
// swiftlint:disable:next cyclomatic_complexity
private func subscribeToProcessorEvents(_ processor: CompactBlockProcessor) async {
let eventClosure: CompactBlockProcessor.EventClosure = { [weak self] event in
switch event {
case let .failed(error):
await self?.failed(error: error)
case let .finished(height, foundBlocks):
await self?.finished(lastScannedHeight: height, foundBlocks: foundBlocks)
case let .finished(height):
await self?.finished(lastScannedHeight: height)
case let .foundTransactions(transactions, range):
self?.foundTransactions(transactions: transactions, in: range)
@ -243,7 +244,7 @@ public class SDKSynchronizer: Synchronizer {
await updateStatus(.error(error))
}
private func finished(lastScannedHeight: BlockHeight, foundBlocks: Bool) async {
private func finished(lastScannedHeight: BlockHeight) async {
await latestBlocksDataProvider.updateScannedData()
await updateStatus(.synced)
@ -403,7 +404,7 @@ public class SDKSynchronizer: Synchronizer {
}
public func latestHeight() async throws -> BlockHeight {
try await blockProcessor.blockDownloaderService.latestBlockHeight()
try await blockProcessor.latestHeight()
}
public func latestUTXOs(address: String) async throws -> [UnspentTransactionOutputEntity] {

View File

@ -905,7 +905,6 @@ class BalanceTests: ZcashTestCase {
let memo = try Memo(string: "shielding is fun!")
var pendingTx: ZcashTransaction.Overview?
let transaction = try await coordinator.synchronizer.sendToAddress(
spendingKey: spendingKey,
zatoshi: sendAmount,

View File

@ -213,7 +213,9 @@ class SynchronizerDarksideTests: ZcashTestCase {
syncSessionID: uuids[0],
shieldedBalance: WalletBalance(verified: Zatoshi(100000), total: Zatoshi(200000)),
transparentBalance: .zero,
internalSyncStatus: .enhancing(EnhancementProgress(totalTransactions: 0, enhancedTransactions: 0, lastFoundTransaction: nil, range: 0...0, newlyMined: false)),
internalSyncStatus: .enhancing(
EnhancementProgress(totalTransactions: 0, enhancedTransactions: 0, lastFoundTransaction: nil, range: 0...0, newlyMined: false)
),
latestScannedHeight: 663189,
latestBlockHeight: 663189,
latestScannedTime: 1

View File

@ -152,7 +152,6 @@ extension ZcashRustBackend {
}
}
extension Zatoshi: CustomDebugStringConvertible {
public var debugDescription: String {
"Zatoshi(\(self.amount))"