From ee9bf855576467527dfd75fc271032939936fa08 Mon Sep 17 00:00:00 2001 From: Francisco Gindre Date: Thu, 10 Nov 2022 21:04:12 -0300 Subject: [PATCH] [WIP] - [#616] move notification off the main queue into their own queue Use combine current value subject instead to maintain retrocompatibility with clients relying on status variable. Move all the notifications to the dedicated queue. --- Package.resolved | 4 +- .../ZcashLightClientKit/Synchronizer.swift | 5 +- .../Synchronizer/SDKSynchronizer.swift | 367 ++++++++---------- 3 files changed, 167 insertions(+), 209 deletions(-) diff --git a/Package.resolved b/Package.resolved index 33336565..3b45ffd6 100644 --- a/Package.resolved +++ b/Package.resolved @@ -86,8 +86,8 @@ "kind" : "remoteSourceControl", "location" : "https://github.com/zcash-hackworks/zcash-light-client-ffi", "state" : { - "revision" : "70bf6ae6538943f2b8453f424e607a05f4dc7be6", - "version" : "0.1.0-beta.2" + "revision" : "7febbfd74a7d963ec206e01f8ef6090bb8cee523", + "version" : "0.1.0-beta.3" } } ], diff --git a/Sources/ZcashLightClientKit/Synchronizer.swift b/Sources/ZcashLightClientKit/Synchronizer.swift index e88c4b69..a8122eda 100644 --- a/Sources/ZcashLightClientKit/Synchronizer.swift +++ b/Sources/ZcashLightClientKit/Synchronizer.swift @@ -7,7 +7,7 @@ // import Foundation - +import Combine /// Represents errors thrown by a Synchronizer public enum SynchronizerError: Error { @@ -74,6 +74,9 @@ public protocol Synchronizer { /// Value representing the Status of this Synchronizer. As the status changes, it will be also notified var status: SyncStatus { get } + /// A value subject reporting the sync status + var statusSubject: CurrentValueSubject { get } + /// reflects current connection state to LightwalletEndpoint var connectionState: ConnectionState { get } diff --git a/Sources/ZcashLightClientKit/Synchronizer/SDKSynchronizer.swift b/Sources/ZcashLightClientKit/Synchronizer/SDKSynchronizer.swift index e24a4207..41c35309 100644 --- a/Sources/ZcashLightClientKit/Synchronizer/SDKSynchronizer.swift +++ b/Sources/ZcashLightClientKit/Synchronizer/SDKSynchronizer.swift @@ -7,17 +7,16 @@ // import Foundation +import Combine public extension Notification.Name { /// Notification is posted whenever transactions are updated /// /// - Important: not yet posted static let transactionsUpdated = Notification.Name("SDKSyncronizerTransactionUpdated") - /// Posted when the synchronizer is started. static let synchronizerStarted = Notification.Name("SDKSyncronizerStarted") - /// Posted when there are progress updates. /// @@ -97,15 +96,17 @@ public class SDKSynchronizer: Synchronizer { public static let previousConnectionState = "SDKSynchronizer.previousConnectionState" public static let synchronizerState = "SDKSynchronizer.synchronizerState" } - - public private(set) var status: SyncStatus { - didSet { - notify(status: status) - } - willSet { - notifyStatusChange(newValue: newValue, oldValue: status) - } - } + + +// public private(set) var status: SyncStatus { +// didSet { +// notify(status: status) +// } +// willSet { +// notifyStatusChange(newValue: newValue, oldValue: status) +// } +// } + public var statusSubject: CurrentValueSubject public private(set) var progress: Float = 0.0 public private(set) var blockProcessor: CompactBlockProcessor public private(set) var initializer: Initializer @@ -115,6 +116,14 @@ public class SDKSynchronizer: Synchronizer { private var transactionManager: OutboundTransactionManager private var transactionRepository: TransactionRepository private var utxoRepository: UnspentTransactionOutputRepository + private var cancellableBag = [AnyCancellable]() + private var queue = DispatchQueue( + label: "ZcashLightClientKit.SDKSynchronizer.notifications.queue", + qos: .userInteractive + ) + public var status: SyncStatus { + statusSubject.value + } /// Creates an SDKSynchronizer instance /// - Parameter initializer: a wallet Initializer object @@ -137,8 +146,8 @@ public class SDKSynchronizer: Synchronizer { utxoRepository: UnspentTransactionOutputRepository, blockProcessor: CompactBlockProcessor ) throws { + self.statusSubject = CurrentValueSubject(.unprepared) self.connectionState = .idle - self.status = status self.initializer = initializer self.transactionManager = transactionManager self.transactionRepository = transactionRepository @@ -158,11 +167,12 @@ public class SDKSynchronizer: Synchronizer { public func prepare(with seed: [UInt8]?) async throws -> Initializer.InitializationResult { if case .seedRequired = try self.initializer.initialize(with: seed) { + self.statusSubject.send(.unprepared) return .seedRequired } try await self.blockProcessor.setStartHeight(initializer.walletBirthday) - self.status = .disconnected + self.statusSubject.send(.disconnected) return .success } @@ -195,97 +205,129 @@ public class SDKSynchronizer: Synchronizer { Task(priority: .high) { await blockProcessor.stop() - self.status = .stopped } } private func subscribeToProcessorNotifications(_ processor: CompactBlockProcessor) { let center = NotificationCenter.default - - center.addObserver( - self, - selector: #selector(processorUpdated(_:)), - name: Notification.Name.blockProcessorUpdated, - object: processor - ) - - center.addObserver( - self, - selector: #selector(processorStartedDownloading(_:)), - name: Notification.Name.blockProcessorStartedDownloading, - object: processor - ) - - center.addObserver( - self, - selector: #selector(processorStartedValidating(_:)), - name: Notification.Name.blockProcessorStartedValidating, - object: processor - ) - - center.addObserver( - self, - selector: #selector(processorStartedScanning(_:)), - name: Notification.Name.blockProcessorStartedScanning, - object: processor - ) - - center.addObserver( - self, - selector: #selector(processorStartedEnhancing(_:)), - name: Notification.Name.blockProcessorStartedEnhancing, - object: processor - ) - - center.addObserver( - self, - selector: #selector(processorStartedFetching(_:)), - name: Notification.Name.blockProcessorStartedFetching, - object: processor - ) - - center.addObserver( - self, - selector: #selector(processorStopped(_:)), - name: Notification.Name.blockProcessorStopped, - object: processor - ) - - center.addObserver( - self, - selector: #selector(processorFailed(_:)), - name: Notification.Name.blockProcessorFailed, - object: processor - ) - center.addObserver( - self, - selector: #selector(processorFinished(_:)), - name: Notification.Name.blockProcessorFinished, - object: processor - ) - - center.addObserver( - self, - selector: #selector(processorTransitionUnknown(_:)), - name: Notification.Name.blockProcessorUnknownTransition, - object: processor - ) - - center.addObserver( - self, - selector: #selector(reorgDetected(_:)), - name: Notification.Name.blockProcessorHandledReOrg, - object: processor - ) - - center.addObserver( - self, - selector: #selector(transactionsFound(_:)), - name: Notification.Name.blockProcessorFoundTransactions, - object: processor - ) - + center.publisher(for: .blockProcessorUpdated, object: processor) + .receive(on: queue) + .sink(receiveValue: { [weak self] notification in + guard + let userInfo = notification.userInfo, + let progress = userInfo[CompactBlockProcessorNotificationKey.progress] as? CompactBlockProgress + else { + return + } + + self?.notify(progress: progress) + }) + .store(in: &self.cancellableBag) + + center.publisher(for: .blockProcessorStartedDownloading, object: processor) + .receive(on: queue) + .sink(receiveValue: { [weak self] _ in + self?.statusSubject.send(.downloading(.nullProgress)) + }) + .store(in: &cancellableBag) + + center.publisher(for: .blockProcessorStartedValidating, object: processor) + .receive(on: queue) + .sink { [weak self] _ in + self?.statusSubject.send(.validating) + } + .store(in: &cancellableBag) + + + center.publisher(for: .blockProcessorStartedScanning, object: processor) + .receive(on: queue) + .sink(receiveValue: { [weak self] _ in + self?.statusSubject.send(.scanning(.nullProgress)) + }) + .store(in: &cancellableBag) + + center.publisher(for: .blockProcessorStartedEnhancing, object: processor) + .receive(on: queue) + .sink { [weak self] _ in + self?.statusSubject.send(.enhancing(NullEnhancementProgress())) + } + .store(in: &cancellableBag) + + center.publisher(for: .blockProcessorStartedFetching, object: processor) + .receive(on: queue) + .sink { [weak self] _ in + self?.statusSubject.send(.fetching) + } + .store(in: &cancellableBag) + + center.publisher(for: .blockProcessorStopped, object: processor) + .receive(on: queue) + .sink { [weak self] _ in + self?.statusSubject.send(.stopped) + } + .store(in: &cancellableBag) + + center.publisher(for: .blockProcessorFailed) + .receive(on: queue) + .sink { [weak self] notification in + guard let self = self else { return } + + if let error = notification.userInfo?[CompactBlockProcessorNotificationKey.error] as? Error { + let mappedError = self.mapError(error) + self.notifyFailure(mappedError) + self.statusSubject.send(.error(mappedError)) + } else { + self.notifyFailure( + CompactBlockProcessorError.generalError( + message: "This is strange. processorFailed Call received no error message" + ) + ) + + self.statusSubject.send( + .error( + SynchronizerError.generalError(message: "This is strange. processorFailed Call received no error message") + ) + ) + } + } + .store(in: &cancellableBag) + + center.publisher(for: .blockProcessorFinished, object: processor) + .receive(on: queue) + .sink { [weak self] notification in + guard let self = self else { return } + // FIX: Pending transaction updates fail if done from another thread. Improvement needed: explicitly define queues for sql repositories see: https://github.com/zcash/ZcashLightClientKit/issues/450 + if let blockHeight = notification.userInfo?[CompactBlockProcessorNotificationKey.latestScannedBlockHeight] as? BlockHeight { + self.latestScannedHeight = blockHeight + } + + self.refreshPendingTransactions() + self.statusSubject.send(.synced) + } + .store(in: &cancellableBag) + + center.publisher(for: .blockProcessorUnknownTransition, object: processor) + .receive(on: queue) + .sink { [weak self] _ in + self?.statusSubject.send(.disconnected) + } + .store(in: &cancellableBag) + + center.publisher(for: .blockProcessorHandledReOrg, object: processor) + .receive(on: queue) + .sink { [weak self] notification in + self?.reorgDetected(notification) + } + .store(in: &cancellableBag) + + center.publisher(for: .blockProcessorFoundTransactions) + .receive(on: queue) + .sink { [weak self] notification in + self?.transactionsFound(notification) + } + .store(in: &cancellableBag) + center.addObserver( self, selector: #selector(connectivityStateChanged(_:)), @@ -324,7 +366,7 @@ public class SDKSynchronizer: Synchronizer { } } - @objc func transactionsFound(_ notification: Notification) { + private func transactionsFound(_ notification: Notification) { guard let userInfo = notification.userInfo, let foundTransactions = userInfo[CompactBlockProcessorNotificationKey.foundTransactions] as? [ConfirmedTransactionEntity] @@ -332,7 +374,8 @@ public class SDKSynchronizer: Synchronizer { return } - NotificationCenter.default.mainThreadPost( + // Legacy + NotificationCenter.default.post( name: .synchronizerFoundTransactions, object: self, userInfo: [ @@ -341,7 +384,7 @@ public class SDKSynchronizer: Synchronizer { ) } - @objc func reorgDetected(_ notification: Notification) { + private func reorgDetected(_ notification: Notification) { guard let userInfo = notification.userInfo, let progress = userInfo[CompactBlockProcessorNotificationKey.reorgHeight] as? BlockHeight, @@ -357,11 +400,11 @@ public class SDKSynchronizer: Synchronizer { try transactionManager.handleReorg(at: rewindHeight) } catch { LoggerProxy.debug("error handling reorg: \(error)") - notifyFailure(error) + notifyFailure(self.mapError(error)) } } - @objc func processorUpdated(_ notification: Notification) { + private func processorUpdated(_ notification: Notification) { guard let userInfo = notification.userInfo, let progress = userInfo[CompactBlockProcessorNotificationKey.progress] as? CompactBlockProgress @@ -372,77 +415,6 @@ public class SDKSynchronizer: Synchronizer { self.notify(progress: progress) } - @objc func processorStartedDownloading(_ notification: Notification) { - DispatchQueue.main.async { [weak self] in - guard let self = self, self.status != .downloading(.nullProgress) else { return } - self.status = .downloading(.nullProgress) - } - } - - @objc func processorStartedValidating(_ notification: Notification) { - DispatchQueue.main.async { [weak self] in - guard let self = self, self.status != .validating else { return } - self.status = .validating - } - } - - @objc func processorStartedScanning(_ notification: Notification) { - DispatchQueue.main.async { [weak self] in - guard let self = self, self.status != .scanning(.nullProgress) else { return } - self.status = .scanning(.nullProgress) - } - } - @objc func processorStartedEnhancing(_ notification: Notification) { - DispatchQueue.main.async { [weak self] in - guard let self = self, self.status != .enhancing(NullEnhancementProgress()) else { return } - self.status = .enhancing(NullEnhancementProgress()) - } - } - - @objc func processorStartedFetching(_ notification: Notification) { - DispatchQueue.main.async { [weak self] in - guard let self = self, self.status != .fetching else { return } - self.status = .fetching - } - } - - @objc func processorStopped(_ notification: Notification) { - DispatchQueue.main.async { [weak self] in - guard let self = self, self.status != .stopped else { return } - self.status = .stopped - } - } - - @objc func processorFailed(_ notification: Notification) { - DispatchQueue.main.async { [weak self] in - guard let self = self else { return } - if let error = notification.userInfo?[CompactBlockProcessorNotificationKey.error] as? Error { - self.notifyFailure(error) - self.status = .error(self.mapError(error)) - } else { - self.notifyFailure( - CompactBlockProcessorError.generalError( - message: "This is strange. processorFailed Call received no error message" - ) - ) - self.status = .error(SynchronizerError.generalError(message: "This is strange. processorFailed Call received no error message")) - } - } - } - - @objc func processorFinished(_ notification: Notification) { - // FIX: Pending transaction updates fail if done from another thread. Improvement needed: explicitly define queues for sql repositories see: https://github.com/zcash/ZcashLightClientKit/issues/450 - if let blockHeight = notification.userInfo?[CompactBlockProcessorNotificationKey.latestScannedBlockHeight] as? BlockHeight { - self.latestScannedHeight = blockHeight - } - self.refreshPendingTransactions() - self.status = .synced - } - - @objc func processorTransitionUnknown(_ notification: Notification) { - self.status = .disconnected - } - // MARK: Synchronizer methods public func sendToAddress( @@ -667,20 +639,10 @@ public class SDKSynchronizer: Synchronizer { userInfo[NotificationKeys.progress] = progress userInfo[NotificationKeys.blockHeight] = progress.progressHeight - self.status = SyncStatus(progress) - NotificationCenter.default.mainThreadPost(name: Notification.Name.synchronizerProgressUpdated, object: self, userInfo: userInfo) - } - - private func notifyStatusChange(newValue: SyncStatus, oldValue: SyncStatus) { - NotificationCenter.default.mainThreadPost( - name: .synchronizerStatusWillUpdate, - object: self, - userInfo: - [ - NotificationKeys.currentStatus: oldValue, - NotificationKeys.nextStatus: newValue - ] - ) + self.statusSubject.send(SyncStatus(progress)) + + // legacy + NotificationCenter.default.post(name: Notification.Name.synchronizerProgressUpdated, object: self, userInfo: userInfo) } private func notify(status: SyncStatus) { @@ -721,7 +683,7 @@ public class SDKSynchronizer: Synchronizer { case .fetching: NotificationCenter.default.mainThreadPost(name: Notification.Name.synchronizerFetching, object: self) case .error(let e): - self.notifyFailure(e) + self.notifyFailure(self.mapError(e)) } } // MARK: book keeping @@ -758,15 +720,11 @@ public class SDKSynchronizer: Synchronizer { } private func notifyMinedTransaction(_ transaction: PendingTransactionEntity) { - DispatchQueue.main.async { [weak self] in - guard let self = self else { return } - - NotificationCenter.default.mainThreadPost( - name: Notification.Name.synchronizerMinedTransaction, - object: self, - userInfo: [NotificationKeys.minedTransaction: transaction] - ) - } + NotificationCenter.default.post( + name: Notification.Name.synchronizerMinedTransaction, + object: self, + userInfo: [NotificationKeys.minedTransaction: transaction] + ) } // swiftlint:disable cyclomatic_complexity @@ -809,14 +767,11 @@ public class SDKSynchronizer: Synchronizer { } private func notifyFailure(_ error: Error) { - DispatchQueue.main.async { [weak self] in - guard let self = self else { return } - NotificationCenter.default.mainThreadPost( - name: Notification.Name.synchronizerFailed, - object: self, - userInfo: [NotificationKeys.error: self.mapError(error)] - ) - } + NotificationCenter.default.post( + name: Notification.Name.synchronizerFailed, + object: self, + userInfo: [NotificationKeys.error: self.mapError(error)] + ) } }