diff --git a/Example/ZcashLightClientSample/ZcashLightClientSample/DemoAppConfig.swift b/Example/ZcashLightClientSample/ZcashLightClientSample/DemoAppConfig.swift index fd589065..1c4a07cb 100644 --- a/Example/ZcashLightClientSample/ZcashLightClientSample/DemoAppConfig.swift +++ b/Example/ZcashLightClientSample/ZcashLightClientSample/DemoAppConfig.swift @@ -16,6 +16,7 @@ enum DemoAppConfig { static var birthdayHeight: BlockHeight = ZcashSDK.isMainnet ? 935000 : 1386000 static var seed = try! Mnemonic.deterministicSeedBytes(from: "live combine flight accident slow soda mind bright absent bid hen shy decade biology amazing mix enlist ensure biology rhythm snap duty soap armor") + static var address: String { "\(host):\(port)" } diff --git a/Example/ZcashLightClientSample/ZcashLightClientSample/Get UTXOs/GetUTXOsViewController.swift b/Example/ZcashLightClientSample/ZcashLightClientSample/Get UTXOs/GetUTXOsViewController.swift index a0ed553c..5db349db 100644 --- a/Example/ZcashLightClientSample/ZcashLightClientSample/Get UTXOs/GetUTXOsViewController.swift +++ b/Example/ZcashLightClientSample/ZcashLightClientSample/Get UTXOs/GetUTXOsViewController.swift @@ -29,11 +29,13 @@ class GetUTXOsViewController: UIViewController { self.transparentAddressLabel.text = tAddress - // swiftlint:disable:next force_try - let balance = try! AppDelegate.shared.sharedSynchronizer.getTransparentBalance(accountIndex: 0) - - self.totalBalanceLabel.text = NumberFormatter.zcashNumberFormatter.string(from: NSNumber(value: balance.total.amount)) - self.verifiedBalanceLabel.text = NumberFormatter.zcashNumberFormatter.string(from: NSNumber(value: balance.verified.amount)) + Task { @MainActor in + // swiftlint:disable:next force_try + let balance = try! await AppDelegate.shared.sharedSynchronizer.getTransparentBalance(accountIndex: 0) + + self.totalBalanceLabel.text = NumberFormatter.zcashNumberFormatter.string(from: NSNumber(value: balance.total.amount)) + self.verifiedBalanceLabel.text = NumberFormatter.zcashNumberFormatter.string(from: NSNumber(value: balance.verified.amount)) + } } @IBAction func shieldFunds(_ sender: Any) { diff --git a/Example/ZcashLightClientSample/ZcashLightClientSample/Send/SendViewController.swift b/Example/ZcashLightClientSample/ZcashLightClientSample/Send/SendViewController.swift index abc6f62e..fa335309 100644 --- a/Example/ZcashLightClientSample/ZcashLightClientSample/Send/SendViewController.swift +++ b/Example/ZcashLightClientSample/ZcashLightClientSample/Send/SendViewController.swift @@ -32,8 +32,10 @@ class SendViewController: UIViewController { override func viewDidLoad() { super.viewDidLoad() synchronizer = AppDelegate.shared.sharedSynchronizer - // swiftlint:disable:next force_try - try! synchronizer.prepare() + Task { @MainActor in + // swiftlint:disable:next force_try + try! await synchronizer.prepare() + } let tapRecognizer = UITapGestureRecognizer(target: self, action: #selector(viewTapped(_:))) self.view.addGestureRecognizer(tapRecognizer) setUp() @@ -41,12 +43,14 @@ class SendViewController: UIViewController { override func viewDidAppear(_ animated: Bool) { super.viewDidAppear(animated) - do { - try synchronizer.start(retry: false) - self.synchronizerStatusLabel.text = SDKSynchronizer.textFor(state: synchronizer.status) - } catch { - self.synchronizerStatusLabel.text = SDKSynchronizer.textFor(state: synchronizer.status) - fail(error) + Task { @MainActor in + do { + try await synchronizer.start(retry: false) + self.synchronizerStatusLabel.text = SDKSynchronizer.textFor(state: synchronizer.status) + } catch { + self.synchronizerStatusLabel.text = SDKSynchronizer.textFor(state: synchronizer.status) + fail(error) + } } } diff --git a/Example/ZcashLightClientSample/ZcashLightClientSample/Sync Blocks/SyncBlocksViewController.swift b/Example/ZcashLightClientSample/ZcashLightClientSample/Sync Blocks/SyncBlocksViewController.swift index 2e7760c2..8c657acd 100644 --- a/Example/ZcashLightClientSample/ZcashLightClientSample/Sync Blocks/SyncBlocksViewController.swift +++ b/Example/ZcashLightClientSample/ZcashLightClientSample/Sync Blocks/SyncBlocksViewController.swift @@ -31,7 +31,9 @@ class SyncBlocksViewController: UIViewController { // swiftlint:disable:next force_try try! wallet.initialize() processor = CompactBlockProcessor(initializer: wallet) - statusLabel.text = textFor(state: processor?.state.getState() ?? .stopped) + Task { @MainActor in + statusLabel.text = textFor(state: await processor?.state ?? .stopped) + } progressBar.progress = 0 NotificationCenter.default.addObserver( @@ -47,14 +49,16 @@ class SyncBlocksViewController: UIViewController { NotificationCenter.default.removeObserver(self) guard let processor = self.processor else { return } - processor.stop() + Task { + await processor.stop() + } } @objc func processorNotification(_ notification: Notification) { - DispatchQueue.main.async { + Task { @MainActor in guard self.processor != nil else { return } - self.updateUI() + await self.updateUI() switch notification.name { case let not where not == Notification.Name.blockProcessorUpdated: @@ -70,30 +74,28 @@ class SyncBlocksViewController: UIViewController { @IBAction func startStop() { guard let processor = processor else { return } - switch processor.state.getState() { - case .stopped: - startProcessor() - default: - stopProcessor() + Task { @MainActor in + switch await processor.state { + case .stopped: + await startProcessor() + default: + await stopProcessor() + } } } - func startProcessor() { + func startProcessor() async { guard let processor = processor else { return } - do { - try processor.start() - updateUI() - } catch { - fail(error: error) - } + await processor.start() + await updateUI() } - func stopProcessor() { + func stopProcessor() async { guard let processor = processor else { return } - processor.stop() - updateUI() + await processor.stop() + await updateUI() } func fail(error: Error) { @@ -110,11 +112,13 @@ class SyncBlocksViewController: UIViewController { ) self.present(alert, animated: true, completion: nil) - updateUI() + Task { @MainActor in + await updateUI() + } } - func updateUI() { - guard let state = processor?.state.getState() else { return } + func updateUI() async { + guard let state = await processor?.state else { return } statusLabel.text = textFor(state: state) startPause.setTitle(buttonText(for: state), for: .normal) diff --git a/Sources/ZcashLightClientKit/Block/Processor/CompactBlockDownload.swift b/Sources/ZcashLightClientKit/Block/Processor/CompactBlockDownload.swift index 4a78030e..f02b8e07 100644 --- a/Sources/ZcashLightClientKit/Block/Processor/CompactBlockDownload.swift +++ b/Sources/ZcashLightClientKit/Block/Processor/CompactBlockDownload.swift @@ -16,7 +16,7 @@ extension CompactBlockProcessor { ) async throws { try Task.checkCancellation() - setState(.downloading) + state = .downloading var buffer: [ZcashCompactBlock] = [] var targetHeightInternal: BlockHeight? diff --git a/Sources/ZcashLightClientKit/Block/Processor/CompactBlockEnhancement.swift b/Sources/ZcashLightClientKit/Block/Processor/CompactBlockEnhancement.swift index cb284414..98a22d32 100644 --- a/Sources/ZcashLightClientKit/Block/Processor/CompactBlockEnhancement.swift +++ b/Sources/ZcashLightClientKit/Block/Processor/CompactBlockEnhancement.swift @@ -56,8 +56,8 @@ extension CompactBlockProcessor { try Task.checkCancellation() LoggerProxy.debug("Started Enhancing range: \(range)") - setState(.enhancing) - + state = .enhancing + let blockRange = range.blockRange() var retries = 0 let maxRetries = 5 diff --git a/Sources/ZcashLightClientKit/Block/Processor/CompactBlockProcessor.swift b/Sources/ZcashLightClientKit/Block/Processor/CompactBlockProcessor.swift index fe30bf6d..bd0c538c 100644 --- a/Sources/ZcashLightClientKit/Block/Processor/CompactBlockProcessor.swift +++ b/Sources/ZcashLightClientKit/Block/Processor/CompactBlockProcessor.swift @@ -103,7 +103,7 @@ public enum CompactBlockProgress { } protocol EnhancementStreamDelegate: AnyObject { - func transactionEnhancementProgressUpdated(_ progress: EnhancementProgress) + func transactionEnhancementProgressUpdated(_ progress: EnhancementProgress) async } public protocol EnhancementProgress { @@ -213,7 +213,7 @@ public extension Notification.Name { /// The compact block processor is in charge of orchestrating the download and caching of compact blocks from a LightWalletEndpoint /// when started the processor downloads does a download - validate - scan cycle until it reaches latest height on the blockchain. -public class CompactBlockProcessor { +public actor CompactBlockProcessor { /// Compact Block Processor configuration /// @@ -312,31 +312,12 @@ public class CompactBlockProcessor { case synced } - // TODO: this isn't an Actor even though it looks like a good candidate, the reason: - // `state` lives in both sync and async environments. An Actor is demanding async context only - // so we can't take the advantage unless we encapsulate all `state` reads/writes to async context. - // Therefore solution with class + lock works for us butr eventually will be replaced. - // The future of CompactBlockProcessor is an actor (we won't need to encapsulate the state separately), issue 523, - // https://github.com/zcash/ZcashLightClientKit/issues/523 - public class ThreadSafeState { - private var state: State = .stopped - let lock = NSLock() - - func setState(_ newState: State) { - lock.lock() - defer { lock.unlock() } - state = newState - } - - public func getState() -> State { - lock.lock() - defer { lock.unlock() } - return state + public internal(set) var state: State = .stopped { + didSet { + transitionState(from: oldValue, to: self.state) } } - public internal(set) var state = ThreadSafeState() - var config: Configuration { willSet { self.stop() @@ -348,7 +329,7 @@ public class CompactBlockProcessor { } var shouldStart: Bool { - switch self.state.getState() { + switch self.state { case .stopped, .synced, .error: return !maxAttemptsReached default: @@ -386,7 +367,7 @@ public class CompactBlockProcessor { /// - storage: concrete implementation of `CompactBlockStorage` protocol /// - backend: a class that complies to `ZcashRustBackendWelding` /// - config: `Configuration` struct for this processor - convenience init( + init( service: LightWalletService, storage: CompactBlockStorage, backend: ZcashRustBackendWelding.Type, @@ -407,7 +388,7 @@ public class CompactBlockProcessor { /// Initializes a CompactBlockProcessor instance from an Initialized object /// - Parameters: /// - initializer: an instance that complies to CompactBlockDownloading protocol - public convenience init(initializer: Initializer) { + public init(initializer: Initializer) { self.init( service: initializer.lightWalletService, storage: initializer.storage, @@ -448,12 +429,6 @@ public class CompactBlockProcessor { cancelableTask?.cancel() } - func setState(_ newState: State) { - let oldValue = state.getState() - state.setState(newState) - transitionState(from: oldValue, to: newState) - } - static func validateServerInfo( _ info: LightWalletdInfo, saplingActivation: BlockHeight, @@ -500,7 +475,7 @@ public class CompactBlockProcessor { /// triggers the blockProcessorStartedDownloading notification /// /// - Important: subscribe to the notifications before calling this method - public func start(retry: Bool = false) throws { + public func start(retry: Bool = false) async { if retry { self.retryAttempts = 0 self.processingError = nil @@ -509,12 +484,12 @@ public class CompactBlockProcessor { } guard shouldStart else { - switch self.state.getState() { + switch self.state { case .error(let e): // max attempts have been reached LoggerProxy.info("max retry attempts reached with error: \(e)") notifyError(CompactBlockProcessorError.maxAttemptsReached(attempts: self.maxAttempts)) - setState(.stopped) + state = .stopped case .stopped: // max attempts have been reached LoggerProxy.info("max retry attempts reached") @@ -529,7 +504,7 @@ public class CompactBlockProcessor { return } - self.nextBatch() + await self.nextBatch() } /** @@ -545,14 +520,13 @@ public class CompactBlockProcessor { cancelableTask?.cancel() self.retryAttempts = 0 - setState(.stopped) } /** Rewinds to provided height. If nil is provided, it will rescan to nearest height (quick rescan) */ - public func rewindTo(_ height: BlockHeight?) throws -> BlockHeight { + public func rewindTo(_ height: BlockHeight?) async throws -> BlockHeight { self.stop() let lastDownloaded = try downloader.lastDownloadedBlockHeight() @@ -563,7 +537,7 @@ public class CompactBlockProcessor { let error = rustBackend.lastError() ?? RustWeldingError.genericError( message: "unknown error getting nearest rewind height for height: \(height)" ) - fail(error) + await fail(error) throw error } @@ -571,7 +545,7 @@ public class CompactBlockProcessor { let rewindHeight = max(Int32(nearestHeight - 1), Int32(config.walletBirthday)) guard rustBackend.rewindToHeight(dbData: config.dataDb, height: rewindHeight, networkType: self.config.network.networkType) else { let error = rustBackend.lastError() ?? RustWeldingError.genericError(message: "unknown error rewinding to height \(height)") - fail(error) + await fail(error) throw error } @@ -588,7 +562,7 @@ public class CompactBlockProcessor { - Throws CompactBlockProcessorError.invalidConfiguration if block height is invalid or if processor is already started */ func setStartHeight(_ startHeight: BlockHeight) throws { - guard self.state.getState() == .stopped, startHeight >= config.network.constants.saplingActivationHeight else { + guard self.state == .stopped, startHeight >= config.network.constants.saplingActivationHeight else { throw CompactBlockProcessorError.invalidConfiguration } @@ -597,27 +571,24 @@ public class CompactBlockProcessor { self.config = config } - func validateServer(completionBlock: @escaping (() -> Void)) { - Task { @MainActor in - do { - let info = try await self.service.getInfo() - try Self.validateServerInfo( - info, - saplingActivation: self.config.saplingActivation, - localNetwork: self.config.network, - rustBackend: self.rustBackend - ) - completionBlock() - } catch let error as LightWalletServiceError { - self.severeFailure(error.mapToProcessorError()) - } catch { - self.severeFailure(error) - } + func validateServer() async { + do { + let info = try await self.service.getInfo() + try Self.validateServerInfo( + info, + saplingActivation: self.config.saplingActivation, + localNetwork: self.config.network, + rustBackend: self.rustBackend + ) + } catch let error as LightWalletServiceError { + self.severeFailure(error.mapToProcessorError()) + } catch { + self.severeFailure(error) } } /// Processes new blocks on the given range based on the configuration set for this instance - func processNewBlocks(range: CompactBlockRange) { + func processNewBlocks(range: CompactBlockRange) async { self.foundBlocks = true self.backoffTimer?.invalidate() self.backoffTimer = nil @@ -633,13 +604,12 @@ public class CompactBlockProcessor { try await compactBlockBatchScanning(range: range) try await compactBlockEnhancement(range: range) try await fetchUnspentTxOutputs(range: range) + //state = .stopped } catch { - if error is CancellationError { - - } - if !(Task.isCancelled) { - fail(error) + await fail(error) + } else { + state = .stopped } } } @@ -658,7 +628,7 @@ public class CompactBlockProcessor { LoggerProxy.debug("progress: \(progress)") - NotificationCenter.default.post( + NotificationCenter.default.mainThreadPost( name: Notification.Name.blockProcessorUpdated, object: self, userInfo: userInfo @@ -666,7 +636,7 @@ public class CompactBlockProcessor { } func notifyTransactions(_ txs: [ConfirmedTransactionEntity], in range: BlockRange) { - NotificationCenter.default.post( + NotificationCenter.default.mainThreadPost( name: .blockProcessorFoundTransactions, object: self, userInfo: [ @@ -691,29 +661,29 @@ public class CompactBlockProcessor { self.backoffTimer?.invalidate() self.retryAttempts = config.retries self.processingError = error - setState(.error(error)) + state = .error(error) self.notifyError(error) } - func fail(_ error: Error) { + func fail(_ error: Error) async { // todo specify: failure LoggerProxy.error("\(error)") cancelableTask?.cancel() self.retryAttempts += 1 self.processingError = error - switch self.state.getState() { + switch self.state { case .error: notifyError(error) default: break } - setState(.error(error)) + state = .error(error) guard self.maxAttemptsReached else { return } // don't set a new timer if there are no more attempts. - self.setTimer() + await self.setTimer() } - func retryProcessing(range: CompactBlockRange) { + func retryProcessing(range: CompactBlockRange) async { cancelableTask?.cancel() // update retries self.retryAttempts += 1 @@ -728,10 +698,9 @@ public class CompactBlockProcessor { try downloader.rewind(to: max(range.lowerBound, self.config.walletBirthday)) // process next batch - // processNewBlocks(range: Self.nextBatchBlockRange(latestHeight: latestBlockHeight, latestDownloadedHeight: try downloader.lastDownloadedBlockHeight(), walletBirthday: config.walletBirthday)) - nextBatch() + await nextBatch() } catch { - self.fail(error) + await self.fail(error) } } @@ -766,41 +735,39 @@ public class CompactBlockProcessor { } } - private func nextBatch() { - setState(.downloading) - Task { @MainActor [self] in - do { - let nextState = try await NextStateHelper.nextStateAsync( - service: self.service, - downloader: self.downloader, - config: self.config, - rustBackend: self.rustBackend + private func nextBatch() async { + state = .downloading + do { + let nextState = try await NextStateHelper.nextStateAsync( + service: self.service, + downloader: self.downloader, + config: self.config, + rustBackend: self.rustBackend + ) + switch nextState { + case .finishProcessing(let height): + self.latestBlockHeight = height + await self.processingFinished(height: height) + case .processNewBlocks(let range): + self.latestBlockHeight = range.upperBound + self.lowerBoundHeight = range.lowerBound + await self.processNewBlocks(range: range) + case let .wait(latestHeight, latestDownloadHeight): + // Lightwalletd might be syncing + self.lowerBoundHeight = latestDownloadHeight + self.latestBlockHeight = latestHeight + LoggerProxy.info( + "Lightwalletd might be syncing: latest downloaded block height is: \(latestDownloadHeight)" + + "while latest blockheight is reported at: \(latestHeight)" ) - switch nextState { - case .finishProcessing(let height): - self.latestBlockHeight = height - self.processingFinished(height: height) - case .processNewBlocks(let range): - self.latestBlockHeight = range.upperBound - self.lowerBoundHeight = range.lowerBound - self.processNewBlocks(range: range) - case let .wait(latestHeight, latestDownloadHeight): - // Lightwalletd might be syncing - self.lowerBoundHeight = latestDownloadHeight - self.latestBlockHeight = latestHeight - LoggerProxy.info( - "Lightwalletd might be syncing: latest downloaded block height is: \(latestDownloadHeight)" + - "while latest blockheight is reported at: \(latestHeight)" - ) - self.processingFinished(height: latestDownloadHeight) - } - } catch { - self.severeFailure(error) + await self.processingFinished(height: latestDownloadHeight) } + } catch { + self.severeFailure(error) } } - internal func validationFailed(at height: BlockHeight) { + internal func validationFailed(at height: BlockHeight) async { // cancel all Tasks cancelableTask?.cancel() @@ -816,7 +783,7 @@ public class CompactBlockProcessor { ) guard rustBackend.rewindToHeight(dbData: config.dataDb, height: Int32(rewindHeight), networkType: self.config.network.networkType) else { - fail(rustBackend.lastError() ?? RustWeldingError.genericError(message: "unknown error rewinding to height \(height)")) + await fail(rustBackend.lastError() ?? RustWeldingError.genericError(message: "unknown error rewinding to height \(height)")) return } @@ -824,7 +791,7 @@ public class CompactBlockProcessor { try downloader.rewind(to: rewindHeight) // notify reorg - NotificationCenter.default.post( + NotificationCenter.default.mainThreadPost( name: Notification.Name.blockProcessorHandledReOrg, object: self, userInfo: [ @@ -833,15 +800,15 @@ public class CompactBlockProcessor { ) // process next batch - self.nextBatch() + await self.nextBatch() } catch { - self.fail(error) + await self.fail(error) } } - internal func processBatchFinished(range: CompactBlockRange) { + internal func processBatchFinished(range: CompactBlockRange) async { guard processingError == nil else { - retryProcessing(range: range) + await retryProcessing(range: range) return } @@ -849,15 +816,15 @@ public class CompactBlockProcessor { consecutiveChainValidationErrors = 0 guard !range.isEmpty else { - processingFinished(height: range.upperBound) + await processingFinished(height: range.upperBound) return } - nextBatch() + await nextBatch() } - private func processingFinished(height: BlockHeight) { - NotificationCenter.default.post( + private func processingFinished(height: BlockHeight) async { + NotificationCenter.default.mainThreadPost( name: Notification.Name.blockProcessorFinished, object: self, userInfo: [ @@ -865,35 +832,33 @@ public class CompactBlockProcessor { CompactBlockProcessorNotificationKey.foundBlocks: self.foundBlocks ] ) - setState(.synced) - setTimer() + state = .synced + await setTimer() } - private func setTimer() { + private func setTimer() async { let interval = self.config.blockPollInterval self.backoffTimer?.invalidate() let timer = Timer( timeInterval: interval, repeats: true, block: { [weak self] _ in - guard let self = self else { return } - do { - if self.shouldStart { + Task { [self] in + guard let self = self else { return } + if await self.shouldStart { LoggerProxy.debug( - """ - Timer triggered: Starting compact Block processor!. - Processor State: \(self.state) - latestHeight: \(self.latestBlockHeight) - attempts: \(self.retryAttempts) - lowerbound: \(String(describing: self.lowerBoundHeight)) - """ + """ + Timer triggered: Starting compact Block processor!. + Processor State: \(await self.state) + latestHeight: \(await self.latestBlockHeight) + attempts: \(await self.retryAttempts) + lowerbound: \(String(describing: await self.lowerBoundHeight)) + """ ) - try self.start() - } else if self.maxAttemptsReached { - self.fail(CompactBlockProcessorError.maxAttemptsReached(attempts: self.config.retries)) + await self.start() + } else if await self.maxAttemptsReached { + await self.fail(CompactBlockProcessorError.maxAttemptsReached(attempts: self.config.retries)) } - } catch { - self.fail(error) } } ) @@ -907,7 +872,7 @@ public class CompactBlockProcessor { return } - NotificationCenter.default.post( + NotificationCenter.default.mainThreadPost( name: .blockProcessorStatusChanged, object: self, userInfo: [ @@ -918,27 +883,27 @@ public class CompactBlockProcessor { switch newValue { case .downloading: - NotificationCenter.default.post(name: Notification.Name.blockProcessorStartedDownloading, object: self) + NotificationCenter.default.mainThreadPost(name: Notification.Name.blockProcessorStartedDownloading, object: self) case .synced: // transition to this state is handled by `processingFinished(height: BlockHeight)` break case .error(let err): notifyError(err) case .scanning: - NotificationCenter.default.post(name: Notification.Name.blockProcessorStartedScanning, object: self) + NotificationCenter.default.mainThreadPost(name: Notification.Name.blockProcessorStartedScanning, object: self) case .stopped: - NotificationCenter.default.post(name: Notification.Name.blockProcessorStopped, object: self) + NotificationCenter.default.mainThreadPost(name: Notification.Name.blockProcessorStopped, object: self) case .validating: - NotificationCenter.default.post(name: Notification.Name.blockProcessorStartedValidating, object: self) + NotificationCenter.default.mainThreadPost(name: Notification.Name.blockProcessorStartedValidating, object: self) case .enhancing: - NotificationCenter.default.post(name: Notification.Name.blockProcessorStartedEnhancing, object: self) + NotificationCenter.default.mainThreadPost(name: Notification.Name.blockProcessorStartedEnhancing, object: self) case .fetching: - NotificationCenter.default.post(name: Notification.Name.blockProcessorStartedFetching, object: self) + NotificationCenter.default.mainThreadPost(name: Notification.Name.blockProcessorStartedFetching, object: self) } } private func notifyError(_ err: Error) { - NotificationCenter.default.post( + NotificationCenter.default.mainThreadPost( name: Notification.Name.blockProcessorFailed, object: self, userInfo: [CompactBlockProcessorNotificationKey.error: mapError(err)] @@ -1157,7 +1122,7 @@ extension CompactBlockProcessorError: LocalizedError { extension CompactBlockProcessor: EnhancementStreamDelegate { func transactionEnhancementProgressUpdated(_ progress: EnhancementProgress) { - NotificationCenter.default.post( + NotificationCenter.default.mainThreadPost( name: .blockProcessorEnhancementProgress, object: self, userInfo: [CompactBlockProcessorNotificationKey.enhancementProgress: progress] diff --git a/Sources/ZcashLightClientKit/Block/Processor/CompactBlockScanning.swift b/Sources/ZcashLightClientKit/Block/Processor/CompactBlockScanning.swift index f85f1b8d..45535ca5 100644 --- a/Sources/ZcashLightClientKit/Block/Processor/CompactBlockScanning.swift +++ b/Sources/ZcashLightClientKit/Block/Processor/CompactBlockScanning.swift @@ -12,7 +12,7 @@ extension CompactBlockProcessor { func compactBlockBatchScanning(range: CompactBlockRange) async throws { try Task.checkCancellation() - setState(.scanning) + state = .scanning let batchSize = UInt32(config.scanningBatchSize) do { @@ -24,7 +24,7 @@ extension CompactBlockProcessor { throw error } let scanFinishTime = Date() - NotificationCenter.default.post( + NotificationCenter.default.mainThreadPostNotification( SDKMetrics.progressReportNotification( progress: BlockProgress( startHeight: range.lowerBound, @@ -68,7 +68,7 @@ extension CompactBlockProcessor { if scannedNewBlocks { let progress = BlockProgress(startHeight: scanStartHeight, targetHeight: targetScanHeight, progressHeight: lastScannedHeight) notifyProgress(.scan(progress)) - NotificationCenter.default.post( + NotificationCenter.default.mainThreadPostNotification( SDKMetrics.progressReportNotification( progress: progress, start: scanStartTime, @@ -81,9 +81,11 @@ extension CompactBlockProcessor { let seconds = scanFinishTime.timeIntervalSinceReferenceDate - scanStartTime.timeIntervalSinceReferenceDate LoggerProxy.debug("Scanned \(heightCount) blocks in \(seconds) seconds") } + + await Task.yield() } while !Task.isCancelled && scannedNewBlocks && lastScannedHeight < targetScanHeight if Task.isCancelled { - setState(.stopped) + state = .stopped LoggerProxy.debug("Warning: compactBlockBatchScanning cancelled") } } diff --git a/Sources/ZcashLightClientKit/Block/Processor/CompactBlockValidationInformation.swift b/Sources/ZcashLightClientKit/Block/Processor/CompactBlockValidationInformation.swift index fccf1fec..f7bf6853 100644 --- a/Sources/ZcashLightClientKit/Block/Processor/CompactBlockValidationInformation.swift +++ b/Sources/ZcashLightClientKit/Block/Processor/CompactBlockValidationInformation.swift @@ -17,7 +17,7 @@ extension CompactBlockProcessor { func compactBlockValidation() async throws { try Task.checkCancellation() - setState(.validating) + state = .validating let result = rustBackend.validateCombinedChain(dbCache: config.cacheDb, dbData: config.dataDb, networkType: config.network.networkType) @@ -30,7 +30,7 @@ extension CompactBlockProcessor { case ZcashRustBackendWeldingConstants.validChain: if Task.isCancelled { - setState(.stopped) + state = .stopped LoggerProxy.debug("Warning: compactBlockValidation cancelled") } LoggerProxy.debug("validateChainFinished") @@ -50,11 +50,11 @@ extension CompactBlockProcessor { switch validationError { case .validationFailed(let height): LoggerProxy.debug("chain validation at height: \(height)") - validationFailed(at: height) + await validationFailed(at: height) case .failedWithError(let err): guard let validationFailure = err else { LoggerProxy.error("validation failed without a specific error") - self.fail(CompactBlockProcessorError.generalError(message: "validation failed without a specific error")) + await self.fail(CompactBlockProcessorError.generalError(message: "validation failed without a specific error")) return } diff --git a/Sources/ZcashLightClientKit/Block/Processor/FetchUnspentTxOutputs.swift b/Sources/ZcashLightClientKit/Block/Processor/FetchUnspentTxOutputs.swift index 2e5e5265..1a497841 100644 --- a/Sources/ZcashLightClientKit/Block/Processor/FetchUnspentTxOutputs.swift +++ b/Sources/ZcashLightClientKit/Block/Processor/FetchUnspentTxOutputs.swift @@ -16,8 +16,8 @@ extension CompactBlockProcessor { func fetchUnspentTxOutputs(range: CompactBlockRange) async throws { try Task.checkCancellation() - setState(.fetching) - + state = .fetching + do { let tAddresses = try accountRepository.getAll().map({ $0.transparentAddress }) do { @@ -64,7 +64,7 @@ extension CompactBlockProcessor { let result = (inserted: refreshed, skipped: skipped) - NotificationCenter.default.post( + NotificationCenter.default.mainThreadPost( name: .blockProcessorStoredUTXOs, object: self, userInfo: [CompactBlockProcessorNotificationKey.refreshedUTXOs: result] @@ -73,7 +73,7 @@ extension CompactBlockProcessor { if Task.isCancelled { LoggerProxy.debug("Warning: fetchUnspentTxOutputs on range \(range) cancelled") } else { - processBatchFinished(range: range) + await processBatchFinished(range: range) } } catch { throw error diff --git a/Sources/ZcashLightClientKit/Block/Utils/NotificationCenter+Post.swift b/Sources/ZcashLightClientKit/Block/Utils/NotificationCenter+Post.swift new file mode 100644 index 00000000..5193640b --- /dev/null +++ b/Sources/ZcashLightClientKit/Block/Utils/NotificationCenter+Post.swift @@ -0,0 +1,30 @@ +// +// NotificationCenter+Post.swift +// +// +// Created by Lukáš Korba on 12.10.2022. +// + +import Foundation + +extension NotificationCenter { + func mainThreadPost( + name aName: NSNotification.Name, + object anObject: Any?, + userInfo aUserInfo: [AnyHashable : Any]? = nil + ) { + DispatchQueue.main.async { + NotificationCenter.default.post( + name: aName, + object: anObject, + userInfo: aUserInfo + ) + } + } + + func mainThreadPostNotification(_ notification: Notification) { + DispatchQueue.main.async { + NotificationCenter.default.post(notification) + } + } +} diff --git a/Sources/ZcashLightClientKit/Service/LightWalletGRPCService.swift b/Sources/ZcashLightClientKit/Service/LightWalletGRPCService.swift index d3f8828d..d78ab118 100644 --- a/Sources/ZcashLightClientKit/Service/LightWalletGRPCService.swift +++ b/Sources/ZcashLightClientKit/Service/LightWalletGRPCService.swift @@ -417,7 +417,7 @@ extension LightWalletServiceError { class ConnectionStatusManager: ConnectivityStateDelegate { func connectivityStateDidChange(from oldState: ConnectivityState, to newState: ConnectivityState) { LoggerProxy.event("Connection Changed from \(oldState) to \(newState)") - NotificationCenter.default.post( + NotificationCenter.default.mainThreadPost( name: .blockProcessorConnectivityStateChanged, object: self, userInfo: [ diff --git a/Sources/ZcashLightClientKit/Synchronizer.swift b/Sources/ZcashLightClientKit/Synchronizer.swift index f48248d9..a3ac5e82 100644 --- a/Sources/ZcashLightClientKit/Synchronizer.swift +++ b/Sources/ZcashLightClientKit/Synchronizer.swift @@ -80,7 +80,7 @@ public protocol Synchronizer { /// prepares this initializer to operate. Initializes the internal state with the given /// Extended Viewing Keys and a wallet birthday found in the initializer object - func prepare() throws + func prepare() async throws ///Starts this synchronizer within the given scope. /// @@ -95,18 +95,18 @@ public protocol Synchronizer { /// Gets the sapling shielded address for the given account. /// - Parameter accountIndex: the optional accountId whose address is of interest. By default, the first account is used. /// - Returns the address or nil if account index is incorrect - func getShieldedAddress(accountIndex: Int) -> SaplingShieldedAddress? + func getShieldedAddress(accountIndex: Int) async -> SaplingShieldedAddress? /// Gets the unified address for the given account. /// - Parameter accountIndex: the optional accountId whose address is of interest. By default, the first account is used. /// - Returns the address or nil if account index is incorrect - func getUnifiedAddress(accountIndex: Int) -> UnifiedAddress? + func getUnifiedAddress(accountIndex: Int) async -> UnifiedAddress? /// Gets the transparent address for the given account. /// - Parameter accountIndex: the optional accountId whose address is of interest. By default, the first account is used. /// - Returns the address or nil if account index is incorrect - func getTransparentAddress(accountIndex: Int) -> TransparentAddress? + func getTransparentAddress(accountIndex: Int) async -> TransparentAddress? /// Sends zatoshi. /// - Parameter spendingKey: the key that allows spends to occur. @@ -167,7 +167,7 @@ public protocol Synchronizer { func allConfirmedTransactions(from transaction: ConfirmedTransactionEntity?, limit: Int) throws -> [ConfirmedTransactionEntity]? /// Returns the latest downloaded height from the compact block cache - func latestDownloadedHeight() throws -> BlockHeight + func latestDownloadedHeight() async throws -> BlockHeight /// Returns the latest block height from the provided Lightwallet endpoint @@ -176,14 +176,14 @@ public protocol Synchronizer { /// Returns the latest block height from the provided Lightwallet endpoint /// Blocking - func latestHeight() throws -> BlockHeight + func latestHeight() async throws -> BlockHeight /// Returns the latests UTXOs for the given address from the specified height on func refreshUTXOs(address: String, from height: BlockHeight) async throws -> RefreshedUTXOs /// Returns the last stored unshielded balance - func getTransparentBalance(accountIndex: Int) throws -> WalletBalance + func getTransparentBalance(accountIndex: Int) async throws -> WalletBalance /// Returns the shielded total balance (includes verified and unverified balance) @@ -207,7 +207,7 @@ public protocol Synchronizer { /// - Throws rewindErrorUnknownArchorHeight when the rewind points to an invalid height /// - Throws rewindError for other errors /// - Note rewind does not trigger notifications as a reorg would. You need to restart the synchronizer afterwards - func rewind(_ policy: RewindPolicy) throws + func rewind(_ policy: RewindPolicy) async throws } public enum SyncStatus: Equatable { diff --git a/Sources/ZcashLightClientKit/Synchronizer/SDKSynchronizer.swift b/Sources/ZcashLightClientKit/Synchronizer/SDKSynchronizer.swift index 2512441b..b84df400 100644 --- a/Sources/ZcashLightClientKit/Synchronizer/SDKSynchronizer.swift +++ b/Sources/ZcashLightClientKit/Synchronizer/SDKSynchronizer.swift @@ -151,17 +151,19 @@ public class SDKSynchronizer: Synchronizer { deinit { NotificationCenter.default.removeObserver(self) - self.blockProcessor.stop() + Task { [blockProcessor] in + await blockProcessor.stop() + } } - public func initialize() throws { + public func initialize() async throws { try self.initializer.initialize() - try self.blockProcessor.setStartHeight(initializer.walletBirthday) + try await self.blockProcessor.setStartHeight(initializer.walletBirthday) } - public func prepare() throws { + public func prepare() async throws { try self.initializer.initialize() - try self.blockProcessor.setStartHeight(initializer.walletBirthday) + try await self.blockProcessor.setStartHeight(initializer.walletBirthday) self.status = .disconnected } @@ -177,10 +179,8 @@ public class SDKSynchronizer: Synchronizer { return case .stopped, .synced, .disconnected, .error: - do { - try blockProcessor.start(retry: retry) - } catch { - throw mapError(error) + Task { + await blockProcessor.start(retry: retry) } } } @@ -192,8 +192,10 @@ public class SDKSynchronizer: Synchronizer { return } - blockProcessor.stop() - self.status = .stopped + Task(priority: .high) { + await blockProcessor.stop() + self.status = .stopped + } } private func subscribeToProcessorNotifications(_ processor: CompactBlockProcessor) { @@ -314,7 +316,7 @@ public class SDKSynchronizer: Synchronizer { } let currentState = ConnectionState(current) - NotificationCenter.default.post( + NotificationCenter.default.mainThreadPost( name: .synchronizerConnectionStateChanged, object: self, userInfo: [ @@ -336,7 +338,7 @@ public class SDKSynchronizer: Synchronizer { return } - NotificationCenter.default.post( + NotificationCenter.default.mainThreadPost( name: .synchronizerFoundTransactions, object: self, userInfo: [ @@ -489,7 +491,7 @@ public class SDKSynchronizer: Synchronizer { do { let tAddr = try derivationTool.deriveTransparentAddressFromPrivateKey(transparentSecretKey) - let tBalance = try utxoRepository.balance(address: tAddr, latestHeight: self.latestDownloadedHeight()) + let tBalance = try await utxoRepository.balance(address: tAddr, latestHeight: self.latestDownloadedHeight()) // Verify that at least there are funds for the fee. Ideally this logic will be improved by the shielding wallet. guard tBalance.verified >= self.network.constants.defaultFee(for: self.latestScannedHeight) else { @@ -566,8 +568,8 @@ public class SDKSynchronizer: Synchronizer { PagedTransactionRepositoryBuilder.build(initializer: initializer, kind: .all) } - public func latestDownloadedHeight() throws -> BlockHeight { - try blockProcessor.downloader.lastDownloadedBlockHeight() + public func latestDownloadedHeight() async throws -> BlockHeight { + try await blockProcessor.downloader.lastDownloadedBlockHeight() } public func latestHeight(result: @escaping (Result) -> Void) { @@ -581,8 +583,8 @@ public class SDKSynchronizer: Synchronizer { } } - public func latestHeight() throws -> BlockHeight { - try blockProcessor.downloader.latestBlockHeight() + public func latestHeight() async throws -> BlockHeight { + try await blockProcessor.downloader.latestBlockHeight() } public func latestUTXOs(address: String) async throws -> [UnspentTransactionOutputEntity] { @@ -626,34 +628,34 @@ public class SDKSynchronizer: Synchronizer { initializer.getVerifiedBalance(account: accountIndex) } - public func getShieldedAddress(accountIndex: Int) -> SaplingShieldedAddress? { - blockProcessor.getShieldedAddress(accountIndex: accountIndex) + public func getShieldedAddress(accountIndex: Int) async -> SaplingShieldedAddress? { + await blockProcessor.getShieldedAddress(accountIndex: accountIndex) } - public func getUnifiedAddress(accountIndex: Int) -> UnifiedAddress? { - blockProcessor.getUnifiedAddres(accountIndex: accountIndex) + public func getUnifiedAddress(accountIndex: Int) async -> UnifiedAddress? { + await blockProcessor.getUnifiedAddres(accountIndex: accountIndex) } - public func getTransparentAddress(accountIndex: Int) -> TransparentAddress? { - blockProcessor.getTransparentAddress(accountIndex: accountIndex) + public func getTransparentAddress(accountIndex: Int) async -> TransparentAddress? { + await blockProcessor.getTransparentAddress(accountIndex: accountIndex) } - public func getTransparentBalance(accountIndex: Int) throws -> WalletBalance { - try blockProcessor.getTransparentBalance(accountIndex: accountIndex) + public func getTransparentBalance(accountIndex: Int) async throws -> WalletBalance { + try await blockProcessor.getTransparentBalance(accountIndex: accountIndex) } /** Returns the last stored unshielded balance */ - public func getTransparentBalance(address: String) throws -> WalletBalance { + public func getTransparentBalance(address: String) async throws -> WalletBalance { do { - return try self.blockProcessor.utxoCacheBalance(tAddress: address) + return try await self.blockProcessor.utxoCacheBalance(tAddress: address) } catch { throw SynchronizerError.uncategorized(underlyingError: error) } } - public func rewind(_ policy: RewindPolicy) throws { + public func rewind(_ policy: RewindPolicy) async throws { self.stop() var height: BlockHeight? @@ -663,7 +665,7 @@ public class SDKSynchronizer: Synchronizer { break case .birthday: - let birthday = self.blockProcessor.config.walletBirthday + let birthday = await self.blockProcessor.config.walletBirthday height = birthday case .height(let rewindHeight): @@ -677,7 +679,7 @@ public class SDKSynchronizer: Synchronizer { } do { - let rewindHeight = try self.blockProcessor.rewindTo(height) + let rewindHeight = try await self.blockProcessor.rewindTo(height) try self.transactionManager.handleReorg(at: rewindHeight) } catch { throw SynchronizerError.rewindError(underlyingError: error) @@ -691,11 +693,11 @@ public class SDKSynchronizer: Synchronizer { userInfo[NotificationKeys.blockHeight] = progress.progressHeight self.status = SyncStatus(progress) - NotificationCenter.default.post(name: Notification.Name.synchronizerProgressUpdated, object: self, userInfo: userInfo) + NotificationCenter.default.mainThreadPost(name: Notification.Name.synchronizerProgressUpdated, object: self, userInfo: userInfo) } private func notifyStatusChange(newValue: SyncStatus, oldValue: SyncStatus) { - NotificationCenter.default.post( + NotificationCenter.default.mainThreadPost( name: .synchronizerStatusWillUpdate, object: self, userInfo: @@ -709,38 +711,40 @@ public class SDKSynchronizer: Synchronizer { private func notify(status: SyncStatus) { switch status { case .disconnected: - NotificationCenter.default.post(name: Notification.Name.synchronizerDisconnected, object: self) + NotificationCenter.default.mainThreadPost(name: Notification.Name.synchronizerDisconnected, object: self) case .stopped: - NotificationCenter.default.post(name: Notification.Name.synchronizerStopped, object: self) + NotificationCenter.default.mainThreadPost(name: Notification.Name.synchronizerStopped, object: self) case .synced: - NotificationCenter.default.post( - name: Notification.Name.synchronizerSynced, - object: self, - userInfo: [ - SDKSynchronizer.NotificationKeys.blockHeight: self.latestScannedHeight, - SDKSynchronizer.NotificationKeys.synchronizerState: SynchronizerState( - shieldedBalance: WalletBalance( - verified: initializer.getVerifiedBalance(), - total: initializer.getBalance() - ), - transparentBalance: (try? self.getTransparentBalance(accountIndex: 0)) ?? WalletBalance.zero, - syncStatus: status, - latestScannedHeight: self.latestScannedHeight - ) - ] - ) + Task { + NotificationCenter.default.mainThreadPost( + name: Notification.Name.synchronizerSynced, + object: self, + userInfo: [ + SDKSynchronizer.NotificationKeys.blockHeight: self.latestScannedHeight, + SDKSynchronizer.NotificationKeys.synchronizerState: SynchronizerState( + shieldedBalance: WalletBalance( + verified: initializer.getVerifiedBalance(), + total: initializer.getBalance() + ), + transparentBalance: (try? await self.getTransparentBalance(accountIndex: 0)) ?? WalletBalance.zero, + syncStatus: status, + latestScannedHeight: self.latestScannedHeight + ) + ] + ) + } case .unprepared: break case .downloading: - NotificationCenter.default.post(name: Notification.Name.synchronizerDownloading, object: self) + NotificationCenter.default.mainThreadPost(name: Notification.Name.synchronizerDownloading, object: self) case .validating: - NotificationCenter.default.post(name: Notification.Name.synchronizerValidating, object: self) + NotificationCenter.default.mainThreadPost(name: Notification.Name.synchronizerValidating, object: self) case .scanning: - NotificationCenter.default.post(name: Notification.Name.synchronizerScanning, object: self) + NotificationCenter.default.mainThreadPost(name: Notification.Name.synchronizerScanning, object: self) case .enhancing: - NotificationCenter.default.post(name: Notification.Name.synchronizerEnhancing, object: self) + NotificationCenter.default.mainThreadPost(name: Notification.Name.synchronizerEnhancing, object: self) case .fetching: - NotificationCenter.default.post(name: Notification.Name.synchronizerFetching, object: self) + NotificationCenter.default.mainThreadPost(name: Notification.Name.synchronizerFetching, object: self) case .error(let e): self.notifyFailure(e) } @@ -782,7 +786,7 @@ public class SDKSynchronizer: Synchronizer { DispatchQueue.main.async { [weak self] in guard let self = self else { return } - NotificationCenter.default.post( + NotificationCenter.default.mainThreadPost( name: Notification.Name.synchronizerMinedTransaction, object: self, userInfo: [NotificationKeys.minedTransaction: transaction] @@ -832,7 +836,7 @@ public class SDKSynchronizer: Synchronizer { private func notifyFailure(_ error: Error) { DispatchQueue.main.async { [weak self] in guard let self = self else { return } - NotificationCenter.default.post( + NotificationCenter.default.mainThreadPost( name: Notification.Name.synchronizerFailed, object: self, userInfo: [NotificationKeys.error: self.mapError(error)] diff --git a/Tests/DarksideTests/AdvancedReOrgTests.swift b/Tests/DarksideTests/AdvancedReOrgTests.swift index 8d02d393..d8891ebd 100644 --- a/Tests/DarksideTests/AdvancedReOrgTests.swift +++ b/Tests/DarksideTests/AdvancedReOrgTests.swift @@ -83,7 +83,7 @@ class AdvancedReOrgTests: XCTestCase { 10. sync up to received_Tx_height + 3 11. verify that balance equals initial balance + tx amount */ - func testReOrgChangesInboundTxMinedHeight() throws { + func testReOrgChangesInboundTxMinedHeight() async throws { hookToReOrgNotification() try FakeChainBuilder.buildChain(darksideWallet: coordinator.service, branchID: branchID, chainName: chainName) var shouldContinue = false @@ -101,16 +101,23 @@ class AdvancedReOrgTests: XCTestCase { var synchronizer: SDKSynchronizer? - try coordinator.sync( - completion: { synchro in - synchronizer = synchro - initialVerifiedBalance = synchro.initializer.getVerifiedBalance() - initialTotalBalance = synchro.initializer.getBalance() - preTxExpectation.fulfill() - shouldContinue = true - }, - error: self.handleError - ) + try await withCheckedThrowingContinuation { continuation in + do { + try coordinator.sync( + completion: { synchro in + synchronizer = synchro + initialVerifiedBalance = synchro.initializer.getVerifiedBalance() + initialTotalBalance = synchro.initializer.getBalance() + preTxExpectation.fulfill() + shouldContinue = true + continuation.resume() + }, + error: self.handleError + ) + } catch { + continuation.resume(with: .failure(error)) + } + } wait(for: [preTxExpectation], timeout: 10) @@ -132,12 +139,20 @@ class AdvancedReOrgTests: XCTestCase { var receivedTxTotalBalance = Zatoshi(-1) var receivedTxVerifiedBalance = Zatoshi(-1) - try coordinator.sync(completion: { synchro in - synchronizer = synchro - receivedTxVerifiedBalance = synchro.initializer.getVerifiedBalance() - receivedTxTotalBalance = synchro.initializer.getBalance() - receivedTxExpectation.fulfill() - }, error: self.handleError) + try await withCheckedThrowingContinuation { continuation in + do { + try coordinator.sync(completion: { synchro in + synchronizer = synchro + receivedTxVerifiedBalance = synchro.initializer.getVerifiedBalance() + receivedTxTotalBalance = synchro.initializer.getBalance() + receivedTxExpectation.fulfill() + continuation.resume() + }, error: self.handleError) + } catch { + continuation.resume(with: .failure(error)) + } + } + sleep(2) wait(for: [receivedTxExpectation], timeout: 10) @@ -196,11 +211,21 @@ class AdvancedReOrgTests: XCTestCase { var afterReorgTxTotalBalance = Zatoshi(-1) var afterReorgTxVerifiedBalance = Zatoshi(-1) - try coordinator.sync(completion: { synchronizer in - afterReorgTxTotalBalance = synchronizer.initializer.getBalance() - afterReorgTxVerifiedBalance = synchronizer.initializer.getVerifiedBalance() - reorgSyncexpectation.fulfill() - }, error: self.handleError(_:)) + try await withCheckedThrowingContinuation { continuation in + do { + try coordinator.sync( + completion: { synchronizer in + afterReorgTxTotalBalance = synchronizer.initializer.getBalance() + afterReorgTxVerifiedBalance = synchronizer.initializer.getVerifiedBalance() + reorgSyncexpectation.fulfill() + continuation.resume() + }, + error: self.handleError + ) + } catch { + continuation.resume(with: .failure(error)) + } + } /* 8. assert that reorg happened at received_Tx_height @@ -224,11 +249,22 @@ class AdvancedReOrgTests: XCTestCase { try coordinator.applyStaged(blockheight: reorgedTxheight + 1) sleep(3) - try coordinator.sync(completion: { synchronizer in - finalReorgTxTotalBalance = synchronizer.initializer.getBalance() - finalReorgTxVerifiedBalance = synchronizer.initializer.getVerifiedBalance() - finalsyncExpectation.fulfill() - }, error: self.handleError(_:)) + + try await withCheckedThrowingContinuation { continuation in + do { + try coordinator.sync( + completion: { synchronizer in + finalReorgTxTotalBalance = synchronizer.initializer.getBalance() + finalReorgTxVerifiedBalance = synchronizer.initializer.getVerifiedBalance() + finalsyncExpectation.fulfill() + continuation.resume() + }, + error: self.handleError + ) + } catch { + continuation.resume(with: .failure(error)) + } + } wait(for: [finalsyncExpectation], timeout: 5) sleep(3) @@ -1239,7 +1275,7 @@ class AdvancedReOrgTests: XCTestCase { XCTAssertEqual(coordinator.synchronizer.initializer.getBalance(), initialTotalBalance) } - func testLongSync() throws { + func testLongSync() async throws { hookToReOrgNotification() /* @@ -1258,21 +1294,31 @@ class AdvancedReOrgTests: XCTestCase { /* sync to latest height */ - try coordinator.sync(completion: { _ in - firstSyncExpectation.fulfill() - }, error: { error in - _ = try? self.coordinator.stop() - firstSyncExpectation.fulfill() - guard let testError = error else { - XCTFail("failed with nil error") - return + try await withCheckedThrowingContinuation { continuation in + do { + try coordinator.sync( + completion: { _ in + firstSyncExpectation.fulfill() + continuation.resume() + }, error: { error in + _ = try? self.coordinator.stop() + firstSyncExpectation.fulfill() + guard let testError = error else { + XCTFail("failed with nil error") + return + } + XCTFail("Failed with error: \(testError)") + } + ) + } catch { + continuation.resume(throwing: error) } - XCTFail("Failed with error: \(testError)") - }) + } wait(for: [firstSyncExpectation], timeout: 500) - XCTAssertEqual(try coordinator.synchronizer.latestDownloadedHeight(), birthday + fullSyncLength) + let latestDownloadedHeight = try await coordinator.synchronizer.latestDownloadedHeight() + XCTAssertEqual(latestDownloadedHeight, birthday + fullSyncLength) } func handleError(_ error: Error?) { diff --git a/Tests/DarksideTests/RewindRescanTests.swift b/Tests/DarksideTests/RewindRescanTests.swift index ce677576..9331fc28 100644 --- a/Tests/DarksideTests/RewindRescanTests.swift +++ b/Tests/DarksideTests/RewindRescanTests.swift @@ -64,7 +64,7 @@ class RewindRescanTests: XCTestCase { XCTFail("Failed with error: \(testError)") } - func testBirthdayRescan() throws { + func testBirthdayRescan() async throws { // 1 sync and get spendable funds try FakeChainBuilder.buildChain(darksideWallet: coordinator.service, branchID: branchID, chainName: chainName) @@ -86,7 +86,7 @@ class RewindRescanTests: XCTestCase { XCTAssertEqual(verifiedBalance, totalBalance) // rewind to birthday - try coordinator.synchronizer.rewind(.birthday) + try await coordinator.synchronizer.rewind(.birthday) // assert that after the new height is XCTAssertEqual(try coordinator.synchronizer.initializer.transactionRepository.lastScannedHeight(), self.birthday) @@ -145,7 +145,7 @@ class RewindRescanTests: XCTestCase { height: Int32(targetHeight), networkType: network.networkType ) - try coordinator.synchronizer.rewind(.height(blockheight: targetHeight)) + try await coordinator.synchronizer.rewind(.height(blockheight: targetHeight)) guard rewindHeight > 0 else { XCTFail("get nearest height failed error: \(ZcashRustBackend.getLastError() ?? "null")") @@ -190,7 +190,7 @@ class RewindRescanTests: XCTestCase { wait(for: [sendExpectation], timeout: 15) } - func testRescanToTransaction() throws { + func testRescanToTransaction() async throws { // 1 sync and get spendable funds try FakeChainBuilder.buildChain(darksideWallet: coordinator.service, branchID: branchID, chainName: chainName) @@ -216,7 +216,7 @@ class RewindRescanTests: XCTestCase { return } - try coordinator.synchronizer.rewind(.transaction(transaction.transactionEntity)) + try await coordinator.synchronizer.rewind(.transaction(transaction.transactionEntity)) // assert that after the new height is XCTAssertEqual( @@ -363,7 +363,7 @@ class RewindRescanTests: XCTestCase { // rewind 5 blocks prior to sending - try coordinator.synchronizer.rewind(.height(blockheight: sentTxHeight - 5)) + try await coordinator.synchronizer.rewind(.height(blockheight: sentTxHeight - 5)) guard let pendingEntity = try coordinator.synchronizer.allPendingTransactions() diff --git a/Tests/DarksideTests/ShieldFundsTests.swift b/Tests/DarksideTests/ShieldFundsTests.swift index c0a4463b..9d4790f5 100644 --- a/Tests/DarksideTests/ShieldFundsTests.swift +++ b/Tests/DarksideTests/ShieldFundsTests.swift @@ -93,7 +93,7 @@ class ShieldFundsTests: XCTestCase { var shouldContinue = false var initialTotalBalance = Zatoshi(-1) var initialVerifiedBalance = Zatoshi(-1) - var initialTransparentBalance: WalletBalance = try coordinator.synchronizer.getTransparentBalance(accountIndex: 0) + var initialTransparentBalance: WalletBalance = try await coordinator.synchronizer.getTransparentBalance(accountIndex: 0) let utxo = try GetAddressUtxosReply(jsonString: """ { @@ -135,7 +135,7 @@ class ShieldFundsTests: XCTestCase { // at this point the balance should be all zeroes for transparent and shielded funds XCTAssertEqual(initialTotalBalance, Zatoshi.zero) XCTAssertEqual(initialVerifiedBalance, Zatoshi.zero) - initialTransparentBalance = try coordinator.synchronizer.getTransparentBalance(accountIndex: 0) + initialTransparentBalance = try await coordinator.synchronizer.getTransparentBalance(accountIndex: 0) XCTAssertEqual(initialTransparentBalance.total, .zero) XCTAssertEqual(initialTransparentBalance.verified, .zero) @@ -169,7 +169,7 @@ class ShieldFundsTests: XCTestCase { // at this point the balance should be zero for shielded, then zero verified transparent funds // and 10000 zatoshi of total (not verified) transparent funds. - let tFundsDetectedBalance = try coordinator.synchronizer.getTransparentBalance(accountIndex: 0) + let tFundsDetectedBalance = try await coordinator.synchronizer.getTransparentBalance(accountIndex: 0) XCTAssertEqual(tFundsDetectedBalance.total, Zatoshi(10000)) XCTAssertEqual(tFundsDetectedBalance.verified, Zatoshi(10000)) //FIXME: this should be zero @@ -199,7 +199,7 @@ class ShieldFundsTests: XCTestCase { wait(for: [tFundsConfirmationSyncExpectation], timeout: 5) // the transparent funds should be 10000 zatoshis both total and verified - let confirmedTFundsBalance = try coordinator.synchronizer.getTransparentBalance(accountIndex: 0) + let confirmedTFundsBalance = try await coordinator.synchronizer.getTransparentBalance(accountIndex: 0) XCTAssertEqual(confirmedTFundsBalance.total, Zatoshi(10000)) XCTAssertEqual(confirmedTFundsBalance.verified, Zatoshi(10000)) @@ -239,7 +239,7 @@ class ShieldFundsTests: XCTestCase { guard shouldContinue else { return } - let postShieldingBalance = try coordinator.synchronizer.getTransparentBalance(accountIndex: 0) + let postShieldingBalance = try await coordinator.synchronizer.getTransparentBalance(accountIndex: 0) // when funds are shielded the UTXOs should be marked as spend and not shown on the balance. // now balance should be zero shielded, zero transaparent. // verify that the balance has been marked as spent regardless of confirmation @@ -290,7 +290,7 @@ class ShieldFundsTests: XCTestCase { // Now it should verify that the balance has been shielded. The resulting balance should be zero // transparent funds and `10000 - fee` total shielded funds, zero verified shielded funds. // Fees at the time of writing the tests are 1000 zatoshi as defined on ZIP-313 - let postShieldingShieldedBalance = try coordinator.synchronizer.getTransparentBalance(accountIndex: 0) + let postShieldingShieldedBalance = try await coordinator.synchronizer.getTransparentBalance(accountIndex: 0) XCTAssertEqual(postShieldingShieldedBalance.total, Zatoshi(10000)) //FIXME: this should be zero XCTAssertEqual(postShieldingShieldedBalance.verified, Zatoshi(10000)) //FIXME: this should be zero @@ -327,7 +327,7 @@ class ShieldFundsTests: XCTestCase { XCTAssertNotNil(clearedTransaction) XCTAssertEqual(coordinator.synchronizer.getShieldedBalance(), Zatoshi(9000)) - let postShieldingConfirmationShieldedBalance = try coordinator.synchronizer.getTransparentBalance(accountIndex: 0) + let postShieldingConfirmationShieldedBalance = try await coordinator.synchronizer.getTransparentBalance(accountIndex: 0) XCTAssertEqual(postShieldingConfirmationShieldedBalance.total, .zero) XCTAssertEqual(postShieldingConfirmationShieldedBalance.verified, .zero) diff --git a/Tests/DarksideTests/SynchronizerTests.swift b/Tests/DarksideTests/SynchronizerTests.swift index d12c3fc1..12918a99 100644 --- a/Tests/DarksideTests/SynchronizerTests.swift +++ b/Tests/DarksideTests/SynchronizerTests.swift @@ -67,7 +67,7 @@ final class SynchronizerTests: XCTestCase { reorgExpectation.fulfill() } - func testSynchronizerStops() throws { + func testSynchronizerStops() async throws { hookToReOrgNotification() /* @@ -102,14 +102,14 @@ final class SynchronizerTests: XCTestCase { XCTFail("Failed with error: \(testError)") }) - DispatchQueue.main.asyncAfter(deadline: .now() + 5) { - self.coordinator.synchronizer.stop() - } + try await Task.sleep(nanoseconds: 5_000_000_000) + self.coordinator.synchronizer.stop() - wait(for: [processorStoppedExpectation,syncStoppedExpectation], timeout: 6, enforceOrder: true) + wait(for: [syncStoppedExpectation, processorStoppedExpectation], timeout: 6, enforceOrder: true) XCTAssertEqual(coordinator.synchronizer.status, .stopped) - XCTAssertEqual(coordinator.synchronizer.blockProcessor.state.getState(), .stopped) + let state = await coordinator.synchronizer.blockProcessor.state + XCTAssertEqual(state, .stopped) } func handleError(_ error: Error?) { diff --git a/Tests/DarksideTests/TransactionEnhancementTests.swift b/Tests/DarksideTests/TransactionEnhancementTests.swift index 575f8b90..e83ea32a 100644 --- a/Tests/DarksideTests/TransactionEnhancementTests.swift +++ b/Tests/DarksideTests/TransactionEnhancementTests.swift @@ -110,7 +110,7 @@ class TransactionEnhancementTests: XCTestCase { NotificationCenter.default.removeObserver(self) } - private func startProcessing() throws { + private func startProcessing() async throws { XCTAssertNotNil(processor) // Subscribe to notifications @@ -120,17 +120,17 @@ class TransactionEnhancementTests: XCTestCase { startedValidatingNotificationExpectation.subscribe(to: Notification.Name.blockProcessorStartedValidating, object: processor) startedScanningNotificationExpectation.subscribe(to: Notification.Name.blockProcessorStartedScanning, object: processor) - try processor.start() + try await processor.start() } - func testBasicEnhacement() throws { + func testBasicEnhacement() async throws { let targetLatestHeight = BlockHeight(663250) let walletBirthday = Checkpoint.birthday(with: 663151, network: network).height - try basicEnhancementTest(latestHeight: targetLatestHeight, walletBirthday: walletBirthday) + try await basicEnhancementTest(latestHeight: targetLatestHeight, walletBirthday: walletBirthday) } - func basicEnhancementTest(latestHeight: BlockHeight, walletBirthday: BlockHeight) throws { + func basicEnhancementTest(latestHeight: BlockHeight, walletBirthday: BlockHeight) async throws { do { try darksideWalletService.reset(saplingActivation: 663150, branchID: branchID, chainName: chainName) try darksideWalletService.useDataset(DarksideDataset.beforeReOrg.rawValue) @@ -157,7 +157,7 @@ class TransactionEnhancementTests: XCTestCase { download and sync blocks from walletBirthday to firstLatestHeight */ do { - try startProcessing() + try await startProcessing() } catch { XCTFail("Error: \(error)") } diff --git a/Tests/NetworkTests/BlockScanTests.swift b/Tests/NetworkTests/BlockScanTests.swift index 920fb0e5..b0d99f7d 100644 --- a/Tests/NetworkTests/BlockScanTests.swift +++ b/Tests/NetworkTests/BlockScanTests.swift @@ -91,7 +91,7 @@ class BlockScanTests: XCTestCase { range: range ) XCTAssertFalse(Task.isCancelled) - try compactBlockProcessor.compactBlockScanning( + try await compactBlockProcessor.compactBlockScanning( rustWelding: rustWelding, cacheDb: cacheDbURL, dataDb: dataDbURL, diff --git a/Tests/NetworkTests/BlockStreamingTest.swift b/Tests/NetworkTests/BlockStreamingTest.swift index d174fe77..8b497750 100644 --- a/Tests/NetworkTests/BlockStreamingTest.swift +++ b/Tests/NetworkTests/BlockStreamingTest.swift @@ -78,9 +78,8 @@ class BlockStreamingTest: XCTestCase { blockBufferSize: 10, startHeight: startHeight ) - XCTAssertTrue(Task.isCancelled) } catch { - XCTFail("failed with error: \(error)") + XCTAssertTrue(Task.isCancelled) } } diff --git a/Tests/NetworkTests/CompactBlockProcessorTests.swift b/Tests/NetworkTests/CompactBlockProcessorTests.swift index 8e273b53..0e91b597 100644 --- a/Tests/NetworkTests/CompactBlockProcessorTests.swift +++ b/Tests/NetworkTests/CompactBlockProcessorTests.swift @@ -96,7 +96,7 @@ class CompactBlockProcessorTests: XCTestCase { } } - private func startProcessing() { + private func startProcessing() async { XCTAssertNotNil(processor) // Subscribe to notifications @@ -107,11 +107,15 @@ class CompactBlockProcessorTests: XCTestCase { startedScanningNotificationExpectation.subscribe(to: Notification.Name.blockProcessorStartedScanning, object: processor) idleNotificationExpectation.subscribe(to: Notification.Name.blockProcessorFinished, object: processor) - XCTAssertNoThrow(try processor.start()) + do { + try await processor.start() + } catch { + XCTFail("shouldn't fail") + } } - func testStartNotifiesSuscriptors() { - startProcessing() + func testStartNotifiesSuscriptors() async { + await startProcessing() wait( for: [ @@ -125,7 +129,7 @@ class CompactBlockProcessorTests: XCTestCase { ) } - func testProgressNotifications() { + func testProgressNotifications() async { let expectedUpdates = expectedBatches( currentHeight: processorConfig.walletBirthday, targetHeight: mockLatestHeight, @@ -133,7 +137,7 @@ class CompactBlockProcessorTests: XCTestCase { ) updatedNotificationExpectation.expectedFulfillmentCount = expectedUpdates - startProcessing() + await startProcessing() wait(for: [updatedNotificationExpectation], timeout: 300) } @@ -189,23 +193,23 @@ class CompactBlockProcessorTests: XCTestCase { ) } - func testDetermineLowerBoundPastBirthday() { + func testDetermineLowerBoundPastBirthday() async { let errorHeight = 781_906 let walletBirthday = 781_900 - let result = processor.determineLowerBound(errorHeight: errorHeight, consecutiveErrors: 1, walletBirthday: walletBirthday) + let result = await processor.determineLowerBound(errorHeight: errorHeight, consecutiveErrors: 1, walletBirthday: walletBirthday) let expected = 781_886 XCTAssertEqual(result, expected) } - func testDetermineLowerBound() { + func testDetermineLowerBound() async { let errorHeight = 781_906 let walletBirthday = 780_900 - let result = processor.determineLowerBound(errorHeight: errorHeight, consecutiveErrors: 0, walletBirthday: walletBirthday) + let result = await processor.determineLowerBound(errorHeight: errorHeight, consecutiveErrors: 0, walletBirthday: walletBirthday) let expected = 781_896 XCTAssertEqual(result, expected) diff --git a/Tests/NetworkTests/CompactBlockReorgTests.swift b/Tests/NetworkTests/CompactBlockReorgTests.swift index e8a3fbd8..22cdce16 100644 --- a/Tests/NetworkTests/CompactBlockReorgTests.swift +++ b/Tests/NetworkTests/CompactBlockReorgTests.swift @@ -127,7 +127,7 @@ class CompactBlockReorgTests: XCTestCase { } } - private func startProcessing() { + private func startProcessing() async { XCTAssertNotNil(processor) // Subscribe to notifications @@ -139,11 +139,15 @@ class CompactBlockReorgTests: XCTestCase { idleNotificationExpectation.subscribe(to: Notification.Name.blockProcessorFinished, object: processor) reorgNotificationExpectation.subscribe(to: Notification.Name.blockProcessorHandledReOrg, object: processor) - XCTAssertNoThrow(try processor.start()) + do { + try await processor.start() + } catch { + XCTFail("shouldn't fail") + } } - func testNotifiesReorg() { - startProcessing() + func testNotifiesReorg() async { + await startProcessing() wait( for: [ diff --git a/Tests/OfflineTests/WalletTests.swift b/Tests/OfflineTests/WalletTests.swift index 000d9c37..0ea9f154 100644 --- a/Tests/OfflineTests/WalletTests.swift +++ b/Tests/OfflineTests/WalletTests.swift @@ -33,7 +33,7 @@ class WalletTests: XCTestCase { } } - func testWalletInitialization() throws { + func testWalletInitialization() async throws { let derivationTool = DerivationTool(networkType: network.networkType) let uvk = try derivationTool.deriveUnifiedViewingKeysFromSeed(seedData.bytes, numberOfAccounts: 1) let wallet = Initializer( @@ -49,7 +49,11 @@ class WalletTests: XCTestCase { ) let synchronizer = try SDKSynchronizer(initializer: wallet) - XCTAssertNoThrow(try synchronizer.prepare()) + do { + try await synchronizer.prepare() + } catch { + XCTFail("shouldn't fail here") + } // fileExists actually sucks, so attempting to delete the file and checking what happens is far better :) XCTAssertNoThrow( try FileManager.default.removeItem(at: dbData!) ) diff --git a/Tests/TestUtils/FakeService.swift b/Tests/TestUtils/FakeService.swift index db061395..5a3f7389 100644 --- a/Tests/TestUtils/FakeService.swift +++ b/Tests/TestUtils/FakeService.swift @@ -25,18 +25,18 @@ class MockLightWalletService: LightWalletService { var queue = DispatchQueue(label: "mock service queue") func blockStream(startHeight: BlockHeight, endHeight: BlockHeight) -> AsyncThrowingStream { - AsyncThrowingStream { _ in } + service.blockStream(startHeight: startHeight, endHeight: endHeight) } func closeConnection() { } func fetchUTXOs(for tAddress: String, height: BlockHeight) -> AsyncThrowingStream { - AsyncThrowingStream { _ in } + service.fetchUTXOs(for: tAddress, height: height) } func fetchUTXOs(for tAddresses: [String], height: BlockHeight) -> AsyncThrowingStream { - AsyncThrowingStream { _ in } + service.fetchUTXOs(for: tAddresses, height: height) } private var service: LightWalletService diff --git a/Tests/TestUtils/TestCoordinator.swift b/Tests/TestUtils/TestCoordinator.swift index 0d79cb9c..c5ef85da 100644 --- a/Tests/TestUtils/TestCoordinator.swift +++ b/Tests/TestUtils/TestCoordinator.swift @@ -200,6 +200,12 @@ class TestCoordinator { } } +extension CompactBlockProcessor { + public func setConfig(_ config: Configuration) { + self.config = config + } +} + extension TestCoordinator { func resetBlocks(dataset: DarksideData) throws { switch dataset { @@ -233,19 +239,25 @@ extension TestCoordinator { } func reset(saplingActivation: BlockHeight, branchID: String, chainName: String) throws { - let config = self.synchronizer.blockProcessor.config - - self.synchronizer.blockProcessor.config = CompactBlockProcessor.Configuration( - cacheDb: config.cacheDb, - dataDb: config.dataDb, - downloadBatchSize: config.downloadBatchSize, - retries: config.retries, - maxBackoffInterval: config.maxBackoffInterval, - rewindDistance: config.rewindDistance, - walletBirthday: config.walletBirthday, - saplingActivation: config.saplingActivation, - network: config.network - ) + Task { + await self.synchronizer.blockProcessor.stop() + let config = await self.synchronizer.blockProcessor.config + + let newConfig = CompactBlockProcessor.Configuration( + cacheDb: config.cacheDb, + dataDb: config.dataDb, + downloadBatchSize: config.downloadBatchSize, + retries: config.retries, + maxBackoffInterval: config.maxBackoffInterval, + rewindDistance: config.rewindDistance, + walletBirthday: config.walletBirthday, + saplingActivation: config.saplingActivation, + network: config.network + ) + + await self.synchronizer.blockProcessor.setConfig(newConfig) + } + try service.reset(saplingActivation: saplingActivation, branchID: branchID, chainName: chainName) } @@ -308,7 +320,9 @@ enum TestSynchronizerBuilder { ) let synchronizer = try SDKSynchronizer(initializer: initializer) - try synchronizer.prepare() + Task { + try await synchronizer.prepare() + } return ([spendingKey], synchronizer) }