Improve the handling of disconnected state.

This commit is contained in:
Kevin Gorham 2020-03-27 16:28:42 -04:00
parent d36e4f276b
commit cdabfc8f4c
No known key found for this signature in database
GPG Key ID: CCA55602DF49FC38
5 changed files with 82 additions and 47 deletions

View File

@ -17,7 +17,9 @@ import cash.z.wallet.sdk.ext.ZcashSdk.SAPLING_ACTIVATION_HEIGHT
import cash.z.wallet.sdk.ext.ZcashSdk.SCAN_BATCH_SIZE
import cash.z.wallet.sdk.jni.RustBackend
import cash.z.wallet.sdk.jni.RustBackendWelding
import cash.z.wallet.sdk.service.LightWalletGrpcService
import cash.z.wallet.sdk.transaction.TransactionRepository
import io.grpc.StatusRuntimeException
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Dispatchers.IO
import kotlinx.coroutines.channels.ConflatedBroadcastChannel
@ -66,6 +68,7 @@ class CompactBlockProcessor(
private val consecutiveChainErrors = AtomicInteger(0)
private val lowerBoundHeight: Int = max(SAPLING_ACTIVATION_HEIGHT, minimumHeight - MAX_REORG_SIZE)
private val reconnectError = -20
private val _state: ConflatedBroadcastChannel<State> = ConflatedBroadcastChannel(Initialized)
private val _progress = ConflatedBroadcastChannel(0)
@ -108,10 +111,13 @@ class CompactBlockProcessor(
retryWithBackoff(::onProcessorError, maxDelayMillis = MAX_BACKOFF_INTERVAL) {
val result = processNewBlocks()
// immediately process again after failures in order to download new blocks right away
if (result < 0) {
consecutiveChainErrors.set(0)
twig("Successfully processed new blocks. Sleeping for ${POLL_INTERVAL}ms")
delay(POLL_INTERVAL)
if (result == reconnectError) {
twig("Unable to process new blocks because we are disconnected! Attempting to reconnect in ${POLL_INTERVAL/4}ms")
delay(POLL_INTERVAL/4)
} else if (result < 0) {
consecutiveChainErrors.set(0)
twig("Successfully processed new blocks. Sleeping for ${POLL_INTERVAL}ms")
delay(POLL_INTERVAL)
} else {
if(consecutiveChainErrors.get() >= RETRIES) {
val errorMessage = "ERROR: unable to resolve reorg at height $result after ${consecutiveChainErrors.get()} correction attempts!"
@ -156,8 +162,12 @@ class CompactBlockProcessor(
verifySetup()
twig("beginning to process new blocks (with lower bound: $lowerBoundHeight)...")
updateRanges()
if (currentInfo.lastDownloadRange.isEmpty() && currentInfo.lastScanRange.isEmpty()) {
if (!updateRanges()) {
twig("Disconnection detected! Attempting to reconnect!")
setState(Disconnected)
downloader.lightwalletService.reconnect()
reconnectError
} else if (currentInfo.lastDownloadRange.isEmpty() && currentInfo.lastScanRange.isEmpty()) {
twig("Nothing to process: no new blocks to download or scan, right now.")
setState(Scanned(currentInfo.lastScanRange))
-1
@ -172,26 +182,34 @@ class CompactBlockProcessor(
/**
* Gets the latest range info and then uses that initialInfo to update (and transmit)
* the scan/download ranges that require processing.
*
* @return true when the update succeeds.
*/
private suspend fun updateRanges() = withContext(IO) {
// TODO: rethink this and make it easier to understand what's happening. Can we reduce this
// so that we only work with actual changing info rather than periodic snapshots? Do we need
// to calculate these derived values every time?
ProcessorInfo(
networkBlockHeight = downloader.getLatestBlockHeight(),
lastScannedHeight = getLastScannedHeight(),
lastDownloadedHeight = max(getLastDownloadedHeight(), lowerBoundHeight - 1)
).let { initialInfo ->
updateProgress(
networkBlockHeight = initialInfo.networkBlockHeight,
lastScannedHeight = initialInfo.lastScannedHeight,
lastDownloadedHeight = initialInfo.lastDownloadedHeight,
lastScanRange = (initialInfo.lastScannedHeight + 1)..initialInfo.networkBlockHeight,
lastDownloadRange = (max(
initialInfo.lastDownloadedHeight,
initialInfo.lastScannedHeight
) + 1)..initialInfo.networkBlockHeight
)
private suspend fun updateRanges(): Boolean = withContext(IO) {
try {
// TODO: rethink this and make it easier to understand what's happening. Can we reduce this
// so that we only work with actual changing info rather than periodic snapshots? Do we need
// to calculate these derived values every time?
ProcessorInfo(
networkBlockHeight = downloader.getLatestBlockHeight(),
lastScannedHeight = getLastScannedHeight(),
lastDownloadedHeight = max(getLastDownloadedHeight(), lowerBoundHeight - 1)
).let { initialInfo ->
updateProgress(
networkBlockHeight = initialInfo.networkBlockHeight,
lastScannedHeight = initialInfo.lastScannedHeight,
lastDownloadedHeight = initialInfo.lastDownloadedHeight,
lastScanRange = (initialInfo.lastScannedHeight + 1)..initialInfo.networkBlockHeight,
lastDownloadRange = (max(
initialInfo.lastDownloadedHeight,
initialInfo.lastScannedHeight
) + 1)..initialInfo.networkBlockHeight
)
}
true
} catch (t: StatusRuntimeException) {
twig("Warning: failed to update ranges due to $t caused by ${t.cause}")
false
}
}

View File

@ -53,6 +53,7 @@ sealed class CompactBlockProcessorException(message: String, cause: Throwable? =
"likely means the server is down or slow to respond. See logs for details.", cause)
class FailedScan(cause: Throwable? = null) : CompactBlockProcessorException("Error while scanning blocks. This most " +
"likely means a block was missed or a reorg was mishandled. See logs for details.", cause)
class Disconnected(cause: Throwable? = null) : CompactBlockProcessorException("Disconnected Error. Unable to download blocks due to ${cause?.message}", cause)
object Uninitialized : CompactBlockProcessorException("Cannot process blocks because the wallet has not been" +
" initialized. Verify that the seed phrase was properly created or imported. If so, then this problem" +
" can be fixed by re-importing the wallet.")

View File

@ -86,19 +86,7 @@ suspend inline fun retryWithBackoff(noinline onErrorListener: ((Throwable) -> Bo
duration = maxDelayMillis - Random.nextLong(1000L) // include jitter but don't exceed max delay
sequence /= 2
}
//TODO: return here and check whether adding more cause info helps identify why the connection is timing out after the device goes to sleep
var cause = t.cause
var depth = 0
val causes = buildString {
while(cause != null && depth < 5) {
append(" [$depth] caused by $cause")
cause = cause?.cause
depth++
}
if (depth == 0) append(" (no additional causes provided)")
if (cause != null) append(" (additional causes omitted)")
}
twig("Failed due to $t$causes backing off and retrying in ${duration}ms...")
twig("Failed due to $t caused by ${t.cause} backing off and retrying in ${duration}ms...")
delay(duration)
}
}

View File

@ -24,11 +24,14 @@ import java.util.concurrent.TimeUnit
* created for streaming requests, it will use a deadline that is after the given duration from now.
*/
class LightWalletGrpcService private constructor(
private val channel: ManagedChannel,
private var channel: ManagedChannel,
private val singleRequestTimeoutSec: Long = 10L,
private val streamingRequestTimeoutSec: Long = 90L
) : LightWalletService {
//TODO: find a better way to do this, maybe change the constructor to keep the properties
lateinit var connectionInfo: ConnectionInfo
/**
* Construct an instance that corresponds to the given host and port.
*
@ -44,7 +47,9 @@ class LightWalletGrpcService private constructor(
host: String,
port: Int = DEFAULT_LIGHTWALLETD_PORT,
usePlaintext: Boolean = appContext.resources.getBoolean(R.bool.lightwalletd_allow_very_insecure_connections)
) : this(createDefaultChannel(appContext, host, port, usePlaintext))
) : this(createDefaultChannel(appContext, host, port, usePlaintext)) {
connectionInfo = ConnectionInfo(appContext.applicationContext, host, port, usePlaintext)
}
/* LightWalletService implementation */
@ -65,7 +70,7 @@ class LightWalletGrpcService private constructor(
}
override fun shutdown() {
channel.shutdownNow()
channel.shutdown()
}
override fun fetchTransaction(txId: ByteArray): Service.RawTransaction? {
@ -73,6 +78,16 @@ class LightWalletGrpcService private constructor(
return channel.createStub().getTransaction(Service.TxFilter.newBuilder().setHash(ByteString.copyFrom(txId)).build())
}
override fun reconnect() {
channel.shutdown()
channel = createDefaultChannel(
connectionInfo.appContext,
connectionInfo.host,
connectionInfo.port,
connectionInfo.usePlaintext
)
}
//
// Utilities
@ -98,6 +113,13 @@ class LightWalletGrpcService private constructor(
}
}
inner class ConnectionInfo(
val appContext: Context,
val host: String,
val port: Int,
val usePlaintext: Boolean
)
companion object {
/**
* Convenience function for creating the default channel to be used for all connections. It
@ -110,7 +132,7 @@ class LightWalletGrpcService private constructor(
port: Int,
usePlaintext: Boolean
): ManagedChannel {
twig("Creating connection to $host:$port")
twig("Creating channel that will connect to $host:$port")
return AndroidChannelBuilder
.forAddress(host, port)
.context(appContext)

View File

@ -33,15 +33,21 @@ interface LightWalletService {
*/
fun submitTransaction(spendTransaction: ByteArray): Service.SendResponse
/**
* Cleanup any connections when the service is shutting down and not going to be used again.
*/
fun shutdown()
/**
* Fetch the details of a known transaction.
*
* @return the full transaction info.
*/
fun fetchTransaction(txId: ByteArray): Service.RawTransaction?
/**
* Cleanup any connections when the service is shutting down and not going to be used again.
*/
fun shutdown()
/**
* Reconnect to the same or a different server. This is useful when the connection is
* unrecoverable. That might be time to switch to a mirror or just reconnect.
*/
fun reconnect()
}