[#764] Stop using Notifications inside the SDK

Closes #764

- All notification that were previously sent from CompactBlockProcessor are now gone.
- `CompactBlockProcessor` now provides `eventStream` to communicate with `SDKSynchronizer`.
- Added `synchronizerStoredUTXOs` notification.
- Tests are updated to not use notifications anymore. Some tests there
  weren't working before are now fixed.
- Added `CompactBlockProcessorEventHandler` utility class. Previously
  expectations were subscribing to notifications. This class replaces
  this functionality. It subscribes to events from
  `CompactBlockProcessor` and fullfill expectations.
This commit is contained in:
Michal Fousek 2023-02-07 14:22:28 +01:00
parent e83bec6db7
commit 1ec12269ae
20 changed files with 448 additions and 565 deletions

View File

@ -27,6 +27,7 @@ disabled_rules:
- nesting # allow for types to be nested, common pattern in Swift
- multiple_closures_with_trailing_closure
- generic_type_name # allow for arbitrarily long generic type names
- redundant_void_return
opt_in_rules:
- mark

View File

@ -1,10 +1,18 @@
# Unreleased
- [#764] Refactor communication between components inside th SDK
This is mostly an internal change. A consequence of this change is that all the notifications
delivered via `NotificationCenter` with the prefix `blockProcessor` are now gone. If affected
notifications were used in your code use notifications with the prefix `synchronizer` now.
These notifications are defined in `SDKSynchronizer.swift`.
- [#759] Remove Jazz-generated HTML docs
We remove these documents since they are outdated and we rely on the docs in the
We remove these documents since they are outdated and we rely on the docs in the
code itself.
- [#726] Modularize GRPC layer
This is mostly internal change. `LightWalletService` is no longer public. If it
is used in your code replace it by using `SDKSynchronizer` API.

View File

@ -72,7 +72,7 @@ class SyncBlocksViewController: UIViewController {
.store(in: &notificationCancellables)
}
NotificationCenter.default.publisher(for: .blockProcessorStartedEnhancing, object: nil)
NotificationCenter.default.publisher(for: .synchronizerEnhancing, object: nil)
.receive(on: DispatchQueue.main)
.sink { [weak self] _ in
self?.accumulateMetrics()
@ -82,7 +82,7 @@ class SyncBlocksViewController: UIViewController {
}
.store(in: &notificationCancellables)
NotificationCenter.default.publisher(for: .blockProcessorUpdated, object: nil)
NotificationCenter.default.publisher(for: .synchronizerProgressUpdated, object: nil)
.throttle(for: 5, scheduler: DispatchQueue.main, latest: true)
.receive(on: DispatchQueue.main)
.map { [weak self] _ -> SDKMetrics.BlockMetricReport? in
@ -94,7 +94,7 @@ class SyncBlocksViewController: UIViewController {
}
.store(in: &notificationCancellables)
NotificationCenter.default.publisher(for: .blockProcessorFinished, object: nil)
NotificationCenter.default.publisher(for: .synchronizerSynced, object: nil)
.receive(on: DispatchQueue.main)
.delay(for: 0.5, scheduler: DispatchQueue.main)
.sink { [weak self] _ in

View File

@ -8,6 +8,7 @@
// swiftlint:disable file_length type_body_length
import Foundation
import Combine
public typealias RefreshedUTXOs = (inserted: [UnspentTransactionOutputEntity], skipped: [UnspentTransactionOutputEntity])
@ -34,28 +35,6 @@ public enum CompactBlockProcessorError: Error {
case wipeAttemptWhileProcessing
}
/**
CompactBlockProcessor notification userInfo object keys.
check Notification.Name extensions for more details.
*/
public enum CompactBlockProcessorNotificationKey {
public static let progress = "CompactBlockProcessorNotificationKey.progress"
public static let progressBlockTime = "CompactBlockProcessorNotificationKey.progressBlockTime"
public static let reorgHeight = "CompactBlockProcessorNotificationKey.reorgHeight"
public static let latestScannedBlockHeight = "CompactBlockProcessorNotificationKey.latestScannedBlockHeight"
public static let rewindHeight = "CompactBlockProcessorNotificationKey.rewindHeight"
public static let foundTransactions = "CompactBlockProcessorNotificationKey.foundTransactions"
public static let foundBlocks = "CompactBlockProcessorNotificationKey.foundBlocks"
public static let foundTransactionsRange = "CompactBlockProcessorNotificationKey.foundTransactionsRange"
public static let error = "error"
public static let refreshedUTXOs = "CompactBlockProcessorNotificationKey.refreshedUTXOs"
public static let enhancementProgress = "CompactBlockProcessorNotificationKey.enhancementProgress"
public static let previousStatus = "CompactBlockProcessorNotificationKey.previousStatus"
public static let newStatus = "CompactBlockProcessorNotificationKey.newStatus"
public static let currentConnectivityStatus = "CompactBlockProcessorNotificationKey.currentConnectivityStatus"
public static let previousConnectivityStatus = "CompactBlockProcessorNotificationKey.previousConnectivityStatus"
}
public enum CompactBlockProgress {
case syncing(_ progress: BlockProgress)
case enhance(_ progress: EnhancementStreamProgress)
@ -101,10 +80,6 @@ public enum CompactBlockProgress {
}
}
protocol EnhancementStreamDelegate: AnyObject {
func transactionEnhancementProgressUpdated(_ progress: EnhancementProgress) async
}
public protocol EnhancementProgress {
var totalTransactions: Int { get }
var enhancedTransactions: Int { get }
@ -123,87 +98,43 @@ public struct EnhancementStreamProgress: EnhancementProgress {
}
}
public extension Notification.Name {
/**
Processing progress update
Query the userInfo object for the key CompactBlockProcessorNotificationKey.progress for a CompactBlockProgress struct
*/
static let blockProcessorUpdated = Notification.Name(rawValue: "CompactBlockProcessorUpdated")
/**
notification sent when processor status changed
*/
static let blockProcessorStatusChanged = Notification.Name(rawValue: "CompactBlockProcessorStatusChanged")
/**
Notification sent when a compact block processor starts syncing
*/
static let blockProcessorStartedSyncing = Notification.Name(rawValue: "CompactBlockProcessorStartedSyncing")
/**
Notification sent when the compact block processor stop() method is called
*/
static let blockProcessorStopped = Notification.Name(rawValue: "CompactBlockProcessorStopped")
/**
Notification sent when the compact block processor presented an error.
Query userInfo object on the key CompactBlockProcessorNotificationKey.error
*/
static let blockProcessorFailed = Notification.Name(rawValue: "CompactBlockProcessorFailed")
/**
Notification sent when the compact block processor has finished syncing the blockchain to latest height
*/
static let blockProcessorFinished = Notification.Name(rawValue: "CompactBlockProcessorFinished")
/**
Notification sent when the compact block processor is doing nothing
*/
static let blockProcessorIdle = Notification.Name(rawValue: "CompactBlockProcessorIdle")
/**
Notification sent when something odd happened. probably going from a state to another state that shouldn't be the next state.
*/
static let blockProcessorUnknownTransition = Notification.Name(rawValue: "CompactBlockProcessorTransitionUnknown")
/**
Notification sent when the compact block processor handled a ReOrg.
Query the userInfo object on the key CompactBlockProcessorNotificationKey.reorgHeight for the height on which the reorg was detected. CompactBlockProcessorNotificationKey.rewindHeight for the height that the processor backed to in order to solve the Reorg
*/
static let blockProcessorHandledReOrg = Notification.Name(rawValue: "CompactBlockProcessorHandledReOrg")
/**
Notification sent when the compact block processor enhanced a bunch of transactions
Query the user info object for CompactBlockProcessorNotificationKey.foundTransactions which will contain an [ConfirmedTransactionEntity] Array with the found transactions and CompactBlockProcessorNotificationKey.foundTransactionsrange
*/
static let blockProcessorFoundTransactions = Notification.Name(rawValue: "CompactBlockProcessorFoundTransactions")
/**
Notification sent when the compact block processor fetched utxos from lightwalletd attempted to store them
Query the user info object for CompactBlockProcessorNotificationKey.blockProcessorStoredUTXOs which will contain a RefreshedUTXOs tuple with the collection of UTXOs stored or skipped
*/
static let blockProcessorStoredUTXOs = Notification.Name(rawValue: "CompactBlockProcessorStoredUTXOs")
static let blockProcessorStartedEnhancing = Notification.Name(rawValue: "CompactBlockProcessorStartedEnhancing")
static let blockProcessorEnhancementProgress = Notification.Name("CompactBlockProcessorEnhancementProgress")
static let blockProcessorStartedFetching = Notification.Name(rawValue: "CompactBlockProcessorStartedFetching")
static let blockProcessorHandlingSaplingFiles = Notification.Name(rawValue: "blockProcessorHandlingSaplingFiles")
/**
Notification sent when the grpc service connection detects a change. Query the user info object for status change details `currentConnectivityStatus` for current and previous with `previousConnectivityStatus`
*/
static let blockProcessorConnectivityStateChanged = Notification.Name("CompactBlockProcessorConnectivityStateChanged")
}
/// The compact block processor is in charge of orchestrating the download and caching of compact blocks from a LightWalletEndpoint
/// when started the processor downloads does a download - validate - scan cycle until it reaches latest height on the blockchain.
actor CompactBlockProcessor {
enum Event {
/// Event sent when the CompactBlockProcessor presented an error.
case failed (CompactBlockProcessorError)
/// Event sent when the CompactBlockProcessor has finished syncing the blockchain to latest height
case finished (_ lastScannedHeight: BlockHeight, _ foundBlocks: Bool)
/// Event sent when the CompactBlockProcessor enhanced a bunch of transactions in some range.
case foundTransactions ([ZcashTransaction.Overview], CompactBlockRange)
/// Event sent when the CompactBlockProcessor handled a ReOrg.
/// `reorgHeight` is the height on which the reorg was detected.
/// `rewindHeight` is the height that the processor backed to in order to solve the Reorg.
case handledReorg (_ reorgHeight: BlockHeight, _ rewindHeight: BlockHeight)
/// Event sent when progress of the sync process changes.
case progressUpdated (CompactBlockProgress)
/// Event sent when the CompactBlockProcessor fetched utxos from lightwalletd attempted to store them.
case storedUTXOs ((inserted: [UnspentTransactionOutputEntity], skipped: [UnspentTransactionOutputEntity]))
/// Event sent when the CompactBlockProcessor starts enhancing of the transactions.
case startedEnhancing
/// Event sent when the CompactBlockProcessor starts fetching of the UTXOs.
case startedFetching
/// Event sent when the CompactBlockProcessor starts syncing.
case startedSyncing
/// Event sent when the CompactBlockProcessor stops syncing.
case stopped
}
/// Compact Block Processor configuration
///
/// - parameter fsBlockCacheRoot: absolute root path where the filesystem block cache will be stored.
@ -344,6 +275,9 @@ actor CompactBlockProcessor {
}
}
var eventStream: AnyPublisher<Event, Never> { eventPublisher.eraseToAnyPublisher() }
private let eventPublisher = PassthroughSubject<Event, Never>()
let blockDownloaderService: BlockDownloaderService
let blockDownloader: BlockDownloader
let blockValidator: BlockValidator
@ -720,7 +654,8 @@ actor CompactBlockProcessor {
anyActionExecuted = true
LoggerProxy.debug("Fetching UTXO with range: \(range.lowerBound)...\(range.upperBound)")
state = .fetching
try await utxoFetcher.fetch(at: range)
let result = try await utxoFetcher.fetch(at: range)
eventPublisher.send(.storedUTXOs(result))
}
state = .handlingSaplingFiles
@ -858,27 +793,12 @@ actor CompactBlockProcessor {
}
func notifyProgress(_ progress: CompactBlockProgress) {
var userInfo: [AnyHashable: Any] = [:]
userInfo[CompactBlockProcessorNotificationKey.progress] = progress
LoggerProxy.debug("progress: \(progress)")
NotificationSender.default.post(
name: Notification.Name.blockProcessorUpdated,
object: self,
userInfo: userInfo
)
eventPublisher.send(.progressUpdated(progress))
}
func notifyTransactions(_ txs: [ZcashTransaction.Overview], in range: CompactBlockRange) {
NotificationSender.default.post(
name: .blockProcessorFoundTransactions,
object: self,
userInfo: [
CompactBlockProcessorNotificationKey.foundTransactions: txs,
CompactBlockProcessorNotificationKey.foundTransactionsRange: range
]
)
eventPublisher.send(.foundTransactions(txs, range))
}
func determineLowerBound(
@ -991,16 +911,9 @@ actor CompactBlockProcessor {
do {
try blockDownloaderService.rewind(to: rewindHeight)
await internalSyncProgress.rewind(to: rewindHeight)
// notify reorg
NotificationSender.default.post(
name: Notification.Name.blockProcessorHandledReOrg,
object: self,
userInfo: [
CompactBlockProcessorNotificationKey.reorgHeight: height, CompactBlockProcessorNotificationKey.rewindHeight: rewindHeight
]
)
eventPublisher.send(.handledReorg(height, rewindHeight))
// process next batch
await self.nextBatch()
} catch {
@ -1020,21 +933,9 @@ actor CompactBlockProcessor {
}
private func processingFinished(height: BlockHeight) async {
NotificationSender.default.post(
name: Notification.Name.blockProcessorFinished,
object: self,
userInfo: [
CompactBlockProcessorNotificationKey.latestScannedBlockHeight: height,
CompactBlockProcessorNotificationKey.foundBlocks: self.foundBlocks
]
)
eventPublisher.send(.finished(height, foundBlocks))
state = .synced
await setTimer()
NotificationSender.default.post(
name: Notification.Name.blockProcessorIdle,
object: self,
userInfo: nil
)
}
private func clearCompactBlockCache() async throws {
@ -1077,40 +978,28 @@ actor CompactBlockProcessor {
return
}
NotificationSender.default.post(
name: .blockProcessorStatusChanged,
object: self,
userInfo: [
CompactBlockProcessorNotificationKey.previousStatus: oldValue,
CompactBlockProcessorNotificationKey.newStatus: newValue
]
)
switch newValue {
case .error(let err):
notifyError(err)
case .stopped:
NotificationSender.default.post(name: Notification.Name.blockProcessorStopped, object: self)
eventPublisher.send(.stopped)
case .enhancing:
NotificationSender.default.post(name: Notification.Name.blockProcessorStartedEnhancing, object: self)
eventPublisher.send(.startedEnhancing)
case .fetching:
NotificationSender.default.post(name: Notification.Name.blockProcessorStartedFetching, object: self)
eventPublisher.send(.startedFetching)
case .handlingSaplingFiles:
NotificationSender.default.post(name: Notification.Name.blockProcessorHandlingSaplingFiles, object: self)
// We don't report this to outside world as separate phase for now.
break
case .synced:
// transition to this state is handled by `processingFinished(height: BlockHeight)`
break
case .syncing:
NotificationSender.default.post(name: Notification.Name.blockProcessorStartedSyncing, object: self)
eventPublisher.send(.startedSyncing)
}
}
private func notifyError(_ err: Error) {
NotificationSender.default.post(
name: Notification.Name.blockProcessorFailed,
object: self,
userInfo: [CompactBlockProcessorNotificationKey.error: mapError(err)]
)
eventPublisher.send(.failed(mapError(err)))
}
// TODO: [#713] encapsulate service errors better, https://github.com/zcash/ZcashLightClientKit/issues/713
}
@ -1320,16 +1209,6 @@ extension CompactBlockProcessorError: LocalizedError {
}
}
extension CompactBlockProcessor: EnhancementStreamDelegate {
func transactionEnhancementProgressUpdated(_ progress: EnhancementProgress) {
NotificationSender.default.post(
name: .blockProcessorEnhancementProgress,
object: self,
userInfo: [CompactBlockProcessorNotificationKey.enhancementProgress: progress]
)
}
}
extension CompactBlockProcessor {
enum NextState: Equatable {
case finishProcessing(height: BlockHeight)

View File

@ -19,7 +19,7 @@ struct UTXOFetcherConfig {
}
protocol UTXOFetcher {
func fetch(at range: CompactBlockRange) async throws
func fetch(at range: CompactBlockRange) async throws -> (inserted: [UnspentTransactionOutputEntity], skipped: [UnspentTransactionOutputEntity])
}
struct UTXOFetcherImpl {
@ -31,7 +31,7 @@ struct UTXOFetcherImpl {
}
extension UTXOFetcherImpl: UTXOFetcher {
func fetch(at range: CompactBlockRange) async throws {
func fetch(at range: CompactBlockRange) async throws -> (inserted: [UnspentTransactionOutputEntity], skipped: [UnspentTransactionOutputEntity]) {
try Task.checkCancellation()
let tAddresses = try accountRepository.getAll()
@ -96,16 +96,12 @@ extension UTXOFetcherImpl: UTXOFetcher {
let result = (inserted: refreshed, skipped: skipped)
NotificationSender.default.post(
name: .blockProcessorStoredUTXOs,
object: self,
userInfo: [CompactBlockProcessorNotificationKey.refreshedUTXOs: result]
)
await internalSyncProgress.set(range.upperBound, .latestUTXOFetchedHeight)
if Task.isCancelled {
LoggerProxy.debug("Warning: fetchUnspentTxOutputs on range \(range) cancelled")
}
return result
}
}

View File

@ -79,7 +79,7 @@ extension FSCompactBlockRepository: CompactBlockRepository {
do {
try self.fileWriter.writeToURL(block.data, blockURL)
} catch {
LoggerProxy.error("Failed to write block: \(block.height) to path: \(blockURL.path).")
LoggerProxy.error("Failed to write block: \(block.height) to path: \(blockURL.path) with error: \(error)")
throw CompactBlockRepositoryError.failedToWriteBlock(block)
}

View File

@ -141,7 +141,7 @@ class SentNotesSQLDAO: SentNotesRepository {
try row.decode()
}) else { return [] }
return rows.compactMap { (sentNote) -> TransactionRecipient? in
return rows.compactMap { sentNote -> TransactionRecipient? in
if sentNote.toAccount == nil {
guard
let toAddress = sentNote.toAddress,

View File

@ -259,11 +259,11 @@ public class Initializer {
endpoint: endpoint,
connectionStateChange: { oldState, newState in
NotificationSender.default.post(
name: .blockProcessorConnectivityStateChanged,
object: nil,
name: .synchronizerConnectionStateChanged,
object: self,
userInfo: [
CompactBlockProcessorNotificationKey.currentConnectivityStatus: newState,
CompactBlockProcessorNotificationKey.previousConnectivityStatus: oldState
SDKSynchronizer.NotificationKeys.previousConnectionState: oldState,
SDKSynchronizer.NotificationKeys.currentConnectionState: newState
]
)
}

View File

@ -347,4 +347,3 @@ extension SyncStatus {
}
}
}

View File

@ -49,6 +49,11 @@ public extension Notification.Name {
/// the `[ConfirmedTransactionEntity]`. This notification could arrive in a background thread.
static let synchronizerFoundTransactions = Notification.Name("synchronizerFoundTransactions")
/// Notification sent when the synchronizer fetched utxos from lightwalletd attempted to store them
/// Query the user info object for CompactBlockProcessorNotificationKey.blockProcessorStoredUTXOs which will contain a RefreshedUTXOs tuple with
/// the collection of UTXOs stored or skipped
static let synchronizerStoredUTXOs = Notification.Name(rawValue: "synchronizerStoredUTXOs")
/// Posted when the synchronizer presents an error
/// - Note: query userInfo on NotificationKeys.error for an error
static let synchronizerFailed = Notification.Name("SDKSynchronizerFailed")
@ -78,6 +83,7 @@ public class SDKSynchronizer: Synchronizer {
public static let currentConnectionState = "SDKSynchronizer.currentConnectionState"
public static let previousConnectionState = "SDKSynchronizer.previousConnectionState"
public static let synchronizerState = "SDKSynchronizer.synchronizerState"
public static let refreshedUTXOs = "SDKSynchronizer.refreshedUTXOs"
}
private var underlyingStatus: SyncStatus
@ -95,8 +101,11 @@ public class SDKSynchronizer: Synchronizer {
notify(status: status)
}
}
public private(set) var progress: Float = 0.0
let blockProcessor: CompactBlockProcessor
let blockProcessorEventProcessingQueue = DispatchQueue(label: "blockProcessorEventProcessingQueue")
public private(set) var progress: Float = 0.0
public private(set) var initializer: Initializer
public private(set) var latestScannedHeight: BlockHeight
public private(set) var connectionState: ConnectionState
@ -111,6 +120,8 @@ public class SDKSynchronizer: Synchronizer {
private let statusUpdateLock = NSRecursiveLock()
private var syncStartDate: Date?
private var longLivingCancelables: [AnyCancellable] = []
/// Creates an SDKSynchronizer instance
/// - Parameter initializer: a wallet Initializer object
@ -152,7 +163,9 @@ public class SDKSynchronizer: Synchronizer {
)
)
self.subscribeToProcessorNotifications(blockProcessor)
subscribeToProcessorNotifications(blockProcessor)
Task(priority: .high) { [weak self] in await self?.subscribeToProcessorEvents(blockProcessor) }
}
deinit {
@ -222,139 +235,105 @@ public class SDKSynchronizer: Synchronizer {
private func subscribeToProcessorNotifications(_ processor: CompactBlockProcessor) {
let center = NotificationCenter.default
center.addObserver(
self,
selector: #selector(processorUpdated(_:)),
name: Notification.Name.blockProcessorUpdated,
object: processor
)
center.addObserver(
self,
selector: #selector(processorStartedSyncing(_:)),
name: Notification.Name.blockProcessorStartedSyncing,
object: processor
)
center.addObserver(
self,
selector: #selector(processorStartedEnhancing(_:)),
name: Notification.Name.blockProcessorStartedEnhancing,
object: processor
)
center.addObserver(
self,
selector: #selector(processorStartedFetching(_:)),
name: Notification.Name.blockProcessorStartedFetching,
object: processor
)
center.addObserver(
self,
selector: #selector(processorStopped(_:)),
name: Notification.Name.blockProcessorStopped,
object: processor
)
center.addObserver(
self,
selector: #selector(processorFailed(_:)),
name: Notification.Name.blockProcessorFailed,
object: processor
)
center.addObserver(
self,
selector: #selector(processorFinished(_:)),
name: Notification.Name.blockProcessorFinished,
object: processor
)
center.addObserver(
self,
selector: #selector(processorTransitionUnknown(_:)),
name: Notification.Name.blockProcessorUnknownTransition,
object: processor
)
center.addObserver(
self,
selector: #selector(reorgDetected(_:)),
name: Notification.Name.blockProcessorHandledReOrg,
object: processor
)
center.addObserver(
self,
selector: #selector(transactionsFound(_:)),
name: Notification.Name.blockProcessorFoundTransactions,
object: processor
)
center.addObserver(
self,
selector: #selector(connectivityStateChanged(_:)),
name: Notification.Name.blockProcessorConnectivityStateChanged,
name: Notification.Name.synchronizerConnectionStateChanged,
object: nil
)
}
// MARK: Block Processor notifications
// MARK: Connectivity State
@objc func connectivityStateChanged(_ notification: Notification) {
guard
let userInfo = notification.userInfo,
let previous = userInfo[CompactBlockProcessorNotificationKey.previousConnectivityStatus] as? ConnectionState,
let current = userInfo[CompactBlockProcessorNotificationKey.currentConnectivityStatus] as? ConnectionState
let current = userInfo[NotificationKeys.currentConnectionState] as? ConnectionState
else {
LoggerProxy.error(
"Found \(Notification.Name.blockProcessorConnectivityStateChanged) but lacks dictionary information." +
"Found \(notification.name) but lacks dictionary information." +
"This is probably a programming error"
)
return
}
NotificationSender.default.post(
name: .synchronizerConnectionStateChanged,
object: self,
userInfo: [
NotificationKeys.previousConnectionState: previous,
NotificationKeys.currentConnectionState: current
]
)
connectionState = current
}
@objc func transactionsFound(_ notification: Notification) {
guard
let userInfo = notification.userInfo,
let foundTransactions = userInfo[CompactBlockProcessorNotificationKey.foundTransactions] as? [ZcashTransaction.Overview]
else {
return
}
// MARK: Handle CompactBlockProcessor.Flow
private func subscribeToProcessorEvents(_ processor: CompactBlockProcessor) async {
let stream = await processor.eventStream
stream
.receive(on: blockProcessorEventProcessingQueue)
.sink { [weak self] event in
switch event {
case let .failed(error):
self?.failed(error: error)
case let .finished(height, foundBlocks):
self?.finished(lastScannedHeight: height, foundBlocks: foundBlocks)
case let .foundTransactions(transactions, range):
self?.foundTransactions(transactions: transactions, in: range)
case let .handledReorg(reorgHeight, rewindHeight):
self?.handledReorg(reorgHeight: reorgHeight, rewindHeight: rewindHeight)
case let .progressUpdated(progress):
self?.progressUpdated(progress: progress)
case let .storedUTXOs(utxos):
self?.storedUTXOs(utxos: utxos)
case .startedEnhancing:
self?.startedEnhancing()
case .startedFetching:
self?.startedFetching()
case .startedSyncing:
self?.startedSyncing()
case .stopped:
self?.stopped()
}
}
.store(in: &longLivingCancelables)
}
private func failed(error: CompactBlockProcessorError) {
self.notifyFailure(error)
self.status = .error(self.mapError(error))
}
private func finished(lastScannedHeight: BlockHeight, foundBlocks: Bool) {
// FIX: Pending transaction updates fail if done from another thread. Improvement needed: explicitly define queues for sql repositories see: https://github.com/zcash/ZcashLightClientKit/issues/450
self.latestScannedHeight = lastScannedHeight
self.refreshPendingTransactions()
self.status = .synced
if let syncStartDate {
SDKMetrics.shared.pushSyncReport(
start: syncStartDate,
end: Date()
)
}
}
private func foundTransactions(transactions: [ZcashTransaction.Overview], in range: CompactBlockRange) {
NotificationSender.default.post(
name: .synchronizerFoundTransactions,
object: self,
userInfo: [
NotificationKeys.foundTransactions: foundTransactions
NotificationKeys.foundTransactions: transactions
]
)
}
@objc func reorgDetected(_ notification: Notification) {
guard
let userInfo = notification.userInfo,
let progress = userInfo[CompactBlockProcessorNotificationKey.reorgHeight] as? BlockHeight,
let rewindHeight = userInfo[CompactBlockProcessorNotificationKey.rewindHeight] as? BlockHeight
else {
LoggerProxy.debug("error processing reorg notification")
return
}
LoggerProxy.debug("handling reorg at: \(progress) with rewind height: \(rewindHeight)")
private func handledReorg(reorgHeight: BlockHeight, rewindHeight: BlockHeight) {
LoggerProxy.debug("handling reorg at: \(reorgHeight) with rewind height: \(rewindHeight)")
do {
try transactionManager.handleReorg(at: rewindHeight)
@ -364,26 +343,19 @@ public class SDKSynchronizer: Synchronizer {
}
}
@objc func processorUpdated(_ notification: Notification) {
guard
let userInfo = notification.userInfo,
let progress = userInfo[CompactBlockProcessorNotificationKey.progress] as? CompactBlockProgress
else {
return
}
private func progressUpdated(progress: CompactBlockProgress) {
self.notify(progress: progress)
}
@objc func processorStartedSyncing(_ notification: Notification) {
statusUpdateLock.lock()
defer { statusUpdateLock.unlock() }
guard status != .syncing(.nullProgress) else { return }
status = .syncing(.nullProgress)
private func storedUTXOs(utxos: (inserted: [UnspentTransactionOutputEntity], skipped: [UnspentTransactionOutputEntity])) {
NotificationSender.default.post(
name: .synchronizerStoredUTXOs,
object: self,
userInfo: [NotificationKeys.refreshedUTXOs: utxos]
)
}
@objc func processorStartedEnhancing(_ notification: Notification) {
private func startedEnhancing() {
statusUpdateLock.lock()
defer { statusUpdateLock.unlock() }
@ -391,7 +363,7 @@ public class SDKSynchronizer: Synchronizer {
status = .enhancing(NullEnhancementProgress())
}
@objc func processorStartedFetching(_ notification: Notification) {
private func startedFetching() {
statusUpdateLock.lock()
defer { statusUpdateLock.unlock() }
@ -399,7 +371,15 @@ public class SDKSynchronizer: Synchronizer {
status = .fetching
}
@objc func processorStopped(_ notification: Notification) {
private func startedSyncing() {
statusUpdateLock.lock()
defer { statusUpdateLock.unlock() }
guard status != .syncing(.nullProgress) else { return }
status = .syncing(.nullProgress)
}
private func stopped() {
statusUpdateLock.lock()
defer { statusUpdateLock.unlock() }
@ -407,40 +387,6 @@ public class SDKSynchronizer: Synchronizer {
status = .stopped
}
@objc func processorFailed(_ notification: Notification) {
if let error = notification.userInfo?[CompactBlockProcessorNotificationKey.error] as? Error {
self.notifyFailure(error)
self.status = .error(self.mapError(error))
} else {
self.notifyFailure(
CompactBlockProcessorError.generalError(
message: "This is strange. processorFailed Call received no error message"
)
)
self.status = .error(SynchronizerError.generalError(message: "This is strange. processorFailed Call received no error message"))
}
}
@objc func processorFinished(_ notification: Notification) {
// FIX: Pending transaction updates fail if done from another thread. Improvement needed: explicitly define queues for sql repositories see: https://github.com/zcash/ZcashLightClientKit/issues/450
if let blockHeight = notification.userInfo?[CompactBlockProcessorNotificationKey.latestScannedBlockHeight] as? BlockHeight {
self.latestScannedHeight = blockHeight
}
self.refreshPendingTransactions()
self.status = .synced
if let syncStartDate {
SDKMetrics.shared.pushSyncReport(
start: syncStartDate,
end: Date()
)
}
}
@objc func processorTransitionUnknown(_ notification: Notification) {
self.status = .disconnected
}
// MARK: Synchronizer methods
public func sendToAddress(

View File

@ -5,6 +5,7 @@
// Created by Francisco Gindre on 5/14/20.
//
import Combine
import XCTest
@testable import TestUtils
@testable import ZcashLightClientKit
@ -30,6 +31,7 @@ class AdvancedReOrgTests: XCTestCase {
let branchID = "2bb40e60"
let chainName = "main"
let network = DarksideWalletDNetwork()
var cancellables: [AnyCancellable] = []
override func setUpWithError() throws {
try super.setUpWithError()
@ -50,14 +52,8 @@ class AdvancedReOrgTests: XCTestCase {
try? FileManager.default.removeItem(at: coordinator.databases.pendingDB)
}
@objc func handleReorg(_ notification: Notification) {
guard
let reorgHeight = notification.userInfo?[CompactBlockProcessorNotificationKey.reorgHeight] as? BlockHeight,
let rewindHeight = notification.userInfo?[CompactBlockProcessorNotificationKey.rewindHeight] as? BlockHeight
else {
XCTFail("empty reorg notification")
return
}
func handleReorg(event: CompactBlockProcessor.Event) {
guard case let .handledReorg(reorgHeight, rewindHeight) = event else { return XCTFail("empty reorg event") }
logger!.debug("--- REORG DETECTED \(reorgHeight)--- RewindHeight: \(rewindHeight)", file: #file, function: #function, line: #line)
@ -80,7 +76,7 @@ class AdvancedReOrgTests: XCTestCase {
/// 10. sync up to received_Tx_height + 3
/// 11. verify that balance equals initial balance + tx amount
func testReOrgChangesInboundTxMinedHeight() async throws {
hookToReOrgNotification()
await hookToReOrgNotification()
try FakeChainBuilder.buildChain(darksideWallet: coordinator.service, branchID: branchID, chainName: chainName)
var shouldContinue = false
let receivedTxHeight: BlockHeight = 663188
@ -462,8 +458,8 @@ class AdvancedReOrgTests: XCTestCase {
XCTAssertEqual(resultingBalance, coordinator.synchronizer.initializer.getVerifiedBalance())
}
func testIncomingTransactionIndexChange() throws {
hookToReOrgNotification()
@MainActor func testIncomingTransactionIndexChange() async throws {
await hookToReOrgNotification()
self.expectedReorgHeight = 663196
self.expectedRewindHeight = 663175
try coordinator.reset(saplingActivation: birthday, branchID: "2bb40e60", chainName: "main")
@ -725,7 +721,7 @@ class AdvancedReOrgTests: XCTestCase {
/// 14. sync to latest height
/// 15. verify that there's no pending transaction and that the tx is displayed on the sentTransactions collection
func testReOrgChangesOutboundTxMinedHeight() async throws {
hookToReOrgNotification()
await hookToReOrgNotification()
/*
1. create fake chain
@ -1037,8 +1033,8 @@ class AdvancedReOrgTests: XCTestCase {
/// 4. sync to latest height
/// 5. verify that reorg Happened at reorgHeight
/// 6. verify that balances match initial balances
func testReOrgRemovesIncomingTxForever() throws {
hookToReOrgNotification()
@MainActor func testReOrgRemovesIncomingTxForever() async throws {
await hookToReOrgNotification()
try coordinator.reset(saplingActivation: 663150, branchID: branchID, chainName: chainName)
try coordinator.resetBlocks(dataset: .predefined(dataset: .txReOrgRemovesInboundTxBefore))
@ -1111,7 +1107,7 @@ class AdvancedReOrgTests: XCTestCase {
/// 8. sync to latest height
/// 9. verify that there's an expired transaction as a pending transaction
func testReOrgRemovesOutboundTxAndIsNeverMined() async throws {
hookToReOrgNotification()
await hookToReOrgNotification()
/*
1. create fake chain
@ -1264,7 +1260,7 @@ class AdvancedReOrgTests: XCTestCase {
}
func testLongSync() async throws {
hookToReOrgNotification()
await hookToReOrgNotification()
/*
1. create fake chain
@ -1318,7 +1314,14 @@ class AdvancedReOrgTests: XCTestCase {
XCTFail("Failed with error: \(testError)")
}
func hookToReOrgNotification() {
NotificationCenter.default.addObserver(self, selector: #selector(handleReorg(_:)), name: .blockProcessorHandledReOrg, object: nil)
func hookToReOrgNotification() async {
await coordinator.synchronizer.blockProcessor.eventStream
.sink { [weak self] event in
switch event {
case .handledReorg: self?.handleReorg(event: event)
default: break
}
}
.store(in: &cancellables)
}
}

View File

@ -8,6 +8,7 @@
import XCTest
@testable import TestUtils
@testable import ZcashLightClientKit
final class InternalStateConsistencyTests: XCTestCase {
// TODO: [#715] Parameterize this from environment, https://github.com/zcash/ZcashLightClientKit/issues/715
// swiftlint:disable:next line_length
@ -27,7 +28,6 @@ final class InternalStateConsistencyTests: XCTestCase {
let branchID = "2bb40e60"
let chainName = "main"
let network = DarksideWalletDNetwork()
var firstSyncContinuation: CheckedContinuation<(), Error>?
override func setUpWithError() throws {
try super.setUpWithError()
self.coordinator = try TestCoordinator(
@ -47,8 +47,8 @@ final class InternalStateConsistencyTests: XCTestCase {
try? FileManager.default.removeItem(at: coordinator.databases.pendingDB)
}
func testInternalStateIsConsistentWhenMigrating() async throws {
NotificationCenter.default.addObserver(self, selector: #selector(self.processorStopped(_:)), name: .blockProcessorStopped, object: nil)
@MainActor func testInternalStateIsConsistentWhenMigrating() async throws {
NotificationCenter.default.addObserver(self, selector: #selector(self.synchronizerStopped(_:)), name: .synchronizerStopped, object: nil)
let fullSyncLength = 1000
try FakeChainBuilder.buildChain(darksideWallet: coordinator.service, branchID: branchID, chainName: chainName, length: fullSyncLength)
@ -60,29 +60,15 @@ final class InternalStateConsistencyTests: XCTestCase {
sleep(1)
try await withCheckedThrowingContinuation { continuation in
do {
try coordinator.sync(
completion: { _ in
XCTFail("shouldn't have completed")
continuation.resume()
}, error: { error in
guard let error else {
XCTFail("there was an unknown error")
continuation.resume()
return
}
continuation.resume(throwing: error)
}
)
try coordinator.sync(
completion: { _ in
XCTFail("shouldn't have completed")
},
error: handleError
)
self.firstSyncContinuation = continuation
DispatchQueue.global().asyncAfter(deadline: .now() + 1) {
self.coordinator.synchronizer.stop()
}
} catch {
continuation.resume(throwing: error)
}
DispatchQueue.global().asyncAfter(deadline: .now() + 1) { [weak self] in
self?.coordinator.synchronizer.stop()
}
wait(for: [firstSyncExpectation], timeout: 2)
@ -147,8 +133,15 @@ final class InternalStateConsistencyTests: XCTestCase {
wait(for: [secondSyncAttemptExpectation], timeout: 10)
}
@objc func processorStopped(_ notification: Notification) {
firstSyncContinuation?.resume()
@objc func synchronizerStopped(_ notification: Notification) {
self.firstSyncExpectation.fulfill()
}
func handleError(_ error: Error?) {
guard let testError = error else {
XCTFail("failed with nil error")
return
}
XCTFail("Failed with error: \(testError)")
}
}

View File

@ -25,7 +25,6 @@ class PendingTransactionUpdatesTest: XCTestCase {
var sentTransactionExpectation = XCTestExpectation(description: "sent")
var expectedReorgHeight: BlockHeight = 665188
var expectedRewindHeight: BlockHeight = 665188
var reorgExpectation = XCTestExpectation(description: "reorg")
let branchID = "2bb40e60"
let chainName = "main"
let network = DarksideWalletDNetwork()
@ -49,18 +48,6 @@ class PendingTransactionUpdatesTest: XCTestCase {
try? FileManager.default.removeItem(at: coordinator.databases.pendingDB)
}
@objc func handleReorg(_ notification: Notification) {
guard
let reorgHeight = notification.userInfo?[CompactBlockProcessorNotificationKey.reorgHeight] as? BlockHeight
else {
XCTFail("empty reorg notification")
return
}
XCTAssertEqual(reorgHeight, expectedReorgHeight)
reorgExpectation.fulfill()
}
func testPendingTransactionMinedHeightUpdated() async throws {
/*
1. create fake chain
@ -240,13 +227,4 @@ class PendingTransactionUpdatesTest: XCTestCase {
}
XCTFail("Failed with error: \(testError)")
}
func hookToReOrgNotification() {
NotificationCenter.default.addObserver(
self,
selector: #selector(handleReorg(_:)),
name: .blockProcessorHandledReOrg,
object: nil
)
}
}

View File

@ -5,6 +5,7 @@
// Created by Francisco Gindre on 3/23/20.
//
import Combine
import XCTest
@testable import TestUtils
@testable import ZcashLightClientKit
@ -44,15 +45,10 @@ class ReOrgTests: XCTestCase {
var sentTransactionExpectation = XCTestExpectation(description: "sent")
var expectedReorgHeight: BlockHeight = 665188
var expectedRewindHeight: BlockHeight = 665188
var cancellables: [AnyCancellable] = []
override func setUpWithError() throws {
try super.setUpWithError()
NotificationCenter.default.addObserver(
self,
selector: #selector(self.handleReOrgNotification(_:)),
name: Notification.Name.blockProcessorHandledReOrg,
object: nil
)
self.coordinator = try TestCoordinator(
seed: self.seedPhrase,
@ -63,6 +59,17 @@ class ReOrgTests: XCTestCase {
try self.coordinator.reset(saplingActivation: self.birthday, branchID: self.branchID, chainName: self.chainName)
try self.coordinator.resetBlocks(dataset: .default)
var stream: AnyPublisher<CompactBlockProcessor.Event, Never>!
XCTestCase.wait { await stream = self.coordinator.synchronizer.blockProcessor.eventStream }
stream
.sink { [weak self] event in
switch event {
case .handledReorg: self?.handleReOrgNotification(event: event)
default: break
}
}
.store(in: &cancellables)
}
override func tearDownWithError() throws {
@ -72,13 +79,11 @@ class ReOrgTests: XCTestCase {
try? FileManager.default.removeItem(at: coordinator.databases.pendingDB)
}
@objc func handleReOrgNotification(_ notification: Notification) {
func handleReOrgNotification(event: CompactBlockProcessor.Event) {
reorgExpectation.fulfill()
guard let reorgHeight = notification.userInfo?[CompactBlockProcessorNotificationKey.reorgHeight] as? BlockHeight,
let rewindHeight = notification.userInfo?[CompactBlockProcessorNotificationKey.rewindHeight] as? BlockHeight else {
XCTFail("malformed reorg userInfo")
return
}
guard case let .handledReorg(reorgHeight, rewindHeight) = event else { return XCTFail("malformed reorg userInfo") }
print("reorgHeight: \(reorgHeight)")
print("rewindHeight: \(rewindHeight)")
@ -142,10 +147,13 @@ class ReOrgTests: XCTestCase {
download and sync blocks from walletBirthday to firstLatestHeight
*/
var synchronizer: SDKSynchronizer?
try coordinator.sync(completion: { synchro in
synchronizer = synchro
firstSyncExpectation.fulfill()
}, error: self.handleError)
try coordinator.sync(
completion: { synchro in
synchronizer = synchro
firstSyncExpectation.fulfill()
},
error: self.handleError
)
wait(for: [firstSyncExpectation], timeout: 5)
@ -193,17 +201,6 @@ class ReOrgTests: XCTestCase {
XCTAssertEqual(latestDownloadedHeight, targetHeight)
}
@objc func processorHandledReorg(_ notification: Notification) {
XCTAssertNotNil(notification.userInfo)
if let reorg = notification.userInfo?[CompactBlockProcessorNotificationKey.reorgHeight] as? BlockHeight,
let rewind = notification.userInfo?[CompactBlockProcessorNotificationKey.rewindHeight] as? BlockHeight {
XCTAssertTrue( rewind <= reorg )
reorgExpectation.fulfill()
} else {
XCTFail("CompactBlockProcessor reorg notification is malformed")
}
}
func handleError(_ error: Error?) {
guard let testError = error else {
XCTFail("failed with nil error")

View File

@ -5,6 +5,7 @@
// Created by Francisco Gindre on 9/16/22.
//
import Combine
import XCTest
@testable import TestUtils
@testable import ZcashLightClientKit
@ -30,6 +31,8 @@ final class SynchronizerTests: XCTestCase {
let branchID = "2bb40e60"
let chainName = "main"
let network = DarksideWalletDNetwork()
var cancellables: [AnyCancellable] = []
let processorEventHandler = CompactBlockProcessorEventHandler()
override func setUpWithError() throws {
try super.setUpWithError()
@ -40,6 +43,17 @@ final class SynchronizerTests: XCTestCase {
)
try coordinator.reset(saplingActivation: 663150, branchID: self.branchID, chainName: self.chainName)
var stream: AnyPublisher<CompactBlockProcessor.Event, Never>!
XCTestCase.wait { await stream = self.coordinator.synchronizer.blockProcessor.eventStream }
stream
.sink { [weak self] event in
switch event {
case .handledReorg: self?.handleReorg(event: event)
default: break
}
}
.store(in: &cancellables)
}
override func tearDownWithError() throws {
@ -51,14 +65,8 @@ final class SynchronizerTests: XCTestCase {
try? FileManager.default.removeItem(at: coordinator.databases.pendingDB)
}
@objc func handleReorg(_ notification: Notification) {
guard
let reorgHeight = notification.userInfo?[CompactBlockProcessorNotificationKey.reorgHeight] as? BlockHeight,
let rewindHeight = notification.userInfo?[CompactBlockProcessorNotificationKey.rewindHeight] as? BlockHeight
else {
XCTFail("empty reorg notification")
return
}
func handleReorg(event: CompactBlockProcessor.Event) {
guard case let .handledReorg(reorgHeight, rewindHeight) = event else { return XCTFail("empty reorg notification") }
logger!.debug("--- REORG DETECTED \(reorgHeight)--- RewindHeight: \(rewindHeight)", file: #file, function: #function, line: #line)
@ -67,8 +75,6 @@ final class SynchronizerTests: XCTestCase {
}
func testSynchronizerStops() async throws {
hookToReOrgNotification()
/*
1. create fake chain
*/
@ -84,7 +90,10 @@ final class SynchronizerTests: XCTestCase {
syncStoppedExpectation.subscribe(to: .synchronizerStopped, object: nil)
let processorStoppedExpectation = XCTestExpectation(description: "ProcessorStopped Expectation")
processorStoppedExpectation.subscribe(to: .blockProcessorStopped, object: nil)
processorEventHandler.subscribe(
to: await coordinator.synchronizer.blockProcessor.eventStream,
expectations: [.stopped: processorStoppedExpectation]
)
/*
sync to latest height
@ -119,8 +128,4 @@ final class SynchronizerTests: XCTestCase {
}
XCTFail("Failed with error: \(testError)")
}
func hookToReOrgNotification() {
NotificationCenter.default.addObserver(self, selector: #selector(handleReorg(_:)), name: .blockProcessorHandledReOrg, object: nil)
}
}

View File

@ -5,12 +5,15 @@
// Created by Francisco Gindre on 4/15/20.
//
import Combine
import XCTest
@testable import TestUtils
@testable import ZcashLightClientKit
// swiftlint:disable implicitly_unwrapped_optional force_try
class TransactionEnhancementTests: XCTestCase {
var cancellables: [AnyCancellable] = []
let processorEventHandler = CompactBlockProcessorEventHandler()
let mockLatestHeight = BlockHeight(663250)
let targetLatestHeight = BlockHeight(663251)
let walletBirthday = BlockHeight(663150)
@ -32,7 +35,7 @@ class TransactionEnhancementTests: XCTestCase {
var syncStartedExpect: XCTestExpectation!
var updatedNotificationExpectation: XCTestExpectation!
var stopNotificationExpectation: XCTestExpectation!
var idleNotificationExpectation: XCTestExpectation!
var finishedNotificationExpectation: XCTestExpectation!
var reorgNotificationExpectation: XCTestExpectation!
var afterReorgIdleNotification: XCTestExpectation!
var txFoundNotificationExpectation: XCTestExpectation!
@ -48,19 +51,26 @@ class TransactionEnhancementTests: XCTestCase {
syncStartedExpect = XCTestExpectation(description: "\(self.description) syncStartedExpect")
stopNotificationExpectation = XCTestExpectation(description: "\(self.description) stopNotificationExpectation")
updatedNotificationExpectation = XCTestExpectation(description: "\(self.description) updatedNotificationExpectation")
idleNotificationExpectation = XCTestExpectation(description: "\(self.description) idleNotificationExpectation")
finishedNotificationExpectation = XCTestExpectation(description: "\(self.description) finishedNotificationExpectation")
afterReorgIdleNotification = XCTestExpectation(description: "\(self.description) afterReorgIdleNotification")
reorgNotificationExpectation = XCTestExpectation(description: "\(self.description) reorgNotificationExpectation")
txFoundNotificationExpectation = XCTestExpectation(description: "\(self.description) txFoundNotificationExpectation")
waitExpectation = XCTestExpectation(description: "\(self.description) waitExpectation")
let birthday = Checkpoint.birthday(with: walletBirthday, network: network)
let config = CompactBlockProcessor.Configuration.standard(for: self.network, walletBirthday: birthday.height)
let rustBackend = ZcashRustBackend.self
processorConfig = config
let birthday = Checkpoint.birthday(with: walletBirthday, network: network)
let pathProvider = DefaultResourceProvider(network: network)
processorConfig = CompactBlockProcessor.Configuration(
fsBlockCacheRoot: testTempDirectory,
dataDb: pathProvider.dataDbURL,
spendParamsURL: pathProvider.spendParamsURL,
outputParamsURL: pathProvider.outputParamsURL,
walletBirthday: birthday.height,
network: network
)
try? FileManager.default.removeItem(at: processorConfig.fsBlockCacheRoot)
try? FileManager.default.removeItem(at: processorConfig.dataDb)
@ -120,38 +130,38 @@ class TransactionEnhancementTests: XCTestCase {
backend: rustBackend,
config: processorConfig
)
NotificationCenter.default.addObserver(
self,
selector: #selector(processorFailed(_:)),
name: Notification.Name.blockProcessorFailed,
object: processor
)
var stream: AnyPublisher<CompactBlockProcessor.Event, Never>!
XCTestCase.wait { await stream = self.processor.eventStream }
stream
.sink { [weak self] event in
switch event {
case .failed: self?.processorFailed(event: event)
default: break
}
}
.store(in: &cancellables)
}
override func tearDownWithError() throws {
try super.tearDownWithError()
try? FileManager.default.removeItem(at: processorConfig.fsBlockCacheRoot)
try? FileManager.default.removeItem(at: processorConfig.dataDb)
syncStartedExpect.unsubscribeFromNotifications()
stopNotificationExpectation.unsubscribeFromNotifications()
updatedNotificationExpectation.unsubscribeFromNotifications()
idleNotificationExpectation.unsubscribeFromNotifications()
reorgNotificationExpectation.unsubscribeFromNotifications()
afterReorgIdleNotification.unsubscribeFromNotifications()
NotificationCenter.default.removeObserver(self)
}
private func startProcessing() async throws {
XCTAssertNotNil(processor)
// Subscribe to notifications
syncStartedExpect.subscribe(to: Notification.Name.blockProcessorStartedSyncing, object: processor)
stopNotificationExpectation.subscribe(to: Notification.Name.blockProcessorStopped, object: processor)
updatedNotificationExpectation.subscribe(to: Notification.Name.blockProcessorUpdated, object: processor)
txFoundNotificationExpectation.subscribe(to: .blockProcessorFoundTransactions, object: processor)
idleNotificationExpectation.subscribe(to: .blockProcessorIdle, object: processor)
let expectations: [CompactBlockProcessorEventHandler.EventIdentifier: XCTestExpectation] = [
.startedSyncing: syncStartedExpect,
.stopped: stopNotificationExpectation,
.progressUpdated: updatedNotificationExpectation,
.foundTransactions: txFoundNotificationExpectation,
.finished: finishedNotificationExpectation
]
processorEventHandler.subscribe(to: await processor.eventStream, expectations: expectations)
await processor.start()
}
@ -193,16 +203,14 @@ class TransactionEnhancementTests: XCTestCase {
for: [
syncStartedExpect,
txFoundNotificationExpectation,
idleNotificationExpectation
finishedNotificationExpectation
],
timeout: 30
)
idleNotificationExpectation.unsubscribeFromNotifications()
}
@objc func processorFailed(_ notification: Notification) {
XCTAssertNotNil(notification.userInfo)
if let error = notification.userInfo?["error"] {
func processorFailed(event: CompactBlockProcessor.Event) {
if case let .failed(error) = event {
XCTFail("CompactBlockProcessor failed with Error: \(error)")
} else {
XCTFail("CompactBlockProcessor failed")

View File

@ -6,6 +6,7 @@
// Copyright © 2019 Electric Coin Company. All rights reserved.
//
import Combine
import XCTest
import SQLite
@testable import TestUtils
@ -13,6 +14,8 @@ import SQLite
// swiftlint:disable implicitly_unwrapped_optional force_try print_function_usage
class BlockScanTests: XCTestCase {
var cancelables: [AnyCancellable] = []
let rustWelding = ZcashRustBackend.self
var dataDbURL: URL!
@ -113,15 +116,15 @@ class BlockScanTests: XCTestCase {
latestScannedheight = repository.lastScannedBlockHeight()
XCTAssertEqual(latestScannedheight, range.upperBound)
}
@objc func observeBenchmark(_ notification: Notification) {
func observeBenchmark() {
let reports = SDKMetrics.shared.popAllBlockReports(flush: true)
reports.forEach {
print("observed benchmark: \($0)")
}
}
func testScanValidateDownload() async throws {
let seed = "testreferencealicetestreferencealice"
@ -129,13 +132,6 @@ class BlockScanTests: XCTestCase {
SDKMetrics.shared.enableMetrics()
NotificationCenter.default.addObserver(
self,
selector: #selector(observeBenchmark(_:)),
name: .blockProcessorUpdated,
object: nil
)
guard try self.rustWelding.initDataDb(dbData: dataDbURL, seed: nil, networkType: network.networkType) == .success else {
XCTFail("Seed should not be required for this test")
return
@ -198,7 +194,16 @@ class BlockScanTests: XCTestCase {
backend: rustWelding,
config: processorConfig
)
await compactBlockProcessor.eventStream
.sink { [weak self] event in
switch event {
case .progressUpdated: self?.observeBenchmark()
default: break
}
}
.store(in: &cancelables)
let range = CompactBlockRange(
uncheckedBounds: (walletBirthDay.height, walletBirthDay.height + 10000)
)

View File

@ -6,6 +6,7 @@
// Copyright © 2019 Electric Coin Company. All rights reserved.
//
import Combine
import XCTest
@testable import TestUtils
@testable import ZcashLightClientKit
@ -24,23 +25,26 @@ class CompactBlockProcessorTests: XCTestCase {
)
}()
var cancellables: [AnyCancellable] = []
let processorEventHandler = CompactBlockProcessorEventHandler()
var processor: CompactBlockProcessor!
var syncStartedExpect: XCTestExpectation!
var updatedNotificationExpectation: XCTestExpectation!
var stopNotificationExpectation: XCTestExpectation!
var idleNotificationExpectation: XCTestExpectation!
var finishedNotificationExpectation: XCTestExpectation!
let network = ZcashNetworkBuilder.network(for: .testnet)
let mockLatestHeight = ZcashNetworkBuilder.network(for: .testnet).constants.saplingActivationHeight + 2000
let testFileManager = FileManager()
let testTempDirectory = URL(fileURLWithPath: NSString(
string: NSTemporaryDirectory()
)
.appendingPathComponent("tmp-\(Int.random(in: 0 ... .max))"))
let testFileManager = FileManager()
override func setUpWithError() throws {
try super.setUpWithError()
try self.testFileManager.createDirectory(at: self.testTempDirectory, withIntermediateDirectories: false)
logger = OSLogger(logLevel: .debug)
try self.testFileManager.createDirectory(at: self.testTempDirectory, withIntermediateDirectories: false)
XCTestCase.wait { await InternalSyncProgress(storage: UserDefaults.standard).rewind(to: 0) }
@ -81,6 +85,7 @@ class CompactBlockProcessorTests: XCTestCase {
backend: realRustBackend,
config: processorConfig
)
let dbInit = try realRustBackend.initDataDb(dbData: processorConfig.dataDb, seed: nil, networkType: .testnet)
guard case .success = dbInit else {
@ -91,29 +96,29 @@ class CompactBlockProcessorTests: XCTestCase {
syncStartedExpect = XCTestExpectation(description: "\(self.description) syncStartedExpect")
stopNotificationExpectation = XCTestExpectation(description: "\(self.description) stopNotificationExpectation")
updatedNotificationExpectation = XCTestExpectation(description: "\(self.description) updatedNotificationExpectation")
idleNotificationExpectation = XCTestExpectation(description: "\(self.description) idleNotificationExpectation")
NotificationCenter.default.addObserver(
self,
selector: #selector(processorFailed(_:)),
name: Notification.Name.blockProcessorFailed,
object: processor
)
finishedNotificationExpectation = XCTestExpectation(description: "\(self.description) finishedNotificationExpectation")
var stream: AnyPublisher<CompactBlockProcessor.Event, Never>!
XCTestCase.wait { await stream = self.processor.eventStream }
stream
.sink { [weak self] event in
switch event {
case .failed: self?.processorFailed(event: event)
default: break
}
}
.store(in: &cancellables)
}
override func tearDownWithError() throws {
try super.tearDownWithError()
try FileManager.default.removeItem(at: processorConfig.fsBlockCacheRoot)
try? FileManager.default.removeItem(at: processorConfig.dataDb)
syncStartedExpect.unsubscribeFromNotifications()
stopNotificationExpectation.unsubscribeFromNotifications()
updatedNotificationExpectation.unsubscribeFromNotifications()
idleNotificationExpectation.unsubscribeFromNotifications()
NotificationCenter.default.removeObserver(self)
}
@objc func processorFailed(_ notification: Notification) {
XCTAssertNotNil(notification.userInfo)
if let error = notification.userInfo?["error"] {
func processorFailed(event: CompactBlockProcessor.Event) {
if case let .failed(error) = event {
XCTFail("CompactBlockProcessor failed with Error: \(error)")
} else {
XCTFail("CompactBlockProcessor failed")
@ -122,13 +127,15 @@ class CompactBlockProcessorTests: XCTestCase {
private func startProcessing() async {
XCTAssertNotNil(processor)
// Subscribe to notifications
syncStartedExpect.subscribe(to: Notification.Name.blockProcessorStartedSyncing, object: processor)
stopNotificationExpectation.subscribe(to: Notification.Name.blockProcessorStopped, object: processor)
updatedNotificationExpectation.subscribe(to: Notification.Name.blockProcessorUpdated, object: processor)
idleNotificationExpectation.subscribe(to: Notification.Name.blockProcessorIdle, object: processor)
let expectations: [CompactBlockProcessorEventHandler.EventIdentifier: XCTestExpectation] = [
.startedSyncing: syncStartedExpect,
.stopped: stopNotificationExpectation,
.progressUpdated: updatedNotificationExpectation,
.finished: finishedNotificationExpectation
]
processorEventHandler.subscribe(to: await processor.eventStream, expectations: expectations)
await processor.start()
}
@ -138,7 +145,7 @@ class CompactBlockProcessorTests: XCTestCase {
wait(
for: [
syncStartedExpect,
idleNotificationExpectation
finishedNotificationExpectation
],
timeout: 30,
enforceOrder: false
@ -154,7 +161,7 @@ class CompactBlockProcessorTests: XCTestCase {
updatedNotificationExpectation.expectedFulfillmentCount = expectedUpdates
await startProcessing()
wait(for: [updatedNotificationExpectation, idleNotificationExpectation], timeout: 300)
wait(for: [updatedNotificationExpectation, finishedNotificationExpectation], timeout: 300)
}
private func expectedBatches(currentHeight: BlockHeight, targetHeight: BlockHeight, batchSize: Int) -> Int {

View File

@ -6,6 +6,7 @@
//
// Copyright © 2019 Electric Coin Company. All rights reserved.
import Combine
import XCTest
@testable import TestUtils
@testable import ZcashLightClientKit
@ -30,20 +31,22 @@ class CompactBlockReorgTests: XCTestCase {
.appendingPathComponent("tmp-\(Int.random(in: 0 ... .max))"))
let testFileManager = FileManager()
var cancellables: [AnyCancellable] = []
let processorEventHandler = CompactBlockProcessorEventHandler()
var processor: CompactBlockProcessor!
var syncStartedExpect: XCTestExpectation!
var updatedNotificationExpectation: XCTestExpectation!
var stopNotificationExpectation: XCTestExpectation!
var idleNotificationExpectation: XCTestExpectation!
var finishedNotificationExpectation: XCTestExpectation!
var reorgNotificationExpectation: XCTestExpectation!
let network = ZcashNetworkBuilder.network(for: .testnet)
let mockLatestHeight = ZcashNetworkBuilder.network(for: .testnet).constants.saplingActivationHeight + 2000
override func setUpWithError() throws {
try super.setUpWithError()
try self.testFileManager.createDirectory(at: self.testTempDirectory, withIntermediateDirectories: false)
logger = OSLogger(logLevel: .debug)
try self.testFileManager.createDirectory(at: self.testTempDirectory, withIntermediateDirectories: false)
XCTestCase.wait { await InternalSyncProgress(storage: UserDefaults.standard).rewind(to: 0) }
@ -99,40 +102,31 @@ class CompactBlockReorgTests: XCTestCase {
syncStartedExpect = XCTestExpectation(description: "\(self.description) syncStartedExpect")
stopNotificationExpectation = XCTestExpectation(description: "\(self.description) stopNotificationExpectation")
updatedNotificationExpectation = XCTestExpectation(description: "\(self.description) updatedNotificationExpectation")
idleNotificationExpectation = XCTestExpectation(description: "\(self.description) idleNotificationExpectation")
finishedNotificationExpectation = XCTestExpectation(description: "\(self.description) finishedNotificationExpectation")
reorgNotificationExpectation = XCTestExpectation(description: "\(self.description) reorgNotificationExpectation")
NotificationCenter.default.addObserver(
self,
selector: #selector(processorHandledReorg(_:)),
name: Notification.Name.blockProcessorHandledReOrg,
object: processor
)
NotificationCenter.default.addObserver(
self,
selector: #selector(processorFailed(_:)),
name: Notification.Name.blockProcessorFailed,
object: processor
)
var stream: AnyPublisher<CompactBlockProcessor.Event, Never>!
XCTestCase.wait { await stream = self.processor.eventStream }
stream
.sink { [weak self] event in
switch event {
case .failed: self?.processorFailed(event: event)
case .handledReorg: self?.processorHandledReorg(event: event)
default: break
}
}
.store(in: &cancellables)
}
override func tearDown() {
super.tearDown()
try! FileManager.default.removeItem(at: processorConfig.fsBlockCacheRoot)
try? FileManager.default.removeItem(at: processorConfig.dataDb)
syncStartedExpect.unsubscribeFromNotifications()
stopNotificationExpectation.unsubscribeFromNotifications()
updatedNotificationExpectation.unsubscribeFromNotifications()
idleNotificationExpectation.unsubscribeFromNotifications()
reorgNotificationExpectation.unsubscribeFromNotifications()
NotificationCenter.default.removeObserver(self)
}
@objc func processorHandledReorg(_ notification: Notification) {
XCTAssertNotNil(notification.userInfo)
if let reorg = notification.userInfo?[CompactBlockProcessorNotificationKey.reorgHeight] as? BlockHeight,
let rewind = notification.userInfo?[CompactBlockProcessorNotificationKey.rewindHeight] as? BlockHeight {
func processorHandledReorg(event: CompactBlockProcessor.Event) {
if case let .handledReorg(reorg, rewind) = event {
XCTAssertTrue( reorg == 0 || reorg > self.network.constants.saplingActivationHeight)
XCTAssertTrue( rewind == 0 || rewind > self.network.constants.saplingActivationHeight)
XCTAssertTrue( rewind <= reorg )
@ -142,9 +136,8 @@ class CompactBlockReorgTests: XCTestCase {
}
}
@objc func processorFailed(_ notification: Notification) {
XCTAssertNotNil(notification.userInfo)
if let error = notification.userInfo?["error"] {
func processorFailed(event: CompactBlockProcessor.Event) {
if case let .failed(error) = event {
XCTFail("CompactBlockProcessor failed with Error: \(error)")
} else {
XCTFail("CompactBlockProcessor failed")
@ -153,13 +146,15 @@ class CompactBlockReorgTests: XCTestCase {
private func startProcessing() async {
XCTAssertNotNil(processor)
// Subscribe to notifications
syncStartedExpect.subscribe(to: Notification.Name.blockProcessorStartedSyncing, object: processor)
stopNotificationExpectation.subscribe(to: Notification.Name.blockProcessorStopped, object: processor)
updatedNotificationExpectation.subscribe(to: Notification.Name.blockProcessorUpdated, object: processor)
idleNotificationExpectation.subscribe(to: Notification.Name.blockProcessorFinished, object: processor)
reorgNotificationExpectation.subscribe(to: Notification.Name.blockProcessorHandledReOrg, object: processor)
let expectations: [CompactBlockProcessorEventHandler.EventIdentifier: XCTestExpectation] = [
.startedSyncing: syncStartedExpect,
.stopped: stopNotificationExpectation,
.progressUpdated: updatedNotificationExpectation,
.finished: finishedNotificationExpectation,
.handleReorg: reorgNotificationExpectation
]
processorEventHandler.subscribe(to: await processor.eventStream, expectations: expectations)
await processor.start()
}
@ -171,7 +166,7 @@ class CompactBlockReorgTests: XCTestCase {
for: [
syncStartedExpect,
reorgNotificationExpectation,
idleNotificationExpectation
finishedNotificationExpectation
],
timeout: 300,
enforceOrder: true

View File

@ -0,0 +1,63 @@
//
// CompactBlockProcessorEventHandler.swift
//
//
// Created by Michal Fousek on 09.02.2023.
//
import Combine
import Foundation
import XCTest
@testable import ZcashLightClientKit
class CompactBlockProcessorEventHandler {
enum EventIdentifier: String {
case failed
case finished
case foundTransactions
case handleReorg
case progressUpdated
case storedUTXOs
case startedEnhancing
case startedFetching
case startedSyncing
case stopped
}
private let queue = DispatchQueue(label: "CompactBlockProcessorEventHandler")
private var cancelables: [AnyCancellable] = []
func subscribe(to eventStream: AnyPublisher<CompactBlockProcessor.Event, Never>, expectations: [EventIdentifier: XCTestExpectation]) {
eventStream
.receive(on: queue)
.sink { event in expectations[event.identifier]?.fulfill() }
.store(in: &cancelables)
}
}
extension CompactBlockProcessor.Event {
var identifier: CompactBlockProcessorEventHandler.EventIdentifier {
switch self {
case .failed:
return .failed
case .finished:
return .finished
case .foundTransactions:
return .foundTransactions
case .handledReorg:
return .handleReorg
case .progressUpdated:
return .progressUpdated
case .storedUTXOs:
return .storedUTXOs
case .startedEnhancing:
return .startedEnhancing
case .startedFetching:
return .startedFetching
case .startedSyncing:
return .startedSyncing
case .stopped:
return .stopped
}
}
}