More robust error handling.

This commit is contained in:
Kevin Gorham 2019-06-17 05:01:29 -04:00 committed by Kevin Gorham
parent 7daeb20755
commit f222bde2fa
8 changed files with 90 additions and 23 deletions

View File

@ -6,10 +6,10 @@ buildscript {
]
ext.versions = [
'architectureComponents': [
'lifecycle': '2.1.0-alpha03',
'room': '2.1.0-alpha06'
'lifecycle': '2.2.0-alpha01',
'room': '2.1.0'
],
'grpc':'1.19.0',
'grpc':'1.21.0',
'kotlin': '1.3.21',
'coroutines': '1.3.0-M1',
'junitJupiter': '5.5.0-M1'
@ -46,9 +46,10 @@ apply plugin: 'com.github.ben-manes.versions'
apply plugin: 'com.github.dcendents.android-maven'
apply plugin: 'com.getkeepsafe.dexcount'
apply plugin: 'org.mozilla.rust-android-gradle.rust-android'
apply plugin: 'org.owasp.dependencycheck'
group = 'cash.z.android.wallet'
version = '1.7.4'
version = '1.8.0'
repositories {
google()
@ -63,7 +64,7 @@ android {
defaultConfig {
minSdkVersion buildConfig.minSdkVersion
targetSdkVersion buildConfig.targetSdkVersion
versionCode = 1_07_05_00 // last digits are alpha(0X) beta(1X) rc(2X) release(3X). Ex: 1_08_04_20 is a RC build
versionCode = 1_08_00_00 // last digits are alpha(0X) beta(1X) rc(2X) release(3X). Ex: 1_08_04_20 is a RC build
versionName = "$version-alpha"
testInstrumentationRunner = "androidx.test.runner.AndroidJUnitRunner"
testInstrumentationRunnerArguments clearPackageData: 'true'
@ -182,12 +183,12 @@ cargo {
}
dependencies {
implementation 'androidx.appcompat:appcompat:1.1.0-alpha03'
implementation 'androidx.appcompat:appcompat:1.1.0-beta01'
// Architecture Components: Lifecycle
implementation "androidx.lifecycle:lifecycle-runtime:${versions.architectureComponents.lifecycle}"
implementation "androidx.lifecycle:lifecycle-extensions:${versions.architectureComponents.lifecycle}"
kapt "androidx.lifecycle:lifecycle-compiler:${versions.architectureComponents.lifecycle}"
implementation "androidx.lifecycle:lifecycle-common-java8:${versions.architectureComponents.lifecycle}"
// Architecture Components: Room
implementation "androidx.room:room-runtime:${versions.architectureComponents.room}"
@ -201,6 +202,7 @@ dependencies {
// grpc-java
implementation "io.grpc:grpc-okhttp:${versions.grpc}"
implementation "io.grpc:grpc-android:${versions.grpc}"
implementation "io.grpc:grpc-protobuf-lite:${versions.grpc}"
implementation "io.grpc:grpc-stub:${versions.grpc}"
implementation 'javax.annotation:javax.annotation-api:1.3.2'
@ -214,7 +216,7 @@ dependencies {
// Tests
testImplementation 'androidx.multidex:multidex:2.0.1'
testImplementation "org.jetbrains.kotlin:kotlin-reflect:${versions.kotlin}"
testImplementation 'org.mockito:mockito-junit-jupiter:2.25.1'
testImplementation 'org.mockito:mockito-junit-jupiter:2.26.0'
testImplementation 'com.nhaarman.mockitokotlin2:mockito-kotlin:2.1.0'
testImplementation "org.junit.jupiter:junit-jupiter-api:${versions.junitJupiter}"
testImplementation "org.junit.jupiter:junit-jupiter-engine:${versions.junitJupiter}"
@ -227,11 +229,12 @@ dependencies {
// because "JUnit 5 uses Java 8-specific APIs that didn't exist on Android before the Oreo release."
androidTestImplementation 'com.nhaarman.mockitokotlin2:mockito-kotlin:2.1.0'
androidTestImplementation 'org.mockito:mockito-android:2.25.1'
androidTestImplementation "androidx.test:runner:1.1.2-alpha02"
androidTestImplementation "androidx.test:core:1.1.1-alpha02"
androidTestImplementation "androidx.test:runner:1.2.0"
androidTestImplementation "androidx.test:core:1.2.0"
androidTestImplementation "androidx.arch.core:core-testing:2.0.1"
androidTestImplementation 'androidx.test.ext:junit:1.1.1-alpha02'
androidTestImplementation 'androidx.test:runner:1.1.1'
androidTestImplementation 'androidx.test.ext:junit:1.1.1'
androidTestImplementation 'androidx.test:runner:1.2.0'
}
preBuild.dependsOn includeDirBugFix
check.dependsOn dependencyCheckAggregate

View File

@ -54,7 +54,7 @@ class IntegrationTest {
blockPollFrequencyMillis = 10_000L
)
val lightwalletService = LightWalletGrpcService("192.168.1.134")
val lightwalletService = LightWalletGrpcService(context,"192.168.1.134")
val compactBlockStore = CompactBlockDbStore(context, config.cacheDbPath)
downloader = CompactBlockDownloader(lightwalletService, compactBlockStore)

View File

@ -1,4 +1,5 @@
<manifest xmlns:android="http://schemas.android.com/apk/res/android"
package="cash.z.wallet.sdk">
<uses-permission android:name="android.permission.INTERNET" />
<uses-permission android:name="android.permission.ACCESS_NETWORK_STATE" />
</manifest>

View File

@ -30,9 +30,13 @@ class CompactBlockProcessor(
private val repository: TransactionRepository,
private val rustBackend: RustBackendWelding = RustBackend()
) {
var onErrorListener: ((Throwable) -> Boolean)? = null
var isConnected: Boolean = false
var isSyncing: Boolean = false
var isScanning: Boolean = false
private val progressChannel = ConflatedBroadcastChannel<Int>()
private var isStopped = false
private val consecutiveErrors = AtomicInteger(0)
private val consecutiveChainErrors = AtomicInteger(0)
fun progress(): ReceiveChannel<Int> = progressChannel.openSubscription()
@ -44,22 +48,25 @@ class CompactBlockProcessor(
validateConfig()
// using do/while makes it easier to execute exactly one loop which helps with testing this processor quickly
// (because you can start and then immediately set isStopped=true to always get precisely one loop)
do {
retryUpTo(config.retries) {
retryWithBackoff(::onConnectionError, maxDelayMillis = config.maxBackoffInterval) {
val result = processNewBlocks()
// immediately process again after failures in order to download new blocks right away
if (result < 0) {
consecutiveErrors.set(0)
isSyncing = false
isScanning = false
consecutiveChainErrors.set(0)
twig("Successfully processed new blocks. Sleeping for ${config.blockPollFrequencyMillis}ms")
delay(config.blockPollFrequencyMillis)
} else {
if(consecutiveErrors.get() >= config.retries) {
val errorMessage = "ERROR: unable to resolve reorg at height $result after ${consecutiveErrors.get()} correction attempts!"
if(consecutiveChainErrors.get() >= config.retries) {
val errorMessage = "ERROR: unable to resolve reorg at height $result after ${consecutiveChainErrors.get()} correction attempts!"
fail(CompactBlockProcessorException.FailedReorgRepair(errorMessage))
} else {
handleChainError(result)
}
consecutiveErrors.getAndIncrement()
consecutiveChainErrors.getAndIncrement()
}
}
} while (isActive && !isStopped)
@ -98,6 +105,8 @@ class CompactBlockProcessor(
// define ranges
val latestBlockHeight = downloader.getLatestBlockHeight()
isConnected = true // no exception on downloader call
isSyncing = true
val lastDownloadedHeight = Math.max(getLastDownloadedHeight(), SAPLING_ACTIVATION_HEIGHT - 1)
val lastScannedHeight = getLastScannedHeight()
@ -169,7 +178,9 @@ class CompactBlockProcessor(
}
Twig.sprout("scanning")
twig("scanning blocks in range $range")
isScanning = true
val result = rustBackend.scanBlocks(config.cacheDbPath, config.dataDbPath)
isScanning = false
Twig.clip("scanning")
return result
}
@ -181,8 +192,15 @@ class CompactBlockProcessor(
downloader.rewindTo(lowerBound)
}
private fun onConnectionError(throwable: Throwable): Boolean {
isConnected = false
isSyncing = false
isScanning = false
return onErrorListener?.invoke(throwable) ?: true
}
private fun determineLowerBound(errorHeight: Int): Int {
val offset = Math.min(MAX_REORG_SIZE, config.rewindDistance * (consecutiveErrors.get() + 1))
val offset = Math.min(MAX_REORG_SIZE, config.rewindDistance * (consecutiveChainErrors.get() + 1))
return Math.max(errorHeight - offset, SAPLING_ACTIVATION_HEIGHT)
}
@ -205,5 +223,6 @@ data class ProcessorConfig(
val downloadBatchSize: Int = DEFAULT_BATCH_SIZE,
val blockPollFrequencyMillis: Long = DEFAULT_POLL_INTERVAL,
val retries: Int = DEFAULT_RETRIES,
val maxBackoffInterval: Long = DEFAULT_MAX_BACKOFF_INTERVAL,
val rewindDistance: Int = DEFAULT_REWIND_DISTANCE
)

View File

@ -69,6 +69,7 @@ open class PollingTransactionRepository(
override fun stop() {
twig("stopping but doing nothing")
pollingJob.cancel()
derivedDataDb.close()
// TODO: verify that the channels behave as expected in this scenario
}

View File

@ -5,6 +5,7 @@ import cash.z.wallet.sdk.data.twig
import cash.z.wallet.sdk.rpc.Service
import kotlinx.coroutines.delay
import java.io.File
import kotlin.random.Random
inline fun Int.toBlockHeight(): Service.BlockID = Service.BlockID.newBuilder().setHeight(this.toLong()).build()
@ -24,6 +25,31 @@ suspend inline fun retryUpTo(retries: Int, initialDelay: Int = 10, block: () ->
}
}
suspend inline fun retryWithBackoff(noinline onErrorListener: ((Throwable) -> Boolean)? = null, initialDelayMillis: Long = 1000L, maxDelayMillis: Long = DEFAULT_MAX_BACKOFF_INTERVAL, block: () -> Unit) {
var sequence = 0 // count up to the max and then reset to half. So that we don't repeat the max but we also don't repeat too much.
while (true) {
try {
block()
return
} catch (t: Throwable) {
// offer to listener first
if (onErrorListener?.invoke(t) == false) {
throw t
}
sequence++
// I^(1/4)n + jitter
var duration = Math.pow(initialDelayMillis.toDouble(), (sequence.toDouble()/4.0)).toLong() + Random.nextLong(1000L)
if (duration > maxDelayMillis) {
duration = maxDelayMillis - Random.nextLong(1000L) // include jitter but don't exceed max delay
sequence /= 2
}
twig("Failed due to $t retrying in ${duration}ms...")
delay(duration)
}
}
}
internal fun dbExists(appContext: Context, dbFileName: String): Boolean {
return File(appContext.getDatabasePath(dbFileName).absolutePath).exists()
}

View File

@ -46,6 +46,12 @@ const val DEFAULT_POLL_INTERVAL = 75_000L
*/
const val DEFAULT_RETRIES = 5
/**
* The default maximum amount of time to wait during retry backoff intervals. Failed loops will never wait longer than
* this before retyring.
*/
const val DEFAULT_MAX_BACKOFF_INTERVAL = 600_000L
/**
* Default number of blocks to rewind when a chain reorg is detected. This should be large enough to recover from the
* reorg but smaller than the theoretical max reorg size of 100.

View File

@ -1,5 +1,6 @@
package cash.z.wallet.sdk.service
import android.content.Context
import cash.z.wallet.sdk.entity.CompactBlock
import cash.z.wallet.sdk.ext.toBlockHeight
import cash.z.wallet.sdk.rpc.CompactFormats
@ -7,24 +8,34 @@ import cash.z.wallet.sdk.rpc.CompactTxStreamerGrpc
import cash.z.wallet.sdk.rpc.Service
import com.google.protobuf.ByteString
import io.grpc.Channel
import io.grpc.ManagedChannelBuilder
import io.grpc.ManagedChannel
import io.grpc.android.AndroidChannelBuilder
import java.util.concurrent.TimeUnit
class LightWalletGrpcService(private val channel: Channel) : LightWalletService {
class LightWalletGrpcService(private val channel: ManagedChannel) : LightWalletService {
constructor(host: String, port: Int = 9067) : this(ManagedChannelBuilder.forAddress(host, port).usePlaintext().build())
constructor(appContext: Context, host: String, port: Int = 9067) : this(
AndroidChannelBuilder
.forAddress(host, port)
.context(appContext)
.usePlaintext()
.build()
)
/* LightWalletService implementation */
override fun getBlockRange(heightRange: IntRange): List<CompactBlock> {
channel.resetConnectBackoff()
return channel.createStub(90L).getBlockRange(heightRange.toBlockRange()).toList()
}
override fun getLatestBlockHeight(): Int {
channel.resetConnectBackoff()
return channel.createStub(10L).getLatestBlock(Service.ChainSpec.newBuilder().build()).height.toInt()
}
override fun submitTransaction(raw: ByteArray): Service.SendResponse {
channel.resetConnectBackoff()
val request = Service.RawTransaction.newBuilder().setData(ByteString.copyFrom(raw)).build()
return channel.createStub().sendTransaction(request)
}