Merge pull request #834 from Chlup/724_change_notifications

[#724] Switch from NotificationCenter to Combine
This commit is contained in:
Michal Fousek 2023-03-17 10:19:09 +01:00 committed by GitHub
commit c88fd684e8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 540 additions and 554 deletions

View File

@ -1,4 +1,10 @@
# unreleased
- [#724] Switch from event based notifications to state based notifications
The `SDKSynchronizer` no longer uses `NotificationCenter` to send notifications.
Notifications are replaced with `Combine` publishers. Check the migrating document and
documentation in the code to get more information.
- [#826] Change how the SDK is initialized
- `viewingKeys` and `walletBirthday` are removed from `Initializer` constuctor. These parameters

View File

@ -59,20 +59,20 @@ class AppDelegate: UIResponder, UIApplicationDelegate {
}
func subscribeToMinedTxNotifications() {
NotificationCenter.default.addObserver(
self,
selector: #selector(txMinedNotification(_:)),
name: Notification.Name.synchronizerMinedTransaction,
object: nil
sharedSynchronizer.eventStream
.map { event in
guard case let .minedTransaction(transaction) = event else { return nil }
return transaction
}
.compactMap { $0 }
.receive(on: DispatchQueue.main)
.sink(
receiveValue: { [weak self] transaction in self?.txMined(transaction) }
)
.store(in: &cancellables)
}
@objc func txMinedNotification(_ notification: Notification) {
guard let transaction = notification.userInfo?[SDKSynchronizer.NotificationKeys.minedTransaction] as? PendingTransactionEntity else {
loggerProxy.error("no tx information on notification")
return
}
func txMined(_ transaction: PendingTransactionEntity) {
NotificationBubble.display(
in: window!.rootViewController!.view,
options: NotificationBubble.sucessOptions(

View File

@ -6,6 +6,7 @@
// Copyright © 2019 Electric Coin Company. All rights reserved.
//
import Combine
import UIKit
import ZcashLightClientKit
import KRProgressHUD
@ -30,6 +31,8 @@ class SendViewController: UIViewController {
// swiftlint:disable:next implicitly_unwrapped_optional
var synchronizer: Synchronizer!
var cancellables: [AnyCancellable] = []
override func viewDidLoad() {
super.viewDidLoad()
synchronizer = AppDelegate.shared.sharedSynchronizer
@ -50,9 +53,9 @@ class SendViewController: UIViewController {
super.viewDidAppear(animated)
do {
try synchronizer.start(retry: false)
self.synchronizerStatusLabel.text = SDKSynchronizer.textFor(state: synchronizer.status)
self.synchronizerStatusLabel.text = SDKSynchronizer.textFor(state: synchronizer.latestState.syncStatus)
} catch {
self.synchronizerStatusLabel.text = SDKSynchronizer.textFor(state: synchronizer.status)
self.synchronizerStatusLabel.text = SDKSynchronizer.textFor(state: synchronizer.latestState.syncStatus)
fail(error)
}
}
@ -78,35 +81,15 @@ class SendViewController: UIViewController {
memoField.layer.borderWidth = 1
memoField.layer.cornerRadius = 5
charactersLeftLabel.text = textForCharacterCount(0)
let center = NotificationCenter.default
center.addObserver(
self,
selector: #selector(synchronizerStarted(_:)),
name: Notification.Name.synchronizerStarted,
object: synchronizer
)
center.addObserver(
self,
selector: #selector(synchronizerSynced(_:)),
name: Notification.Name.synchronizerSynced,
object: synchronizer
)
center.addObserver(
self,
selector: #selector(synchronizerStopped(_:)),
name: Notification.Name.synchronizerStopped,
object: synchronizer
)
center.addObserver(
self,
selector: #selector(synchronizerUpdated(_:)),
name: Notification.Name.synchronizerProgressUpdated,
object: synchronizer
synchronizer.stateStream
.throttle(for: .seconds(0.2), scheduler: DispatchQueue.main, latest: true)
.sink(
receiveValue: { [weak self] state in
self?.synchronizerStatusLabel.text = SDKSynchronizer.textFor(state: state.syncStatus)
}
)
.store(in: &cancellables)
}
func format(balance: Zatoshi = Zatoshi()) -> String {
@ -129,7 +112,7 @@ class SendViewController: UIViewController {
}
func isFormValid() -> Bool {
switch synchronizer.status {
switch synchronizer.latestState.syncStatus {
case .synced:
return isBalanceValid() && isAmountValid() && isRecipientValid()
default:
@ -263,43 +246,6 @@ class SendViewController: UIViewController {
func cancel() {}
// MARK: synchronizer notifications
@objc func synchronizerUpdated(_ notification: Notification) {
DispatchQueue.main.async { [weak self] in
guard let self else {
return
}
self.synchronizerStatusLabel.text = SDKSynchronizer.textFor(state: self.synchronizer.status)
}
}
@objc func synchronizerStarted(_ notification: Notification) {
DispatchQueue.main.async { [weak self] in
guard let self else {
return
}
self.synchronizerStatusLabel.text = SDKSynchronizer.textFor(state: self.synchronizer.status)
}
}
@objc func synchronizerStopped(_ notification: Notification) {
DispatchQueue.main.async { [weak self] in
guard let self else {
return
}
self.synchronizerStatusLabel.text = SDKSynchronizer.textFor(state: self.synchronizer.status)
}
}
@objc func synchronizerSynced(_ notification: Notification) {
DispatchQueue.main.async { [weak self] in
guard let self else {
return
}
self.synchronizerStatusLabel.text = SDKSynchronizer.textFor(state: self.synchronizer.status)
}
}
func textForCharacterCount(_ count: Int) -> String {
"\(count) of \(characterLimit) bytes left"
}

View File

@ -21,6 +21,7 @@ class SyncBlocksViewController: UIViewController {
@IBOutlet weak var summaryLabel: UILabel!
private var queue = DispatchQueue(label: "metrics.queue", qos: .default)
private var enhancingStarted = false
private var accumulatedMetrics: ProcessorMetrics = .initial
private var currentMetric: SDKMetrics.Operation?
private var currentMetricName: String {
@ -34,12 +35,12 @@ class SyncBlocksViewController: UIViewController {
}
}
var cancellables: [AnyCancellable] = []
let synchronizer = AppDelegate.shared.sharedSynchronizer
var notificationCancellables: [AnyCancellable] = []
deinit {
notificationCancellables.forEach { $0.cancel() }
cancellables.forEach { $0.cancel() }
}
override func viewDidLoad() {
@ -51,83 +52,62 @@ class SyncBlocksViewController: UIViewController {
action: #selector(wipe(_:))
)
statusLabel.text = textFor(state: synchronizer.status)
statusLabel.text = textFor(state: synchronizer.latestState.syncStatus)
progressBar.progress = 0
let center = NotificationCenter.default
let subscribeToNotifications: [Notification.Name] = [
.synchronizerStarted,
.synchronizerProgressUpdated,
.synchronizerStatusWillUpdate,
.synchronizerSynced,
.synchronizerStopped,
.synchronizerDisconnected,
.synchronizerSyncing,
.synchronizerEnhancing,
.synchronizerFetching,
.synchronizerFailed
]
for notificationName in subscribeToNotifications {
center.publisher(for: notificationName)
.receive(on: DispatchQueue.main)
.sink { [weak self] notification in
DispatchQueue.main.async {
self?.processorNotification(notification)
}
}
.store(in: &notificationCancellables)
}
NotificationCenter.default.publisher(for: .synchronizerEnhancing, object: nil)
.receive(on: DispatchQueue.main)
.sink { [weak self] _ in
self?.accumulateMetrics()
self?.summaryLabel.text = "scan: \((self?.accumulatedMetrics.debugDescription ?? "No summary"))"
self?.accumulatedMetrics = .initial
self?.currentMetric = .enhancement
}
.store(in: &notificationCancellables)
NotificationCenter.default.publisher(for: .synchronizerProgressUpdated, object: nil)
.throttle(for: 5, scheduler: DispatchQueue.main, latest: true)
.receive(on: DispatchQueue.main)
.map { [weak self] _ -> SDKMetrics.BlockMetricReport? in
guard let currentMetric = self?.currentMetric else { return nil }
return SDKMetrics.shared.popBlock(operation: currentMetric)?.last
}
.sink { [weak self] report in
self?.metricLabel.text = (self?.currentMetricName ?? "") + report.debugDescription
}
.store(in: &notificationCancellables)
NotificationCenter.default.publisher(for: .synchronizerSynced, object: nil)
.receive(on: DispatchQueue.main)
.delay(for: 0.5, scheduler: DispatchQueue.main)
.sink { [weak self] _ in
self?.accumulateMetrics()
self?.summaryLabel.text = "enhancement: \((self?.accumulatedMetrics.debugDescription ?? "No summary"))"
self?.overallSummary()
}
.store(in: &notificationCancellables)
synchronizer.stateStream
.throttle(for: .seconds(0.2), scheduler: DispatchQueue.main, latest: true)
.sink(receiveValue: { [weak self] state in self?.synchronizerStateUpdated(state) })
.store(in: &cancellables)
}
override func viewWillDisappear(_ animated: Bool) {
super.viewWillDisappear(animated)
notificationCancellables.forEach { $0.cancel() }
cancellables.forEach { $0.cancel() }
synchronizer.stop()
}
@objc func processorNotification(_ notification: Notification) {
private func synchronizerStateUpdated(_ state: SynchronizerState) {
self.updateUI()
switch notification.name {
case let not where not == Notification.Name.synchronizerProgressUpdated:
guard let progress = notification.userInfo?[SDKSynchronizer.NotificationKeys.progress] as? CompactBlockProgress else { return }
self.progressBar.progress = progress.progress
self.progressLabel.text = "\(floor(progress.progress * 1000) / 10)%"
default:
return
switch state.syncStatus {
case .unprepared:
break
case let .syncing(progress):
enhancingStarted = false
progressBar.progress = progress.progress
progressLabel.text = "\(floor(progress.progress * 1000) / 10)%"
if let currentMetric {
let report = SDKMetrics.shared.popBlock(operation: currentMetric)?.last
metricLabel.text = currentMetricName + report.debugDescription
}
case .enhancing:
guard !enhancingStarted else { return }
enhancingStarted = true
accumulateMetrics()
summaryLabel.text = "scan: \(accumulatedMetrics.debugDescription)"
accumulatedMetrics = .initial
currentMetric = .enhancement
case .fetching:
break
case .synced:
accumulateMetrics()
summaryLabel.text = "enhancement: \(accumulatedMetrics.debugDescription)"
overallSummary()
case .stopped:
break
case .disconnected:
break
case .error:
break
}
}
@ -169,10 +149,11 @@ class SyncBlocksViewController: UIViewController {
}
func doStartStop() async {
switch synchronizer.status {
let syncStatus = synchronizer.latestState.syncStatus
switch syncStatus {
case .stopped, .unprepared:
do {
if synchronizer.status == .unprepared {
if syncStatus == .unprepared {
_ = try synchronizer.prepare(
with: DemoAppConfig.seed,
viewingKeys: [AppDelegate.shared.sharedViewingKey],
@ -214,11 +195,11 @@ class SyncBlocksViewController: UIViewController {
}
func updateUI() {
let state = synchronizer.status
let syncStatus = synchronizer.latestState.syncStatus
statusLabel.text = textFor(state: state)
startPause.setTitle(buttonText(for: state), for: .normal)
if case SyncStatus.synced = state {
statusLabel.text = textFor(state: syncStatus)
startPause.setTitle(buttonText(for: syncStatus), for: .normal)
if case SyncStatus.synced = syncStatus {
startPause.isEnabled = false
} else {
startPause.isEnabled = true
@ -229,9 +210,9 @@ class SyncBlocksViewController: UIViewController {
switch state {
case .syncing:
return "Pause"
case .stopped:
case .stopped, .unprepared:
return "Start"
case .error, .unprepared, .disconnected:
case .error, .disconnected:
return "Retry"
case .synced:
return "Chill!"

View File

@ -1,4 +1,32 @@
# Migrating from previous versions to <Unreleased>
# Migrating from previous versions to <unreleased>
The `SDKSynchronizer` no longer uses `NotificationCenter` to send notifications.
Notifications are replaced with `Combine` publishers.
`stateStream` publisher replaces notifications related to `SyncStatus` changes.
These notifications are replaced by `stateStream`:
- .synchronizerStarted
- .synchronizerProgressUpdated
- .synchronizerStatusWillUpdate
- .synchronizerSynced
- .synchronizerStopped
- .synchronizerDisconnected
- .synchronizerSyncing
- .synchronizerEnhancing
- .synchronizerFetching
- .synchronizerFailed
`eventStream` publisher replaces notifications related to transactions and other stuff.
These notifications are replaced by `eventStream`:
- .synchronizerMinedTransaction
- .synchronizerFoundTransactions
- .synchronizerStoredUTXOs
- .synchronizerConnectionStateChanged
`latestState` is also new property that can be used to get the latest SDK state in a synchronous way.
`SDKSynchronizer.status` is no longer public. To get `SyncStatus` either subscribe to `stateStream`
or use `latestState`.
# Migrating from previous versions to 0.18.x
Compact block cache no longer uses a sqlite database. The existing database
should be deleted. `Initializer` now takes an `fsBlockDbRootURL` which is a
URL pointing to a RW directory in the filesystem that will be used to store

View File

@ -35,7 +35,7 @@ public enum CompactBlockProcessorError: Error {
public enum CompactBlockProgress {
case syncing(_ progress: BlockProgress)
case enhance(_ progress: EnhancementStreamProgress)
case enhance(_ progress: EnhancementProgress)
case fetch
public var progress: Float {
@ -78,22 +78,34 @@ public enum CompactBlockProgress {
}
}
public protocol EnhancementProgress {
var totalTransactions: Int { get }
var enhancedTransactions: Int { get }
var lastFoundTransaction: ZcashTransaction.Overview? { get }
var range: CompactBlockRange { get }
}
public struct EnhancementStreamProgress: EnhancementProgress {
public struct EnhancementProgress: Equatable {
public var totalTransactions: Int
public var enhancedTransactions: Int
public var lastFoundTransaction: ZcashTransaction.Overview?
public var range: CompactBlockRange
public init(totalTransactions: Int, enhancedTransactions: Int, lastFoundTransaction: ZcashTransaction.Overview?, range: CompactBlockRange) {
self.totalTransactions = totalTransactions
self.enhancedTransactions = enhancedTransactions
self.lastFoundTransaction = lastFoundTransaction
self.range = range
}
public var progress: Float {
totalTransactions > 0 ? Float(enhancedTransactions) / Float(totalTransactions) : 0
}
public static var zero: EnhancementProgress {
EnhancementProgress(totalTransactions: 0, enhancedTransactions: 0, lastFoundTransaction: nil, range: 0...0)
}
public static func == (lhs: EnhancementProgress, rhs: EnhancementProgress) -> Bool {
return
lhs.totalTransactions == rhs.totalTransactions &&
lhs.enhancedTransactions == rhs.enhancedTransactions &&
lhs.lastFoundTransaction?.id == rhs.lastFoundTransaction?.id &&
lhs.range == rhs.range
}
}
/// The compact block processor is in charge of orchestrating the download and caching of compact blocks from a LightWalletEndpoint
@ -621,11 +633,11 @@ actor CompactBlockProcessor {
let fileManager = FileManager.default
if fileManager.fileExists(atPath: config.dataDb.path) {
try FileManager.default.removeItem(at: config.dataDb)
try fileManager.removeItem(at: config.dataDb)
}
if fileManager.fileExists(atPath: context.pendingDbURL.path) {
try FileManager.default.removeItem(at: context.pendingDbURL)
try fileManager.removeItem(at: context.pendingDbURL)
}
context.completion(nil)

View File

@ -20,7 +20,7 @@ struct BlockEnhancerConfig {
}
protocol BlockEnhancer {
func enhance(at range: CompactBlockRange, didEnhance: (EnhancementStreamProgress) async -> Void) async throws -> [ZcashTransaction.Overview]
func enhance(at range: CompactBlockRange, didEnhance: (EnhancementProgress) async -> Void) async throws -> [ZcashTransaction.Overview]
}
struct BlockEnhancerImpl {
@ -75,7 +75,7 @@ extension BlockEnhancerImpl: BlockEnhancer {
case txIdNotFound(txId: Data)
}
func enhance(at range: CompactBlockRange, didEnhance: (EnhancementStreamProgress) async -> Void) async throws -> [ZcashTransaction.Overview] {
func enhance(at range: CompactBlockRange, didEnhance: (EnhancementProgress) async -> Void) async throws -> [ZcashTransaction.Overview] {
try Task.checkCancellation()
LoggerProxy.debug("Started Enhancing range: \(range)")
@ -103,7 +103,7 @@ extension BlockEnhancerImpl: BlockEnhancer {
do {
let confirmedTx = try await enhance(transaction: transaction)
retry = false
let progress = EnhancementStreamProgress(
let progress = EnhancementProgress(
totalTransactions: transactions.count,
enhancedTransactions: index + 1,
lastFoundTransaction: confirmedTx,

View File

@ -66,11 +66,48 @@ public enum ConnectionState {
case shutdown
}
public struct SynchronizerState: Equatable {
public var shieldedBalance: WalletBalance
public var transparentBalance: WalletBalance
public var syncStatus: SyncStatus
public var latestScannedHeight: BlockHeight
public static var zero: SynchronizerState {
SynchronizerState(
shieldedBalance: .zero,
transparentBalance: .zero,
syncStatus: .unprepared,
latestScannedHeight: .zero
)
}
}
public enum SynchronizerEvent {
// Sent when the synchronizer finds a pendingTransaction that hast been newly mined.
case minedTransaction(PendingTransactionEntity)
// Sent when the synchronizer finds a mined transaction
case foundTransactions(_ transactions: [ZcashTransaction.Overview], _ inRange: CompactBlockRange)
// Sent when the synchronizer fetched utxos from lightwalletd attempted to store them.
case storedUTXOs(_ inserted: [UnspentTransactionOutputEntity], _ skipped: [UnspentTransactionOutputEntity])
// Connection state to LightwalletEndpoint changed.
case connectionStateChanged
}
/// Primary interface for interacting with the SDK. Defines the contract that specific
/// implementations like SdkSynchronizer fulfill.
public protocol Synchronizer {
/// Value representing the Status of this Synchronizer. As the status changes, it will be also notified
var status: SyncStatus { get }
/// This stream is backed by `CurrentValueSubject`. This is primary source of information about what is the SDK doing. New values are emitted when
/// `SyncStatus` is changed inside the SDK.
///
/// Synchronization progress is part of the `SyncStatus` so this stream emits lot of values. `throttle` can be used to control amout of values
/// delivered. Values are delivered on random background thread.
var stateStream: AnyPublisher<SynchronizerState, Never> { get }
/// Latest state of the SDK which can be get in synchronous manner.
var latestState: SynchronizerState { get }
/// This stream is backed by `PassthroughSubject`. Check `SynchronizerEvent` to see which events may be emitted.
var eventStream: AnyPublisher<SynchronizerEvent, Never> { get }
/// reflects current connection state to LightwalletEndpoint
var connectionState: ConnectionState { get }
@ -279,7 +316,7 @@ public enum SyncStatus: Equatable {
/// When set, a UI element may want to turn red.
case disconnected
case error(_ error: Error)
case error(_ error: SynchronizerError)
public var isSyncing: Bool {
switch self {
@ -317,57 +354,17 @@ public enum RewindPolicy {
}
extension SyncStatus {
// swiftlint:disable cyclomatic_complexity
public static func == (lhs: SyncStatus, rhs: SyncStatus) -> Bool {
switch lhs {
case .unprepared:
if case .unprepared = rhs {
return true
} else {
return false
}
case .disconnected:
if case .disconnected = rhs {
return true
} else {
return false
}
case .syncing:
if case .syncing = rhs {
return true
} else {
return false
}
case .enhancing:
if case .enhancing = rhs {
return true
} else {
return false
}
case .fetching:
if case .fetching = rhs {
return true
} else {
return false
}
case .synced:
if case .synced = rhs {
return true
} else {
return false
}
case .stopped:
if case .stopped = rhs {
return true
} else {
return false
}
case .error:
if case .error = rhs {
return true
} else {
return false
}
switch (lhs, rhs) {
case (.unprepared, .unprepared): return true
case let (.syncing(lhsProgress), .syncing(rhsProgress)): return lhsProgress == rhsProgress
case let (.enhancing(lhsProgress), .enhancing(rhsProgress)): return lhsProgress == rhsProgress
case (.fetching, .fetching): return true
case (.synced, .synced): return true
case (.stopped, .stopped): return true
case (.disconnected, .disconnected): return true
case (.error, .error): return true
default: return false
}
}
}

View File

@ -9,126 +9,55 @@
import Foundation
import Combine
public extension Notification.Name {
/// Posted when the synchronizer is started.
/// - note: Query userInfo object for `NotificationKeys.synchronizerState`
static let synchronizerStarted = Notification.Name("SDKSyncronizerStarted")
/// Posted when there are progress updates.
///
/// - Note: Query userInfo object for NotificationKeys.progress for Float
/// progress percentage and NotificationKeys.blockHeight /// for the current progress height
static let synchronizerProgressUpdated = Notification.Name("SDKSyncronizerProgressUpdated")
static let synchronizerStatusWillUpdate = Notification.Name("SDKSynchronizerStatusWillUpdate")
/// Posted when the synchronizer is synced to latest height
static let synchronizerSynced = Notification.Name("SDKSyncronizerSynced")
/// Posted when the synchronizer is stopped
static let synchronizerStopped = Notification.Name("SDKSyncronizerStopped")
/// Posted when the synchronizer loses connection
static let synchronizerDisconnected = Notification.Name("SDKSyncronizerDisconnected")
/// Posted when the synchronizer starts syncing
static let synchronizerSyncing = Notification.Name("SDKSynchronizerSyncing")
/// Posted when the synchronizer starts Enhancing
static let synchronizerEnhancing = Notification.Name("SDKSyncronizerEnhancing")
/// Posted when the synchronizer starts fetching UTXOs
static let synchronizerFetching = Notification.Name("SDKSyncronizerFetching")
/// Posted when the synchronizer finds a pendingTransaction that hast been newly mined
/// - Note: query userInfo on NotificationKeys.minedTransaction for the transaction
static let synchronizerMinedTransaction = Notification.Name("synchronizerMinedTransaction")
/// Posted when the synchronizer finds a mined transaction
/// - Note: query userInfo on NotificationKeys.foundTransactions for
/// the `[ConfirmedTransactionEntity]`. This notification could arrive in a background thread.
static let synchronizerFoundTransactions = Notification.Name("synchronizerFoundTransactions")
/// Notification sent when the synchronizer fetched utxos from lightwalletd attempted to store them
/// Query the user info object for CompactBlockProcessorNotificationKey.blockProcessorStoredUTXOs which will contain a RefreshedUTXOs tuple with
/// the collection of UTXOs stored or skipped
static let synchronizerStoredUTXOs = Notification.Name(rawValue: "synchronizerStoredUTXOs")
/// Posted when the synchronizer presents an error
/// - Note: query userInfo on NotificationKeys.error for an error
static let synchronizerFailed = Notification.Name("SDKSynchronizerFailed")
extension Notification.Name {
static let synchronizerConnectionStateChanged = Notification.Name("SynchronizerConnectionStateChanged")
}
/// Synchronizer implementation for UIKit and iOS 13+
// swiftlint:disable type_body_length
public class SDKSynchronizer: Synchronizer {
public struct SynchronizerState: Equatable {
public var shieldedBalance: WalletBalance
public var transparentBalance: WalletBalance
public var syncStatus: SyncStatus
public var latestScannedHeight: BlockHeight
public static var zero: SynchronizerState {
SynchronizerState(
shieldedBalance: .zero,
transparentBalance: .zero,
syncStatus: .unprepared,
latestScannedHeight: .zero
)
}
}
public enum NotificationKeys {
public static let progress = "SDKSynchronizer.progress"
public static let blockHeight = "SDKSynchronizer.blockHeight"
public static let blockDate = "SDKSynchronizer.blockDate"
public static let minedTransaction = "SDKSynchronizer.minedTransaction"
public static let foundTransactions = "SDKSynchronizer.foundTransactions"
public static let error = "SDKSynchronizer.error"
public static let currentStatus = "SDKSynchronizer.currentStatus"
public static let nextStatus = "SDKSynchronizer.nextStatus"
public static let currentConnectionState = "SDKSynchronizer.currentConnectionState"
public static let previousConnectionState = "SDKSynchronizer.previousConnectionState"
public static let synchronizerState = "SDKSynchronizer.synchronizerState"
public static let refreshedUTXOs = "SDKSynchronizer.refreshedUTXOs"
}
private let streamsUpdateQueue = DispatchQueue(label: "streamsUpdateQueue")
private let stateSubject = CurrentValueSubject<SynchronizerState, Never>(.zero)
public var stateStream: AnyPublisher<SynchronizerState, Never> { stateSubject.eraseToAnyPublisher() }
public private(set) var latestState: SynchronizerState = .zero
private let eventSubject = PassthroughSubject<SynchronizerEvent, Never>()
public var eventStream: AnyPublisher<SynchronizerEvent, Never> { eventSubject.eraseToAnyPublisher() }
private let statusUpdateLock = NSRecursiveLock()
private var underlyingStatus: SyncStatus
public private(set) var status: SyncStatus {
var status: SyncStatus {
get {
statusUpdateLock.lock()
defer { statusUpdateLock.unlock() }
return underlyingStatus
}
set {
notifyStatusChange(newValue: newValue, oldValue: underlyingStatus)
statusUpdateLock.lock()
let oldValue = underlyingStatus
underlyingStatus = newValue
notify(oldStatus: oldValue, newStatus: newValue)
statusUpdateLock.unlock()
notify(status: status)
}
}
let blockProcessor: CompactBlockProcessor
let blockProcessorEventProcessingQueue = DispatchQueue(label: "blockProcessorEventProcessingQueue")
public private(set) var progress: Float = 0.0
public private(set) var initializer: Initializer
// Valid value is stored here after `prepare` is called.
public private(set) var latestScannedHeight: BlockHeight = .zero
public private(set) var connectionState: ConnectionState
public private(set) var network: ZcashNetwork
public var lastState: AnyPublisher<SynchronizerState, Never> { lastStateSubject.eraseToAnyPublisher() }
private var lastStateSubject: CurrentValueSubject<SynchronizerState, Never>
private var transactionManager: OutboundTransactionManager
private var transactionRepository: TransactionRepository
private var utxoRepository: UnspentTransactionOutputRepository
private let statusUpdateLock = NSRecursiveLock()
private var syncStartDate: Date?
private var longLivingCancelables: [AnyCancellable] = []
@ -165,7 +94,6 @@ public class SDKSynchronizer: Synchronizer {
self.utxoRepository = utxoRepository
self.blockProcessor = blockProcessor
self.network = initializer.network
self.lastStateSubject = CurrentValueSubject(.zero)
subscribeToProcessorNotifications(blockProcessor)
@ -192,10 +120,10 @@ public class SDKSynchronizer: Synchronizer {
return .seedRequired
}
self.status = .disconnected
latestScannedHeight = (try? transactionRepository.lastScannedHeight()) ?? initializer.walletBirthday
self.status = .disconnected
return .success
}
@ -216,17 +144,7 @@ public class SDKSynchronizer: Synchronizer {
case .stopped, .synced, .disconnected, .error:
Task {
let state = await snapshotState()
lastStateSubject.send(state)
NotificationSender.default.post(
name: .synchronizerStarted,
object: self,
userInfo: [
NotificationKeys.synchronizerState: state
]
)
status = .syncing(.nullProgress)
syncStartDate = Date()
await blockProcessor.start(retry: retry)
}
@ -271,6 +189,9 @@ public class SDKSynchronizer: Synchronizer {
}
connectionState = current
streamsUpdateQueue.async { [weak self] in
self?.eventSubject.send(.connectionStateChanged)
}
}
// MARK: Handle CompactBlockProcessor.Flow
@ -301,24 +222,23 @@ public class SDKSynchronizer: Synchronizer {
self?.storedUTXOs(utxos: utxos)
case .startedEnhancing:
self?.startedEnhancing()
self?.status = .enhancing(.zero)
case .startedFetching:
self?.startedFetching()
self?.status = .fetching
case .startedSyncing:
self?.startedSyncing()
self?.status = .syncing(.nullProgress)
case .stopped:
self?.stopped()
self?.status = .stopped
}
}
.store(in: &longLivingCancelables)
}
private func failed(error: CompactBlockProcessorError) {
self.notifyFailure(error)
self.status = .error(self.mapError(error))
status = .error(self.mapError(error))
}
private func finished(lastScannedHeight: BlockHeight, foundBlocks: Bool) {
@ -336,13 +256,9 @@ public class SDKSynchronizer: Synchronizer {
}
private func foundTransactions(transactions: [ZcashTransaction.Overview], in range: CompactBlockRange) {
NotificationSender.default.post(
name: .synchronizerFoundTransactions,
object: self,
userInfo: [
NotificationKeys.foundTransactions: transactions
]
)
streamsUpdateQueue.async { [weak self] in
self?.eventSubject.send(.foundTransactions(transactions, range))
}
}
private func handledReorg(reorgHeight: BlockHeight, rewindHeight: BlockHeight) {
@ -352,52 +268,24 @@ public class SDKSynchronizer: Synchronizer {
try transactionManager.handleReorg(at: rewindHeight)
} catch {
LoggerProxy.debug("error handling reorg: \(error)")
notifyFailure(error)
}
}
private func progressUpdated(progress: CompactBlockProgress) {
self.notify(progress: progress)
switch progress {
case let .syncing(progress):
status = .syncing(progress)
case let .enhance(progress):
status = .enhancing(progress)
case .fetch:
status = .fetching
}
}
private func storedUTXOs(utxos: (inserted: [UnspentTransactionOutputEntity], skipped: [UnspentTransactionOutputEntity])) {
NotificationSender.default.post(
name: .synchronizerStoredUTXOs,
object: self,
userInfo: [NotificationKeys.refreshedUTXOs: utxos]
)
streamsUpdateQueue.async { [weak self] in
self?.eventSubject.send(.storedUTXOs(utxos.inserted, utxos.skipped))
}
private func startedEnhancing() {
statusUpdateLock.lock()
defer { statusUpdateLock.unlock() }
guard status != .enhancing(NullEnhancementProgress()) else { return }
status = .enhancing(NullEnhancementProgress())
}
private func startedFetching() {
statusUpdateLock.lock()
defer { statusUpdateLock.unlock() }
guard status != .fetching else { return }
status = .fetching
}
private func startedSyncing() {
statusUpdateLock.lock()
defer { statusUpdateLock.unlock() }
guard status != .syncing(.nullProgress) else { return }
status = .syncing(.nullProgress)
}
private func stopped() {
statusUpdateLock.lock()
defer { statusUpdateLock.unlock() }
guard status != .stopped else { return }
status = .stopped
}
// MARK: Synchronizer methods
@ -689,28 +577,8 @@ public class SDKSynchronizer: Synchronizer {
}
// MARK: notify state
private func notify(progress: CompactBlockProgress) {
var userInfo: [AnyHashable: Any] = .init()
userInfo[NotificationKeys.progress] = progress
userInfo[NotificationKeys.blockHeight] = progress.progressHeight
self.status = SyncStatus(progress)
NotificationSender.default.post(name: Notification.Name.synchronizerProgressUpdated, object: self, userInfo: userInfo)
}
private func notifyStatusChange(newValue: SyncStatus, oldValue: SyncStatus) {
NotificationSender.default.post(
name: .synchronizerStatusWillUpdate,
object: self,
userInfo:
[
NotificationKeys.currentStatus: oldValue,
NotificationKeys.nextStatus: newValue
]
)
}
private func snapshotState() async -> SDKSynchronizer.SynchronizerState {
private func snapshotState(status: SyncStatus) async -> SynchronizerState {
SynchronizerState(
shieldedBalance: WalletBalance(
verified: initializer.getVerifiedBalance(),
@ -722,38 +590,55 @@ public class SDKSynchronizer: Synchronizer {
)
}
private func notify(status: SyncStatus) {
switch status {
case .disconnected:
NotificationSender.default.post(name: Notification.Name.synchronizerDisconnected, object: self)
case .stopped:
NotificationSender.default.post(name: Notification.Name.synchronizerStopped, object: self)
case .synced:
Task {
let state = await self.snapshotState()
self.lastStateSubject.send(state)
private func notify(oldStatus: SyncStatus, newStatus: SyncStatus) {
guard oldStatus != newStatus else { return }
NotificationSender.default.post(
name: Notification.Name.synchronizerSynced,
object: self,
userInfo: [
SDKSynchronizer.NotificationKeys.blockHeight: self.latestScannedHeight,
SDKSynchronizer.NotificationKeys.synchronizerState: state
]
// When the wipe happens status is switched to `unprepared`. And we expect that everything is deleted. All the databases including data DB.
// When new snapshot is created balance is checked. And when balance is checked and data DB doesn't exist then rust initialise new database.
// So it's necessary to not create new snapshot after status is switched to `unprepared` otherwise data DB exists after wipe
if newStatus == .unprepared {
latestState = SynchronizerState.zero
updateStateStream(with: latestState)
} else {
let didStatusChange = areTwoStatusesDifferent(firstStatus: oldStatus, secondStatus: newStatus)
if didStatusChange {
Task {
latestState = await snapshotState(status: newStatus)
updateStateStream(with: latestState)
}
} else {
latestState = SynchronizerState(
shieldedBalance: latestState.shieldedBalance,
transparentBalance: latestState.transparentBalance,
syncStatus: newStatus,
latestScannedHeight: latestState.latestScannedHeight
)
}
case .unprepared:
break
case .syncing:
NotificationSender.default.post(name: Notification.Name.synchronizerSyncing, object: self)
case .enhancing:
NotificationSender.default.post(name: Notification.Name.synchronizerEnhancing, object: self)
case .fetching:
NotificationSender.default.post(name: Notification.Name.synchronizerFetching, object: self)
case .error(let error):
self.notifyFailure(error)
updateStateStream(with: latestState)
}
}
}
private func areTwoStatusesDifferent(firstStatus: SyncStatus, secondStatus: SyncStatus) -> Bool {
switch (firstStatus, secondStatus) {
case (.unprepared, .unprepared): return false
case (.syncing, .syncing): return false
case (.enhancing, .enhancing): return false
case (.fetching, .fetching): return false
case (.synced, .synced): return false
case (.stopped, .stopped): return false
case (.disconnected, .disconnected): return false
case (.error, .error): return false
default: return true
}
}
private func updateStateStream(with newState: SynchronizerState) {
streamsUpdateQueue.async { [weak self] in
self?.stateSubject.send(newState)
}
}
// MARK: book keeping
private func updateMinedTransactions() throws {
@ -788,19 +673,13 @@ public class SDKSynchronizer: Synchronizer {
}
private func notifyMinedTransaction(_ transaction: PendingTransactionEntity) {
DispatchQueue.main.async { [weak self] in
guard let self else { return }
NotificationSender.default.post(
name: Notification.Name.synchronizerMinedTransaction,
object: self,
userInfo: [NotificationKeys.minedTransaction: transaction]
)
streamsUpdateQueue.async { [weak self] in
self?.eventSubject.send(.minedTransaction(transaction))
}
}
// swiftlint:disable cyclomatic_complexity
private func mapError(_ error: Error) -> Error {
private func mapError(_ error: Error) -> SynchronizerError {
if let compactBlockProcessorError = error as? CompactBlockProcessorError {
switch compactBlockProcessorError {
case .dataDbInitFailed(let path):
@ -838,14 +717,6 @@ public class SDKSynchronizer: Synchronizer {
return SynchronizerError.uncategorized(underlyingError: error)
}
private func notifyFailure(_ error: Error) {
NotificationSender.default.post(
name: Notification.Name.synchronizerFailed,
object: self,
userInfo: [NotificationKeys.error: self.mapError(error)]
)
}
}
extension SDKSynchronizer {
@ -879,10 +750,3 @@ extension SDKSynchronizer {
self.getUnifiedAddress(accountIndex: accountIndex)?.transparentReceiver()
}
}
private struct NullEnhancementProgress: EnhancementProgress {
var totalTransactions: Int { 0 }
var enhancedTransactions: Int { 0 }
var lastFoundTransaction: ZcashTransaction.Overview? { nil }
var range: CompactBlockRange { 0 ... 0 }
}

View File

@ -5,6 +5,7 @@
// Created by Francisco Gindre on 4/28/20.
//
import Combine
import XCTest
@testable import TestUtils
@testable import ZcashLightClientKit
@ -20,6 +21,7 @@ class BalanceTests: XCTestCase {
var sentTransactionExpectation = XCTestExpectation(description: "sent")
var syncedExpectation = XCTestExpectation(description: "synced")
var coordinator: TestCoordinator!
var cancellables: [AnyCancellable] = []
override func setUpWithError() throws {
try super.setUpWithError()
@ -39,6 +41,7 @@ class BalanceTests: XCTestCase {
try? FileManager.default.removeItem(at: coordinator.databases.dataDB)
try? FileManager.default.removeItem(at: coordinator.databases.pendingDB)
coordinator = nil
cancellables = []
}
/**
@ -1188,34 +1191,39 @@ class BalanceTests: XCTestCase {
class SDKSynchonizerListener {
var transactionsFound: (([ZcashTransaction.Overview]) -> Void)?
var synchronizerMinedTransaction: ((PendingTransactionEntity) -> Void)?
var cancellables: [AnyCancellable] = []
func subscribeToSynchronizer(_ synchronizer: SDKSynchronizer) {
NotificationCenter.default.addObserver(self, selector: #selector(txFound(_:)), name: .synchronizerFoundTransactions, object: synchronizer)
NotificationCenter.default.addObserver(self, selector: #selector(txMined(_:)), name: .synchronizerMinedTransaction, object: synchronizer)
synchronizer.eventStream
.sink(
receiveValue: { [weak self] event in
switch event {
case let .minedTransaction(transaction):
self?.txMined(transaction)
case let .foundTransactions(transactions, _):
self?.txFound(transactions)
case .storedUTXOs, .connectionStateChanged:
break
}
}
)
.store(in: &cancellables)
}
func unsubscribe() {
NotificationCenter.default.removeObserver(self)
}
@objc func txFound(_ notification: Notification) {
func txFound(_ txs: [ZcashTransaction.Overview]) {
DispatchQueue.main.async { [weak self] in
guard let txs = notification.userInfo?[SDKSynchronizer.NotificationKeys.foundTransactions] as? [ZcashTransaction.Overview] else {
XCTFail("expected [ConfirmedTransactionEntity] array")
return
}
self?.transactionsFound?(txs)
}
}
@objc func txMined(_ notification: Notification) {
func txMined(_ transaction: PendingTransactionEntity) {
DispatchQueue.main.async { [weak self] in
guard let transaction = notification.userInfo?[SDKSynchronizer.NotificationKeys.minedTransaction] as? PendingTransactionEntity else {
XCTFail("expected transaction")
return
}
self?.synchronizerMinedTransaction?(transaction)
}
}

View File

@ -4,6 +4,8 @@
//
// Created by Francisco Gindre on 1/26/23.
//
import Combine
import XCTest
@testable import TestUtils
@testable import ZcashLightClientKit
@ -20,6 +22,7 @@ final class InternalStateConsistencyTests: XCTestCase {
let branchID = "2bb40e60"
let chainName = "main"
let network = DarksideWalletDNetwork()
var sdkSynchronizerSyncStatusHandler: SDKSynchronizerSyncStatusHandler! = SDKSynchronizerSyncStatusHandler()
override func setUpWithError() throws {
try super.setUpWithError()
@ -38,10 +41,14 @@ final class InternalStateConsistencyTests: XCTestCase {
try? FileManager.default.removeItem(at: coordinator.databases.dataDB)
try? FileManager.default.removeItem(at: coordinator.databases.pendingDB)
coordinator = nil
sdkSynchronizerSyncStatusHandler = nil
}
@MainActor func testInternalStateIsConsistentWhenMigrating() async throws {
NotificationCenter.default.addObserver(self, selector: #selector(self.synchronizerStopped(_:)), name: .synchronizerStopped, object: nil)
sdkSynchronizerSyncStatusHandler.subscribe(
to: coordinator.synchronizer.stateStream,
expectations: [.stopped: firstSyncExpectation]
)
let fullSyncLength = 1000
try FakeChainBuilder.buildChain(darksideWallet: coordinator.service, branchID: branchID, chainName: chainName, length: fullSyncLength)
@ -126,10 +133,6 @@ final class InternalStateConsistencyTests: XCTestCase {
wait(for: [secondSyncAttemptExpectation], timeout: 10)
}
@objc func synchronizerStopped(_ notification: Notification) {
self.firstSyncExpectation.fulfill()
}
func handleError(_ error: Error?) {
guard let testError = error else {
XCTFail("failed with nil error")

View File

@ -50,12 +50,14 @@ class SychronizerDarksideTests: XCTestCase {
}
func testFoundTransactions() throws {
NotificationCenter.default.addObserver(
self,
selector: #selector(handleFoundTransactions(_:)),
name: Notification.Name.synchronizerFoundTransactions,
object: nil
)
coordinator.synchronizer.eventStream
.map { event in
guard case let .foundTransactions(transactions, _) = event else { return nil }
return transactions
}
.compactMap { $0 }
.sink(receiveValue: { [weak self] transactions in self?.handleFoundTransactions(transactions: transactions) })
.store(in: &cancellables)
try FakeChainBuilder.buildChain(darksideWallet: self.coordinator.service, branchID: branchID, chainName: chainName)
let receivedTxHeight: BlockHeight = 663188
@ -75,12 +77,14 @@ class SychronizerDarksideTests: XCTestCase {
}
func testFoundManyTransactions() throws {
NotificationCenter.default.addObserver(
self,
selector: #selector(handleFoundTransactions(_:)),
name: Notification.Name.synchronizerFoundTransactions,
object: nil
)
coordinator.synchronizer.eventStream
.map { event in
guard case let .foundTransactions(transactions, _) = event else { return nil }
return transactions
}
.compactMap { $0 }
.sink(receiveValue: { [weak self] transactions in self?.handleFoundTransactions(transactions: transactions) })
.store(in: &cancellables)
try FakeChainBuilder.buildChain(darksideWallet: self.coordinator.service, branchID: branchID, chainName: chainName, length: 1000)
let receivedTxHeight: BlockHeight = 663229
@ -128,9 +132,9 @@ class SychronizerDarksideTests: XCTestCase {
}
func testLastStates() throws {
var disposeBag: [AnyCancellable] = []
var cancellables: [AnyCancellable] = []
var states: [SDKSynchronizer.SynchronizerState] = []
var states: [SynchronizerState] = []
try FakeChainBuilder.buildChain(darksideWallet: self.coordinator.service, branchID: branchID, chainName: chainName)
let receivedTxHeight: BlockHeight = 663188
@ -140,12 +144,11 @@ class SychronizerDarksideTests: XCTestCase {
sleep(2)
let preTxExpectation = XCTestExpectation(description: "pre receive")
coordinator.synchronizer.lastState
.receive(on: DispatchQueue.main)
coordinator.synchronizer.stateStream
.sink { state in
states.append(state)
}
.store(in: &disposeBag)
.store(in: &cancellables)
try coordinator.sync(completion: { _ in
preTxExpectation.fulfill()
@ -153,26 +156,102 @@ class SychronizerDarksideTests: XCTestCase {
wait(for: [preTxExpectation], timeout: 5)
XCTAssertEqual(states, [
SDKSynchronizer.SynchronizerState(
let expectedStates: [SynchronizerState] = [
SynchronizerState(
shieldedBalance: .zero,
transparentBalance: .zero,
syncStatus: .unprepared,
latestScannedHeight: .zero
),
SDKSynchronizer.SynchronizerState(
shieldedBalance: WalletBalance(verified: Zatoshi(0), total: Zatoshi(0)),
transparentBalance: WalletBalance(verified: Zatoshi(0), total: Zatoshi(0)),
syncStatus: SyncStatus.disconnected,
syncStatus: .disconnected,
latestScannedHeight: 663150
),
SDKSynchronizer.SynchronizerState(
SynchronizerState(
shieldedBalance: .zero,
transparentBalance: .zero,
syncStatus: .syncing(BlockProgress(startHeight: 0, targetHeight: 0, progressHeight: 0)),
latestScannedHeight: 663150
),
SynchronizerState(
shieldedBalance: .zero,
transparentBalance: .zero,
syncStatus: .syncing(BlockProgress(startHeight: 663150, targetHeight: 663189, progressHeight: 663189)),
latestScannedHeight: 663150
),
SynchronizerState(
shieldedBalance: WalletBalance(verified: Zatoshi(100000), total: Zatoshi(200000)),
transparentBalance: WalletBalance(verified: Zatoshi(0), total: Zatoshi(0)),
syncStatus: SyncStatus.synced,
syncStatus: .enhancing(EnhancementProgress(totalTransactions: 0, enhancedTransactions: 0, lastFoundTransaction: nil, range: 0...0)),
latestScannedHeight: 663150
),
SynchronizerState(
shieldedBalance: WalletBalance(verified: Zatoshi(100000), total: Zatoshi(200000)),
transparentBalance: WalletBalance(verified: Zatoshi(0), total: Zatoshi(0)),
syncStatus: .enhancing(
EnhancementProgress(
totalTransactions: 2,
enhancedTransactions: 1,
lastFoundTransaction: ZcashTransaction.Overview(
blockTime: 1.0,
expiryHeight: 663206,
fee: Zatoshi(0),
id: 2,
index: 1,
isWalletInternal: true,
hasChange: false,
memoCount: 1,
minedHeight: 663188,
raw: Data(),
rawID: Data(),
receivedNoteCount: 1,
sentNoteCount: 0,
value: Zatoshi(100000)
),
range: 663150...663189
)
),
latestScannedHeight: 663150
),
SynchronizerState(
shieldedBalance: WalletBalance(verified: Zatoshi(100000), total: Zatoshi(200000)),
transparentBalance: WalletBalance(verified: Zatoshi(0), total: Zatoshi(0)),
syncStatus: .enhancing(
EnhancementProgress(
totalTransactions: 2,
enhancedTransactions: 2,
lastFoundTransaction: ZcashTransaction.Overview(
blockTime: 1.0,
expiryHeight: 663192,
fee: Zatoshi(0),
id: 1,
index: 1,
isWalletInternal: true,
hasChange: false,
memoCount: 1,
minedHeight: 663174,
raw: Data(),
rawID: Data(),
receivedNoteCount: 1,
sentNoteCount: 0,
value: Zatoshi(100000)
),
range: 663150...663189
)
),
latestScannedHeight: 663150
),
SynchronizerState(
shieldedBalance: WalletBalance(verified: Zatoshi(100000), total: Zatoshi(200000)),
transparentBalance: WalletBalance(verified: Zatoshi(0), total: Zatoshi(0)),
syncStatus: .fetching,
latestScannedHeight: 663150
),
SynchronizerState(
shieldedBalance: WalletBalance(verified: Zatoshi(100000), total: Zatoshi(200000)),
transparentBalance: WalletBalance(verified: Zatoshi(0), total: Zatoshi(0)),
syncStatus: .synced,
latestScannedHeight: 663189
)
])
]
XCTAssertEqual(states, expectedStates)
}
@MainActor func testSyncAfterWipeWorks() async throws {
@ -229,13 +308,7 @@ class SychronizerDarksideTests: XCTestCase {
wait(for: [secondSyncExpectation], timeout: 10)
}
@objc func handleFoundTransactions(_ notification: Notification) {
guard
let userInfo = notification.userInfo,
let transactions = userInfo[SDKSynchronizer.NotificationKeys.foundTransactions] as? [ZcashTransaction.Overview]
else {
return
}
func handleFoundTransactions(transactions: [ZcashTransaction.Overview]) {
self.foundTransactions.append(contentsOf: transactions)
}

View File

@ -22,6 +22,7 @@ final class SynchronizerTests: XCTestCase {
let chainName = "main"
let network = DarksideWalletDNetwork()
var cancellables: [AnyCancellable] = []
var sdkSynchronizerSyncStatusHandler: SDKSynchronizerSyncStatusHandler! = SDKSynchronizerSyncStatusHandler()
override func setUpWithError() throws {
try super.setUpWithError()
@ -52,6 +53,7 @@ final class SynchronizerTests: XCTestCase {
try? FileManager.default.removeItem(at: coordinator.databases.dataDB)
try? FileManager.default.removeItem(at: coordinator.databases.pendingDB)
coordinator = nil
sdkSynchronizerSyncStatusHandler = nil
cancellables = []
}
@ -77,7 +79,10 @@ final class SynchronizerTests: XCTestCase {
sleep(10)
let syncStoppedExpectation = XCTestExpectation(description: "SynchronizerStopped Expectation")
syncStoppedExpectation.subscribe(to: .synchronizerStopped, object: nil)
sdkSynchronizerSyncStatusHandler.subscribe(
to: coordinator.synchronizer.stateStream,
expectations: [.stopped: syncStoppedExpectation]
)
/*
sync to latest height
@ -227,10 +232,11 @@ final class SynchronizerTests: XCTestCase {
private func checkThatWipeWorked() async {
let storage = await self.coordinator.synchronizer.blockProcessor.storage as! FSCompactBlockRepository
let fm = FileManager.default
XCTAssertFalse(fm.fileExists(atPath: coordinator.synchronizer.initializer.dataDbURL.path))
XCTAssertFalse(fm.fileExists(atPath: coordinator.synchronizer.initializer.pendingDbURL.path))
XCTAssertTrue(fm.fileExists(atPath: storage.blocksDirectory.path))
XCTAssertEqual(try fm.contentsOfDirectory(atPath: storage.blocksDirectory.path), [])
print(coordinator.synchronizer.initializer.dataDbURL.path)
XCTAssertFalse(fm.fileExists(atPath: coordinator.synchronizer.initializer.pendingDbURL.path), "Pending DB should be deleted")
XCTAssertFalse(fm.fileExists(atPath: coordinator.synchronizer.initializer.dataDbURL.path), "Data DB should be deleted.")
XCTAssertTrue(fm.fileExists(atPath: storage.blocksDirectory.path), "FS Cache directory should exist")
XCTAssertEqual(try fm.contentsOfDirectory(atPath: storage.blocksDirectory.path), [], "FS Cache directory should be empty")
let internalSyncProgress = InternalSyncProgress(storage: UserDefaults.standard)
@ -238,14 +244,14 @@ final class SynchronizerTests: XCTestCase {
let latestEnhancedHeight = await internalSyncProgress.load(.latestEnhancedHeight)
let latestUTXOFetchedHeight = await internalSyncProgress.load(.latestUTXOFetchedHeight)
XCTAssertEqual(latestDownloadedBlockHeight, 0)
XCTAssertEqual(latestEnhancedHeight, 0)
XCTAssertEqual(latestUTXOFetchedHeight, 0)
XCTAssertEqual(latestDownloadedBlockHeight, 0, "internalSyncProgress latestDownloadedBlockHeight should be 0")
XCTAssertEqual(latestEnhancedHeight, 0, "internalSyncProgress latestEnhancedHeight should be 0")
XCTAssertEqual(latestUTXOFetchedHeight, 0, "internalSyncProgress latestUTXOFetchedHeight should be 0")
let blockProcessorState = await coordinator.synchronizer.blockProcessor.state
XCTAssertEqual(blockProcessorState, .stopped)
XCTAssertEqual(blockProcessorState, .stopped, "CompactBlockProcessor state should be stopped")
XCTAssertEqual(coordinator.synchronizer.status, .unprepared)
XCTAssertEqual(coordinator.synchronizer.status, .unprepared, "SDKSynchronizer state should be unprepared")
}
func handleError(_ error: Error?) {

View File

@ -5,6 +5,7 @@
// Created by Francisco Gindre on 8/4/21.
//
import Combine
import XCTest
@testable import TestUtils
@testable import ZcashLightClientKit
@ -20,6 +21,7 @@ class Z2TReceiveTests: XCTestCase {
var foundTransactionsExpectation = XCTestExpectation(description: "found transactions")
let branchID = "2bb40e60"
let chainName = "main"
var cancellables: [AnyCancellable] = []
let network = DarksideWalletDNetwork()
@ -41,23 +43,17 @@ class Z2TReceiveTests: XCTestCase {
try? FileManager.default.removeItem(at: coordinator.databases.dataDB)
try? FileManager.default.removeItem(at: coordinator.databases.pendingDB)
coordinator = nil
cancellables = []
}
func subscribeToFoundTransactions() {
NotificationCenter.default.addObserver(
self,
selector: #selector(foundTransactions(_:)),
name: .synchronizerFoundTransactions,
object: nil
)
coordinator.synchronizer.eventStream
.filter { event in
guard case .foundTransactions = event else { return false }
return true
}
@objc func foundTransactions(_ notification: Notification) {
guard notification.userInfo?[SDKSynchronizer.NotificationKeys.foundTransactions] != nil else {
XCTFail("found transactions notification is empty")
return
}
self.foundTransactionsExpectation.fulfill()
.sink(receiveValue: { [weak self] _ in self?.self.foundTransactionsExpectation.fulfill() })
.store(in: &cancellables)
}
func testSendingZ2TWithMemoFails() async throws {

View File

@ -5,6 +5,7 @@
// Created by Lukáš Korba on 13.12.2022.
//
import Combine
import XCTest
@testable import ZcashLightClientKit
@testable import TestUtils
@ -23,6 +24,8 @@ class SynchronizerTests: XCTestCase {
}
var coordinator: TestCoordinator!
var cancellables: [AnyCancellable] = []
var sdkSynchronizerSyncStatusHandler: SDKSynchronizerSyncStatusHandler! = SDKSynchronizerSyncStatusHandler()
let seedPhrase = """
wish puppy smile loan doll curve hole maze file ginger hair nose key relax knife witness cannon grab despair throw review deal slush frame
@ -30,6 +33,13 @@ class SynchronizerTests: XCTestCase {
var birthday: BlockHeight = 1_730_000
override func tearDown() {
super.tearDown()
coordinator = nil
cancellables = []
sdkSynchronizerSyncStatusHandler = nil
}
@MainActor
func testHundredBlocksSync() async throws {
let derivationTool = DerivationTool(networkType: .mainnet)
@ -71,7 +81,7 @@ class SynchronizerTests: XCTestCase {
_ = try synchronizer.prepare(with: seedBytes, viewingKeys: [ufvk], walletBirthday: birthday)
let syncSyncedExpectation = XCTestExpectation(description: "synchronizerSynced Expectation")
syncSyncedExpectation.subscribe(to: .synchronizerSynced, object: nil)
sdkSynchronizerSyncStatusHandler.subscribe(to: synchronizer.stateStream, expectations: [.synced: syncSyncedExpectation])
let internalSyncProgress = InternalSyncProgress(storage: UserDefaults.standard)
await internalSyncProgress.rewind(to: birthday)

View File

@ -0,0 +1,50 @@
//
// SDKSynchronizerStateHandler.swift
//
//
// Created by Michal Fousek on 15.03.2023.
//
import Combine
import Foundation
import XCTest
@testable import ZcashLightClientKit
class SDKSynchronizerSyncStatusHandler {
enum StatusIdentifier: String {
case unprepared
case syncing
case enhancing
case fetching
case synced
case stopped
case disconnected
case error
}
private let queue = DispatchQueue(label: "SDKSynchronizerSyncStatusHandler")
private var cancellables: [AnyCancellable] = []
func subscribe(to stateStream: AnyPublisher<SynchronizerState, Never>, expectations: [StatusIdentifier: XCTestExpectation]) {
stateStream
.receive(on: queue)
.map { $0.syncStatus }
.sink { status in expectations[status.identifier]?.fulfill() }
.store(in: &cancellables)
}
}
extension SyncStatus {
var identifier: SDKSynchronizerSyncStatusHandler.StatusIdentifier {
switch self {
case .unprepared: return .unprepared
case .syncing: return .syncing
case .enhancing: return .enhancing
case .fetching: return .fetching
case .synced: return .synced
case .stopped: return .stopped
case .disconnected: return .disconnected
case .error: return .error
}
}
}

View File

@ -5,6 +5,7 @@
// Created by Francisco Gindre on 4/29/20.
//
import Combine
import Foundation
import XCTest
@testable import ZcashLightClientKit
@ -33,6 +34,7 @@ class TestCoordinator {
case url(urlString: String, startHeigth: BlockHeight)
}
var cancellables: [AnyCancellable] = []
var completionHandler: ((SDKSynchronizer) throws -> Void)?
var errorHandler: ((Error?) -> Void)?
var spendingKey: UnifiedSpendingKey
@ -120,12 +122,17 @@ class TestCoordinator {
)
self.synchronizer = synchronizer
subscribeToNotifications(synchronizer: self.synchronizer)
subscribeToState(synchronizer: self.synchronizer)
if case .seedRequired = try prepare(seed: Environment.seedBytes) {
throw TestCoordinator.CoordinatorError.seedRequiredForMigration
}
}
deinit {
cancellables.forEach { $0.cancel() }
cancellables = []
}
func prepare(seed: [UInt8]) throws -> Initializer.InitializationResult {
return try synchronizer.prepare(with: seed, viewingKeys: [viewingKey], walletBirthday: self.birthday)
}
@ -160,17 +167,29 @@ class TestCoordinator {
// MARK: notifications
func subscribeToNotifications(synchronizer: Synchronizer) {
NotificationCenter.default.addObserver(self, selector: #selector(synchronizerFailed(_:)), name: .synchronizerFailed, object: synchronizer)
NotificationCenter.default.addObserver(self, selector: #selector(synchronizerSynced(_:)), name: .synchronizerSynced, object: synchronizer)
func subscribeToState(synchronizer: Synchronizer) {
synchronizer.stateStream
.sink(
receiveValue: { [weak self] state in
switch state.syncStatus {
case let .error(error):
self?.synchronizerFailed(error: error)
case .synced:
try! self?.synchronizerSynced()
default:
break
}
}
)
.store(in: &cancellables)
}
@objc func synchronizerFailed(_ notification: Notification) {
self.errorHandler?(notification.userInfo?[SDKSynchronizer.NotificationKeys.error] as? Error)
func synchronizerFailed(error: Error) {
self.errorHandler?(error)
}
@objc func synchronizerSynced(_ notification: Notification) throws {
if case .stopped = self.synchronizer.status {
func synchronizerSynced() throws {
if case .stopped = self.synchronizer.latestState.syncStatus {
LoggerProxy.debug("WARNING: notification received after synchronizer was stopped")
return
}

View File

@ -6,6 +6,7 @@
// Copyright © 2019 Electric Coin Company. All rights reserved.
//
import Combine
import Foundation
import GRPC
import ZcashLightClientKit
@ -79,20 +80,6 @@ enum MockDbInit {
}
}
extension XCTestExpectation {
func subscribe(to notification: Notification.Name, object: Any?) {
NotificationCenter.default.addObserver(self, selector: #selector(fulfill), name: notification, object: object)
}
func unsubscribe(from notification: Notification.Name) {
NotificationCenter.default.removeObserver(self, name: notification, object: nil)
}
func unsubscribeFromNotifications() {
NotificationCenter.default.removeObserver(self)
}
}
func __documentsDirectory() throws -> URL {
try FileManager.default.url(for: .documentDirectory, in: .userDomainMask, appropriateFor: nil, create: true)
}