[#523] Make a CompactBlockProcessor an Actor (#565)

- Sample app refactored for the processor being an actor
- tests refactored as well
- dark side tests fixed
- utilities separated to new file
- synchronizer's start and stop are no longer in async context
- updating the UI for the scan fixed
This commit is contained in:
Lukas Korba 2022-10-27 12:51:38 +02:00 committed by GitHub
parent b7528b4bf8
commit cfe71d5da2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
26 changed files with 442 additions and 359 deletions

View File

@ -16,6 +16,7 @@ enum DemoAppConfig {
static var birthdayHeight: BlockHeight = ZcashSDK.isMainnet ? 935000 : 1386000
static var seed = try! Mnemonic.deterministicSeedBytes(from: "live combine flight accident slow soda mind bright absent bid hen shy decade biology amazing mix enlist ensure biology rhythm snap duty soap armor")
static var address: String {
"\(host):\(port)"
}

View File

@ -29,11 +29,13 @@ class GetUTXOsViewController: UIViewController {
self.transparentAddressLabel.text = tAddress
// swiftlint:disable:next force_try
let balance = try! AppDelegate.shared.sharedSynchronizer.getTransparentBalance(accountIndex: 0)
self.totalBalanceLabel.text = NumberFormatter.zcashNumberFormatter.string(from: NSNumber(value: balance.total.amount))
self.verifiedBalanceLabel.text = NumberFormatter.zcashNumberFormatter.string(from: NSNumber(value: balance.verified.amount))
Task { @MainActor in
// swiftlint:disable:next force_try
let balance = try! await AppDelegate.shared.sharedSynchronizer.getTransparentBalance(accountIndex: 0)
self.totalBalanceLabel.text = NumberFormatter.zcashNumberFormatter.string(from: NSNumber(value: balance.total.amount))
self.verifiedBalanceLabel.text = NumberFormatter.zcashNumberFormatter.string(from: NSNumber(value: balance.verified.amount))
}
}
@IBAction func shieldFunds(_ sender: Any) {

View File

@ -32,8 +32,10 @@ class SendViewController: UIViewController {
override func viewDidLoad() {
super.viewDidLoad()
synchronizer = AppDelegate.shared.sharedSynchronizer
// swiftlint:disable:next force_try
try! synchronizer.prepare()
Task { @MainActor in
// swiftlint:disable:next force_try
try! await synchronizer.prepare()
}
let tapRecognizer = UITapGestureRecognizer(target: self, action: #selector(viewTapped(_:)))
self.view.addGestureRecognizer(tapRecognizer)
setUp()
@ -41,12 +43,14 @@ class SendViewController: UIViewController {
override func viewDidAppear(_ animated: Bool) {
super.viewDidAppear(animated)
do {
try synchronizer.start(retry: false)
self.synchronizerStatusLabel.text = SDKSynchronizer.textFor(state: synchronizer.status)
} catch {
self.synchronizerStatusLabel.text = SDKSynchronizer.textFor(state: synchronizer.status)
fail(error)
Task { @MainActor in
do {
try await synchronizer.start(retry: false)
self.synchronizerStatusLabel.text = SDKSynchronizer.textFor(state: synchronizer.status)
} catch {
self.synchronizerStatusLabel.text = SDKSynchronizer.textFor(state: synchronizer.status)
fail(error)
}
}
}

View File

@ -31,7 +31,9 @@ class SyncBlocksViewController: UIViewController {
// swiftlint:disable:next force_try
try! wallet.initialize()
processor = CompactBlockProcessor(initializer: wallet)
statusLabel.text = textFor(state: processor?.state.getState() ?? .stopped)
Task { @MainActor in
statusLabel.text = textFor(state: await processor?.state ?? .stopped)
}
progressBar.progress = 0
NotificationCenter.default.addObserver(
@ -47,14 +49,16 @@ class SyncBlocksViewController: UIViewController {
NotificationCenter.default.removeObserver(self)
guard let processor = self.processor else { return }
processor.stop()
Task {
await processor.stop()
}
}
@objc func processorNotification(_ notification: Notification) {
DispatchQueue.main.async {
Task { @MainActor in
guard self.processor != nil else { return }
self.updateUI()
await self.updateUI()
switch notification.name {
case let not where not == Notification.Name.blockProcessorUpdated:
@ -70,30 +74,28 @@ class SyncBlocksViewController: UIViewController {
@IBAction func startStop() {
guard let processor = processor else { return }
switch processor.state.getState() {
case .stopped:
startProcessor()
default:
stopProcessor()
Task { @MainActor in
switch await processor.state {
case .stopped:
await startProcessor()
default:
await stopProcessor()
}
}
}
func startProcessor() {
func startProcessor() async {
guard let processor = processor else { return }
do {
try processor.start()
updateUI()
} catch {
fail(error: error)
}
await processor.start()
await updateUI()
}
func stopProcessor() {
func stopProcessor() async {
guard let processor = processor else { return }
processor.stop()
updateUI()
await processor.stop()
await updateUI()
}
func fail(error: Error) {
@ -110,11 +112,13 @@ class SyncBlocksViewController: UIViewController {
)
self.present(alert, animated: true, completion: nil)
updateUI()
Task { @MainActor in
await updateUI()
}
}
func updateUI() {
guard let state = processor?.state.getState() else { return }
func updateUI() async {
guard let state = await processor?.state else { return }
statusLabel.text = textFor(state: state)
startPause.setTitle(buttonText(for: state), for: .normal)

View File

@ -16,7 +16,7 @@ extension CompactBlockProcessor {
) async throws {
try Task.checkCancellation()
setState(.downloading)
state = .downloading
var buffer: [ZcashCompactBlock] = []
var targetHeightInternal: BlockHeight?

View File

@ -56,8 +56,8 @@ extension CompactBlockProcessor {
try Task.checkCancellation()
LoggerProxy.debug("Started Enhancing range: \(range)")
setState(.enhancing)
state = .enhancing
let blockRange = range.blockRange()
var retries = 0
let maxRetries = 5

View File

@ -103,7 +103,7 @@ public enum CompactBlockProgress {
}
protocol EnhancementStreamDelegate: AnyObject {
func transactionEnhancementProgressUpdated(_ progress: EnhancementProgress)
func transactionEnhancementProgressUpdated(_ progress: EnhancementProgress) async
}
public protocol EnhancementProgress {
@ -213,7 +213,7 @@ public extension Notification.Name {
/// The compact block processor is in charge of orchestrating the download and caching of compact blocks from a LightWalletEndpoint
/// when started the processor downloads does a download - validate - scan cycle until it reaches latest height on the blockchain.
public class CompactBlockProcessor {
public actor CompactBlockProcessor {
/// Compact Block Processor configuration
///
@ -312,31 +312,12 @@ public class CompactBlockProcessor {
case synced
}
// TODO: this isn't an Actor even though it looks like a good candidate, the reason:
// `state` lives in both sync and async environments. An Actor is demanding async context only
// so we can't take the advantage unless we encapsulate all `state` reads/writes to async context.
// Therefore solution with class + lock works for us butr eventually will be replaced.
// The future of CompactBlockProcessor is an actor (we won't need to encapsulate the state separately), issue 523,
// https://github.com/zcash/ZcashLightClientKit/issues/523
public class ThreadSafeState {
private var state: State = .stopped
let lock = NSLock()
func setState(_ newState: State) {
lock.lock()
defer { lock.unlock() }
state = newState
}
public func getState() -> State {
lock.lock()
defer { lock.unlock() }
return state
public internal(set) var state: State = .stopped {
didSet {
transitionState(from: oldValue, to: self.state)
}
}
public internal(set) var state = ThreadSafeState()
var config: Configuration {
willSet {
self.stop()
@ -348,7 +329,7 @@ public class CompactBlockProcessor {
}
var shouldStart: Bool {
switch self.state.getState() {
switch self.state {
case .stopped, .synced, .error:
return !maxAttemptsReached
default:
@ -386,7 +367,7 @@ public class CompactBlockProcessor {
/// - storage: concrete implementation of `CompactBlockStorage` protocol
/// - backend: a class that complies to `ZcashRustBackendWelding`
/// - config: `Configuration` struct for this processor
convenience init(
init(
service: LightWalletService,
storage: CompactBlockStorage,
backend: ZcashRustBackendWelding.Type,
@ -407,7 +388,7 @@ public class CompactBlockProcessor {
/// Initializes a CompactBlockProcessor instance from an Initialized object
/// - Parameters:
/// - initializer: an instance that complies to CompactBlockDownloading protocol
public convenience init(initializer: Initializer) {
public init(initializer: Initializer) {
self.init(
service: initializer.lightWalletService,
storage: initializer.storage,
@ -448,12 +429,6 @@ public class CompactBlockProcessor {
cancelableTask?.cancel()
}
func setState(_ newState: State) {
let oldValue = state.getState()
state.setState(newState)
transitionState(from: oldValue, to: newState)
}
static func validateServerInfo(
_ info: LightWalletdInfo,
saplingActivation: BlockHeight,
@ -500,7 +475,7 @@ public class CompactBlockProcessor {
/// triggers the blockProcessorStartedDownloading notification
///
/// - Important: subscribe to the notifications before calling this method
public func start(retry: Bool = false) throws {
public func start(retry: Bool = false) async {
if retry {
self.retryAttempts = 0
self.processingError = nil
@ -509,12 +484,12 @@ public class CompactBlockProcessor {
}
guard shouldStart else {
switch self.state.getState() {
switch self.state {
case .error(let e):
// max attempts have been reached
LoggerProxy.info("max retry attempts reached with error: \(e)")
notifyError(CompactBlockProcessorError.maxAttemptsReached(attempts: self.maxAttempts))
setState(.stopped)
state = .stopped
case .stopped:
// max attempts have been reached
LoggerProxy.info("max retry attempts reached")
@ -529,7 +504,7 @@ public class CompactBlockProcessor {
return
}
self.nextBatch()
await self.nextBatch()
}
/**
@ -545,14 +520,13 @@ public class CompactBlockProcessor {
cancelableTask?.cancel()
self.retryAttempts = 0
setState(.stopped)
}
/**
Rewinds to provided height.
If nil is provided, it will rescan to nearest height (quick rescan)
*/
public func rewindTo(_ height: BlockHeight?) throws -> BlockHeight {
public func rewindTo(_ height: BlockHeight?) async throws -> BlockHeight {
self.stop()
let lastDownloaded = try downloader.lastDownloadedBlockHeight()
@ -563,7 +537,7 @@ public class CompactBlockProcessor {
let error = rustBackend.lastError() ?? RustWeldingError.genericError(
message: "unknown error getting nearest rewind height for height: \(height)"
)
fail(error)
await fail(error)
throw error
}
@ -571,7 +545,7 @@ public class CompactBlockProcessor {
let rewindHeight = max(Int32(nearestHeight - 1), Int32(config.walletBirthday))
guard rustBackend.rewindToHeight(dbData: config.dataDb, height: rewindHeight, networkType: self.config.network.networkType) else {
let error = rustBackend.lastError() ?? RustWeldingError.genericError(message: "unknown error rewinding to height \(height)")
fail(error)
await fail(error)
throw error
}
@ -588,7 +562,7 @@ public class CompactBlockProcessor {
- Throws CompactBlockProcessorError.invalidConfiguration if block height is invalid or if processor is already started
*/
func setStartHeight(_ startHeight: BlockHeight) throws {
guard self.state.getState() == .stopped, startHeight >= config.network.constants.saplingActivationHeight else {
guard self.state == .stopped, startHeight >= config.network.constants.saplingActivationHeight else {
throw CompactBlockProcessorError.invalidConfiguration
}
@ -597,27 +571,24 @@ public class CompactBlockProcessor {
self.config = config
}
func validateServer(completionBlock: @escaping (() -> Void)) {
Task { @MainActor in
do {
let info = try await self.service.getInfo()
try Self.validateServerInfo(
info,
saplingActivation: self.config.saplingActivation,
localNetwork: self.config.network,
rustBackend: self.rustBackend
)
completionBlock()
} catch let error as LightWalletServiceError {
self.severeFailure(error.mapToProcessorError())
} catch {
self.severeFailure(error)
}
func validateServer() async {
do {
let info = try await self.service.getInfo()
try Self.validateServerInfo(
info,
saplingActivation: self.config.saplingActivation,
localNetwork: self.config.network,
rustBackend: self.rustBackend
)
} catch let error as LightWalletServiceError {
self.severeFailure(error.mapToProcessorError())
} catch {
self.severeFailure(error)
}
}
/// Processes new blocks on the given range based on the configuration set for this instance
func processNewBlocks(range: CompactBlockRange) {
func processNewBlocks(range: CompactBlockRange) async {
self.foundBlocks = true
self.backoffTimer?.invalidate()
self.backoffTimer = nil
@ -633,13 +604,12 @@ public class CompactBlockProcessor {
try await compactBlockBatchScanning(range: range)
try await compactBlockEnhancement(range: range)
try await fetchUnspentTxOutputs(range: range)
//state = .stopped
} catch {
if error is CancellationError {
}
if !(Task.isCancelled) {
fail(error)
await fail(error)
} else {
state = .stopped
}
}
}
@ -658,7 +628,7 @@ public class CompactBlockProcessor {
LoggerProxy.debug("progress: \(progress)")
NotificationCenter.default.post(
NotificationCenter.default.mainThreadPost(
name: Notification.Name.blockProcessorUpdated,
object: self,
userInfo: userInfo
@ -666,7 +636,7 @@ public class CompactBlockProcessor {
}
func notifyTransactions(_ txs: [ConfirmedTransactionEntity], in range: BlockRange) {
NotificationCenter.default.post(
NotificationCenter.default.mainThreadPost(
name: .blockProcessorFoundTransactions,
object: self,
userInfo: [
@ -691,29 +661,29 @@ public class CompactBlockProcessor {
self.backoffTimer?.invalidate()
self.retryAttempts = config.retries
self.processingError = error
setState(.error(error))
state = .error(error)
self.notifyError(error)
}
func fail(_ error: Error) {
func fail(_ error: Error) async {
// todo specify: failure
LoggerProxy.error("\(error)")
cancelableTask?.cancel()
self.retryAttempts += 1
self.processingError = error
switch self.state.getState() {
switch self.state {
case .error:
notifyError(error)
default:
break
}
setState(.error(error))
state = .error(error)
guard self.maxAttemptsReached else { return }
// don't set a new timer if there are no more attempts.
self.setTimer()
await self.setTimer()
}
func retryProcessing(range: CompactBlockRange) {
func retryProcessing(range: CompactBlockRange) async {
cancelableTask?.cancel()
// update retries
self.retryAttempts += 1
@ -728,10 +698,9 @@ public class CompactBlockProcessor {
try downloader.rewind(to: max(range.lowerBound, self.config.walletBirthday))
// process next batch
// processNewBlocks(range: Self.nextBatchBlockRange(latestHeight: latestBlockHeight, latestDownloadedHeight: try downloader.lastDownloadedBlockHeight(), walletBirthday: config.walletBirthday))
nextBatch()
await nextBatch()
} catch {
self.fail(error)
await self.fail(error)
}
}
@ -766,41 +735,39 @@ public class CompactBlockProcessor {
}
}
private func nextBatch() {
setState(.downloading)
Task { @MainActor [self] in
do {
let nextState = try await NextStateHelper.nextStateAsync(
service: self.service,
downloader: self.downloader,
config: self.config,
rustBackend: self.rustBackend
private func nextBatch() async {
state = .downloading
do {
let nextState = try await NextStateHelper.nextStateAsync(
service: self.service,
downloader: self.downloader,
config: self.config,
rustBackend: self.rustBackend
)
switch nextState {
case .finishProcessing(let height):
self.latestBlockHeight = height
await self.processingFinished(height: height)
case .processNewBlocks(let range):
self.latestBlockHeight = range.upperBound
self.lowerBoundHeight = range.lowerBound
await self.processNewBlocks(range: range)
case let .wait(latestHeight, latestDownloadHeight):
// Lightwalletd might be syncing
self.lowerBoundHeight = latestDownloadHeight
self.latestBlockHeight = latestHeight
LoggerProxy.info(
"Lightwalletd might be syncing: latest downloaded block height is: \(latestDownloadHeight)" +
"while latest blockheight is reported at: \(latestHeight)"
)
switch nextState {
case .finishProcessing(let height):
self.latestBlockHeight = height
self.processingFinished(height: height)
case .processNewBlocks(let range):
self.latestBlockHeight = range.upperBound
self.lowerBoundHeight = range.lowerBound
self.processNewBlocks(range: range)
case let .wait(latestHeight, latestDownloadHeight):
// Lightwalletd might be syncing
self.lowerBoundHeight = latestDownloadHeight
self.latestBlockHeight = latestHeight
LoggerProxy.info(
"Lightwalletd might be syncing: latest downloaded block height is: \(latestDownloadHeight)" +
"while latest blockheight is reported at: \(latestHeight)"
)
self.processingFinished(height: latestDownloadHeight)
}
} catch {
self.severeFailure(error)
await self.processingFinished(height: latestDownloadHeight)
}
} catch {
self.severeFailure(error)
}
}
internal func validationFailed(at height: BlockHeight) {
internal func validationFailed(at height: BlockHeight) async {
// cancel all Tasks
cancelableTask?.cancel()
@ -816,7 +783,7 @@ public class CompactBlockProcessor {
)
guard rustBackend.rewindToHeight(dbData: config.dataDb, height: Int32(rewindHeight), networkType: self.config.network.networkType) else {
fail(rustBackend.lastError() ?? RustWeldingError.genericError(message: "unknown error rewinding to height \(height)"))
await fail(rustBackend.lastError() ?? RustWeldingError.genericError(message: "unknown error rewinding to height \(height)"))
return
}
@ -824,7 +791,7 @@ public class CompactBlockProcessor {
try downloader.rewind(to: rewindHeight)
// notify reorg
NotificationCenter.default.post(
NotificationCenter.default.mainThreadPost(
name: Notification.Name.blockProcessorHandledReOrg,
object: self,
userInfo: [
@ -833,15 +800,15 @@ public class CompactBlockProcessor {
)
// process next batch
self.nextBatch()
await self.nextBatch()
} catch {
self.fail(error)
await self.fail(error)
}
}
internal func processBatchFinished(range: CompactBlockRange) {
internal func processBatchFinished(range: CompactBlockRange) async {
guard processingError == nil else {
retryProcessing(range: range)
await retryProcessing(range: range)
return
}
@ -849,15 +816,15 @@ public class CompactBlockProcessor {
consecutiveChainValidationErrors = 0
guard !range.isEmpty else {
processingFinished(height: range.upperBound)
await processingFinished(height: range.upperBound)
return
}
nextBatch()
await nextBatch()
}
private func processingFinished(height: BlockHeight) {
NotificationCenter.default.post(
private func processingFinished(height: BlockHeight) async {
NotificationCenter.default.mainThreadPost(
name: Notification.Name.blockProcessorFinished,
object: self,
userInfo: [
@ -865,35 +832,33 @@ public class CompactBlockProcessor {
CompactBlockProcessorNotificationKey.foundBlocks: self.foundBlocks
]
)
setState(.synced)
setTimer()
state = .synced
await setTimer()
}
private func setTimer() {
private func setTimer() async {
let interval = self.config.blockPollInterval
self.backoffTimer?.invalidate()
let timer = Timer(
timeInterval: interval,
repeats: true,
block: { [weak self] _ in
guard let self = self else { return }
do {
if self.shouldStart {
Task { [self] in
guard let self = self else { return }
if await self.shouldStart {
LoggerProxy.debug(
"""
Timer triggered: Starting compact Block processor!.
Processor State: \(self.state)
latestHeight: \(self.latestBlockHeight)
attempts: \(self.retryAttempts)
lowerbound: \(String(describing: self.lowerBoundHeight))
"""
"""
Timer triggered: Starting compact Block processor!.
Processor State: \(await self.state)
latestHeight: \(await self.latestBlockHeight)
attempts: \(await self.retryAttempts)
lowerbound: \(String(describing: await self.lowerBoundHeight))
"""
)
try self.start()
} else if self.maxAttemptsReached {
self.fail(CompactBlockProcessorError.maxAttemptsReached(attempts: self.config.retries))
await self.start()
} else if await self.maxAttemptsReached {
await self.fail(CompactBlockProcessorError.maxAttemptsReached(attempts: self.config.retries))
}
} catch {
self.fail(error)
}
}
)
@ -907,7 +872,7 @@ public class CompactBlockProcessor {
return
}
NotificationCenter.default.post(
NotificationCenter.default.mainThreadPost(
name: .blockProcessorStatusChanged,
object: self,
userInfo: [
@ -918,27 +883,27 @@ public class CompactBlockProcessor {
switch newValue {
case .downloading:
NotificationCenter.default.post(name: Notification.Name.blockProcessorStartedDownloading, object: self)
NotificationCenter.default.mainThreadPost(name: Notification.Name.blockProcessorStartedDownloading, object: self)
case .synced:
// transition to this state is handled by `processingFinished(height: BlockHeight)`
break
case .error(let err):
notifyError(err)
case .scanning:
NotificationCenter.default.post(name: Notification.Name.blockProcessorStartedScanning, object: self)
NotificationCenter.default.mainThreadPost(name: Notification.Name.blockProcessorStartedScanning, object: self)
case .stopped:
NotificationCenter.default.post(name: Notification.Name.blockProcessorStopped, object: self)
NotificationCenter.default.mainThreadPost(name: Notification.Name.blockProcessorStopped, object: self)
case .validating:
NotificationCenter.default.post(name: Notification.Name.blockProcessorStartedValidating, object: self)
NotificationCenter.default.mainThreadPost(name: Notification.Name.blockProcessorStartedValidating, object: self)
case .enhancing:
NotificationCenter.default.post(name: Notification.Name.blockProcessorStartedEnhancing, object: self)
NotificationCenter.default.mainThreadPost(name: Notification.Name.blockProcessorStartedEnhancing, object: self)
case .fetching:
NotificationCenter.default.post(name: Notification.Name.blockProcessorStartedFetching, object: self)
NotificationCenter.default.mainThreadPost(name: Notification.Name.blockProcessorStartedFetching, object: self)
}
}
private func notifyError(_ err: Error) {
NotificationCenter.default.post(
NotificationCenter.default.mainThreadPost(
name: Notification.Name.blockProcessorFailed,
object: self,
userInfo: [CompactBlockProcessorNotificationKey.error: mapError(err)]
@ -1157,7 +1122,7 @@ extension CompactBlockProcessorError: LocalizedError {
extension CompactBlockProcessor: EnhancementStreamDelegate {
func transactionEnhancementProgressUpdated(_ progress: EnhancementProgress) {
NotificationCenter.default.post(
NotificationCenter.default.mainThreadPost(
name: .blockProcessorEnhancementProgress,
object: self,
userInfo: [CompactBlockProcessorNotificationKey.enhancementProgress: progress]

View File

@ -12,7 +12,7 @@ extension CompactBlockProcessor {
func compactBlockBatchScanning(range: CompactBlockRange) async throws {
try Task.checkCancellation()
setState(.scanning)
state = .scanning
let batchSize = UInt32(config.scanningBatchSize)
do {
@ -24,7 +24,7 @@ extension CompactBlockProcessor {
throw error
}
let scanFinishTime = Date()
NotificationCenter.default.post(
NotificationCenter.default.mainThreadPostNotification(
SDKMetrics.progressReportNotification(
progress: BlockProgress(
startHeight: range.lowerBound,
@ -68,7 +68,7 @@ extension CompactBlockProcessor {
if scannedNewBlocks {
let progress = BlockProgress(startHeight: scanStartHeight, targetHeight: targetScanHeight, progressHeight: lastScannedHeight)
notifyProgress(.scan(progress))
NotificationCenter.default.post(
NotificationCenter.default.mainThreadPostNotification(
SDKMetrics.progressReportNotification(
progress: progress,
start: scanStartTime,
@ -81,9 +81,11 @@ extension CompactBlockProcessor {
let seconds = scanFinishTime.timeIntervalSinceReferenceDate - scanStartTime.timeIntervalSinceReferenceDate
LoggerProxy.debug("Scanned \(heightCount) blocks in \(seconds) seconds")
}
await Task.yield()
} while !Task.isCancelled && scannedNewBlocks && lastScannedHeight < targetScanHeight
if Task.isCancelled {
setState(.stopped)
state = .stopped
LoggerProxy.debug("Warning: compactBlockBatchScanning cancelled")
}
}

View File

@ -17,7 +17,7 @@ extension CompactBlockProcessor {
func compactBlockValidation() async throws {
try Task.checkCancellation()
setState(.validating)
state = .validating
let result = rustBackend.validateCombinedChain(dbCache: config.cacheDb, dbData: config.dataDb, networkType: config.network.networkType)
@ -30,7 +30,7 @@ extension CompactBlockProcessor {
case ZcashRustBackendWeldingConstants.validChain:
if Task.isCancelled {
setState(.stopped)
state = .stopped
LoggerProxy.debug("Warning: compactBlockValidation cancelled")
}
LoggerProxy.debug("validateChainFinished")
@ -50,11 +50,11 @@ extension CompactBlockProcessor {
switch validationError {
case .validationFailed(let height):
LoggerProxy.debug("chain validation at height: \(height)")
validationFailed(at: height)
await validationFailed(at: height)
case .failedWithError(let err):
guard let validationFailure = err else {
LoggerProxy.error("validation failed without a specific error")
self.fail(CompactBlockProcessorError.generalError(message: "validation failed without a specific error"))
await self.fail(CompactBlockProcessorError.generalError(message: "validation failed without a specific error"))
return
}

View File

@ -16,8 +16,8 @@ extension CompactBlockProcessor {
func fetchUnspentTxOutputs(range: CompactBlockRange) async throws {
try Task.checkCancellation()
setState(.fetching)
state = .fetching
do {
let tAddresses = try accountRepository.getAll().map({ $0.transparentAddress })
do {
@ -64,7 +64,7 @@ extension CompactBlockProcessor {
let result = (inserted: refreshed, skipped: skipped)
NotificationCenter.default.post(
NotificationCenter.default.mainThreadPost(
name: .blockProcessorStoredUTXOs,
object: self,
userInfo: [CompactBlockProcessorNotificationKey.refreshedUTXOs: result]
@ -73,7 +73,7 @@ extension CompactBlockProcessor {
if Task.isCancelled {
LoggerProxy.debug("Warning: fetchUnspentTxOutputs on range \(range) cancelled")
} else {
processBatchFinished(range: range)
await processBatchFinished(range: range)
}
} catch {
throw error

View File

@ -0,0 +1,30 @@
//
// NotificationCenter+Post.swift
//
//
// Created by Lukáš Korba on 12.10.2022.
//
import Foundation
extension NotificationCenter {
func mainThreadPost(
name aName: NSNotification.Name,
object anObject: Any?,
userInfo aUserInfo: [AnyHashable : Any]? = nil
) {
DispatchQueue.main.async {
NotificationCenter.default.post(
name: aName,
object: anObject,
userInfo: aUserInfo
)
}
}
func mainThreadPostNotification(_ notification: Notification) {
DispatchQueue.main.async {
NotificationCenter.default.post(notification)
}
}
}

View File

@ -417,7 +417,7 @@ extension LightWalletServiceError {
class ConnectionStatusManager: ConnectivityStateDelegate {
func connectivityStateDidChange(from oldState: ConnectivityState, to newState: ConnectivityState) {
LoggerProxy.event("Connection Changed from \(oldState) to \(newState)")
NotificationCenter.default.post(
NotificationCenter.default.mainThreadPost(
name: .blockProcessorConnectivityStateChanged,
object: self,
userInfo: [

View File

@ -80,7 +80,7 @@ public protocol Synchronizer {
/// prepares this initializer to operate. Initializes the internal state with the given
/// Extended Viewing Keys and a wallet birthday found in the initializer object
func prepare() throws
func prepare() async throws
///Starts this synchronizer within the given scope.
///
@ -95,18 +95,18 @@ public protocol Synchronizer {
/// Gets the sapling shielded address for the given account.
/// - Parameter accountIndex: the optional accountId whose address is of interest. By default, the first account is used.
/// - Returns the address or nil if account index is incorrect
func getShieldedAddress(accountIndex: Int) -> SaplingShieldedAddress?
func getShieldedAddress(accountIndex: Int) async -> SaplingShieldedAddress?
/// Gets the unified address for the given account.
/// - Parameter accountIndex: the optional accountId whose address is of interest. By default, the first account is used.
/// - Returns the address or nil if account index is incorrect
func getUnifiedAddress(accountIndex: Int) -> UnifiedAddress?
func getUnifiedAddress(accountIndex: Int) async -> UnifiedAddress?
/// Gets the transparent address for the given account.
/// - Parameter accountIndex: the optional accountId whose address is of interest. By default, the first account is used.
/// - Returns the address or nil if account index is incorrect
func getTransparentAddress(accountIndex: Int) -> TransparentAddress?
func getTransparentAddress(accountIndex: Int) async -> TransparentAddress?
/// Sends zatoshi.
/// - Parameter spendingKey: the key that allows spends to occur.
@ -167,7 +167,7 @@ public protocol Synchronizer {
func allConfirmedTransactions(from transaction: ConfirmedTransactionEntity?, limit: Int) throws -> [ConfirmedTransactionEntity]?
/// Returns the latest downloaded height from the compact block cache
func latestDownloadedHeight() throws -> BlockHeight
func latestDownloadedHeight() async throws -> BlockHeight
/// Returns the latest block height from the provided Lightwallet endpoint
@ -176,14 +176,14 @@ public protocol Synchronizer {
/// Returns the latest block height from the provided Lightwallet endpoint
/// Blocking
func latestHeight() throws -> BlockHeight
func latestHeight() async throws -> BlockHeight
/// Returns the latests UTXOs for the given address from the specified height on
func refreshUTXOs(address: String, from height: BlockHeight) async throws -> RefreshedUTXOs
/// Returns the last stored unshielded balance
func getTransparentBalance(accountIndex: Int) throws -> WalletBalance
func getTransparentBalance(accountIndex: Int) async throws -> WalletBalance
/// Returns the shielded total balance (includes verified and unverified balance)
@ -207,7 +207,7 @@ public protocol Synchronizer {
/// - Throws rewindErrorUnknownArchorHeight when the rewind points to an invalid height
/// - Throws rewindError for other errors
/// - Note rewind does not trigger notifications as a reorg would. You need to restart the synchronizer afterwards
func rewind(_ policy: RewindPolicy) throws
func rewind(_ policy: RewindPolicy) async throws
}
public enum SyncStatus: Equatable {

View File

@ -151,17 +151,19 @@ public class SDKSynchronizer: Synchronizer {
deinit {
NotificationCenter.default.removeObserver(self)
self.blockProcessor.stop()
Task { [blockProcessor] in
await blockProcessor.stop()
}
}
public func initialize() throws {
public func initialize() async throws {
try self.initializer.initialize()
try self.blockProcessor.setStartHeight(initializer.walletBirthday)
try await self.blockProcessor.setStartHeight(initializer.walletBirthday)
}
public func prepare() throws {
public func prepare() async throws {
try self.initializer.initialize()
try self.blockProcessor.setStartHeight(initializer.walletBirthday)
try await self.blockProcessor.setStartHeight(initializer.walletBirthday)
self.status = .disconnected
}
@ -177,10 +179,8 @@ public class SDKSynchronizer: Synchronizer {
return
case .stopped, .synced, .disconnected, .error:
do {
try blockProcessor.start(retry: retry)
} catch {
throw mapError(error)
Task {
await blockProcessor.start(retry: retry)
}
}
}
@ -192,8 +192,10 @@ public class SDKSynchronizer: Synchronizer {
return
}
blockProcessor.stop()
self.status = .stopped
Task(priority: .high) {
await blockProcessor.stop()
self.status = .stopped
}
}
private func subscribeToProcessorNotifications(_ processor: CompactBlockProcessor) {
@ -314,7 +316,7 @@ public class SDKSynchronizer: Synchronizer {
}
let currentState = ConnectionState(current)
NotificationCenter.default.post(
NotificationCenter.default.mainThreadPost(
name: .synchronizerConnectionStateChanged,
object: self,
userInfo: [
@ -336,7 +338,7 @@ public class SDKSynchronizer: Synchronizer {
return
}
NotificationCenter.default.post(
NotificationCenter.default.mainThreadPost(
name: .synchronizerFoundTransactions,
object: self,
userInfo: [
@ -489,7 +491,7 @@ public class SDKSynchronizer: Synchronizer {
do {
let tAddr = try derivationTool.deriveTransparentAddressFromPrivateKey(transparentSecretKey)
let tBalance = try utxoRepository.balance(address: tAddr, latestHeight: self.latestDownloadedHeight())
let tBalance = try await utxoRepository.balance(address: tAddr, latestHeight: self.latestDownloadedHeight())
// Verify that at least there are funds for the fee. Ideally this logic will be improved by the shielding wallet.
guard tBalance.verified >= self.network.constants.defaultFee(for: self.latestScannedHeight) else {
@ -566,8 +568,8 @@ public class SDKSynchronizer: Synchronizer {
PagedTransactionRepositoryBuilder.build(initializer: initializer, kind: .all)
}
public func latestDownloadedHeight() throws -> BlockHeight {
try blockProcessor.downloader.lastDownloadedBlockHeight()
public func latestDownloadedHeight() async throws -> BlockHeight {
try await blockProcessor.downloader.lastDownloadedBlockHeight()
}
public func latestHeight(result: @escaping (Result<BlockHeight, Error>) -> Void) {
@ -581,8 +583,8 @@ public class SDKSynchronizer: Synchronizer {
}
}
public func latestHeight() throws -> BlockHeight {
try blockProcessor.downloader.latestBlockHeight()
public func latestHeight() async throws -> BlockHeight {
try await blockProcessor.downloader.latestBlockHeight()
}
public func latestUTXOs(address: String) async throws -> [UnspentTransactionOutputEntity] {
@ -626,34 +628,34 @@ public class SDKSynchronizer: Synchronizer {
initializer.getVerifiedBalance(account: accountIndex)
}
public func getShieldedAddress(accountIndex: Int) -> SaplingShieldedAddress? {
blockProcessor.getShieldedAddress(accountIndex: accountIndex)
public func getShieldedAddress(accountIndex: Int) async -> SaplingShieldedAddress? {
await blockProcessor.getShieldedAddress(accountIndex: accountIndex)
}
public func getUnifiedAddress(accountIndex: Int) -> UnifiedAddress? {
blockProcessor.getUnifiedAddres(accountIndex: accountIndex)
public func getUnifiedAddress(accountIndex: Int) async -> UnifiedAddress? {
await blockProcessor.getUnifiedAddres(accountIndex: accountIndex)
}
public func getTransparentAddress(accountIndex: Int) -> TransparentAddress? {
blockProcessor.getTransparentAddress(accountIndex: accountIndex)
public func getTransparentAddress(accountIndex: Int) async -> TransparentAddress? {
await blockProcessor.getTransparentAddress(accountIndex: accountIndex)
}
public func getTransparentBalance(accountIndex: Int) throws -> WalletBalance {
try blockProcessor.getTransparentBalance(accountIndex: accountIndex)
public func getTransparentBalance(accountIndex: Int) async throws -> WalletBalance {
try await blockProcessor.getTransparentBalance(accountIndex: accountIndex)
}
/**
Returns the last stored unshielded balance
*/
public func getTransparentBalance(address: String) throws -> WalletBalance {
public func getTransparentBalance(address: String) async throws -> WalletBalance {
do {
return try self.blockProcessor.utxoCacheBalance(tAddress: address)
return try await self.blockProcessor.utxoCacheBalance(tAddress: address)
} catch {
throw SynchronizerError.uncategorized(underlyingError: error)
}
}
public func rewind(_ policy: RewindPolicy) throws {
public func rewind(_ policy: RewindPolicy) async throws {
self.stop()
var height: BlockHeight?
@ -663,7 +665,7 @@ public class SDKSynchronizer: Synchronizer {
break
case .birthday:
let birthday = self.blockProcessor.config.walletBirthday
let birthday = await self.blockProcessor.config.walletBirthday
height = birthday
case .height(let rewindHeight):
@ -677,7 +679,7 @@ public class SDKSynchronizer: Synchronizer {
}
do {
let rewindHeight = try self.blockProcessor.rewindTo(height)
let rewindHeight = try await self.blockProcessor.rewindTo(height)
try self.transactionManager.handleReorg(at: rewindHeight)
} catch {
throw SynchronizerError.rewindError(underlyingError: error)
@ -691,11 +693,11 @@ public class SDKSynchronizer: Synchronizer {
userInfo[NotificationKeys.blockHeight] = progress.progressHeight
self.status = SyncStatus(progress)
NotificationCenter.default.post(name: Notification.Name.synchronizerProgressUpdated, object: self, userInfo: userInfo)
NotificationCenter.default.mainThreadPost(name: Notification.Name.synchronizerProgressUpdated, object: self, userInfo: userInfo)
}
private func notifyStatusChange(newValue: SyncStatus, oldValue: SyncStatus) {
NotificationCenter.default.post(
NotificationCenter.default.mainThreadPost(
name: .synchronizerStatusWillUpdate,
object: self,
userInfo:
@ -709,38 +711,40 @@ public class SDKSynchronizer: Synchronizer {
private func notify(status: SyncStatus) {
switch status {
case .disconnected:
NotificationCenter.default.post(name: Notification.Name.synchronizerDisconnected, object: self)
NotificationCenter.default.mainThreadPost(name: Notification.Name.synchronizerDisconnected, object: self)
case .stopped:
NotificationCenter.default.post(name: Notification.Name.synchronizerStopped, object: self)
NotificationCenter.default.mainThreadPost(name: Notification.Name.synchronizerStopped, object: self)
case .synced:
NotificationCenter.default.post(
name: Notification.Name.synchronizerSynced,
object: self,
userInfo: [
SDKSynchronizer.NotificationKeys.blockHeight: self.latestScannedHeight,
SDKSynchronizer.NotificationKeys.synchronizerState: SynchronizerState(
shieldedBalance: WalletBalance(
verified: initializer.getVerifiedBalance(),
total: initializer.getBalance()
),
transparentBalance: (try? self.getTransparentBalance(accountIndex: 0)) ?? WalletBalance.zero,
syncStatus: status,
latestScannedHeight: self.latestScannedHeight
)
]
)
Task {
NotificationCenter.default.mainThreadPost(
name: Notification.Name.synchronizerSynced,
object: self,
userInfo: [
SDKSynchronizer.NotificationKeys.blockHeight: self.latestScannedHeight,
SDKSynchronizer.NotificationKeys.synchronizerState: SynchronizerState(
shieldedBalance: WalletBalance(
verified: initializer.getVerifiedBalance(),
total: initializer.getBalance()
),
transparentBalance: (try? await self.getTransparentBalance(accountIndex: 0)) ?? WalletBalance.zero,
syncStatus: status,
latestScannedHeight: self.latestScannedHeight
)
]
)
}
case .unprepared:
break
case .downloading:
NotificationCenter.default.post(name: Notification.Name.synchronizerDownloading, object: self)
NotificationCenter.default.mainThreadPost(name: Notification.Name.synchronizerDownloading, object: self)
case .validating:
NotificationCenter.default.post(name: Notification.Name.synchronizerValidating, object: self)
NotificationCenter.default.mainThreadPost(name: Notification.Name.synchronizerValidating, object: self)
case .scanning:
NotificationCenter.default.post(name: Notification.Name.synchronizerScanning, object: self)
NotificationCenter.default.mainThreadPost(name: Notification.Name.synchronizerScanning, object: self)
case .enhancing:
NotificationCenter.default.post(name: Notification.Name.synchronizerEnhancing, object: self)
NotificationCenter.default.mainThreadPost(name: Notification.Name.synchronizerEnhancing, object: self)
case .fetching:
NotificationCenter.default.post(name: Notification.Name.synchronizerFetching, object: self)
NotificationCenter.default.mainThreadPost(name: Notification.Name.synchronizerFetching, object: self)
case .error(let e):
self.notifyFailure(e)
}
@ -782,7 +786,7 @@ public class SDKSynchronizer: Synchronizer {
DispatchQueue.main.async { [weak self] in
guard let self = self else { return }
NotificationCenter.default.post(
NotificationCenter.default.mainThreadPost(
name: Notification.Name.synchronizerMinedTransaction,
object: self,
userInfo: [NotificationKeys.minedTransaction: transaction]
@ -832,7 +836,7 @@ public class SDKSynchronizer: Synchronizer {
private func notifyFailure(_ error: Error) {
DispatchQueue.main.async { [weak self] in
guard let self = self else { return }
NotificationCenter.default.post(
NotificationCenter.default.mainThreadPost(
name: Notification.Name.synchronizerFailed,
object: self,
userInfo: [NotificationKeys.error: self.mapError(error)]

View File

@ -83,7 +83,7 @@ class AdvancedReOrgTests: XCTestCase {
10. sync up to received_Tx_height + 3
11. verify that balance equals initial balance + tx amount
*/
func testReOrgChangesInboundTxMinedHeight() throws {
func testReOrgChangesInboundTxMinedHeight() async throws {
hookToReOrgNotification()
try FakeChainBuilder.buildChain(darksideWallet: coordinator.service, branchID: branchID, chainName: chainName)
var shouldContinue = false
@ -101,16 +101,23 @@ class AdvancedReOrgTests: XCTestCase {
var synchronizer: SDKSynchronizer?
try coordinator.sync(
completion: { synchro in
synchronizer = synchro
initialVerifiedBalance = synchro.initializer.getVerifiedBalance()
initialTotalBalance = synchro.initializer.getBalance()
preTxExpectation.fulfill()
shouldContinue = true
},
error: self.handleError
)
try await withCheckedThrowingContinuation { continuation in
do {
try coordinator.sync(
completion: { synchro in
synchronizer = synchro
initialVerifiedBalance = synchro.initializer.getVerifiedBalance()
initialTotalBalance = synchro.initializer.getBalance()
preTxExpectation.fulfill()
shouldContinue = true
continuation.resume()
},
error: self.handleError
)
} catch {
continuation.resume(with: .failure(error))
}
}
wait(for: [preTxExpectation], timeout: 10)
@ -132,12 +139,20 @@ class AdvancedReOrgTests: XCTestCase {
var receivedTxTotalBalance = Zatoshi(-1)
var receivedTxVerifiedBalance = Zatoshi(-1)
try coordinator.sync(completion: { synchro in
synchronizer = synchro
receivedTxVerifiedBalance = synchro.initializer.getVerifiedBalance()
receivedTxTotalBalance = synchro.initializer.getBalance()
receivedTxExpectation.fulfill()
}, error: self.handleError)
try await withCheckedThrowingContinuation { continuation in
do {
try coordinator.sync(completion: { synchro in
synchronizer = synchro
receivedTxVerifiedBalance = synchro.initializer.getVerifiedBalance()
receivedTxTotalBalance = synchro.initializer.getBalance()
receivedTxExpectation.fulfill()
continuation.resume()
}, error: self.handleError)
} catch {
continuation.resume(with: .failure(error))
}
}
sleep(2)
wait(for: [receivedTxExpectation], timeout: 10)
@ -196,11 +211,21 @@ class AdvancedReOrgTests: XCTestCase {
var afterReorgTxTotalBalance = Zatoshi(-1)
var afterReorgTxVerifiedBalance = Zatoshi(-1)
try coordinator.sync(completion: { synchronizer in
afterReorgTxTotalBalance = synchronizer.initializer.getBalance()
afterReorgTxVerifiedBalance = synchronizer.initializer.getVerifiedBalance()
reorgSyncexpectation.fulfill()
}, error: self.handleError(_:))
try await withCheckedThrowingContinuation { continuation in
do {
try coordinator.sync(
completion: { synchronizer in
afterReorgTxTotalBalance = synchronizer.initializer.getBalance()
afterReorgTxVerifiedBalance = synchronizer.initializer.getVerifiedBalance()
reorgSyncexpectation.fulfill()
continuation.resume()
},
error: self.handleError
)
} catch {
continuation.resume(with: .failure(error))
}
}
/*
8. assert that reorg happened at received_Tx_height
@ -224,11 +249,22 @@ class AdvancedReOrgTests: XCTestCase {
try coordinator.applyStaged(blockheight: reorgedTxheight + 1)
sleep(3)
try coordinator.sync(completion: { synchronizer in
finalReorgTxTotalBalance = synchronizer.initializer.getBalance()
finalReorgTxVerifiedBalance = synchronizer.initializer.getVerifiedBalance()
finalsyncExpectation.fulfill()
}, error: self.handleError(_:))
try await withCheckedThrowingContinuation { continuation in
do {
try coordinator.sync(
completion: { synchronizer in
finalReorgTxTotalBalance = synchronizer.initializer.getBalance()
finalReorgTxVerifiedBalance = synchronizer.initializer.getVerifiedBalance()
finalsyncExpectation.fulfill()
continuation.resume()
},
error: self.handleError
)
} catch {
continuation.resume(with: .failure(error))
}
}
wait(for: [finalsyncExpectation], timeout: 5)
sleep(3)
@ -1239,7 +1275,7 @@ class AdvancedReOrgTests: XCTestCase {
XCTAssertEqual(coordinator.synchronizer.initializer.getBalance(), initialTotalBalance)
}
func testLongSync() throws {
func testLongSync() async throws {
hookToReOrgNotification()
/*
@ -1258,21 +1294,31 @@ class AdvancedReOrgTests: XCTestCase {
/*
sync to latest height
*/
try coordinator.sync(completion: { _ in
firstSyncExpectation.fulfill()
}, error: { error in
_ = try? self.coordinator.stop()
firstSyncExpectation.fulfill()
guard let testError = error else {
XCTFail("failed with nil error")
return
try await withCheckedThrowingContinuation { continuation in
do {
try coordinator.sync(
completion: { _ in
firstSyncExpectation.fulfill()
continuation.resume()
}, error: { error in
_ = try? self.coordinator.stop()
firstSyncExpectation.fulfill()
guard let testError = error else {
XCTFail("failed with nil error")
return
}
XCTFail("Failed with error: \(testError)")
}
)
} catch {
continuation.resume(throwing: error)
}
XCTFail("Failed with error: \(testError)")
})
}
wait(for: [firstSyncExpectation], timeout: 500)
XCTAssertEqual(try coordinator.synchronizer.latestDownloadedHeight(), birthday + fullSyncLength)
let latestDownloadedHeight = try await coordinator.synchronizer.latestDownloadedHeight()
XCTAssertEqual(latestDownloadedHeight, birthday + fullSyncLength)
}
func handleError(_ error: Error?) {

View File

@ -64,7 +64,7 @@ class RewindRescanTests: XCTestCase {
XCTFail("Failed with error: \(testError)")
}
func testBirthdayRescan() throws {
func testBirthdayRescan() async throws {
// 1 sync and get spendable funds
try FakeChainBuilder.buildChain(darksideWallet: coordinator.service, branchID: branchID, chainName: chainName)
@ -86,7 +86,7 @@ class RewindRescanTests: XCTestCase {
XCTAssertEqual(verifiedBalance, totalBalance)
// rewind to birthday
try coordinator.synchronizer.rewind(.birthday)
try await coordinator.synchronizer.rewind(.birthday)
// assert that after the new height is
XCTAssertEqual(try coordinator.synchronizer.initializer.transactionRepository.lastScannedHeight(), self.birthday)
@ -145,7 +145,7 @@ class RewindRescanTests: XCTestCase {
height: Int32(targetHeight),
networkType: network.networkType
)
try coordinator.synchronizer.rewind(.height(blockheight: targetHeight))
try await coordinator.synchronizer.rewind(.height(blockheight: targetHeight))
guard rewindHeight > 0 else {
XCTFail("get nearest height failed error: \(ZcashRustBackend.getLastError() ?? "null")")
@ -190,7 +190,7 @@ class RewindRescanTests: XCTestCase {
wait(for: [sendExpectation], timeout: 15)
}
func testRescanToTransaction() throws {
func testRescanToTransaction() async throws {
// 1 sync and get spendable funds
try FakeChainBuilder.buildChain(darksideWallet: coordinator.service, branchID: branchID, chainName: chainName)
@ -216,7 +216,7 @@ class RewindRescanTests: XCTestCase {
return
}
try coordinator.synchronizer.rewind(.transaction(transaction.transactionEntity))
try await coordinator.synchronizer.rewind(.transaction(transaction.transactionEntity))
// assert that after the new height is
XCTAssertEqual(
@ -363,7 +363,7 @@ class RewindRescanTests: XCTestCase {
// rewind 5 blocks prior to sending
try coordinator.synchronizer.rewind(.height(blockheight: sentTxHeight - 5))
try await coordinator.synchronizer.rewind(.height(blockheight: sentTxHeight - 5))
guard
let pendingEntity = try coordinator.synchronizer.allPendingTransactions()

View File

@ -93,7 +93,7 @@ class ShieldFundsTests: XCTestCase {
var shouldContinue = false
var initialTotalBalance = Zatoshi(-1)
var initialVerifiedBalance = Zatoshi(-1)
var initialTransparentBalance: WalletBalance = try coordinator.synchronizer.getTransparentBalance(accountIndex: 0)
var initialTransparentBalance: WalletBalance = try await coordinator.synchronizer.getTransparentBalance(accountIndex: 0)
let utxo = try GetAddressUtxosReply(jsonString: """
{
@ -135,7 +135,7 @@ class ShieldFundsTests: XCTestCase {
// at this point the balance should be all zeroes for transparent and shielded funds
XCTAssertEqual(initialTotalBalance, Zatoshi.zero)
XCTAssertEqual(initialVerifiedBalance, Zatoshi.zero)
initialTransparentBalance = try coordinator.synchronizer.getTransparentBalance(accountIndex: 0)
initialTransparentBalance = try await coordinator.synchronizer.getTransparentBalance(accountIndex: 0)
XCTAssertEqual(initialTransparentBalance.total, .zero)
XCTAssertEqual(initialTransparentBalance.verified, .zero)
@ -169,7 +169,7 @@ class ShieldFundsTests: XCTestCase {
// at this point the balance should be zero for shielded, then zero verified transparent funds
// and 10000 zatoshi of total (not verified) transparent funds.
let tFundsDetectedBalance = try coordinator.synchronizer.getTransparentBalance(accountIndex: 0)
let tFundsDetectedBalance = try await coordinator.synchronizer.getTransparentBalance(accountIndex: 0)
XCTAssertEqual(tFundsDetectedBalance.total, Zatoshi(10000))
XCTAssertEqual(tFundsDetectedBalance.verified, Zatoshi(10000)) //FIXME: this should be zero
@ -199,7 +199,7 @@ class ShieldFundsTests: XCTestCase {
wait(for: [tFundsConfirmationSyncExpectation], timeout: 5)
// the transparent funds should be 10000 zatoshis both total and verified
let confirmedTFundsBalance = try coordinator.synchronizer.getTransparentBalance(accountIndex: 0)
let confirmedTFundsBalance = try await coordinator.synchronizer.getTransparentBalance(accountIndex: 0)
XCTAssertEqual(confirmedTFundsBalance.total, Zatoshi(10000))
XCTAssertEqual(confirmedTFundsBalance.verified, Zatoshi(10000))
@ -239,7 +239,7 @@ class ShieldFundsTests: XCTestCase {
guard shouldContinue else { return }
let postShieldingBalance = try coordinator.synchronizer.getTransparentBalance(accountIndex: 0)
let postShieldingBalance = try await coordinator.synchronizer.getTransparentBalance(accountIndex: 0)
// when funds are shielded the UTXOs should be marked as spend and not shown on the balance.
// now balance should be zero shielded, zero transaparent.
// verify that the balance has been marked as spent regardless of confirmation
@ -290,7 +290,7 @@ class ShieldFundsTests: XCTestCase {
// Now it should verify that the balance has been shielded. The resulting balance should be zero
// transparent funds and `10000 - fee` total shielded funds, zero verified shielded funds.
// Fees at the time of writing the tests are 1000 zatoshi as defined on ZIP-313
let postShieldingShieldedBalance = try coordinator.synchronizer.getTransparentBalance(accountIndex: 0)
let postShieldingShieldedBalance = try await coordinator.synchronizer.getTransparentBalance(accountIndex: 0)
XCTAssertEqual(postShieldingShieldedBalance.total, Zatoshi(10000)) //FIXME: this should be zero
XCTAssertEqual(postShieldingShieldedBalance.verified, Zatoshi(10000)) //FIXME: this should be zero
@ -327,7 +327,7 @@ class ShieldFundsTests: XCTestCase {
XCTAssertNotNil(clearedTransaction)
XCTAssertEqual(coordinator.synchronizer.getShieldedBalance(), Zatoshi(9000))
let postShieldingConfirmationShieldedBalance = try coordinator.synchronizer.getTransparentBalance(accountIndex: 0)
let postShieldingConfirmationShieldedBalance = try await coordinator.synchronizer.getTransparentBalance(accountIndex: 0)
XCTAssertEqual(postShieldingConfirmationShieldedBalance.total, .zero)
XCTAssertEqual(postShieldingConfirmationShieldedBalance.verified, .zero)

View File

@ -67,7 +67,7 @@ final class SynchronizerTests: XCTestCase {
reorgExpectation.fulfill()
}
func testSynchronizerStops() throws {
func testSynchronizerStops() async throws {
hookToReOrgNotification()
/*
@ -102,14 +102,14 @@ final class SynchronizerTests: XCTestCase {
XCTFail("Failed with error: \(testError)")
})
DispatchQueue.main.asyncAfter(deadline: .now() + 5) {
self.coordinator.synchronizer.stop()
}
try await Task.sleep(nanoseconds: 5_000_000_000)
self.coordinator.synchronizer.stop()
wait(for: [processorStoppedExpectation,syncStoppedExpectation], timeout: 6, enforceOrder: true)
wait(for: [syncStoppedExpectation, processorStoppedExpectation], timeout: 6, enforceOrder: true)
XCTAssertEqual(coordinator.synchronizer.status, .stopped)
XCTAssertEqual(coordinator.synchronizer.blockProcessor.state.getState(), .stopped)
let state = await coordinator.synchronizer.blockProcessor.state
XCTAssertEqual(state, .stopped)
}
func handleError(_ error: Error?) {

View File

@ -110,7 +110,7 @@ class TransactionEnhancementTests: XCTestCase {
NotificationCenter.default.removeObserver(self)
}
private func startProcessing() throws {
private func startProcessing() async throws {
XCTAssertNotNil(processor)
// Subscribe to notifications
@ -120,17 +120,17 @@ class TransactionEnhancementTests: XCTestCase {
startedValidatingNotificationExpectation.subscribe(to: Notification.Name.blockProcessorStartedValidating, object: processor)
startedScanningNotificationExpectation.subscribe(to: Notification.Name.blockProcessorStartedScanning, object: processor)
try processor.start()
try await processor.start()
}
func testBasicEnhacement() throws {
func testBasicEnhacement() async throws {
let targetLatestHeight = BlockHeight(663250)
let walletBirthday = Checkpoint.birthday(with: 663151, network: network).height
try basicEnhancementTest(latestHeight: targetLatestHeight, walletBirthday: walletBirthday)
try await basicEnhancementTest(latestHeight: targetLatestHeight, walletBirthday: walletBirthday)
}
func basicEnhancementTest(latestHeight: BlockHeight, walletBirthday: BlockHeight) throws {
func basicEnhancementTest(latestHeight: BlockHeight, walletBirthday: BlockHeight) async throws {
do {
try darksideWalletService.reset(saplingActivation: 663150, branchID: branchID, chainName: chainName)
try darksideWalletService.useDataset(DarksideDataset.beforeReOrg.rawValue)
@ -157,7 +157,7 @@ class TransactionEnhancementTests: XCTestCase {
download and sync blocks from walletBirthday to firstLatestHeight
*/
do {
try startProcessing()
try await startProcessing()
} catch {
XCTFail("Error: \(error)")
}

View File

@ -91,7 +91,7 @@ class BlockScanTests: XCTestCase {
range: range
)
XCTAssertFalse(Task.isCancelled)
try compactBlockProcessor.compactBlockScanning(
try await compactBlockProcessor.compactBlockScanning(
rustWelding: rustWelding,
cacheDb: cacheDbURL,
dataDb: dataDbURL,

View File

@ -78,9 +78,8 @@ class BlockStreamingTest: XCTestCase {
blockBufferSize: 10,
startHeight: startHeight
)
XCTAssertTrue(Task.isCancelled)
} catch {
XCTFail("failed with error: \(error)")
XCTAssertTrue(Task.isCancelled)
}
}

View File

@ -96,7 +96,7 @@ class CompactBlockProcessorTests: XCTestCase {
}
}
private func startProcessing() {
private func startProcessing() async {
XCTAssertNotNil(processor)
// Subscribe to notifications
@ -107,11 +107,15 @@ class CompactBlockProcessorTests: XCTestCase {
startedScanningNotificationExpectation.subscribe(to: Notification.Name.blockProcessorStartedScanning, object: processor)
idleNotificationExpectation.subscribe(to: Notification.Name.blockProcessorFinished, object: processor)
XCTAssertNoThrow(try processor.start())
do {
try await processor.start()
} catch {
XCTFail("shouldn't fail")
}
}
func testStartNotifiesSuscriptors() {
startProcessing()
func testStartNotifiesSuscriptors() async {
await startProcessing()
wait(
for: [
@ -125,7 +129,7 @@ class CompactBlockProcessorTests: XCTestCase {
)
}
func testProgressNotifications() {
func testProgressNotifications() async {
let expectedUpdates = expectedBatches(
currentHeight: processorConfig.walletBirthday,
targetHeight: mockLatestHeight,
@ -133,7 +137,7 @@ class CompactBlockProcessorTests: XCTestCase {
)
updatedNotificationExpectation.expectedFulfillmentCount = expectedUpdates
startProcessing()
await startProcessing()
wait(for: [updatedNotificationExpectation], timeout: 300)
}
@ -189,23 +193,23 @@ class CompactBlockProcessorTests: XCTestCase {
)
}
func testDetermineLowerBoundPastBirthday() {
func testDetermineLowerBoundPastBirthday() async {
let errorHeight = 781_906
let walletBirthday = 781_900
let result = processor.determineLowerBound(errorHeight: errorHeight, consecutiveErrors: 1, walletBirthday: walletBirthday)
let result = await processor.determineLowerBound(errorHeight: errorHeight, consecutiveErrors: 1, walletBirthday: walletBirthday)
let expected = 781_886
XCTAssertEqual(result, expected)
}
func testDetermineLowerBound() {
func testDetermineLowerBound() async {
let errorHeight = 781_906
let walletBirthday = 780_900
let result = processor.determineLowerBound(errorHeight: errorHeight, consecutiveErrors: 0, walletBirthday: walletBirthday)
let result = await processor.determineLowerBound(errorHeight: errorHeight, consecutiveErrors: 0, walletBirthday: walletBirthday)
let expected = 781_896
XCTAssertEqual(result, expected)

View File

@ -127,7 +127,7 @@ class CompactBlockReorgTests: XCTestCase {
}
}
private func startProcessing() {
private func startProcessing() async {
XCTAssertNotNil(processor)
// Subscribe to notifications
@ -139,11 +139,15 @@ class CompactBlockReorgTests: XCTestCase {
idleNotificationExpectation.subscribe(to: Notification.Name.blockProcessorFinished, object: processor)
reorgNotificationExpectation.subscribe(to: Notification.Name.blockProcessorHandledReOrg, object: processor)
XCTAssertNoThrow(try processor.start())
do {
try await processor.start()
} catch {
XCTFail("shouldn't fail")
}
}
func testNotifiesReorg() {
startProcessing()
func testNotifiesReorg() async {
await startProcessing()
wait(
for: [

View File

@ -33,7 +33,7 @@ class WalletTests: XCTestCase {
}
}
func testWalletInitialization() throws {
func testWalletInitialization() async throws {
let derivationTool = DerivationTool(networkType: network.networkType)
let uvk = try derivationTool.deriveUnifiedViewingKeysFromSeed(seedData.bytes, numberOfAccounts: 1)
let wallet = Initializer(
@ -49,7 +49,11 @@ class WalletTests: XCTestCase {
)
let synchronizer = try SDKSynchronizer(initializer: wallet)
XCTAssertNoThrow(try synchronizer.prepare())
do {
try await synchronizer.prepare()
} catch {
XCTFail("shouldn't fail here")
}
// fileExists actually sucks, so attempting to delete the file and checking what happens is far better :)
XCTAssertNoThrow( try FileManager.default.removeItem(at: dbData!) )

View File

@ -25,18 +25,18 @@ class MockLightWalletService: LightWalletService {
var queue = DispatchQueue(label: "mock service queue")
func blockStream(startHeight: BlockHeight, endHeight: BlockHeight) -> AsyncThrowingStream<ZcashCompactBlock, Error> {
AsyncThrowingStream { _ in }
service.blockStream(startHeight: startHeight, endHeight: endHeight)
}
func closeConnection() {
}
func fetchUTXOs(for tAddress: String, height: BlockHeight) -> AsyncThrowingStream<UnspentTransactionOutputEntity, Error> {
AsyncThrowingStream { _ in }
service.fetchUTXOs(for: tAddress, height: height)
}
func fetchUTXOs(for tAddresses: [String], height: BlockHeight) -> AsyncThrowingStream<UnspentTransactionOutputEntity, Error> {
AsyncThrowingStream { _ in }
service.fetchUTXOs(for: tAddresses, height: height)
}
private var service: LightWalletService

View File

@ -200,6 +200,12 @@ class TestCoordinator {
}
}
extension CompactBlockProcessor {
public func setConfig(_ config: Configuration) {
self.config = config
}
}
extension TestCoordinator {
func resetBlocks(dataset: DarksideData) throws {
switch dataset {
@ -233,19 +239,25 @@ extension TestCoordinator {
}
func reset(saplingActivation: BlockHeight, branchID: String, chainName: String) throws {
let config = self.synchronizer.blockProcessor.config
self.synchronizer.blockProcessor.config = CompactBlockProcessor.Configuration(
cacheDb: config.cacheDb,
dataDb: config.dataDb,
downloadBatchSize: config.downloadBatchSize,
retries: config.retries,
maxBackoffInterval: config.maxBackoffInterval,
rewindDistance: config.rewindDistance,
walletBirthday: config.walletBirthday,
saplingActivation: config.saplingActivation,
network: config.network
)
Task {
await self.synchronizer.blockProcessor.stop()
let config = await self.synchronizer.blockProcessor.config
let newConfig = CompactBlockProcessor.Configuration(
cacheDb: config.cacheDb,
dataDb: config.dataDb,
downloadBatchSize: config.downloadBatchSize,
retries: config.retries,
maxBackoffInterval: config.maxBackoffInterval,
rewindDistance: config.rewindDistance,
walletBirthday: config.walletBirthday,
saplingActivation: config.saplingActivation,
network: config.network
)
await self.synchronizer.blockProcessor.setConfig(newConfig)
}
try service.reset(saplingActivation: saplingActivation, branchID: branchID, chainName: chainName)
}
@ -308,7 +320,9 @@ enum TestSynchronizerBuilder {
)
let synchronizer = try SDKSynchronizer(initializer: initializer)
try synchronizer.prepare()
Task {
try await synchronizer.prepare()
}
return ([spendingKey], synchronizer)
}