diff --git a/Package.resolved b/Package.resolved index 33336565..3b45ffd6 100644 --- a/Package.resolved +++ b/Package.resolved @@ -86,8 +86,8 @@ "kind" : "remoteSourceControl", "location" : "https://github.com/zcash-hackworks/zcash-light-client-ffi", "state" : { - "revision" : "70bf6ae6538943f2b8453f424e607a05f4dc7be6", - "version" : "0.1.0-beta.2" + "revision" : "7febbfd74a7d963ec206e01f8ef6090bb8cee523", + "version" : "0.1.0-beta.3" } } ], diff --git a/Sources/ZcashLightClientKit/Synchronizer.swift b/Sources/ZcashLightClientKit/Synchronizer.swift index e88c4b69..a8122eda 100644 --- a/Sources/ZcashLightClientKit/Synchronizer.swift +++ b/Sources/ZcashLightClientKit/Synchronizer.swift @@ -7,7 +7,7 @@ // import Foundation - +import Combine /// Represents errors thrown by a Synchronizer public enum SynchronizerError: Error { @@ -74,6 +74,9 @@ public protocol Synchronizer { /// Value representing the Status of this Synchronizer. As the status changes, it will be also notified var status: SyncStatus { get } + /// A value subject reporting the sync status + var statusSubject: CurrentValueSubject { get } + /// reflects current connection state to LightwalletEndpoint var connectionState: ConnectionState { get } diff --git a/Sources/ZcashLightClientKit/Synchronizer/SDKSynchronizer.swift b/Sources/ZcashLightClientKit/Synchronizer/SDKSynchronizer.swift index e24a4207..41c35309 100644 --- a/Sources/ZcashLightClientKit/Synchronizer/SDKSynchronizer.swift +++ b/Sources/ZcashLightClientKit/Synchronizer/SDKSynchronizer.swift @@ -7,17 +7,16 @@ // import Foundation +import Combine public extension Notification.Name { /// Notification is posted whenever transactions are updated /// /// - Important: not yet posted static let transactionsUpdated = Notification.Name("SDKSyncronizerTransactionUpdated") - /// Posted when the synchronizer is started. static let synchronizerStarted = Notification.Name("SDKSyncronizerStarted") - /// Posted when there are progress updates. /// @@ -97,15 +96,17 @@ public class SDKSynchronizer: Synchronizer { public static let previousConnectionState = "SDKSynchronizer.previousConnectionState" public static let synchronizerState = "SDKSynchronizer.synchronizerState" } - - public private(set) var status: SyncStatus { - didSet { - notify(status: status) - } - willSet { - notifyStatusChange(newValue: newValue, oldValue: status) - } - } + + +// public private(set) var status: SyncStatus { +// didSet { +// notify(status: status) +// } +// willSet { +// notifyStatusChange(newValue: newValue, oldValue: status) +// } +// } + public var statusSubject: CurrentValueSubject public private(set) var progress: Float = 0.0 public private(set) var blockProcessor: CompactBlockProcessor public private(set) var initializer: Initializer @@ -115,6 +116,14 @@ public class SDKSynchronizer: Synchronizer { private var transactionManager: OutboundTransactionManager private var transactionRepository: TransactionRepository private var utxoRepository: UnspentTransactionOutputRepository + private var cancellableBag = [AnyCancellable]() + private var queue = DispatchQueue( + label: "ZcashLightClientKit.SDKSynchronizer.notifications.queue", + qos: .userInteractive + ) + public var status: SyncStatus { + statusSubject.value + } /// Creates an SDKSynchronizer instance /// - Parameter initializer: a wallet Initializer object @@ -137,8 +146,8 @@ public class SDKSynchronizer: Synchronizer { utxoRepository: UnspentTransactionOutputRepository, blockProcessor: CompactBlockProcessor ) throws { + self.statusSubject = CurrentValueSubject(.unprepared) self.connectionState = .idle - self.status = status self.initializer = initializer self.transactionManager = transactionManager self.transactionRepository = transactionRepository @@ -158,11 +167,12 @@ public class SDKSynchronizer: Synchronizer { public func prepare(with seed: [UInt8]?) async throws -> Initializer.InitializationResult { if case .seedRequired = try self.initializer.initialize(with: seed) { + self.statusSubject.send(.unprepared) return .seedRequired } try await self.blockProcessor.setStartHeight(initializer.walletBirthday) - self.status = .disconnected + self.statusSubject.send(.disconnected) return .success } @@ -195,97 +205,129 @@ public class SDKSynchronizer: Synchronizer { Task(priority: .high) { await blockProcessor.stop() - self.status = .stopped } } private func subscribeToProcessorNotifications(_ processor: CompactBlockProcessor) { let center = NotificationCenter.default - - center.addObserver( - self, - selector: #selector(processorUpdated(_:)), - name: Notification.Name.blockProcessorUpdated, - object: processor - ) - - center.addObserver( - self, - selector: #selector(processorStartedDownloading(_:)), - name: Notification.Name.blockProcessorStartedDownloading, - object: processor - ) - - center.addObserver( - self, - selector: #selector(processorStartedValidating(_:)), - name: Notification.Name.blockProcessorStartedValidating, - object: processor - ) - - center.addObserver( - self, - selector: #selector(processorStartedScanning(_:)), - name: Notification.Name.blockProcessorStartedScanning, - object: processor - ) - - center.addObserver( - self, - selector: #selector(processorStartedEnhancing(_:)), - name: Notification.Name.blockProcessorStartedEnhancing, - object: processor - ) - - center.addObserver( - self, - selector: #selector(processorStartedFetching(_:)), - name: Notification.Name.blockProcessorStartedFetching, - object: processor - ) - - center.addObserver( - self, - selector: #selector(processorStopped(_:)), - name: Notification.Name.blockProcessorStopped, - object: processor - ) - - center.addObserver( - self, - selector: #selector(processorFailed(_:)), - name: Notification.Name.blockProcessorFailed, - object: processor - ) - center.addObserver( - self, - selector: #selector(processorFinished(_:)), - name: Notification.Name.blockProcessorFinished, - object: processor - ) - - center.addObserver( - self, - selector: #selector(processorTransitionUnknown(_:)), - name: Notification.Name.blockProcessorUnknownTransition, - object: processor - ) - - center.addObserver( - self, - selector: #selector(reorgDetected(_:)), - name: Notification.Name.blockProcessorHandledReOrg, - object: processor - ) - - center.addObserver( - self, - selector: #selector(transactionsFound(_:)), - name: Notification.Name.blockProcessorFoundTransactions, - object: processor - ) - + center.publisher(for: .blockProcessorUpdated, object: processor) + .receive(on: queue) + .sink(receiveValue: { [weak self] notification in + guard + let userInfo = notification.userInfo, + let progress = userInfo[CompactBlockProcessorNotificationKey.progress] as? CompactBlockProgress + else { + return + } + + self?.notify(progress: progress) + }) + .store(in: &self.cancellableBag) + + center.publisher(for: .blockProcessorStartedDownloading, object: processor) + .receive(on: queue) + .sink(receiveValue: { [weak self] _ in + self?.statusSubject.send(.downloading(.nullProgress)) + }) + .store(in: &cancellableBag) + + center.publisher(for: .blockProcessorStartedValidating, object: processor) + .receive(on: queue) + .sink { [weak self] _ in + self?.statusSubject.send(.validating) + } + .store(in: &cancellableBag) + + + center.publisher(for: .blockProcessorStartedScanning, object: processor) + .receive(on: queue) + .sink(receiveValue: { [weak self] _ in + self?.statusSubject.send(.scanning(.nullProgress)) + }) + .store(in: &cancellableBag) + + center.publisher(for: .blockProcessorStartedEnhancing, object: processor) + .receive(on: queue) + .sink { [weak self] _ in + self?.statusSubject.send(.enhancing(NullEnhancementProgress())) + } + .store(in: &cancellableBag) + + center.publisher(for: .blockProcessorStartedFetching, object: processor) + .receive(on: queue) + .sink { [weak self] _ in + self?.statusSubject.send(.fetching) + } + .store(in: &cancellableBag) + + center.publisher(for: .blockProcessorStopped, object: processor) + .receive(on: queue) + .sink { [weak self] _ in + self?.statusSubject.send(.stopped) + } + .store(in: &cancellableBag) + + center.publisher(for: .blockProcessorFailed) + .receive(on: queue) + .sink { [weak self] notification in + guard let self = self else { return } + + if let error = notification.userInfo?[CompactBlockProcessorNotificationKey.error] as? Error { + let mappedError = self.mapError(error) + self.notifyFailure(mappedError) + self.statusSubject.send(.error(mappedError)) + } else { + self.notifyFailure( + CompactBlockProcessorError.generalError( + message: "This is strange. processorFailed Call received no error message" + ) + ) + + self.statusSubject.send( + .error( + SynchronizerError.generalError(message: "This is strange. processorFailed Call received no error message") + ) + ) + } + } + .store(in: &cancellableBag) + + center.publisher(for: .blockProcessorFinished, object: processor) + .receive(on: queue) + .sink { [weak self] notification in + guard let self = self else { return } + // FIX: Pending transaction updates fail if done from another thread. Improvement needed: explicitly define queues for sql repositories see: https://github.com/zcash/ZcashLightClientKit/issues/450 + if let blockHeight = notification.userInfo?[CompactBlockProcessorNotificationKey.latestScannedBlockHeight] as? BlockHeight { + self.latestScannedHeight = blockHeight + } + + self.refreshPendingTransactions() + self.statusSubject.send(.synced) + } + .store(in: &cancellableBag) + + center.publisher(for: .blockProcessorUnknownTransition, object: processor) + .receive(on: queue) + .sink { [weak self] _ in + self?.statusSubject.send(.disconnected) + } + .store(in: &cancellableBag) + + center.publisher(for: .blockProcessorHandledReOrg, object: processor) + .receive(on: queue) + .sink { [weak self] notification in + self?.reorgDetected(notification) + } + .store(in: &cancellableBag) + + center.publisher(for: .blockProcessorFoundTransactions) + .receive(on: queue) + .sink { [weak self] notification in + self?.transactionsFound(notification) + } + .store(in: &cancellableBag) + center.addObserver( self, selector: #selector(connectivityStateChanged(_:)), @@ -324,7 +366,7 @@ public class SDKSynchronizer: Synchronizer { } } - @objc func transactionsFound(_ notification: Notification) { + private func transactionsFound(_ notification: Notification) { guard let userInfo = notification.userInfo, let foundTransactions = userInfo[CompactBlockProcessorNotificationKey.foundTransactions] as? [ConfirmedTransactionEntity] @@ -332,7 +374,8 @@ public class SDKSynchronizer: Synchronizer { return } - NotificationCenter.default.mainThreadPost( + // Legacy + NotificationCenter.default.post( name: .synchronizerFoundTransactions, object: self, userInfo: [ @@ -341,7 +384,7 @@ public class SDKSynchronizer: Synchronizer { ) } - @objc func reorgDetected(_ notification: Notification) { + private func reorgDetected(_ notification: Notification) { guard let userInfo = notification.userInfo, let progress = userInfo[CompactBlockProcessorNotificationKey.reorgHeight] as? BlockHeight, @@ -357,11 +400,11 @@ public class SDKSynchronizer: Synchronizer { try transactionManager.handleReorg(at: rewindHeight) } catch { LoggerProxy.debug("error handling reorg: \(error)") - notifyFailure(error) + notifyFailure(self.mapError(error)) } } - @objc func processorUpdated(_ notification: Notification) { + private func processorUpdated(_ notification: Notification) { guard let userInfo = notification.userInfo, let progress = userInfo[CompactBlockProcessorNotificationKey.progress] as? CompactBlockProgress @@ -372,77 +415,6 @@ public class SDKSynchronizer: Synchronizer { self.notify(progress: progress) } - @objc func processorStartedDownloading(_ notification: Notification) { - DispatchQueue.main.async { [weak self] in - guard let self = self, self.status != .downloading(.nullProgress) else { return } - self.status = .downloading(.nullProgress) - } - } - - @objc func processorStartedValidating(_ notification: Notification) { - DispatchQueue.main.async { [weak self] in - guard let self = self, self.status != .validating else { return } - self.status = .validating - } - } - - @objc func processorStartedScanning(_ notification: Notification) { - DispatchQueue.main.async { [weak self] in - guard let self = self, self.status != .scanning(.nullProgress) else { return } - self.status = .scanning(.nullProgress) - } - } - @objc func processorStartedEnhancing(_ notification: Notification) { - DispatchQueue.main.async { [weak self] in - guard let self = self, self.status != .enhancing(NullEnhancementProgress()) else { return } - self.status = .enhancing(NullEnhancementProgress()) - } - } - - @objc func processorStartedFetching(_ notification: Notification) { - DispatchQueue.main.async { [weak self] in - guard let self = self, self.status != .fetching else { return } - self.status = .fetching - } - } - - @objc func processorStopped(_ notification: Notification) { - DispatchQueue.main.async { [weak self] in - guard let self = self, self.status != .stopped else { return } - self.status = .stopped - } - } - - @objc func processorFailed(_ notification: Notification) { - DispatchQueue.main.async { [weak self] in - guard let self = self else { return } - if let error = notification.userInfo?[CompactBlockProcessorNotificationKey.error] as? Error { - self.notifyFailure(error) - self.status = .error(self.mapError(error)) - } else { - self.notifyFailure( - CompactBlockProcessorError.generalError( - message: "This is strange. processorFailed Call received no error message" - ) - ) - self.status = .error(SynchronizerError.generalError(message: "This is strange. processorFailed Call received no error message")) - } - } - } - - @objc func processorFinished(_ notification: Notification) { - // FIX: Pending transaction updates fail if done from another thread. Improvement needed: explicitly define queues for sql repositories see: https://github.com/zcash/ZcashLightClientKit/issues/450 - if let blockHeight = notification.userInfo?[CompactBlockProcessorNotificationKey.latestScannedBlockHeight] as? BlockHeight { - self.latestScannedHeight = blockHeight - } - self.refreshPendingTransactions() - self.status = .synced - } - - @objc func processorTransitionUnknown(_ notification: Notification) { - self.status = .disconnected - } - // MARK: Synchronizer methods public func sendToAddress( @@ -667,20 +639,10 @@ public class SDKSynchronizer: Synchronizer { userInfo[NotificationKeys.progress] = progress userInfo[NotificationKeys.blockHeight] = progress.progressHeight - self.status = SyncStatus(progress) - NotificationCenter.default.mainThreadPost(name: Notification.Name.synchronizerProgressUpdated, object: self, userInfo: userInfo) - } - - private func notifyStatusChange(newValue: SyncStatus, oldValue: SyncStatus) { - NotificationCenter.default.mainThreadPost( - name: .synchronizerStatusWillUpdate, - object: self, - userInfo: - [ - NotificationKeys.currentStatus: oldValue, - NotificationKeys.nextStatus: newValue - ] - ) + self.statusSubject.send(SyncStatus(progress)) + + // legacy + NotificationCenter.default.post(name: Notification.Name.synchronizerProgressUpdated, object: self, userInfo: userInfo) } private func notify(status: SyncStatus) { @@ -721,7 +683,7 @@ public class SDKSynchronizer: Synchronizer { case .fetching: NotificationCenter.default.mainThreadPost(name: Notification.Name.synchronizerFetching, object: self) case .error(let e): - self.notifyFailure(e) + self.notifyFailure(self.mapError(e)) } } // MARK: book keeping @@ -758,15 +720,11 @@ public class SDKSynchronizer: Synchronizer { } private func notifyMinedTransaction(_ transaction: PendingTransactionEntity) { - DispatchQueue.main.async { [weak self] in - guard let self = self else { return } - - NotificationCenter.default.mainThreadPost( - name: Notification.Name.synchronizerMinedTransaction, - object: self, - userInfo: [NotificationKeys.minedTransaction: transaction] - ) - } + NotificationCenter.default.post( + name: Notification.Name.synchronizerMinedTransaction, + object: self, + userInfo: [NotificationKeys.minedTransaction: transaction] + ) } // swiftlint:disable cyclomatic_complexity @@ -809,14 +767,11 @@ public class SDKSynchronizer: Synchronizer { } private func notifyFailure(_ error: Error) { - DispatchQueue.main.async { [weak self] in - guard let self = self else { return } - NotificationCenter.default.mainThreadPost( - name: Notification.Name.synchronizerFailed, - object: self, - userInfo: [NotificationKeys.error: self.mapError(error)] - ) - } + NotificationCenter.default.post( + name: Notification.Name.synchronizerFailed, + object: self, + userInfo: [NotificationKeys.error: self.mapError(error)] + ) } }