[#435] thread sanitizer issues (#448)

* [#435] this commit attempts to fix thread being starved dues to inversion
of priorities where a .userInitiated thread ends up depending on a lower
priority one on GRPC.

* Add an Synchronizer State struct to report state at once

* Make CompactBlockProcessor's downloader available internally for SDKSynchronizer
remove duplicate handling of processor finished

* PR Suggestions
This commit is contained in:
Francisco Gindre 2022-07-29 15:20:55 -03:00 committed by GitHub
parent fef4bccce8
commit a5d0e44774
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 38 additions and 23 deletions

View File

@ -86,7 +86,7 @@ extension CompactBlockStorage: CompactBlockRepository {
} }
func latestHeight(result: @escaping (Swift.Result<BlockHeight, Error>) -> Void) { func latestHeight(result: @escaping (Swift.Result<BlockHeight, Error>) -> Void) {
DispatchQueue.global(qos: .userInitiated).async { DispatchQueue.global(qos: .default).async {
do { do {
result(.success(try self.latestBlockHeight())) result(.success(try self.latestBlockHeight()))
} catch { } catch {
@ -100,7 +100,7 @@ extension CompactBlockStorage: CompactBlockRepository {
} }
func write(blocks: [ZcashCompactBlock], completion: ((Error?) -> Void)?) { func write(blocks: [ZcashCompactBlock], completion: ((Error?) -> Void)?) {
DispatchQueue.global(qos: .userInitiated).async { DispatchQueue.global(qos: .default).async {
do { do {
try self.insert(blocks) try self.insert(blocks)
completion?(nil) completion?(nil)
@ -111,7 +111,7 @@ extension CompactBlockStorage: CompactBlockRepository {
} }
func rewind(to height: BlockHeight, completion: ((Error?) -> Void)?) { func rewind(to height: BlockHeight, completion: ((Error?) -> Void)?) {
DispatchQueue.global(qos: .userInitiated).async { DispatchQueue.global(qos: .default).async {
do { do {
try self.rewind(to: height) try self.rewind(to: height)
completion?(nil) completion?(nil)

View File

@ -322,9 +322,11 @@ public class CompactBlockProcessor {
self.stop() self.stop()
} }
} }
var maxAttemptsReached: Bool { var maxAttemptsReached: Bool {
self.retryAttempts >= self.config.retries self.retryAttempts >= self.config.retries
} }
var shouldStart: Bool { var shouldStart: Bool {
switch self.state { switch self.state {
case .stopped, .synced, .error: case .stopped, .synced, .error:
@ -335,7 +337,7 @@ public class CompactBlockProcessor {
} }
private var service: LightWalletService private var service: LightWalletService
private var downloader: CompactBlockDownloading private(set) var downloader: CompactBlockDownloading
private var storage: CompactBlockStorage private var storage: CompactBlockStorage
private var transactionRepository: TransactionRepository private var transactionRepository: TransactionRepository
private var accountRepository: AccountRepository private var accountRepository: AccountRepository
@ -1130,7 +1132,8 @@ public class CompactBlockProcessor {
case .downloading: case .downloading:
NotificationCenter.default.post(name: Notification.Name.blockProcessorStartedDownloading, object: self) NotificationCenter.default.post(name: Notification.Name.blockProcessorStartedDownloading, object: self)
case .synced: case .synced:
NotificationCenter.default.post(name: Notification.Name.blockProcessorFinished, object: self) // transition to this state is handled by `processingFinished(height: BlockHeight)`
break
case .error(let err): case .error(let err):
notifyError(err) notifyError(err)
case .scanning: case .scanning:
@ -1396,7 +1399,7 @@ extension CompactBlockProcessor {
queue: DispatchQueue?, queue: DispatchQueue?,
result: @escaping (Result<FigureNextBatchOperation.NextState, Error>) -> Void result: @escaping (Result<FigureNextBatchOperation.NextState, Error>) -> Void
) { ) {
let dispatchQueue = queue ?? DispatchQueue.global(qos: .userInitiated) let dispatchQueue = queue ?? DispatchQueue.global(qos: .default)
dispatchQueue.async { dispatchQueue.async {
do { do {

View File

@ -50,7 +50,7 @@ class PagedTransactionDAO: PaginatedTransactionRepository {
} }
func page(_ number: Int, result: @escaping (Result<[TransactionEntity]?, Error>) -> Void) { func page(_ number: Int, result: @escaping (Result<[TransactionEntity]?, Error>) -> Void) {
DispatchQueue.global(qos: .userInitiated).async { [weak self] in DispatchQueue.global(qos: .default).async { [weak self] in
guard let self = self else { return } guard let self = self else { return }
do { do {
result(.success(try self.page(number))) result(.success(try self.page(number)))

View File

@ -77,6 +77,12 @@ public extension Notification.Name {
/// Synchronizer implementation for UIKit and iOS 12+ /// Synchronizer implementation for UIKit and iOS 12+
// swiftlint:disable type_body_length // swiftlint:disable type_body_length
public class SDKSynchronizer: Synchronizer { public class SDKSynchronizer: Synchronizer {
public struct SynchronizerState {
public var shieldedBalance: WalletBalance
public var transparentBalance: WalletBalance
public var syncStatus: SyncStatus
public var latestScannedHeight: BlockHeight
}
public enum NotificationKeys { public enum NotificationKeys {
public static let progress = "SDKSynchronizer.progress" public static let progress = "SDKSynchronizer.progress"
@ -89,6 +95,7 @@ public class SDKSynchronizer: Synchronizer {
public static let nextStatus = "SDKSynchronizer.nextStatus" public static let nextStatus = "SDKSynchronizer.nextStatus"
public static let currentConnectionState = "SDKSynchronizer.currentConnectionState" public static let currentConnectionState = "SDKSynchronizer.currentConnectionState"
public static let previousConnectionState = "SDKSynchronizer.previousConnectionState" public static let previousConnectionState = "SDKSynchronizer.previousConnectionState"
public static let synchronizerState = "SDKSynchronizer.synchronizerState"
} }
public private(set) var status: SyncStatus { public private(set) var status: SyncStatus {
@ -435,16 +442,12 @@ public class SDKSynchronizer: Synchronizer {
} }
@objc func processorFinished(_ notification: Notification) { @objc func processorFinished(_ notification: Notification) {
// FIX: Pending transaction updates fail if done from another thread. Improvement needed: explicitly define queues for sql repositories // FIX: Pending transaction updates fail if done from another thread. Improvement needed: explicitly define queues for sql repositories see: https://github.com/zcash/ZcashLightClientKit/issues/450
if let blockHeight = notification.userInfo?[CompactBlockProcessorNotificationKey.latestScannedBlockHeight] as? BlockHeight {
DispatchQueue.main.async { [weak self] in self.latestScannedHeight = blockHeight
guard let self = self else { return } }
if let blockHeight = notification.userInfo?[CompactBlockProcessorNotificationKey.latestScannedBlockHeight] as? BlockHeight { self.refreshPendingTransactions()
self.latestScannedHeight = blockHeight self.status = .synced
}
self.refreshPendingTransactions()
self.status = .synced
}
} }
@objc func processorTransitionUnknown(_ notification: Notification) { @objc func processorTransitionUnknown(_ notification: Notification) {
@ -624,15 +627,15 @@ public class SDKSynchronizer: Synchronizer {
} }
public func latestDownloadedHeight() throws -> BlockHeight { public func latestDownloadedHeight() throws -> BlockHeight {
try initializer.downloader.lastDownloadedBlockHeight() try blockProcessor.downloader.lastDownloadedBlockHeight()
} }
public func latestHeight(result: @escaping (Result<BlockHeight, Error>) -> Void) { public func latestHeight(result: @escaping (Result<BlockHeight, Error>) -> Void) {
initializer.downloader.latestBlockHeight(result: result) blockProcessor.downloader.latestBlockHeight(result: result)
} }
public func latestHeight() throws -> BlockHeight { public func latestHeight() throws -> BlockHeight {
try initializer.downloader.latestBlockHeight() try blockProcessor.downloader.latestBlockHeight()
} }
public func latestUTXOs(address: String, result: @escaping (Result<[UnspentTransactionOutputEntity], Error>) -> Void) { public func latestUTXOs(address: String, result: @escaping (Result<[UnspentTransactionOutputEntity], Error>) -> Void) {
@ -770,7 +773,16 @@ public class SDKSynchronizer: Synchronizer {
name: Notification.Name.synchronizerSynced, name: Notification.Name.synchronizerSynced,
object: self, object: self,
userInfo: [ userInfo: [
SDKSynchronizer.NotificationKeys.blockHeight: self.latestScannedHeight SDKSynchronizer.NotificationKeys.blockHeight: self.latestScannedHeight,
SDKSynchronizer.NotificationKeys.synchronizerState: SynchronizerState(
shieldedBalance: WalletBalance(
verified: initializer.getVerifiedBalance(),
total: initializer.getBalance()
),
transparentBalance: (try? self.getTransparentBalance(accountIndex: 0)) ?? WalletBalance.zero,
syncStatus: status,
latestScannedHeight: self.latestScannedHeight
)
] ]
) )
case .unprepared: case .unprepared:

View File

@ -35,7 +35,7 @@ class PersistentTransactionManager: OutboundTransactionManager {
self.encoder = encoder self.encoder = encoder
self.service = service self.service = service
self.network = networkType self.network = networkType
self.queue = DispatchQueue.init(label: "PersistentTransactionManager.serial.queue", qos: .userInitiated) self.queue = DispatchQueue.init(label: "PersistentTransactionManager.serial.queue", qos: .default)
} }
func initSpend( func initSpend(

View File

@ -111,7 +111,7 @@ class MockLightWalletService: LightWalletService {
} }
func submit(spendTransaction: Data, result: @escaping (Result<LightWalletServiceResponse, LightWalletServiceError>) -> Void) { func submit(spendTransaction: Data, result: @escaping (Result<LightWalletServiceResponse, LightWalletServiceError>) -> Void) {
DispatchQueue.global(qos: .userInitiated).asyncAfter(deadline: .now() + 1) { DispatchQueue.global(qos: .default).asyncAfter(deadline: .now() + 1) {
result(.success(LightWalletServiceMockResponse(errorCode: 0, errorMessage: "", unknownFields: UnknownStorage()))) result(.success(LightWalletServiceMockResponse(errorCode: 0, errorMessage: "", unknownFields: UnknownStorage())))
} }
} }