From a38e8134b8291ab2052a4e08de75c370b46c9031 Mon Sep 17 00:00:00 2001 From: Michal Fousek Date: Wed, 15 Mar 2023 12:17:43 +0100 Subject: [PATCH] [#724] Switch from NotificationCenter to Combine Closes #724 - All the notifications are gone. Only `synchronizerConnectionStateChanged` stayed because it's used internally. Let's deal with this one in another task. - Synchronizer has now two new publishers (`stateStream` and (`eventStream`) which are used to notify the client app about what is going on. These publishers replace notifications. - There is also new property `latestState` which can be used to get the SDK state in synchronous manner. - `SDKSynchronizer.status` is no longer public. It is used internally to refresh `latestState` and emit new values from `stateStream. - When `SDKSynchronizer.status` is update `notify()` function is triggered. And this function is now responsible for generating new snapshot of `SynchronizerState` and updating `latestState` and `stateStream`. --- CHANGELOG.md | 6 + .../ZcashLightClientSample/AppDelegate.swift | 24 +- .../Send/SendViewController.swift | 82 +---- .../SyncBlocksViewController.swift | 137 ++++---- MIGRATING.md | 30 +- .../Block/CompactBlockProcessor.swift | 36 +- .../Block/Enhance/BlockEnhancer.swift | 6 +- .../ZcashLightClientKit/Synchronizer.swift | 103 +++--- .../Synchronizer/SDKSynchronizer.swift | 308 +++++------------- Tests/DarksideTests/BalanceTests.swift | 36 +- .../InternalStateConsistencyTests.swift | 13 +- .../SychronizerDarksideTests.swift | 147 ++++++--- Tests/DarksideTests/SynchronizerTests.swift | 26 +- Tests/DarksideTests/Z2TReceiveTests.swift | 26 +- .../PerformanceTests/SynchronizerTests.swift | 12 +- .../SDKSynchronizerSyncStatusHandler.swift | 50 +++ Tests/TestUtils/TestCoordinator.swift | 37 ++- Tests/TestUtils/Tests+Utils.swift | 15 +- 18 files changed, 540 insertions(+), 554 deletions(-) create mode 100644 Tests/TestUtils/SDKSynchronizerSyncStatusHandler.swift diff --git a/CHANGELOG.md b/CHANGELOG.md index 1f2a54dc..7c66b5e4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,10 @@ # unreleased +- [#724] Switch from event based notifications to state based notifications + + The `SDKSynchronizer` no longer uses `NotificationCenter` to send notifications. +Notifications are replaced with `Combine` publishers. Check the migrating document and +documentation in the code to get more information. + - [#826] Change how the SDK is initialized - `viewingKeys` and `walletBirthday` are removed from `Initializer` constuctor. These parameters diff --git a/Example/ZcashLightClientSample/ZcashLightClientSample/AppDelegate.swift b/Example/ZcashLightClientSample/ZcashLightClientSample/AppDelegate.swift index 55e04264..68b27532 100644 --- a/Example/ZcashLightClientSample/ZcashLightClientSample/AppDelegate.swift +++ b/Example/ZcashLightClientSample/ZcashLightClientSample/AppDelegate.swift @@ -59,20 +59,20 @@ class AppDelegate: UIResponder, UIApplicationDelegate { } func subscribeToMinedTxNotifications() { - NotificationCenter.default.addObserver( - self, - selector: #selector(txMinedNotification(_:)), - name: Notification.Name.synchronizerMinedTransaction, - object: nil - ) + sharedSynchronizer.eventStream + .map { event in + guard case let .minedTransaction(transaction) = event else { return nil } + return transaction + } + .compactMap { $0 } + .receive(on: DispatchQueue.main) + .sink( + receiveValue: { [weak self] transaction in self?.txMined(transaction) } + ) + .store(in: &cancellables) } - @objc func txMinedNotification(_ notification: Notification) { - guard let transaction = notification.userInfo?[SDKSynchronizer.NotificationKeys.minedTransaction] as? PendingTransactionEntity else { - loggerProxy.error("no tx information on notification") - return - } - + func txMined(_ transaction: PendingTransactionEntity) { NotificationBubble.display( in: window!.rootViewController!.view, options: NotificationBubble.sucessOptions( diff --git a/Example/ZcashLightClientSample/ZcashLightClientSample/Send/SendViewController.swift b/Example/ZcashLightClientSample/ZcashLightClientSample/Send/SendViewController.swift index a1f8125e..a1c1ca29 100644 --- a/Example/ZcashLightClientSample/ZcashLightClientSample/Send/SendViewController.swift +++ b/Example/ZcashLightClientSample/ZcashLightClientSample/Send/SendViewController.swift @@ -6,6 +6,7 @@ // Copyright © 2019 Electric Coin Company. All rights reserved. // +import Combine import UIKit import ZcashLightClientKit import KRProgressHUD @@ -29,6 +30,8 @@ class SendViewController: UIViewController { // swiftlint:disable:next implicitly_unwrapped_optional var synchronizer: Synchronizer! + + var cancellables: [AnyCancellable] = [] override func viewDidLoad() { super.viewDidLoad() @@ -50,9 +53,9 @@ class SendViewController: UIViewController { super.viewDidAppear(animated) do { try synchronizer.start(retry: false) - self.synchronizerStatusLabel.text = SDKSynchronizer.textFor(state: synchronizer.status) + self.synchronizerStatusLabel.text = SDKSynchronizer.textFor(state: synchronizer.latestState.syncStatus) } catch { - self.synchronizerStatusLabel.text = SDKSynchronizer.textFor(state: synchronizer.status) + self.synchronizerStatusLabel.text = SDKSynchronizer.textFor(state: synchronizer.latestState.syncStatus) fail(error) } } @@ -78,35 +81,15 @@ class SendViewController: UIViewController { memoField.layer.borderWidth = 1 memoField.layer.cornerRadius = 5 charactersLeftLabel.text = textForCharacterCount(0) - let center = NotificationCenter.default - - center.addObserver( - self, - selector: #selector(synchronizerStarted(_:)), - name: Notification.Name.synchronizerStarted, - object: synchronizer - ) - center.addObserver( - self, - selector: #selector(synchronizerSynced(_:)), - name: Notification.Name.synchronizerSynced, - object: synchronizer - ) - - center.addObserver( - self, - selector: #selector(synchronizerStopped(_:)), - name: Notification.Name.synchronizerStopped, - object: synchronizer - ) - - center.addObserver( - self, - selector: #selector(synchronizerUpdated(_:)), - name: Notification.Name.synchronizerProgressUpdated, - object: synchronizer - ) + synchronizer.stateStream + .throttle(for: .seconds(0.2), scheduler: DispatchQueue.main, latest: true) + .sink( + receiveValue: { [weak self] state in + self?.synchronizerStatusLabel.text = SDKSynchronizer.textFor(state: state.syncStatus) + } + ) + .store(in: &cancellables) } func format(balance: Zatoshi = Zatoshi()) -> String { @@ -129,7 +112,7 @@ class SendViewController: UIViewController { } func isFormValid() -> Bool { - switch synchronizer.status { + switch synchronizer.latestState.syncStatus { case .synced: return isBalanceValid() && isAmountValid() && isRecipientValid() default: @@ -263,43 +246,6 @@ class SendViewController: UIViewController { func cancel() {} - // MARK: synchronizer notifications - @objc func synchronizerUpdated(_ notification: Notification) { - DispatchQueue.main.async { [weak self] in - guard let self else { - return - } - self.synchronizerStatusLabel.text = SDKSynchronizer.textFor(state: self.synchronizer.status) - } - } - - @objc func synchronizerStarted(_ notification: Notification) { - DispatchQueue.main.async { [weak self] in - guard let self else { - return - } - self.synchronizerStatusLabel.text = SDKSynchronizer.textFor(state: self.synchronizer.status) - } - } - - @objc func synchronizerStopped(_ notification: Notification) { - DispatchQueue.main.async { [weak self] in - guard let self else { - return - } - self.synchronizerStatusLabel.text = SDKSynchronizer.textFor(state: self.synchronizer.status) - } - } - - @objc func synchronizerSynced(_ notification: Notification) { - DispatchQueue.main.async { [weak self] in - guard let self else { - return - } - self.synchronizerStatusLabel.text = SDKSynchronizer.textFor(state: self.synchronizer.status) - } - } - func textForCharacterCount(_ count: Int) -> String { "\(count) of \(characterLimit) bytes left" } diff --git a/Example/ZcashLightClientSample/ZcashLightClientSample/Sync Blocks/SyncBlocksViewController.swift b/Example/ZcashLightClientSample/ZcashLightClientSample/Sync Blocks/SyncBlocksViewController.swift index 69f45349..60de18fa 100644 --- a/Example/ZcashLightClientSample/ZcashLightClientSample/Sync Blocks/SyncBlocksViewController.swift +++ b/Example/ZcashLightClientSample/ZcashLightClientSample/Sync Blocks/SyncBlocksViewController.swift @@ -19,8 +19,9 @@ class SyncBlocksViewController: UIViewController { @IBOutlet weak var startPause: UIButton! @IBOutlet weak var metricLabel: UILabel! @IBOutlet weak var summaryLabel: UILabel! - + private var queue = DispatchQueue(label: "metrics.queue", qos: .default) + private var enhancingStarted = false private var accumulatedMetrics: ProcessorMetrics = .initial private var currentMetric: SDKMetrics.Operation? private var currentMetricName: String { @@ -34,12 +35,12 @@ class SyncBlocksViewController: UIViewController { } } + var cancellables: [AnyCancellable] = [] + let synchronizer = AppDelegate.shared.sharedSynchronizer - var notificationCancellables: [AnyCancellable] = [] - deinit { - notificationCancellables.forEach { $0.cancel() } + cancellables.forEach { $0.cancel() } } override func viewDidLoad() { @@ -51,83 +52,62 @@ class SyncBlocksViewController: UIViewController { action: #selector(wipe(_:)) ) - statusLabel.text = textFor(state: synchronizer.status) + statusLabel.text = textFor(state: synchronizer.latestState.syncStatus) progressBar.progress = 0 - let center = NotificationCenter.default - let subscribeToNotifications: [Notification.Name] = [ - .synchronizerStarted, - .synchronizerProgressUpdated, - .synchronizerStatusWillUpdate, - .synchronizerSynced, - .synchronizerStopped, - .synchronizerDisconnected, - .synchronizerSyncing, - .synchronizerEnhancing, - .synchronizerFetching, - .synchronizerFailed - ] - for notificationName in subscribeToNotifications { - center.publisher(for: notificationName) - .receive(on: DispatchQueue.main) - .sink { [weak self] notification in - DispatchQueue.main.async { - self?.processorNotification(notification) - } - } - .store(in: ¬ificationCancellables) - } - - NotificationCenter.default.publisher(for: .synchronizerEnhancing, object: nil) - .receive(on: DispatchQueue.main) - .sink { [weak self] _ in - self?.accumulateMetrics() - self?.summaryLabel.text = "scan: \((self?.accumulatedMetrics.debugDescription ?? "No summary"))" - self?.accumulatedMetrics = .initial - self?.currentMetric = .enhancement - } - .store(in: ¬ificationCancellables) - - NotificationCenter.default.publisher(for: .synchronizerProgressUpdated, object: nil) - .throttle(for: 5, scheduler: DispatchQueue.main, latest: true) - .receive(on: DispatchQueue.main) - .map { [weak self] _ -> SDKMetrics.BlockMetricReport? in - guard let currentMetric = self?.currentMetric else { return nil } - return SDKMetrics.shared.popBlock(operation: currentMetric)?.last - } - .sink { [weak self] report in - self?.metricLabel.text = (self?.currentMetricName ?? "") + report.debugDescription - } - .store(in: ¬ificationCancellables) - - NotificationCenter.default.publisher(for: .synchronizerSynced, object: nil) - .receive(on: DispatchQueue.main) - .delay(for: 0.5, scheduler: DispatchQueue.main) - .sink { [weak self] _ in - self?.accumulateMetrics() - self?.summaryLabel.text = "enhancement: \((self?.accumulatedMetrics.debugDescription ?? "No summary"))" - self?.overallSummary() - } - .store(in: ¬ificationCancellables) + synchronizer.stateStream + .throttle(for: .seconds(0.2), scheduler: DispatchQueue.main, latest: true) + .sink(receiveValue: { [weak self] state in self?.synchronizerStateUpdated(state) }) + .store(in: &cancellables) } override func viewWillDisappear(_ animated: Bool) { super.viewWillDisappear(animated) - - notificationCancellables.forEach { $0.cancel() } + cancellables.forEach { $0.cancel() } synchronizer.stop() } - @objc func processorNotification(_ notification: Notification) { + private func synchronizerStateUpdated(_ state: SynchronizerState) { self.updateUI() - switch notification.name { - case let not where not == Notification.Name.synchronizerProgressUpdated: - guard let progress = notification.userInfo?[SDKSynchronizer.NotificationKeys.progress] as? CompactBlockProgress else { return } - self.progressBar.progress = progress.progress - self.progressLabel.text = "\(floor(progress.progress * 1000) / 10)%" - default: - return + switch state.syncStatus { + case .unprepared: + break + + case let .syncing(progress): + enhancingStarted = false + + progressBar.progress = progress.progress + progressLabel.text = "\(floor(progress.progress * 1000) / 10)%" + + if let currentMetric { + let report = SDKMetrics.shared.popBlock(operation: currentMetric)?.last + metricLabel.text = currentMetricName + report.debugDescription + } + + case .enhancing: + guard !enhancingStarted else { return } + enhancingStarted = true + + accumulateMetrics() + summaryLabel.text = "scan: \(accumulatedMetrics.debugDescription)" + accumulatedMetrics = .initial + currentMetric = .enhancement + + case .fetching: + break + + case .synced: + accumulateMetrics() + summaryLabel.text = "enhancement: \(accumulatedMetrics.debugDescription)" + overallSummary() + + case .stopped: + break + case .disconnected: + break + case .error: + break } } @@ -169,10 +149,11 @@ class SyncBlocksViewController: UIViewController { } func doStartStop() async { - switch synchronizer.status { + let syncStatus = synchronizer.latestState.syncStatus + switch syncStatus { case .stopped, .unprepared: do { - if synchronizer.status == .unprepared { + if syncStatus == .unprepared { _ = try synchronizer.prepare( with: DemoAppConfig.seed, viewingKeys: [AppDelegate.shared.sharedViewingKey], @@ -214,11 +195,11 @@ class SyncBlocksViewController: UIViewController { } func updateUI() { - let state = synchronizer.status + let syncStatus = synchronizer.latestState.syncStatus - statusLabel.text = textFor(state: state) - startPause.setTitle(buttonText(for: state), for: .normal) - if case SyncStatus.synced = state { + statusLabel.text = textFor(state: syncStatus) + startPause.setTitle(buttonText(for: syncStatus), for: .normal) + if case SyncStatus.synced = syncStatus { startPause.isEnabled = false } else { startPause.isEnabled = true @@ -229,9 +210,9 @@ class SyncBlocksViewController: UIViewController { switch state { case .syncing: return "Pause" - case .stopped: + case .stopped, .unprepared: return "Start" - case .error, .unprepared, .disconnected: + case .error, .disconnected: return "Retry" case .synced: return "Chill!" diff --git a/MIGRATING.md b/MIGRATING.md index 9a569ef9..576ec136 100644 --- a/MIGRATING.md +++ b/MIGRATING.md @@ -1,4 +1,32 @@ -# Migrating from previous versions to +# Migrating from previous versions to +The `SDKSynchronizer` no longer uses `NotificationCenter` to send notifications. +Notifications are replaced with `Combine` publishers. + +`stateStream` publisher replaces notifications related to `SyncStatus` changes. +These notifications are replaced by `stateStream`: +- .synchronizerStarted +- .synchronizerProgressUpdated +- .synchronizerStatusWillUpdate +- .synchronizerSynced +- .synchronizerStopped +- .synchronizerDisconnected +- .synchronizerSyncing +- .synchronizerEnhancing +- .synchronizerFetching +- .synchronizerFailed + +`eventStream` publisher replaces notifications related to transactions and other stuff. +These notifications are replaced by `eventStream`: +- .synchronizerMinedTransaction +- .synchronizerFoundTransactions +- .synchronizerStoredUTXOs +- .synchronizerConnectionStateChanged + +`latestState` is also new property that can be used to get the latest SDK state in a synchronous way. +`SDKSynchronizer.status` is no longer public. To get `SyncStatus` either subscribe to `stateStream` +or use `latestState`. + +# Migrating from previous versions to 0.18.x Compact block cache no longer uses a sqlite database. The existing database should be deleted. `Initializer` now takes an `fsBlockDbRootURL` which is a URL pointing to a RW directory in the filesystem that will be used to store diff --git a/Sources/ZcashLightClientKit/Block/CompactBlockProcessor.swift b/Sources/ZcashLightClientKit/Block/CompactBlockProcessor.swift index 8fcdcada..3e107912 100644 --- a/Sources/ZcashLightClientKit/Block/CompactBlockProcessor.swift +++ b/Sources/ZcashLightClientKit/Block/CompactBlockProcessor.swift @@ -35,7 +35,7 @@ public enum CompactBlockProcessorError: Error { public enum CompactBlockProgress { case syncing(_ progress: BlockProgress) - case enhance(_ progress: EnhancementStreamProgress) + case enhance(_ progress: EnhancementProgress) case fetch public var progress: Float { @@ -78,22 +78,34 @@ public enum CompactBlockProgress { } } -public protocol EnhancementProgress { - var totalTransactions: Int { get } - var enhancedTransactions: Int { get } - var lastFoundTransaction: ZcashTransaction.Overview? { get } - var range: CompactBlockRange { get } -} - -public struct EnhancementStreamProgress: EnhancementProgress { +public struct EnhancementProgress: Equatable { public var totalTransactions: Int public var enhancedTransactions: Int public var lastFoundTransaction: ZcashTransaction.Overview? public var range: CompactBlockRange + + public init(totalTransactions: Int, enhancedTransactions: Int, lastFoundTransaction: ZcashTransaction.Overview?, range: CompactBlockRange) { + self.totalTransactions = totalTransactions + self.enhancedTransactions = enhancedTransactions + self.lastFoundTransaction = lastFoundTransaction + self.range = range + } public var progress: Float { totalTransactions > 0 ? Float(enhancedTransactions) / Float(totalTransactions) : 0 } + + public static var zero: EnhancementProgress { + EnhancementProgress(totalTransactions: 0, enhancedTransactions: 0, lastFoundTransaction: nil, range: 0...0) + } + + public static func == (lhs: EnhancementProgress, rhs: EnhancementProgress) -> Bool { + return + lhs.totalTransactions == rhs.totalTransactions && + lhs.enhancedTransactions == rhs.enhancedTransactions && + lhs.lastFoundTransaction?.id == rhs.lastFoundTransaction?.id && + lhs.range == rhs.range + } } /// The compact block processor is in charge of orchestrating the download and caching of compact blocks from a LightWalletEndpoint @@ -621,11 +633,11 @@ actor CompactBlockProcessor { let fileManager = FileManager.default if fileManager.fileExists(atPath: config.dataDb.path) { - try FileManager.default.removeItem(at: config.dataDb) + try fileManager.removeItem(at: config.dataDb) } if fileManager.fileExists(atPath: context.pendingDbURL.path) { - try FileManager.default.removeItem(at: context.pendingDbURL) + try fileManager.removeItem(at: context.pendingDbURL) } context.completion(nil) @@ -979,7 +991,7 @@ actor CompactBlockProcessor { self.consecutiveChainValidationErrors += 1 - guard rustBackend.rewindToHeight(dbData: config.dataDb, height: Int32(rewindHeight), networkType: self.config.network.networkType) else { + guard rustBackend.rewindToHeight(dbData: config.dataDb, height: Int32(rewindHeight), networkType: self.config.network.networkType) else { await fail(rustBackend.lastError() ?? RustWeldingError.genericError(message: "unknown error rewinding to height \(height)")) return } diff --git a/Sources/ZcashLightClientKit/Block/Enhance/BlockEnhancer.swift b/Sources/ZcashLightClientKit/Block/Enhance/BlockEnhancer.swift index f1419ebb..77796fff 100644 --- a/Sources/ZcashLightClientKit/Block/Enhance/BlockEnhancer.swift +++ b/Sources/ZcashLightClientKit/Block/Enhance/BlockEnhancer.swift @@ -20,7 +20,7 @@ struct BlockEnhancerConfig { } protocol BlockEnhancer { - func enhance(at range: CompactBlockRange, didEnhance: (EnhancementStreamProgress) async -> Void) async throws -> [ZcashTransaction.Overview] + func enhance(at range: CompactBlockRange, didEnhance: (EnhancementProgress) async -> Void) async throws -> [ZcashTransaction.Overview] } struct BlockEnhancerImpl { @@ -75,7 +75,7 @@ extension BlockEnhancerImpl: BlockEnhancer { case txIdNotFound(txId: Data) } - func enhance(at range: CompactBlockRange, didEnhance: (EnhancementStreamProgress) async -> Void) async throws -> [ZcashTransaction.Overview] { + func enhance(at range: CompactBlockRange, didEnhance: (EnhancementProgress) async -> Void) async throws -> [ZcashTransaction.Overview] { try Task.checkCancellation() LoggerProxy.debug("Started Enhancing range: \(range)") @@ -103,7 +103,7 @@ extension BlockEnhancerImpl: BlockEnhancer { do { let confirmedTx = try await enhance(transaction: transaction) retry = false - let progress = EnhancementStreamProgress( + let progress = EnhancementProgress( totalTransactions: transactions.count, enhancedTransactions: index + 1, lastFoundTransaction: confirmedTx, diff --git a/Sources/ZcashLightClientKit/Synchronizer.swift b/Sources/ZcashLightClientKit/Synchronizer.swift index 305136f1..fe35f89a 100644 --- a/Sources/ZcashLightClientKit/Synchronizer.swift +++ b/Sources/ZcashLightClientKit/Synchronizer.swift @@ -66,11 +66,48 @@ public enum ConnectionState { case shutdown } +public struct SynchronizerState: Equatable { + public var shieldedBalance: WalletBalance + public var transparentBalance: WalletBalance + public var syncStatus: SyncStatus + public var latestScannedHeight: BlockHeight + + public static var zero: SynchronizerState { + SynchronizerState( + shieldedBalance: .zero, + transparentBalance: .zero, + syncStatus: .unprepared, + latestScannedHeight: .zero + ) + } +} + +public enum SynchronizerEvent { + // Sent when the synchronizer finds a pendingTransaction that hast been newly mined. + case minedTransaction(PendingTransactionEntity) + // Sent when the synchronizer finds a mined transaction + case foundTransactions(_ transactions: [ZcashTransaction.Overview], _ inRange: CompactBlockRange) + // Sent when the synchronizer fetched utxos from lightwalletd attempted to store them. + case storedUTXOs(_ inserted: [UnspentTransactionOutputEntity], _ skipped: [UnspentTransactionOutputEntity]) + // Connection state to LightwalletEndpoint changed. + case connectionStateChanged +} + /// Primary interface for interacting with the SDK. Defines the contract that specific /// implementations like SdkSynchronizer fulfill. public protocol Synchronizer { - /// Value representing the Status of this Synchronizer. As the status changes, it will be also notified - var status: SyncStatus { get } + /// This stream is backed by `CurrentValueSubject`. This is primary source of information about what is the SDK doing. New values are emitted when + /// `SyncStatus` is changed inside the SDK. + /// + /// Synchronization progress is part of the `SyncStatus` so this stream emits lot of values. `throttle` can be used to control amout of values + /// delivered. Values are delivered on random background thread. + var stateStream: AnyPublisher { get } + + /// Latest state of the SDK which can be get in synchronous manner. + var latestState: SynchronizerState { get } + + /// This stream is backed by `PassthroughSubject`. Check `SynchronizerEvent` to see which events may be emitted. + var eventStream: AnyPublisher { get } /// reflects current connection state to LightwalletEndpoint var connectionState: ConnectionState { get } @@ -279,7 +316,7 @@ public enum SyncStatus: Equatable { /// When set, a UI element may want to turn red. case disconnected - case error(_ error: Error) + case error(_ error: SynchronizerError) public var isSyncing: Bool { switch self { @@ -317,57 +354,17 @@ public enum RewindPolicy { } extension SyncStatus { - // swiftlint:disable cyclomatic_complexity public static func == (lhs: SyncStatus, rhs: SyncStatus) -> Bool { - switch lhs { - case .unprepared: - if case .unprepared = rhs { - return true - } else { - return false - } - case .disconnected: - if case .disconnected = rhs { - return true - } else { - return false - } - case .syncing: - if case .syncing = rhs { - return true - } else { - return false - } - case .enhancing: - if case .enhancing = rhs { - return true - } else { - return false - } - case .fetching: - if case .fetching = rhs { - return true - } else { - return false - } - case .synced: - if case .synced = rhs { - return true - } else { - return false - } - case .stopped: - if case .stopped = rhs { - return true - } else { - return false - } - case .error: - if case .error = rhs { - return true - } else { - return false - } + switch (lhs, rhs) { + case (.unprepared, .unprepared): return true + case let (.syncing(lhsProgress), .syncing(rhsProgress)): return lhsProgress == rhsProgress + case let (.enhancing(lhsProgress), .enhancing(rhsProgress)): return lhsProgress == rhsProgress + case (.fetching, .fetching): return true + case (.synced, .synced): return true + case (.stopped, .stopped): return true + case (.disconnected, .disconnected): return true + case (.error, .error): return true + default: return false } } } diff --git a/Sources/ZcashLightClientKit/Synchronizer/SDKSynchronizer.swift b/Sources/ZcashLightClientKit/Synchronizer/SDKSynchronizer.swift index fe3611b4..8cc21914 100644 --- a/Sources/ZcashLightClientKit/Synchronizer/SDKSynchronizer.swift +++ b/Sources/ZcashLightClientKit/Synchronizer/SDKSynchronizer.swift @@ -9,126 +9,55 @@ import Foundation import Combine -public extension Notification.Name { - /// Posted when the synchronizer is started. - /// - note: Query userInfo object for `NotificationKeys.synchronizerState` - static let synchronizerStarted = Notification.Name("SDKSyncronizerStarted") - - /// Posted when there are progress updates. - /// - /// - Note: Query userInfo object for NotificationKeys.progress for Float - /// progress percentage and NotificationKeys.blockHeight /// for the current progress height - static let synchronizerProgressUpdated = Notification.Name("SDKSyncronizerProgressUpdated") - - static let synchronizerStatusWillUpdate = Notification.Name("SDKSynchronizerStatusWillUpdate") - - /// Posted when the synchronizer is synced to latest height - static let synchronizerSynced = Notification.Name("SDKSyncronizerSynced") - - /// Posted when the synchronizer is stopped - static let synchronizerStopped = Notification.Name("SDKSyncronizerStopped") - - /// Posted when the synchronizer loses connection - static let synchronizerDisconnected = Notification.Name("SDKSyncronizerDisconnected") - - /// Posted when the synchronizer starts syncing - static let synchronizerSyncing = Notification.Name("SDKSynchronizerSyncing") - - /// Posted when the synchronizer starts Enhancing - static let synchronizerEnhancing = Notification.Name("SDKSyncronizerEnhancing") - - /// Posted when the synchronizer starts fetching UTXOs - static let synchronizerFetching = Notification.Name("SDKSyncronizerFetching") - - /// Posted when the synchronizer finds a pendingTransaction that hast been newly mined - /// - Note: query userInfo on NotificationKeys.minedTransaction for the transaction - static let synchronizerMinedTransaction = Notification.Name("synchronizerMinedTransaction") - - /// Posted when the synchronizer finds a mined transaction - /// - Note: query userInfo on NotificationKeys.foundTransactions for - /// the `[ConfirmedTransactionEntity]`. This notification could arrive in a background thread. - static let synchronizerFoundTransactions = Notification.Name("synchronizerFoundTransactions") - - /// Notification sent when the synchronizer fetched utxos from lightwalletd attempted to store them - /// Query the user info object for CompactBlockProcessorNotificationKey.blockProcessorStoredUTXOs which will contain a RefreshedUTXOs tuple with - /// the collection of UTXOs stored or skipped - static let synchronizerStoredUTXOs = Notification.Name(rawValue: "synchronizerStoredUTXOs") - - /// Posted when the synchronizer presents an error - /// - Note: query userInfo on NotificationKeys.error for an error - static let synchronizerFailed = Notification.Name("SDKSynchronizerFailed") - +extension Notification.Name { static let synchronizerConnectionStateChanged = Notification.Name("SynchronizerConnectionStateChanged") } /// Synchronizer implementation for UIKit and iOS 13+ // swiftlint:disable type_body_length public class SDKSynchronizer: Synchronizer { - public struct SynchronizerState: Equatable { - public var shieldedBalance: WalletBalance - public var transparentBalance: WalletBalance - public var syncStatus: SyncStatus - public var latestScannedHeight: BlockHeight - - public static var zero: SynchronizerState { - SynchronizerState( - shieldedBalance: .zero, - transparentBalance: .zero, - syncStatus: .unprepared, - latestScannedHeight: .zero - ) - } - } - public enum NotificationKeys { - public static let progress = "SDKSynchronizer.progress" - public static let blockHeight = "SDKSynchronizer.blockHeight" - public static let blockDate = "SDKSynchronizer.blockDate" - public static let minedTransaction = "SDKSynchronizer.minedTransaction" - public static let foundTransactions = "SDKSynchronizer.foundTransactions" - public static let error = "SDKSynchronizer.error" - public static let currentStatus = "SDKSynchronizer.currentStatus" - public static let nextStatus = "SDKSynchronizer.nextStatus" public static let currentConnectionState = "SDKSynchronizer.currentConnectionState" public static let previousConnectionState = "SDKSynchronizer.previousConnectionState" - public static let synchronizerState = "SDKSynchronizer.synchronizerState" - public static let refreshedUTXOs = "SDKSynchronizer.refreshedUTXOs" } + private let streamsUpdateQueue = DispatchQueue(label: "streamsUpdateQueue") + private let stateSubject = CurrentValueSubject(.zero) + public var stateStream: AnyPublisher { stateSubject.eraseToAnyPublisher() } + public private(set) var latestState: SynchronizerState = .zero + + private let eventSubject = PassthroughSubject() + public var eventStream: AnyPublisher { eventSubject.eraseToAnyPublisher() } + + private let statusUpdateLock = NSRecursiveLock() private var underlyingStatus: SyncStatus - public private(set) var status: SyncStatus { + var status: SyncStatus { get { statusUpdateLock.lock() defer { statusUpdateLock.unlock() } return underlyingStatus } set { - notifyStatusChange(newValue: newValue, oldValue: underlyingStatus) statusUpdateLock.lock() + let oldValue = underlyingStatus underlyingStatus = newValue + notify(oldStatus: oldValue, newStatus: newValue) statusUpdateLock.unlock() - notify(status: status) } } let blockProcessor: CompactBlockProcessor let blockProcessorEventProcessingQueue = DispatchQueue(label: "blockProcessorEventProcessingQueue") - public private(set) var progress: Float = 0.0 public private(set) var initializer: Initializer // Valid value is stored here after `prepare` is called. public private(set) var latestScannedHeight: BlockHeight = .zero public private(set) var connectionState: ConnectionState public private(set) var network: ZcashNetwork - public var lastState: AnyPublisher { lastStateSubject.eraseToAnyPublisher() } - - private var lastStateSubject: CurrentValueSubject private var transactionManager: OutboundTransactionManager private var transactionRepository: TransactionRepository private var utxoRepository: UnspentTransactionOutputRepository - private let statusUpdateLock = NSRecursiveLock() - private var syncStartDate: Date? private var longLivingCancelables: [AnyCancellable] = [] @@ -165,7 +94,6 @@ public class SDKSynchronizer: Synchronizer { self.utxoRepository = utxoRepository self.blockProcessor = blockProcessor self.network = initializer.network - self.lastStateSubject = CurrentValueSubject(.zero) subscribeToProcessorNotifications(blockProcessor) @@ -192,10 +120,10 @@ public class SDKSynchronizer: Synchronizer { return .seedRequired } - self.status = .disconnected - latestScannedHeight = (try? transactionRepository.lastScannedHeight()) ?? initializer.walletBirthday + self.status = .disconnected + return .success } @@ -216,17 +144,7 @@ public class SDKSynchronizer: Synchronizer { case .stopped, .synced, .disconnected, .error: Task { - let state = await snapshotState() - lastStateSubject.send(state) - - NotificationSender.default.post( - name: .synchronizerStarted, - object: self, - userInfo: [ - NotificationKeys.synchronizerState: state - ] - ) - + status = .syncing(.nullProgress) syncStartDate = Date() await blockProcessor.start(retry: retry) } @@ -271,6 +189,9 @@ public class SDKSynchronizer: Synchronizer { } connectionState = current + streamsUpdateQueue.async { [weak self] in + self?.eventSubject.send(.connectionStateChanged) + } } // MARK: Handle CompactBlockProcessor.Flow @@ -301,24 +222,23 @@ public class SDKSynchronizer: Synchronizer { self?.storedUTXOs(utxos: utxos) case .startedEnhancing: - self?.startedEnhancing() + self?.status = .enhancing(.zero) case .startedFetching: - self?.startedFetching() + self?.status = .fetching case .startedSyncing: - self?.startedSyncing() + self?.status = .syncing(.nullProgress) case .stopped: - self?.stopped() + self?.status = .stopped } } .store(in: &longLivingCancelables) } private func failed(error: CompactBlockProcessorError) { - self.notifyFailure(error) - self.status = .error(self.mapError(error)) + status = .error(self.mapError(error)) } private func finished(lastScannedHeight: BlockHeight, foundBlocks: Bool) { @@ -336,13 +256,9 @@ public class SDKSynchronizer: Synchronizer { } private func foundTransactions(transactions: [ZcashTransaction.Overview], in range: CompactBlockRange) { - NotificationSender.default.post( - name: .synchronizerFoundTransactions, - object: self, - userInfo: [ - NotificationKeys.foundTransactions: transactions - ] - ) + streamsUpdateQueue.async { [weak self] in + self?.eventSubject.send(.foundTransactions(transactions, range)) + } } private func handledReorg(reorgHeight: BlockHeight, rewindHeight: BlockHeight) { @@ -352,52 +268,24 @@ public class SDKSynchronizer: Synchronizer { try transactionManager.handleReorg(at: rewindHeight) } catch { LoggerProxy.debug("error handling reorg: \(error)") - notifyFailure(error) } } private func progressUpdated(progress: CompactBlockProgress) { - self.notify(progress: progress) + switch progress { + case let .syncing(progress): + status = .syncing(progress) + case let .enhance(progress): + status = .enhancing(progress) + case .fetch: + status = .fetching + } } private func storedUTXOs(utxos: (inserted: [UnspentTransactionOutputEntity], skipped: [UnspentTransactionOutputEntity])) { - NotificationSender.default.post( - name: .synchronizerStoredUTXOs, - object: self, - userInfo: [NotificationKeys.refreshedUTXOs: utxos] - ) - } - - private func startedEnhancing() { - statusUpdateLock.lock() - defer { statusUpdateLock.unlock() } - - guard status != .enhancing(NullEnhancementProgress()) else { return } - status = .enhancing(NullEnhancementProgress()) - } - - private func startedFetching() { - statusUpdateLock.lock() - defer { statusUpdateLock.unlock() } - - guard status != .fetching else { return } - status = .fetching - } - - private func startedSyncing() { - statusUpdateLock.lock() - defer { statusUpdateLock.unlock() } - - guard status != .syncing(.nullProgress) else { return } - status = .syncing(.nullProgress) - } - - private func stopped() { - statusUpdateLock.lock() - defer { statusUpdateLock.unlock() } - - guard status != .stopped else { return } - status = .stopped + streamsUpdateQueue.async { [weak self] in + self?.eventSubject.send(.storedUTXOs(utxos.inserted, utxos.skipped)) + } } // MARK: Synchronizer methods @@ -689,28 +577,8 @@ public class SDKSynchronizer: Synchronizer { } // MARK: notify state - private func notify(progress: CompactBlockProgress) { - var userInfo: [AnyHashable: Any] = .init() - userInfo[NotificationKeys.progress] = progress - userInfo[NotificationKeys.blockHeight] = progress.progressHeight - self.status = SyncStatus(progress) - NotificationSender.default.post(name: Notification.Name.synchronizerProgressUpdated, object: self, userInfo: userInfo) - } - - private func notifyStatusChange(newValue: SyncStatus, oldValue: SyncStatus) { - NotificationSender.default.post( - name: .synchronizerStatusWillUpdate, - object: self, - userInfo: - [ - NotificationKeys.currentStatus: oldValue, - NotificationKeys.nextStatus: newValue - ] - ) - } - - private func snapshotState() async -> SDKSynchronizer.SynchronizerState { + private func snapshotState(status: SyncStatus) async -> SynchronizerState { SynchronizerState( shieldedBalance: WalletBalance( verified: initializer.getVerifiedBalance(), @@ -722,38 +590,55 @@ public class SDKSynchronizer: Synchronizer { ) } - private func notify(status: SyncStatus) { - switch status { - case .disconnected: - NotificationSender.default.post(name: Notification.Name.synchronizerDisconnected, object: self) - case .stopped: - NotificationSender.default.post(name: Notification.Name.synchronizerStopped, object: self) - case .synced: - Task { - let state = await self.snapshotState() - self.lastStateSubject.send(state) + private func notify(oldStatus: SyncStatus, newStatus: SyncStatus) { + guard oldStatus != newStatus else { return } - NotificationSender.default.post( - name: Notification.Name.synchronizerSynced, - object: self, - userInfo: [ - SDKSynchronizer.NotificationKeys.blockHeight: self.latestScannedHeight, - SDKSynchronizer.NotificationKeys.synchronizerState: state - ] + // When the wipe happens status is switched to `unprepared`. And we expect that everything is deleted. All the databases including data DB. + // When new snapshot is created balance is checked. And when balance is checked and data DB doesn't exist then rust initialise new database. + // So it's necessary to not create new snapshot after status is switched to `unprepared` otherwise data DB exists after wipe + if newStatus == .unprepared { + latestState = SynchronizerState.zero + updateStateStream(with: latestState) + } else { + let didStatusChange = areTwoStatusesDifferent(firstStatus: oldStatus, secondStatus: newStatus) + + if didStatusChange { + Task { + latestState = await snapshotState(status: newStatus) + updateStateStream(with: latestState) + } + } else { + latestState = SynchronizerState( + shieldedBalance: latestState.shieldedBalance, + transparentBalance: latestState.transparentBalance, + syncStatus: newStatus, + latestScannedHeight: latestState.latestScannedHeight ) + updateStateStream(with: latestState) } - case .unprepared: - break - case .syncing: - NotificationSender.default.post(name: Notification.Name.synchronizerSyncing, object: self) - case .enhancing: - NotificationSender.default.post(name: Notification.Name.synchronizerEnhancing, object: self) - case .fetching: - NotificationSender.default.post(name: Notification.Name.synchronizerFetching, object: self) - case .error(let error): - self.notifyFailure(error) } } + + private func areTwoStatusesDifferent(firstStatus: SyncStatus, secondStatus: SyncStatus) -> Bool { + switch (firstStatus, secondStatus) { + case (.unprepared, .unprepared): return false + case (.syncing, .syncing): return false + case (.enhancing, .enhancing): return false + case (.fetching, .fetching): return false + case (.synced, .synced): return false + case (.stopped, .stopped): return false + case (.disconnected, .disconnected): return false + case (.error, .error): return false + default: return true + } + } + + private func updateStateStream(with newState: SynchronizerState) { + streamsUpdateQueue.async { [weak self] in + self?.stateSubject.send(newState) + } + } + // MARK: book keeping private func updateMinedTransactions() throws { @@ -788,19 +673,13 @@ public class SDKSynchronizer: Synchronizer { } private func notifyMinedTransaction(_ transaction: PendingTransactionEntity) { - DispatchQueue.main.async { [weak self] in - guard let self else { return } - - NotificationSender.default.post( - name: Notification.Name.synchronizerMinedTransaction, - object: self, - userInfo: [NotificationKeys.minedTransaction: transaction] - ) + streamsUpdateQueue.async { [weak self] in + self?.eventSubject.send(.minedTransaction(transaction)) } } // swiftlint:disable cyclomatic_complexity - private func mapError(_ error: Error) -> Error { + private func mapError(_ error: Error) -> SynchronizerError { if let compactBlockProcessorError = error as? CompactBlockProcessorError { switch compactBlockProcessorError { case .dataDbInitFailed(let path): @@ -838,14 +717,6 @@ public class SDKSynchronizer: Synchronizer { return SynchronizerError.uncategorized(underlyingError: error) } - - private func notifyFailure(_ error: Error) { - NotificationSender.default.post( - name: Notification.Name.synchronizerFailed, - object: self, - userInfo: [NotificationKeys.error: self.mapError(error)] - ) - } } extension SDKSynchronizer { @@ -879,10 +750,3 @@ extension SDKSynchronizer { self.getUnifiedAddress(accountIndex: accountIndex)?.transparentReceiver() } } - -private struct NullEnhancementProgress: EnhancementProgress { - var totalTransactions: Int { 0 } - var enhancedTransactions: Int { 0 } - var lastFoundTransaction: ZcashTransaction.Overview? { nil } - var range: CompactBlockRange { 0 ... 0 } -} diff --git a/Tests/DarksideTests/BalanceTests.swift b/Tests/DarksideTests/BalanceTests.swift index 6987124e..abf05d73 100644 --- a/Tests/DarksideTests/BalanceTests.swift +++ b/Tests/DarksideTests/BalanceTests.swift @@ -5,6 +5,7 @@ // Created by Francisco Gindre on 4/28/20. // +import Combine import XCTest @testable import TestUtils @testable import ZcashLightClientKit @@ -20,6 +21,7 @@ class BalanceTests: XCTestCase { var sentTransactionExpectation = XCTestExpectation(description: "sent") var syncedExpectation = XCTestExpectation(description: "synced") var coordinator: TestCoordinator! + var cancellables: [AnyCancellable] = [] override func setUpWithError() throws { try super.setUpWithError() @@ -39,6 +41,7 @@ class BalanceTests: XCTestCase { try? FileManager.default.removeItem(at: coordinator.databases.dataDB) try? FileManager.default.removeItem(at: coordinator.databases.pendingDB) coordinator = nil + cancellables = [] } /** @@ -1188,34 +1191,39 @@ class BalanceTests: XCTestCase { class SDKSynchonizerListener { var transactionsFound: (([ZcashTransaction.Overview]) -> Void)? var synchronizerMinedTransaction: ((PendingTransactionEntity) -> Void)? + var cancellables: [AnyCancellable] = [] func subscribeToSynchronizer(_ synchronizer: SDKSynchronizer) { - NotificationCenter.default.addObserver(self, selector: #selector(txFound(_:)), name: .synchronizerFoundTransactions, object: synchronizer) - NotificationCenter.default.addObserver(self, selector: #selector(txMined(_:)), name: .synchronizerMinedTransaction, object: synchronizer) + synchronizer.eventStream + .sink( + receiveValue: { [weak self] event in + switch event { + case let .minedTransaction(transaction): + self?.txMined(transaction) + + case let .foundTransactions(transactions, _): + self?.txFound(transactions) + + case .storedUTXOs, .connectionStateChanged: + break + } + } + ) + .store(in: &cancellables) } func unsubscribe() { NotificationCenter.default.removeObserver(self) } - @objc func txFound(_ notification: Notification) { + func txFound(_ txs: [ZcashTransaction.Overview]) { DispatchQueue.main.async { [weak self] in - guard let txs = notification.userInfo?[SDKSynchronizer.NotificationKeys.foundTransactions] as? [ZcashTransaction.Overview] else { - XCTFail("expected [ConfirmedTransactionEntity] array") - return - } - self?.transactionsFound?(txs) } } - @objc func txMined(_ notification: Notification) { + func txMined(_ transaction: PendingTransactionEntity) { DispatchQueue.main.async { [weak self] in - guard let transaction = notification.userInfo?[SDKSynchronizer.NotificationKeys.minedTransaction] as? PendingTransactionEntity else { - XCTFail("expected transaction") - return - } - self?.synchronizerMinedTransaction?(transaction) } } diff --git a/Tests/DarksideTests/InternalStateConsistencyTests.swift b/Tests/DarksideTests/InternalStateConsistencyTests.swift index ba458e6c..39cfb806 100644 --- a/Tests/DarksideTests/InternalStateConsistencyTests.swift +++ b/Tests/DarksideTests/InternalStateConsistencyTests.swift @@ -4,6 +4,8 @@ // // Created by Francisco Gindre on 1/26/23. // + +import Combine import XCTest @testable import TestUtils @testable import ZcashLightClientKit @@ -20,6 +22,7 @@ final class InternalStateConsistencyTests: XCTestCase { let branchID = "2bb40e60" let chainName = "main" let network = DarksideWalletDNetwork() + var sdkSynchronizerSyncStatusHandler: SDKSynchronizerSyncStatusHandler! = SDKSynchronizerSyncStatusHandler() override func setUpWithError() throws { try super.setUpWithError() @@ -38,10 +41,14 @@ final class InternalStateConsistencyTests: XCTestCase { try? FileManager.default.removeItem(at: coordinator.databases.dataDB) try? FileManager.default.removeItem(at: coordinator.databases.pendingDB) coordinator = nil + sdkSynchronizerSyncStatusHandler = nil } @MainActor func testInternalStateIsConsistentWhenMigrating() async throws { - NotificationCenter.default.addObserver(self, selector: #selector(self.synchronizerStopped(_:)), name: .synchronizerStopped, object: nil) + sdkSynchronizerSyncStatusHandler.subscribe( + to: coordinator.synchronizer.stateStream, + expectations: [.stopped: firstSyncExpectation] + ) let fullSyncLength = 1000 try FakeChainBuilder.buildChain(darksideWallet: coordinator.service, branchID: branchID, chainName: chainName, length: fullSyncLength) @@ -126,10 +133,6 @@ final class InternalStateConsistencyTests: XCTestCase { wait(for: [secondSyncAttemptExpectation], timeout: 10) } - @objc func synchronizerStopped(_ notification: Notification) { - self.firstSyncExpectation.fulfill() - } - func handleError(_ error: Error?) { guard let testError = error else { XCTFail("failed with nil error") diff --git a/Tests/DarksideTests/SychronizerDarksideTests.swift b/Tests/DarksideTests/SychronizerDarksideTests.swift index a177119b..11297052 100644 --- a/Tests/DarksideTests/SychronizerDarksideTests.swift +++ b/Tests/DarksideTests/SychronizerDarksideTests.swift @@ -50,13 +50,15 @@ class SychronizerDarksideTests: XCTestCase { } func testFoundTransactions() throws { - NotificationCenter.default.addObserver( - self, - selector: #selector(handleFoundTransactions(_:)), - name: Notification.Name.synchronizerFoundTransactions, - object: nil - ) - + coordinator.synchronizer.eventStream + .map { event in + guard case let .foundTransactions(transactions, _) = event else { return nil } + return transactions + } + .compactMap { $0 } + .sink(receiveValue: { [weak self] transactions in self?.handleFoundTransactions(transactions: transactions) }) + .store(in: &cancellables) + try FakeChainBuilder.buildChain(darksideWallet: self.coordinator.service, branchID: branchID, chainName: chainName) let receivedTxHeight: BlockHeight = 663188 @@ -75,12 +77,14 @@ class SychronizerDarksideTests: XCTestCase { } func testFoundManyTransactions() throws { - NotificationCenter.default.addObserver( - self, - selector: #selector(handleFoundTransactions(_:)), - name: Notification.Name.synchronizerFoundTransactions, - object: nil - ) + coordinator.synchronizer.eventStream + .map { event in + guard case let .foundTransactions(transactions, _) = event else { return nil } + return transactions + } + .compactMap { $0 } + .sink(receiveValue: { [weak self] transactions in self?.handleFoundTransactions(transactions: transactions) }) + .store(in: &cancellables) try FakeChainBuilder.buildChain(darksideWallet: self.coordinator.service, branchID: branchID, chainName: chainName, length: 1000) let receivedTxHeight: BlockHeight = 663229 @@ -128,9 +132,9 @@ class SychronizerDarksideTests: XCTestCase { } func testLastStates() throws { - var disposeBag: [AnyCancellable] = [] + var cancellables: [AnyCancellable] = [] - var states: [SDKSynchronizer.SynchronizerState] = [] + var states: [SynchronizerState] = [] try FakeChainBuilder.buildChain(darksideWallet: self.coordinator.service, branchID: branchID, chainName: chainName) let receivedTxHeight: BlockHeight = 663188 @@ -140,12 +144,11 @@ class SychronizerDarksideTests: XCTestCase { sleep(2) let preTxExpectation = XCTestExpectation(description: "pre receive") - coordinator.synchronizer.lastState - .receive(on: DispatchQueue.main) + coordinator.synchronizer.stateStream .sink { state in states.append(state) } - .store(in: &disposeBag) + .store(in: &cancellables) try coordinator.sync(completion: { _ in preTxExpectation.fulfill() @@ -153,26 +156,102 @@ class SychronizerDarksideTests: XCTestCase { wait(for: [preTxExpectation], timeout: 5) - XCTAssertEqual(states, [ - SDKSynchronizer.SynchronizerState( + let expectedStates: [SynchronizerState] = [ + SynchronizerState( shieldedBalance: .zero, transparentBalance: .zero, - syncStatus: .unprepared, - latestScannedHeight: .zero - ), - SDKSynchronizer.SynchronizerState( - shieldedBalance: WalletBalance(verified: Zatoshi(0), total: Zatoshi(0)), - transparentBalance: WalletBalance(verified: Zatoshi(0), total: Zatoshi(0)), - syncStatus: SyncStatus.disconnected, + syncStatus: .disconnected, latestScannedHeight: 663150 ), - SDKSynchronizer.SynchronizerState( + SynchronizerState( + shieldedBalance: .zero, + transparentBalance: .zero, + syncStatus: .syncing(BlockProgress(startHeight: 0, targetHeight: 0, progressHeight: 0)), + latestScannedHeight: 663150 + ), + SynchronizerState( + shieldedBalance: .zero, + transparentBalance: .zero, + syncStatus: .syncing(BlockProgress(startHeight: 663150, targetHeight: 663189, progressHeight: 663189)), + latestScannedHeight: 663150 + ), + SynchronizerState( shieldedBalance: WalletBalance(verified: Zatoshi(100000), total: Zatoshi(200000)), transparentBalance: WalletBalance(verified: Zatoshi(0), total: Zatoshi(0)), - syncStatus: SyncStatus.synced, + syncStatus: .enhancing(EnhancementProgress(totalTransactions: 0, enhancedTransactions: 0, lastFoundTransaction: nil, range: 0...0)), + latestScannedHeight: 663150 + ), + SynchronizerState( + shieldedBalance: WalletBalance(verified: Zatoshi(100000), total: Zatoshi(200000)), + transparentBalance: WalletBalance(verified: Zatoshi(0), total: Zatoshi(0)), + syncStatus: .enhancing( + EnhancementProgress( + totalTransactions: 2, + enhancedTransactions: 1, + lastFoundTransaction: ZcashTransaction.Overview( + blockTime: 1.0, + expiryHeight: 663206, + fee: Zatoshi(0), + id: 2, + index: 1, + isWalletInternal: true, + hasChange: false, + memoCount: 1, + minedHeight: 663188, + raw: Data(), + rawID: Data(), + receivedNoteCount: 1, + sentNoteCount: 0, + value: Zatoshi(100000) + ), + range: 663150...663189 + ) + ), + latestScannedHeight: 663150 + ), + SynchronizerState( + shieldedBalance: WalletBalance(verified: Zatoshi(100000), total: Zatoshi(200000)), + transparentBalance: WalletBalance(verified: Zatoshi(0), total: Zatoshi(0)), + syncStatus: .enhancing( + EnhancementProgress( + totalTransactions: 2, + enhancedTransactions: 2, + lastFoundTransaction: ZcashTransaction.Overview( + blockTime: 1.0, + expiryHeight: 663192, + fee: Zatoshi(0), + id: 1, + index: 1, + isWalletInternal: true, + hasChange: false, + memoCount: 1, + minedHeight: 663174, + raw: Data(), + rawID: Data(), + receivedNoteCount: 1, + sentNoteCount: 0, + value: Zatoshi(100000) + ), + range: 663150...663189 + ) + ), + latestScannedHeight: 663150 + ), + SynchronizerState( + shieldedBalance: WalletBalance(verified: Zatoshi(100000), total: Zatoshi(200000)), + transparentBalance: WalletBalance(verified: Zatoshi(0), total: Zatoshi(0)), + syncStatus: .fetching, + latestScannedHeight: 663150 + ), + SynchronizerState( + shieldedBalance: WalletBalance(verified: Zatoshi(100000), total: Zatoshi(200000)), + transparentBalance: WalletBalance(verified: Zatoshi(0), total: Zatoshi(0)), + syncStatus: .synced, latestScannedHeight: 663189 ) - ]) + ] + + XCTAssertEqual(states, expectedStates) } @MainActor func testSyncAfterWipeWorks() async throws { @@ -229,13 +308,7 @@ class SychronizerDarksideTests: XCTestCase { wait(for: [secondSyncExpectation], timeout: 10) } - @objc func handleFoundTransactions(_ notification: Notification) { - guard - let userInfo = notification.userInfo, - let transactions = userInfo[SDKSynchronizer.NotificationKeys.foundTransactions] as? [ZcashTransaction.Overview] - else { - return - } + func handleFoundTransactions(transactions: [ZcashTransaction.Overview]) { self.foundTransactions.append(contentsOf: transactions) } diff --git a/Tests/DarksideTests/SynchronizerTests.swift b/Tests/DarksideTests/SynchronizerTests.swift index 7d459293..2d7fe4bb 100644 --- a/Tests/DarksideTests/SynchronizerTests.swift +++ b/Tests/DarksideTests/SynchronizerTests.swift @@ -22,6 +22,7 @@ final class SynchronizerTests: XCTestCase { let chainName = "main" let network = DarksideWalletDNetwork() var cancellables: [AnyCancellable] = [] + var sdkSynchronizerSyncStatusHandler: SDKSynchronizerSyncStatusHandler! = SDKSynchronizerSyncStatusHandler() override func setUpWithError() throws { try super.setUpWithError() @@ -52,6 +53,7 @@ final class SynchronizerTests: XCTestCase { try? FileManager.default.removeItem(at: coordinator.databases.dataDB) try? FileManager.default.removeItem(at: coordinator.databases.pendingDB) coordinator = nil + sdkSynchronizerSyncStatusHandler = nil cancellables = [] } @@ -77,7 +79,10 @@ final class SynchronizerTests: XCTestCase { sleep(10) let syncStoppedExpectation = XCTestExpectation(description: "SynchronizerStopped Expectation") - syncStoppedExpectation.subscribe(to: .synchronizerStopped, object: nil) + sdkSynchronizerSyncStatusHandler.subscribe( + to: coordinator.synchronizer.stateStream, + expectations: [.stopped: syncStoppedExpectation] + ) /* sync to latest height @@ -227,10 +232,11 @@ final class SynchronizerTests: XCTestCase { private func checkThatWipeWorked() async { let storage = await self.coordinator.synchronizer.blockProcessor.storage as! FSCompactBlockRepository let fm = FileManager.default - XCTAssertFalse(fm.fileExists(atPath: coordinator.synchronizer.initializer.dataDbURL.path)) - XCTAssertFalse(fm.fileExists(atPath: coordinator.synchronizer.initializer.pendingDbURL.path)) - XCTAssertTrue(fm.fileExists(atPath: storage.blocksDirectory.path)) - XCTAssertEqual(try fm.contentsOfDirectory(atPath: storage.blocksDirectory.path), []) + print(coordinator.synchronizer.initializer.dataDbURL.path) + XCTAssertFalse(fm.fileExists(atPath: coordinator.synchronizer.initializer.pendingDbURL.path), "Pending DB should be deleted") + XCTAssertFalse(fm.fileExists(atPath: coordinator.synchronizer.initializer.dataDbURL.path), "Data DB should be deleted.") + XCTAssertTrue(fm.fileExists(atPath: storage.blocksDirectory.path), "FS Cache directory should exist") + XCTAssertEqual(try fm.contentsOfDirectory(atPath: storage.blocksDirectory.path), [], "FS Cache directory should be empty") let internalSyncProgress = InternalSyncProgress(storage: UserDefaults.standard) @@ -238,14 +244,14 @@ final class SynchronizerTests: XCTestCase { let latestEnhancedHeight = await internalSyncProgress.load(.latestEnhancedHeight) let latestUTXOFetchedHeight = await internalSyncProgress.load(.latestUTXOFetchedHeight) - XCTAssertEqual(latestDownloadedBlockHeight, 0) - XCTAssertEqual(latestEnhancedHeight, 0) - XCTAssertEqual(latestUTXOFetchedHeight, 0) + XCTAssertEqual(latestDownloadedBlockHeight, 0, "internalSyncProgress latestDownloadedBlockHeight should be 0") + XCTAssertEqual(latestEnhancedHeight, 0, "internalSyncProgress latestEnhancedHeight should be 0") + XCTAssertEqual(latestUTXOFetchedHeight, 0, "internalSyncProgress latestUTXOFetchedHeight should be 0") let blockProcessorState = await coordinator.synchronizer.blockProcessor.state - XCTAssertEqual(blockProcessorState, .stopped) + XCTAssertEqual(blockProcessorState, .stopped, "CompactBlockProcessor state should be stopped") - XCTAssertEqual(coordinator.synchronizer.status, .unprepared) + XCTAssertEqual(coordinator.synchronizer.status, .unprepared, "SDKSynchronizer state should be unprepared") } func handleError(_ error: Error?) { diff --git a/Tests/DarksideTests/Z2TReceiveTests.swift b/Tests/DarksideTests/Z2TReceiveTests.swift index c36cf284..097983dc 100644 --- a/Tests/DarksideTests/Z2TReceiveTests.swift +++ b/Tests/DarksideTests/Z2TReceiveTests.swift @@ -5,6 +5,7 @@ // Created by Francisco Gindre on 8/4/21. // +import Combine import XCTest @testable import TestUtils @testable import ZcashLightClientKit @@ -20,6 +21,7 @@ class Z2TReceiveTests: XCTestCase { var foundTransactionsExpectation = XCTestExpectation(description: "found transactions") let branchID = "2bb40e60" let chainName = "main" + var cancellables: [AnyCancellable] = [] let network = DarksideWalletDNetwork() @@ -41,25 +43,19 @@ class Z2TReceiveTests: XCTestCase { try? FileManager.default.removeItem(at: coordinator.databases.dataDB) try? FileManager.default.removeItem(at: coordinator.databases.pendingDB) coordinator = nil + cancellables = [] } func subscribeToFoundTransactions() { - NotificationCenter.default.addObserver( - self, - selector: #selector(foundTransactions(_:)), - name: .synchronizerFoundTransactions, - object: nil - ) + coordinator.synchronizer.eventStream + .filter { event in + guard case .foundTransactions = event else { return false } + return true + } + .sink(receiveValue: { [weak self] _ in self?.self.foundTransactionsExpectation.fulfill() }) + .store(in: &cancellables) } - - @objc func foundTransactions(_ notification: Notification) { - guard notification.userInfo?[SDKSynchronizer.NotificationKeys.foundTransactions] != nil else { - XCTFail("found transactions notification is empty") - return - } - self.foundTransactionsExpectation.fulfill() - } - + func testSendingZ2TWithMemoFails() async throws { subscribeToFoundTransactions() try FakeChainBuilder.buildChain(darksideWallet: self.coordinator.service, branchID: branchID, chainName: chainName) diff --git a/Tests/PerformanceTests/SynchronizerTests.swift b/Tests/PerformanceTests/SynchronizerTests.swift index ca8dbb7a..a72bf066 100644 --- a/Tests/PerformanceTests/SynchronizerTests.swift +++ b/Tests/PerformanceTests/SynchronizerTests.swift @@ -5,6 +5,7 @@ // Created by Lukáš Korba on 13.12.2022. // +import Combine import XCTest @testable import ZcashLightClientKit @testable import TestUtils @@ -23,6 +24,8 @@ class SynchronizerTests: XCTestCase { } var coordinator: TestCoordinator! + var cancellables: [AnyCancellable] = [] + var sdkSynchronizerSyncStatusHandler: SDKSynchronizerSyncStatusHandler! = SDKSynchronizerSyncStatusHandler() let seedPhrase = """ wish puppy smile loan doll curve hole maze file ginger hair nose key relax knife witness cannon grab despair throw review deal slush frame @@ -30,6 +33,13 @@ class SynchronizerTests: XCTestCase { var birthday: BlockHeight = 1_730_000 + override func tearDown() { + super.tearDown() + coordinator = nil + cancellables = [] + sdkSynchronizerSyncStatusHandler = nil + } + @MainActor func testHundredBlocksSync() async throws { let derivationTool = DerivationTool(networkType: .mainnet) @@ -71,7 +81,7 @@ class SynchronizerTests: XCTestCase { _ = try synchronizer.prepare(with: seedBytes, viewingKeys: [ufvk], walletBirthday: birthday) let syncSyncedExpectation = XCTestExpectation(description: "synchronizerSynced Expectation") - syncSyncedExpectation.subscribe(to: .synchronizerSynced, object: nil) + sdkSynchronizerSyncStatusHandler.subscribe(to: synchronizer.stateStream, expectations: [.synced: syncSyncedExpectation]) let internalSyncProgress = InternalSyncProgress(storage: UserDefaults.standard) await internalSyncProgress.rewind(to: birthday) diff --git a/Tests/TestUtils/SDKSynchronizerSyncStatusHandler.swift b/Tests/TestUtils/SDKSynchronizerSyncStatusHandler.swift new file mode 100644 index 00000000..1288846e --- /dev/null +++ b/Tests/TestUtils/SDKSynchronizerSyncStatusHandler.swift @@ -0,0 +1,50 @@ +// +// SDKSynchronizerStateHandler.swift +// +// +// Created by Michal Fousek on 15.03.2023. +// + +import Combine +import Foundation +import XCTest +@testable import ZcashLightClientKit + +class SDKSynchronizerSyncStatusHandler { + enum StatusIdentifier: String { + case unprepared + case syncing + case enhancing + case fetching + case synced + case stopped + case disconnected + case error + } + + private let queue = DispatchQueue(label: "SDKSynchronizerSyncStatusHandler") + private var cancellables: [AnyCancellable] = [] + + func subscribe(to stateStream: AnyPublisher, expectations: [StatusIdentifier: XCTestExpectation]) { + stateStream + .receive(on: queue) + .map { $0.syncStatus } + .sink { status in expectations[status.identifier]?.fulfill() } + .store(in: &cancellables) + } +} + +extension SyncStatus { + var identifier: SDKSynchronizerSyncStatusHandler.StatusIdentifier { + switch self { + case .unprepared: return .unprepared + case .syncing: return .syncing + case .enhancing: return .enhancing + case .fetching: return .fetching + case .synced: return .synced + case .stopped: return .stopped + case .disconnected: return .disconnected + case .error: return .error + } + } +} diff --git a/Tests/TestUtils/TestCoordinator.swift b/Tests/TestUtils/TestCoordinator.swift index 8eee78a8..7891e927 100644 --- a/Tests/TestUtils/TestCoordinator.swift +++ b/Tests/TestUtils/TestCoordinator.swift @@ -5,6 +5,7 @@ // Created by Francisco Gindre on 4/29/20. // +import Combine import Foundation import XCTest @testable import ZcashLightClientKit @@ -32,7 +33,8 @@ class TestCoordinator { case predefined(dataset: DarksideDataset) case url(urlString: String, startHeigth: BlockHeight) } - + + var cancellables: [AnyCancellable] = [] var completionHandler: ((SDKSynchronizer) throws -> Void)? var errorHandler: ((Error?) -> Void)? var spendingKey: UnifiedSpendingKey @@ -120,12 +122,17 @@ class TestCoordinator { ) self.synchronizer = synchronizer - subscribeToNotifications(synchronizer: self.synchronizer) + subscribeToState(synchronizer: self.synchronizer) if case .seedRequired = try prepare(seed: Environment.seedBytes) { throw TestCoordinator.CoordinatorError.seedRequiredForMigration } } + deinit { + cancellables.forEach { $0.cancel() } + cancellables = [] + } + func prepare(seed: [UInt8]) throws -> Initializer.InitializationResult { return try synchronizer.prepare(with: seed, viewingKeys: [viewingKey], walletBirthday: self.birthday) } @@ -160,17 +167,29 @@ class TestCoordinator { // MARK: notifications - func subscribeToNotifications(synchronizer: Synchronizer) { - NotificationCenter.default.addObserver(self, selector: #selector(synchronizerFailed(_:)), name: .synchronizerFailed, object: synchronizer) - NotificationCenter.default.addObserver(self, selector: #selector(synchronizerSynced(_:)), name: .synchronizerSynced, object: synchronizer) + func subscribeToState(synchronizer: Synchronizer) { + synchronizer.stateStream + .sink( + receiveValue: { [weak self] state in + switch state.syncStatus { + case let .error(error): + self?.synchronizerFailed(error: error) + case .synced: + try! self?.synchronizerSynced() + default: + break + } + } + ) + .store(in: &cancellables) } - @objc func synchronizerFailed(_ notification: Notification) { - self.errorHandler?(notification.userInfo?[SDKSynchronizer.NotificationKeys.error] as? Error) + func synchronizerFailed(error: Error) { + self.errorHandler?(error) } - @objc func synchronizerSynced(_ notification: Notification) throws { - if case .stopped = self.synchronizer.status { + func synchronizerSynced() throws { + if case .stopped = self.synchronizer.latestState.syncStatus { LoggerProxy.debug("WARNING: notification received after synchronizer was stopped") return } diff --git a/Tests/TestUtils/Tests+Utils.swift b/Tests/TestUtils/Tests+Utils.swift index 9baa5b0b..009d8b80 100644 --- a/Tests/TestUtils/Tests+Utils.swift +++ b/Tests/TestUtils/Tests+Utils.swift @@ -6,6 +6,7 @@ // Copyright © 2019 Electric Coin Company. All rights reserved. // +import Combine import Foundation import GRPC import ZcashLightClientKit @@ -79,20 +80,6 @@ enum MockDbInit { } } -extension XCTestExpectation { - func subscribe(to notification: Notification.Name, object: Any?) { - NotificationCenter.default.addObserver(self, selector: #selector(fulfill), name: notification, object: object) - } - - func unsubscribe(from notification: Notification.Name) { - NotificationCenter.default.removeObserver(self, name: notification, object: nil) - } - - func unsubscribeFromNotifications() { - NotificationCenter.default.removeObserver(self) - } -} - func __documentsDirectory() throws -> URL { try FileManager.default.url(for: .documentDirectory, in: .userDomainMask, appropriateFor: nil, create: true) }