FIX: processor stalls on reconnection

This commit is contained in:
Francisco Gindre 2021-06-15 18:53:21 -03:00
parent b58c20b3b0
commit 7b76f721cf
5 changed files with 41 additions and 8 deletions

View File

@ -956,7 +956,7 @@ public class CompactBlockProcessor {
func severeFailure(_ error: Error) {
queue.cancelAllOperations()
LoggerProxy.error("severe failure: \(error)")
LoggerProxy.error("show stoppper failure: \(error)")
self.backoffTimer?.invalidate()
self.retryAttempts = config.retries
self.processingError = error

View File

@ -27,6 +27,8 @@ public struct LightWalletEndpoint {
public var host: String
public var port: Int
public var secure: Bool
public var singleCallTimeoutInMillis: Int64
public var streamingCallTimeoutInMillis: Int64
/**
initializes a LightWalletEndpoint
@ -34,11 +36,19 @@ public struct LightWalletEndpoint {
- address: a String containing the host address
- port: string with the port of the host address
- secure: true if connecting through TLS. Default value is true
- singleCallTimeoutInMillis: timeout for single calls in Milliseconds
- streamingCallTimeoutInMillis: timeout for streaming calls in Milliseconds
*/
public init(address: String, port: Int, secure: Bool = true) {
public init(address: String,
port: Int,
secure: Bool = true,
singleCallTimeoutInMillis: Int64 = 10000,
streamingCallTimeoutInMillis: Int64 = 100000) {
self.host = address
self.port = port
self.secure = secure
self.singleCallTimeoutInMillis = singleCallTimeoutInMillis
self.streamingCallTimeoutInMillis = streamingCallTimeoutInMillis
}
}

View File

@ -104,7 +104,11 @@ public class LightWalletGRPCService {
let streamingCallTimeout: TimeLimit
public convenience init(endpoint: LightWalletEndpoint) {
self.init(host: endpoint.host, port: endpoint.port, secure: endpoint.secure)
self.init(host: endpoint.host,
port: endpoint.port,
secure: endpoint.secure,
singleCallTimeout: endpoint.singleCallTimeoutInMillis,
streamingCallTimeout: endpoint.streamingCallTimeoutInMillis)
}
deinit {
@ -191,7 +195,7 @@ extension LightWalletGRPCService: LightWalletService {
}
public func getInfo(result: @escaping (Result<LightWalletdInfo, LightWalletServiceError>) -> Void) {
compactTxStreamer.getLightdInfo(Empty()).response.whenComplete { r in
compactTxStreamer.getLightdInfo(Empty(), callOptions: Self.callOptions(timeLimit: .timeout(.seconds(3)))).response.whenComplete { r in
switch r {
case .success(let info):
result(.success(info))

View File

@ -87,7 +87,12 @@ public protocol Synchronizer {
var status: SyncStatus { get }
/**
prepares this initializer to operate. Initializes the internal state with the given Extended Viewing Keys and a wallet birthday found in the initializer object
reflects current connection state to LightwalletEndpoint
*/
var connectionState: ConnectionState { get }
/**
prepares this initializer to operate. Initializes the internal state with the given Extended Viewing Keys and a wallet birthday found in the initializer object
*/
func prepare() throws
/**
@ -288,6 +293,15 @@ public enum SyncStatus: Equatable {
return false
}
}
public var isSynced: Bool {
switch self {
case .synced:
return true
default:
return false
}
}
}
/**

View File

@ -125,6 +125,7 @@ public class SDKSynchronizer: Synchronizer {
public private(set) var blockProcessor: CompactBlockProcessor
public private(set) var initializer: Initializer
public private(set) var connectionState: ConnectionState
private var transactionManager: OutboundTransactionManager
private var transactionRepository: TransactionRepository
private var utxoRepository: UnspentTransactionOutputRepository
@ -150,6 +151,7 @@ public class SDKSynchronizer: Synchronizer {
transactionRepository: TransactionRepository,
utxoRepository: UnspentTransactionOutputRepository,
blockProcessor: CompactBlockProcessor) throws {
self.connectionState = .idle
self.status = status
self.initializer = initializer
self.transactionManager = transactionManager
@ -276,7 +278,7 @@ public class SDKSynchronizer: Synchronizer {
center.addObserver(self,
selector: #selector(connectivityStateChanged(_:)),
name: Notification.Name.blockProcessorConnectivityStateChanged,
object: processor)
object: nil)
}
// MARK: Block Processor notifications
@ -287,15 +289,18 @@ public class SDKSynchronizer: Synchronizer {
LoggerProxy.error("found \(Notification.Name.blockProcessorConnectivityStateChanged) but lacks dictionary information. this is probably a programming error")
return
}
let currentState = ConnectionState(current)
NotificationCenter.default.post(
name: .synchronizerConnectionStateChanged,
object: self,
userInfo: [
NotificationKeys.previousConnectionState : ConnectionState(previous),
NotificationKeys.currentConnectionState : ConnectionState(current)
NotificationKeys.currentConnectionState : currentState
])
DispatchQueue.main.async { [weak self] in
self?.connectionState = currentState
}
}
@objc func transactionsFound(_ notification: Notification) {