[WIP] - [#616] move notification off the main queue into their own queue

Use combine current value subject instead to maintain retrocompatibility
with clients relying on status variable.

Move all the notifications to the dedicated queue.
This commit is contained in:
Francisco Gindre 2022-11-10 21:04:12 -03:00
parent 380425e4e0
commit ee9bf85557
3 changed files with 167 additions and 209 deletions

View File

@ -86,8 +86,8 @@
"kind" : "remoteSourceControl", "kind" : "remoteSourceControl",
"location" : "https://github.com/zcash-hackworks/zcash-light-client-ffi", "location" : "https://github.com/zcash-hackworks/zcash-light-client-ffi",
"state" : { "state" : {
"revision" : "70bf6ae6538943f2b8453f424e607a05f4dc7be6", "revision" : "7febbfd74a7d963ec206e01f8ef6090bb8cee523",
"version" : "0.1.0-beta.2" "version" : "0.1.0-beta.3"
} }
} }
], ],

View File

@ -7,7 +7,7 @@
// //
import Foundation import Foundation
import Combine
/// Represents errors thrown by a Synchronizer /// Represents errors thrown by a Synchronizer
public enum SynchronizerError: Error { public enum SynchronizerError: Error {
@ -74,6 +74,9 @@ public protocol Synchronizer {
/// Value representing the Status of this Synchronizer. As the status changes, it will be also notified /// Value representing the Status of this Synchronizer. As the status changes, it will be also notified
var status: SyncStatus { get } var status: SyncStatus { get }
/// A value subject reporting the sync status
var statusSubject: CurrentValueSubject<SyncStatus, Never> { get }
/// reflects current connection state to LightwalletEndpoint /// reflects current connection state to LightwalletEndpoint
var connectionState: ConnectionState { get } var connectionState: ConnectionState { get }

View File

@ -7,17 +7,16 @@
// //
import Foundation import Foundation
import Combine
public extension Notification.Name { public extension Notification.Name {
/// Notification is posted whenever transactions are updated /// Notification is posted whenever transactions are updated
/// ///
/// - Important: not yet posted /// - Important: not yet posted
static let transactionsUpdated = Notification.Name("SDKSyncronizerTransactionUpdated") static let transactionsUpdated = Notification.Name("SDKSyncronizerTransactionUpdated")
/// Posted when the synchronizer is started. /// Posted when the synchronizer is started.
static let synchronizerStarted = Notification.Name("SDKSyncronizerStarted") static let synchronizerStarted = Notification.Name("SDKSyncronizerStarted")
/// Posted when there are progress updates. /// Posted when there are progress updates.
/// ///
@ -97,15 +96,17 @@ public class SDKSynchronizer: Synchronizer {
public static let previousConnectionState = "SDKSynchronizer.previousConnectionState" public static let previousConnectionState = "SDKSynchronizer.previousConnectionState"
public static let synchronizerState = "SDKSynchronizer.synchronizerState" public static let synchronizerState = "SDKSynchronizer.synchronizerState"
} }
public private(set) var status: SyncStatus {
didSet { // public private(set) var status: SyncStatus {
notify(status: status) // didSet {
} // notify(status: status)
willSet { // }
notifyStatusChange(newValue: newValue, oldValue: status) // willSet {
} // notifyStatusChange(newValue: newValue, oldValue: status)
} // }
// }
public var statusSubject: CurrentValueSubject<SyncStatus, Never>
public private(set) var progress: Float = 0.0 public private(set) var progress: Float = 0.0
public private(set) var blockProcessor: CompactBlockProcessor public private(set) var blockProcessor: CompactBlockProcessor
public private(set) var initializer: Initializer public private(set) var initializer: Initializer
@ -115,6 +116,14 @@ public class SDKSynchronizer: Synchronizer {
private var transactionManager: OutboundTransactionManager private var transactionManager: OutboundTransactionManager
private var transactionRepository: TransactionRepository private var transactionRepository: TransactionRepository
private var utxoRepository: UnspentTransactionOutputRepository private var utxoRepository: UnspentTransactionOutputRepository
private var cancellableBag = [AnyCancellable]()
private var queue = DispatchQueue(
label: "ZcashLightClientKit.SDKSynchronizer.notifications.queue",
qos: .userInteractive
)
public var status: SyncStatus {
statusSubject.value
}
/// Creates an SDKSynchronizer instance /// Creates an SDKSynchronizer instance
/// - Parameter initializer: a wallet Initializer object /// - Parameter initializer: a wallet Initializer object
@ -137,8 +146,8 @@ public class SDKSynchronizer: Synchronizer {
utxoRepository: UnspentTransactionOutputRepository, utxoRepository: UnspentTransactionOutputRepository,
blockProcessor: CompactBlockProcessor blockProcessor: CompactBlockProcessor
) throws { ) throws {
self.statusSubject = CurrentValueSubject(.unprepared)
self.connectionState = .idle self.connectionState = .idle
self.status = status
self.initializer = initializer self.initializer = initializer
self.transactionManager = transactionManager self.transactionManager = transactionManager
self.transactionRepository = transactionRepository self.transactionRepository = transactionRepository
@ -158,11 +167,12 @@ public class SDKSynchronizer: Synchronizer {
public func prepare(with seed: [UInt8]?) async throws -> Initializer.InitializationResult { public func prepare(with seed: [UInt8]?) async throws -> Initializer.InitializationResult {
if case .seedRequired = try self.initializer.initialize(with: seed) { if case .seedRequired = try self.initializer.initialize(with: seed) {
self.statusSubject.send(.unprepared)
return .seedRequired return .seedRequired
} }
try await self.blockProcessor.setStartHeight(initializer.walletBirthday) try await self.blockProcessor.setStartHeight(initializer.walletBirthday)
self.status = .disconnected self.statusSubject.send(.disconnected)
return .success return .success
} }
@ -195,97 +205,129 @@ public class SDKSynchronizer: Synchronizer {
Task(priority: .high) { Task(priority: .high) {
await blockProcessor.stop() await blockProcessor.stop()
self.status = .stopped
} }
} }
private func subscribeToProcessorNotifications(_ processor: CompactBlockProcessor) { private func subscribeToProcessorNotifications(_ processor: CompactBlockProcessor) {
let center = NotificationCenter.default let center = NotificationCenter.default
center.addObserver(
self,
selector: #selector(processorUpdated(_:)),
name: Notification.Name.blockProcessorUpdated,
object: processor
)
center.addObserver(
self,
selector: #selector(processorStartedDownloading(_:)),
name: Notification.Name.blockProcessorStartedDownloading,
object: processor
)
center.addObserver(
self,
selector: #selector(processorStartedValidating(_:)),
name: Notification.Name.blockProcessorStartedValidating,
object: processor
)
center.addObserver(
self,
selector: #selector(processorStartedScanning(_:)),
name: Notification.Name.blockProcessorStartedScanning,
object: processor
)
center.addObserver(
self,
selector: #selector(processorStartedEnhancing(_:)),
name: Notification.Name.blockProcessorStartedEnhancing,
object: processor
)
center.addObserver(
self,
selector: #selector(processorStartedFetching(_:)),
name: Notification.Name.blockProcessorStartedFetching,
object: processor
)
center.addObserver(
self,
selector: #selector(processorStopped(_:)),
name: Notification.Name.blockProcessorStopped,
object: processor
)
center.addObserver(
self,
selector: #selector(processorFailed(_:)),
name: Notification.Name.blockProcessorFailed,
object: processor
)
center.addObserver( center.publisher(for: .blockProcessorUpdated, object: processor)
self, .receive(on: queue)
selector: #selector(processorFinished(_:)), .sink(receiveValue: { [weak self] notification in
name: Notification.Name.blockProcessorFinished, guard
object: processor let userInfo = notification.userInfo,
) let progress = userInfo[CompactBlockProcessorNotificationKey.progress] as? CompactBlockProgress
else {
center.addObserver( return
self, }
selector: #selector(processorTransitionUnknown(_:)),
name: Notification.Name.blockProcessorUnknownTransition, self?.notify(progress: progress)
object: processor })
) .store(in: &self.cancellableBag)
center.addObserver( center.publisher(for: .blockProcessorStartedDownloading, object: processor)
self, .receive(on: queue)
selector: #selector(reorgDetected(_:)), .sink(receiveValue: { [weak self] _ in
name: Notification.Name.blockProcessorHandledReOrg, self?.statusSubject.send(.downloading(.nullProgress))
object: processor })
) .store(in: &cancellableBag)
center.addObserver( center.publisher(for: .blockProcessorStartedValidating, object: processor)
self, .receive(on: queue)
selector: #selector(transactionsFound(_:)), .sink { [weak self] _ in
name: Notification.Name.blockProcessorFoundTransactions, self?.statusSubject.send(.validating)
object: processor }
) .store(in: &cancellableBag)
center.publisher(for: .blockProcessorStartedScanning, object: processor)
.receive(on: queue)
.sink(receiveValue: { [weak self] _ in
self?.statusSubject.send(.scanning(.nullProgress))
})
.store(in: &cancellableBag)
center.publisher(for: .blockProcessorStartedEnhancing, object: processor)
.receive(on: queue)
.sink { [weak self] _ in
self?.statusSubject.send(.enhancing(NullEnhancementProgress()))
}
.store(in: &cancellableBag)
center.publisher(for: .blockProcessorStartedFetching, object: processor)
.receive(on: queue)
.sink { [weak self] _ in
self?.statusSubject.send(.fetching)
}
.store(in: &cancellableBag)
center.publisher(for: .blockProcessorStopped, object: processor)
.receive(on: queue)
.sink { [weak self] _ in
self?.statusSubject.send(.stopped)
}
.store(in: &cancellableBag)
center.publisher(for: .blockProcessorFailed)
.receive(on: queue)
.sink { [weak self] notification in
guard let self = self else { return }
if let error = notification.userInfo?[CompactBlockProcessorNotificationKey.error] as? Error {
let mappedError = self.mapError(error)
self.notifyFailure(mappedError)
self.statusSubject.send(.error(mappedError))
} else {
self.notifyFailure(
CompactBlockProcessorError.generalError(
message: "This is strange. processorFailed Call received no error message"
)
)
self.statusSubject.send(
.error(
SynchronizerError.generalError(message: "This is strange. processorFailed Call received no error message")
)
)
}
}
.store(in: &cancellableBag)
center.publisher(for: .blockProcessorFinished, object: processor)
.receive(on: queue)
.sink { [weak self] notification in
guard let self = self else { return }
// FIX: Pending transaction updates fail if done from another thread. Improvement needed: explicitly define queues for sql repositories see: https://github.com/zcash/ZcashLightClientKit/issues/450
if let blockHeight = notification.userInfo?[CompactBlockProcessorNotificationKey.latestScannedBlockHeight] as? BlockHeight {
self.latestScannedHeight = blockHeight
}
self.refreshPendingTransactions()
self.statusSubject.send(.synced)
}
.store(in: &cancellableBag)
center.publisher(for: .blockProcessorUnknownTransition, object: processor)
.receive(on: queue)
.sink { [weak self] _ in
self?.statusSubject.send(.disconnected)
}
.store(in: &cancellableBag)
center.publisher(for: .blockProcessorHandledReOrg, object: processor)
.receive(on: queue)
.sink { [weak self] notification in
self?.reorgDetected(notification)
}
.store(in: &cancellableBag)
center.publisher(for: .blockProcessorFoundTransactions)
.receive(on: queue)
.sink { [weak self] notification in
self?.transactionsFound(notification)
}
.store(in: &cancellableBag)
center.addObserver( center.addObserver(
self, self,
selector: #selector(connectivityStateChanged(_:)), selector: #selector(connectivityStateChanged(_:)),
@ -324,7 +366,7 @@ public class SDKSynchronizer: Synchronizer {
} }
} }
@objc func transactionsFound(_ notification: Notification) { private func transactionsFound(_ notification: Notification) {
guard guard
let userInfo = notification.userInfo, let userInfo = notification.userInfo,
let foundTransactions = userInfo[CompactBlockProcessorNotificationKey.foundTransactions] as? [ConfirmedTransactionEntity] let foundTransactions = userInfo[CompactBlockProcessorNotificationKey.foundTransactions] as? [ConfirmedTransactionEntity]
@ -332,7 +374,8 @@ public class SDKSynchronizer: Synchronizer {
return return
} }
NotificationCenter.default.mainThreadPost( // Legacy
NotificationCenter.default.post(
name: .synchronizerFoundTransactions, name: .synchronizerFoundTransactions,
object: self, object: self,
userInfo: [ userInfo: [
@ -341,7 +384,7 @@ public class SDKSynchronizer: Synchronizer {
) )
} }
@objc func reorgDetected(_ notification: Notification) { private func reorgDetected(_ notification: Notification) {
guard guard
let userInfo = notification.userInfo, let userInfo = notification.userInfo,
let progress = userInfo[CompactBlockProcessorNotificationKey.reorgHeight] as? BlockHeight, let progress = userInfo[CompactBlockProcessorNotificationKey.reorgHeight] as? BlockHeight,
@ -357,11 +400,11 @@ public class SDKSynchronizer: Synchronizer {
try transactionManager.handleReorg(at: rewindHeight) try transactionManager.handleReorg(at: rewindHeight)
} catch { } catch {
LoggerProxy.debug("error handling reorg: \(error)") LoggerProxy.debug("error handling reorg: \(error)")
notifyFailure(error) notifyFailure(self.mapError(error))
} }
} }
@objc func processorUpdated(_ notification: Notification) { private func processorUpdated(_ notification: Notification) {
guard guard
let userInfo = notification.userInfo, let userInfo = notification.userInfo,
let progress = userInfo[CompactBlockProcessorNotificationKey.progress] as? CompactBlockProgress let progress = userInfo[CompactBlockProcessorNotificationKey.progress] as? CompactBlockProgress
@ -372,77 +415,6 @@ public class SDKSynchronizer: Synchronizer {
self.notify(progress: progress) self.notify(progress: progress)
} }
@objc func processorStartedDownloading(_ notification: Notification) {
DispatchQueue.main.async { [weak self] in
guard let self = self, self.status != .downloading(.nullProgress) else { return }
self.status = .downloading(.nullProgress)
}
}
@objc func processorStartedValidating(_ notification: Notification) {
DispatchQueue.main.async { [weak self] in
guard let self = self, self.status != .validating else { return }
self.status = .validating
}
}
@objc func processorStartedScanning(_ notification: Notification) {
DispatchQueue.main.async { [weak self] in
guard let self = self, self.status != .scanning(.nullProgress) else { return }
self.status = .scanning(.nullProgress)
}
}
@objc func processorStartedEnhancing(_ notification: Notification) {
DispatchQueue.main.async { [weak self] in
guard let self = self, self.status != .enhancing(NullEnhancementProgress()) else { return }
self.status = .enhancing(NullEnhancementProgress())
}
}
@objc func processorStartedFetching(_ notification: Notification) {
DispatchQueue.main.async { [weak self] in
guard let self = self, self.status != .fetching else { return }
self.status = .fetching
}
}
@objc func processorStopped(_ notification: Notification) {
DispatchQueue.main.async { [weak self] in
guard let self = self, self.status != .stopped else { return }
self.status = .stopped
}
}
@objc func processorFailed(_ notification: Notification) {
DispatchQueue.main.async { [weak self] in
guard let self = self else { return }
if let error = notification.userInfo?[CompactBlockProcessorNotificationKey.error] as? Error {
self.notifyFailure(error)
self.status = .error(self.mapError(error))
} else {
self.notifyFailure(
CompactBlockProcessorError.generalError(
message: "This is strange. processorFailed Call received no error message"
)
)
self.status = .error(SynchronizerError.generalError(message: "This is strange. processorFailed Call received no error message"))
}
}
}
@objc func processorFinished(_ notification: Notification) {
// FIX: Pending transaction updates fail if done from another thread. Improvement needed: explicitly define queues for sql repositories see: https://github.com/zcash/ZcashLightClientKit/issues/450
if let blockHeight = notification.userInfo?[CompactBlockProcessorNotificationKey.latestScannedBlockHeight] as? BlockHeight {
self.latestScannedHeight = blockHeight
}
self.refreshPendingTransactions()
self.status = .synced
}
@objc func processorTransitionUnknown(_ notification: Notification) {
self.status = .disconnected
}
// MARK: Synchronizer methods // MARK: Synchronizer methods
public func sendToAddress( public func sendToAddress(
@ -667,20 +639,10 @@ public class SDKSynchronizer: Synchronizer {
userInfo[NotificationKeys.progress] = progress userInfo[NotificationKeys.progress] = progress
userInfo[NotificationKeys.blockHeight] = progress.progressHeight userInfo[NotificationKeys.blockHeight] = progress.progressHeight
self.status = SyncStatus(progress) self.statusSubject.send(SyncStatus(progress))
NotificationCenter.default.mainThreadPost(name: Notification.Name.synchronizerProgressUpdated, object: self, userInfo: userInfo)
} // legacy
NotificationCenter.default.post(name: Notification.Name.synchronizerProgressUpdated, object: self, userInfo: userInfo)
private func notifyStatusChange(newValue: SyncStatus, oldValue: SyncStatus) {
NotificationCenter.default.mainThreadPost(
name: .synchronizerStatusWillUpdate,
object: self,
userInfo:
[
NotificationKeys.currentStatus: oldValue,
NotificationKeys.nextStatus: newValue
]
)
} }
private func notify(status: SyncStatus) { private func notify(status: SyncStatus) {
@ -721,7 +683,7 @@ public class SDKSynchronizer: Synchronizer {
case .fetching: case .fetching:
NotificationCenter.default.mainThreadPost(name: Notification.Name.synchronizerFetching, object: self) NotificationCenter.default.mainThreadPost(name: Notification.Name.synchronizerFetching, object: self)
case .error(let e): case .error(let e):
self.notifyFailure(e) self.notifyFailure(self.mapError(e))
} }
} }
// MARK: book keeping // MARK: book keeping
@ -758,15 +720,11 @@ public class SDKSynchronizer: Synchronizer {
} }
private func notifyMinedTransaction(_ transaction: PendingTransactionEntity) { private func notifyMinedTransaction(_ transaction: PendingTransactionEntity) {
DispatchQueue.main.async { [weak self] in NotificationCenter.default.post(
guard let self = self else { return } name: Notification.Name.synchronizerMinedTransaction,
object: self,
NotificationCenter.default.mainThreadPost( userInfo: [NotificationKeys.minedTransaction: transaction]
name: Notification.Name.synchronizerMinedTransaction, )
object: self,
userInfo: [NotificationKeys.minedTransaction: transaction]
)
}
} }
// swiftlint:disable cyclomatic_complexity // swiftlint:disable cyclomatic_complexity
@ -809,14 +767,11 @@ public class SDKSynchronizer: Synchronizer {
} }
private func notifyFailure(_ error: Error) { private func notifyFailure(_ error: Error) {
DispatchQueue.main.async { [weak self] in NotificationCenter.default.post(
guard let self = self else { return } name: Notification.Name.synchronizerFailed,
NotificationCenter.default.mainThreadPost( object: self,
name: Notification.Name.synchronizerFailed, userInfo: [NotificationKeys.error: self.mapError(error)]
object: self, )
userInfo: [NotificationKeys.error: self.mapError(error)]
)
}
} }
} }