2019-06-14 16:24:52 -07:00
|
|
|
package cash.z.wallet.sdk.block
|
|
|
|
|
|
|
|
import androidx.annotation.VisibleForTesting
|
|
|
|
import cash.z.wallet.sdk.annotation.OpenForTesting
|
2019-10-21 03:26:02 -07:00
|
|
|
import cash.z.wallet.sdk.block.CompactBlockProcessor.State.*
|
2019-06-14 16:24:52 -07:00
|
|
|
import cash.z.wallet.sdk.exception.CompactBlockProcessorException
|
2019-11-01 13:25:28 -07:00
|
|
|
import cash.z.wallet.sdk.exception.RustLayerException
|
|
|
|
import cash.z.wallet.sdk.ext.*
|
2019-09-26 09:58:37 -07:00
|
|
|
import cash.z.wallet.sdk.ext.ZcashSdk.DOWNLOAD_BATCH_SIZE
|
|
|
|
import cash.z.wallet.sdk.ext.ZcashSdk.MAX_BACKOFF_INTERVAL
|
|
|
|
import cash.z.wallet.sdk.ext.ZcashSdk.MAX_REORG_SIZE
|
|
|
|
import cash.z.wallet.sdk.ext.ZcashSdk.POLL_INTERVAL
|
|
|
|
import cash.z.wallet.sdk.ext.ZcashSdk.RETRIES
|
|
|
|
import cash.z.wallet.sdk.ext.ZcashSdk.REWIND_DISTANCE
|
|
|
|
import cash.z.wallet.sdk.ext.ZcashSdk.SAPLING_ACTIVATION_HEIGHT
|
2019-11-01 13:25:28 -07:00
|
|
|
import cash.z.wallet.sdk.jni.RustBackend
|
2019-06-14 16:24:52 -07:00
|
|
|
import cash.z.wallet.sdk.jni.RustBackendWelding
|
2019-11-01 13:25:28 -07:00
|
|
|
import cash.z.wallet.sdk.transaction.TransactionRepository
|
2019-06-14 16:24:52 -07:00
|
|
|
import kotlinx.coroutines.Dispatchers.IO
|
|
|
|
import kotlinx.coroutines.channels.ConflatedBroadcastChannel
|
|
|
|
import kotlinx.coroutines.delay
|
2019-10-21 03:26:02 -07:00
|
|
|
import kotlinx.coroutines.flow.asFlow
|
2019-06-14 16:24:52 -07:00
|
|
|
import kotlinx.coroutines.isActive
|
|
|
|
import kotlinx.coroutines.withContext
|
|
|
|
import java.util.concurrent.atomic.AtomicInteger
|
2019-09-26 09:58:37 -07:00
|
|
|
import kotlin.math.max
|
|
|
|
import kotlin.math.min
|
|
|
|
import kotlin.math.roundToInt
|
2019-06-14 16:24:52 -07:00
|
|
|
|
|
|
|
/**
|
|
|
|
* Responsible for processing the compact blocks that are received from the lightwallet server. This class encapsulates
|
|
|
|
* all the business logic required to validate and scan the blockchain and is therefore tightly coupled with
|
|
|
|
* librustzcash.
|
|
|
|
*/
|
|
|
|
@OpenForTesting
|
|
|
|
class CompactBlockProcessor(
|
|
|
|
internal val downloader: CompactBlockDownloader,
|
|
|
|
private val repository: TransactionRepository,
|
2019-10-21 03:26:02 -07:00
|
|
|
private val rustBackend: RustBackendWelding,
|
|
|
|
minimumHeight: Int = SAPLING_ACTIVATION_HEIGHT
|
2019-06-14 16:24:52 -07:00
|
|
|
) {
|
2019-06-17 02:01:29 -07:00
|
|
|
var onErrorListener: ((Throwable) -> Boolean)? = null
|
2019-10-21 03:26:02 -07:00
|
|
|
|
2019-06-17 02:01:29 -07:00
|
|
|
private val consecutiveChainErrors = AtomicInteger(0)
|
2019-10-21 03:26:02 -07:00
|
|
|
private val lowerBoundHeight: Int = max(SAPLING_ACTIVATION_HEIGHT, minimumHeight - MAX_REORG_SIZE)
|
|
|
|
|
|
|
|
private val _state: ConflatedBroadcastChannel<State> = ConflatedBroadcastChannel(Initialized)
|
2019-11-01 13:25:28 -07:00
|
|
|
private val _progress = ConflatedBroadcastChannel(0)
|
2019-06-14 16:24:52 -07:00
|
|
|
|
2019-11-01 13:25:28 -07:00
|
|
|
val state = _state.asFlow()
|
|
|
|
val progress = _progress.asFlow()
|
2019-06-14 16:24:52 -07:00
|
|
|
|
|
|
|
/**
|
|
|
|
* Download compact blocks, verify and scan them.
|
|
|
|
*/
|
|
|
|
suspend fun start() = withContext(IO) {
|
|
|
|
twig("processor starting")
|
|
|
|
|
|
|
|
// using do/while makes it easier to execute exactly one loop which helps with testing this processor quickly
|
2019-06-17 02:01:29 -07:00
|
|
|
// (because you can start and then immediately set isStopped=true to always get precisely one loop)
|
2019-06-14 16:24:52 -07:00
|
|
|
do {
|
2019-09-26 09:58:37 -07:00
|
|
|
retryWithBackoff(::onConnectionError, maxDelayMillis = MAX_BACKOFF_INTERVAL) {
|
2019-06-14 16:24:52 -07:00
|
|
|
val result = processNewBlocks()
|
|
|
|
// immediately process again after failures in order to download new blocks right away
|
|
|
|
if (result < 0) {
|
2019-06-17 02:01:29 -07:00
|
|
|
consecutiveChainErrors.set(0)
|
2019-09-26 09:58:37 -07:00
|
|
|
twig("Successfully processed new blocks. Sleeping for ${POLL_INTERVAL}ms")
|
|
|
|
delay(POLL_INTERVAL)
|
2019-06-14 16:24:52 -07:00
|
|
|
} else {
|
2019-09-26 09:58:37 -07:00
|
|
|
if(consecutiveChainErrors.get() >= RETRIES) {
|
2019-06-17 02:01:29 -07:00
|
|
|
val errorMessage = "ERROR: unable to resolve reorg at height $result after ${consecutiveChainErrors.get()} correction attempts!"
|
2019-06-14 16:24:52 -07:00
|
|
|
fail(CompactBlockProcessorException.FailedReorgRepair(errorMessage))
|
|
|
|
} else {
|
|
|
|
handleChainError(result)
|
|
|
|
}
|
2019-06-17 02:01:29 -07:00
|
|
|
consecutiveChainErrors.getAndIncrement()
|
2019-06-14 16:24:52 -07:00
|
|
|
}
|
|
|
|
}
|
2019-10-21 03:26:02 -07:00
|
|
|
} while (isActive && _state.value !is Stopped)
|
2019-06-14 16:24:52 -07:00
|
|
|
twig("processor complete")
|
|
|
|
stop()
|
|
|
|
}
|
|
|
|
|
2019-10-21 03:26:02 -07:00
|
|
|
suspend fun stop() {
|
|
|
|
setState(Stopped)
|
2019-06-14 16:24:52 -07:00
|
|
|
}
|
|
|
|
|
2019-10-21 03:26:02 -07:00
|
|
|
private suspend fun fail(error: Throwable) {
|
2019-06-14 16:24:52 -07:00
|
|
|
stop()
|
|
|
|
twig("${error.message}")
|
|
|
|
throw error
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Process new blocks returning false whenever an error was found.
|
|
|
|
*
|
|
|
|
* @return -1 when processing was successful and did not encounter errors during validation or scanning. Otherwise
|
|
|
|
* return the block height where an error was found.
|
|
|
|
*/
|
2019-06-19 14:52:15 -07:00
|
|
|
private suspend fun processNewBlocks(): Int = withContext(IO) {
|
2019-06-14 16:24:52 -07:00
|
|
|
twig("beginning to process new blocks...")
|
|
|
|
|
|
|
|
// define ranges
|
|
|
|
val latestBlockHeight = downloader.getLatestBlockHeight()
|
2019-10-21 03:26:02 -07:00
|
|
|
val lastDownloadedHeight = getLastDownloadedHeight()
|
2019-06-14 16:24:52 -07:00
|
|
|
val lastScannedHeight = getLastScannedHeight()
|
2019-10-21 03:26:02 -07:00
|
|
|
val boundedLastDownloadedHeight = max(lastDownloadedHeight, lowerBoundHeight - 1)
|
2019-06-14 16:24:52 -07:00
|
|
|
|
2019-10-21 03:26:02 -07:00
|
|
|
twig(
|
|
|
|
"latestBlockHeight: $latestBlockHeight\tlastDownloadedHeight: $lastDownloadedHeight" +
|
|
|
|
"\tlastScannedHeight: $lastScannedHeight\tlowerBoundHeight: $lowerBoundHeight"
|
|
|
|
)
|
2019-06-14 16:24:52 -07:00
|
|
|
|
|
|
|
// as long as the database has the sapling tree (like when it's initialized from a checkpoint) we can avoid
|
|
|
|
// downloading earlier blocks so take the larger of these two numbers
|
2019-10-21 03:26:02 -07:00
|
|
|
val rangeToDownload = (max(boundedLastDownloadedHeight, lastScannedHeight) + 1)..latestBlockHeight
|
2019-06-14 16:24:52 -07:00
|
|
|
val rangeToScan = (lastScannedHeight + 1)..latestBlockHeight
|
|
|
|
|
2019-10-21 03:26:02 -07:00
|
|
|
setState(Downloading)
|
2019-06-14 16:24:52 -07:00
|
|
|
downloadNewBlocks(rangeToDownload)
|
2019-10-21 03:26:02 -07:00
|
|
|
|
|
|
|
setState(Validating)
|
|
|
|
var error = validateNewBlocks(rangeToScan)
|
2019-06-19 14:52:15 -07:00
|
|
|
if (error < 0) {
|
2019-09-26 09:58:37 -07:00
|
|
|
// in theory, a scan should not fail after validation succeeds but maybe consider
|
|
|
|
// changing the rust layer to return the failed block height whenever scan does fail
|
|
|
|
// rather than a boolean
|
2019-10-21 03:26:02 -07:00
|
|
|
setState(Scanning)
|
2019-09-26 09:58:37 -07:00
|
|
|
val success = scanNewBlocks(rangeToScan)
|
|
|
|
if (!success) throw CompactBlockProcessorException.FailedScan
|
2019-11-22 23:18:20 -08:00
|
|
|
else {
|
|
|
|
setState(Scanned(rangeToScan))
|
|
|
|
}
|
2019-09-26 09:58:37 -07:00
|
|
|
-1
|
2019-06-14 16:24:52 -07:00
|
|
|
} else {
|
|
|
|
error
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
2019-11-22 23:18:20 -08:00
|
|
|
|
2019-06-14 16:24:52 -07:00
|
|
|
@VisibleForTesting //allow mocks to verify how this is called, rather than the downloader, which is more complex
|
2019-06-19 14:52:15 -07:00
|
|
|
internal suspend fun downloadNewBlocks(range: IntRange) = withContext<Unit>(IO) {
|
2019-06-14 16:24:52 -07:00
|
|
|
if (range.isEmpty()) {
|
|
|
|
twig("no blocks to download")
|
2019-06-19 14:52:15 -07:00
|
|
|
} else {
|
2019-10-21 03:26:02 -07:00
|
|
|
_state.send(Downloading)
|
2019-06-19 14:52:15 -07:00
|
|
|
Twig.sprout("downloading")
|
|
|
|
twig("downloading blocks in range $range")
|
|
|
|
|
2019-09-26 09:58:37 -07:00
|
|
|
var downloadedBlockHeight = range.first
|
2019-06-19 14:52:15 -07:00
|
|
|
val missingBlockCount = range.last - range.first + 1
|
2019-09-26 09:58:37 -07:00
|
|
|
val batches = (missingBlockCount / DOWNLOAD_BATCH_SIZE
|
|
|
|
+ (if (missingBlockCount.rem(DOWNLOAD_BATCH_SIZE) == 0) 0 else 1))
|
2019-06-19 14:52:15 -07:00
|
|
|
var progress: Int
|
2019-09-26 09:58:37 -07:00
|
|
|
twig("found $missingBlockCount missing blocks, downloading in $batches batches of ${DOWNLOAD_BATCH_SIZE}...")
|
2019-06-19 14:52:15 -07:00
|
|
|
for (i in 1..batches) {
|
2019-09-26 09:58:37 -07:00
|
|
|
retryUpTo(RETRIES) {
|
|
|
|
val end = min(range.first + (i * DOWNLOAD_BATCH_SIZE), range.last + 1)
|
2019-11-22 23:18:20 -08:00
|
|
|
twig("downloaded $downloadedBlockHeight..${(end - 1)} (batch $i of $batches)") {
|
2019-09-26 09:58:37 -07:00
|
|
|
downloader.downloadBlockRange(downloadedBlockHeight until end)
|
2019-06-19 14:52:15 -07:00
|
|
|
}
|
2019-09-26 09:58:37 -07:00
|
|
|
progress = (i / batches.toFloat() * 100).roundToInt()
|
2019-06-19 14:52:15 -07:00
|
|
|
// only report during large downloads. TODO: allow for configuration of "large"
|
2019-11-01 13:25:28 -07:00
|
|
|
_progress.send(progress)
|
2019-06-19 14:52:15 -07:00
|
|
|
downloadedBlockHeight = end
|
2019-06-14 16:24:52 -07:00
|
|
|
}
|
|
|
|
}
|
2019-06-19 14:52:15 -07:00
|
|
|
Twig.clip("downloading")
|
2019-06-14 16:24:52 -07:00
|
|
|
}
|
2019-11-01 13:25:28 -07:00
|
|
|
_progress.send(100)
|
2019-06-14 16:24:52 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
private fun validateNewBlocks(range: IntRange?): Int {
|
|
|
|
if (range?.isEmpty() != false) {
|
|
|
|
twig("no blocks to validate: $range")
|
|
|
|
return -1
|
|
|
|
}
|
|
|
|
Twig.sprout("validating")
|
2019-11-01 13:25:28 -07:00
|
|
|
twig("validating blocks in range $range in db: ${(rustBackend as RustBackend).dbCachePath}")
|
2019-09-26 09:58:37 -07:00
|
|
|
val result = rustBackend.validateCombinedChain()
|
2019-06-14 16:24:52 -07:00
|
|
|
Twig.clip("validating")
|
|
|
|
return result
|
|
|
|
}
|
|
|
|
|
|
|
|
private fun scanNewBlocks(range: IntRange?): Boolean {
|
|
|
|
if (range?.isEmpty() != false) {
|
|
|
|
twig("no blocks to scan")
|
|
|
|
return true
|
|
|
|
}
|
|
|
|
Twig.sprout("scanning")
|
|
|
|
twig("scanning blocks in range $range")
|
2019-09-26 09:58:37 -07:00
|
|
|
val result = rustBackend.scanBlocks()
|
2019-06-14 16:24:52 -07:00
|
|
|
Twig.clip("scanning")
|
|
|
|
return result
|
|
|
|
}
|
|
|
|
|
|
|
|
private suspend fun handleChainError(errorHeight: Int) = withContext(IO) {
|
|
|
|
val lowerBound = determineLowerBound(errorHeight)
|
|
|
|
twig("handling chain error at $errorHeight by rewinding to block $lowerBound")
|
2019-09-26 09:58:37 -07:00
|
|
|
rustBackend.rewindToHeight(lowerBound)
|
2019-11-01 13:25:28 -07:00
|
|
|
downloader.rewindToHeight(lowerBound)
|
2019-06-14 16:24:52 -07:00
|
|
|
}
|
|
|
|
|
2019-06-17 02:01:29 -07:00
|
|
|
private fun onConnectionError(throwable: Throwable): Boolean {
|
2019-10-21 03:26:02 -07:00
|
|
|
_state.offer(Disconnected)
|
2019-06-17 02:01:29 -07:00
|
|
|
return onErrorListener?.invoke(throwable) ?: true
|
|
|
|
}
|
|
|
|
|
2019-06-14 16:24:52 -07:00
|
|
|
private fun determineLowerBound(errorHeight: Int): Int {
|
2019-09-26 09:58:37 -07:00
|
|
|
val offset = Math.min(MAX_REORG_SIZE, REWIND_DISTANCE * (consecutiveChainErrors.get() + 1))
|
2019-10-21 03:26:02 -07:00
|
|
|
return Math.max(errorHeight - offset, lowerBoundHeight)
|
2019-06-14 16:24:52 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
suspend fun getLastDownloadedHeight() = withContext(IO) {
|
|
|
|
downloader.getLastDownloadedHeight()
|
|
|
|
}
|
|
|
|
|
|
|
|
suspend fun getLastScannedHeight() = withContext(IO) {
|
|
|
|
repository.lastScannedHeight()
|
|
|
|
}
|
2019-10-21 03:26:02 -07:00
|
|
|
|
2019-11-01 13:25:28 -07:00
|
|
|
suspend fun getAddress(accountId: Int) = withContext(IO) {
|
|
|
|
rustBackend.getAddress(accountId)
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Calculates the latest balance info. Defaults to the first account.
|
|
|
|
*
|
|
|
|
* @param accountIndex the account to check for balance info.
|
|
|
|
*/
|
|
|
|
suspend fun getBalanceInfo(accountIndex: Int = 0): WalletBalance = withContext(IO) {
|
|
|
|
twigTask("checking balance info") {
|
|
|
|
try {
|
|
|
|
val balanceTotal = rustBackend.getBalance(accountIndex)
|
|
|
|
twig("found total balance of: $balanceTotal")
|
|
|
|
val balanceAvailable = rustBackend.getVerifiedBalance(accountIndex)
|
|
|
|
twig("found available balance of: $balanceAvailable")
|
|
|
|
WalletBalance(balanceTotal, balanceAvailable)
|
|
|
|
} catch (t: Throwable) {
|
|
|
|
twig("failed to get balance due to $t")
|
|
|
|
throw RustLayerException.BalanceException(t)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-10-21 03:26:02 -07:00
|
|
|
suspend fun setState(newState: State) {
|
|
|
|
_state.send(newState)
|
|
|
|
}
|
|
|
|
|
|
|
|
sealed class State {
|
|
|
|
interface Connected
|
2019-11-01 13:25:28 -07:00
|
|
|
interface Syncing
|
|
|
|
object Downloading : Connected, Syncing, State()
|
|
|
|
object Validating : Connected, Syncing, State()
|
|
|
|
object Scanning : Connected, Syncing, State()
|
2019-11-22 23:18:20 -08:00
|
|
|
class Scanned(val scannedRange:IntRange) : Connected, Syncing, State()
|
2019-10-21 03:26:02 -07:00
|
|
|
object Disconnected : State()
|
|
|
|
object Stopped : State()
|
|
|
|
object Initialized : State()
|
|
|
|
}
|
2019-11-01 13:25:28 -07:00
|
|
|
|
|
|
|
/**
|
|
|
|
* Data structure to hold the total and available balance of the wallet. This is what is
|
|
|
|
* received on the balance channel.
|
|
|
|
*
|
|
|
|
* @param total the total balance, ignoring funds that cannot be used.
|
|
|
|
* @param available the amount of funds that are available for use. Typical reasons that funds
|
|
|
|
* may be unavailable include fairly new transactions that do not have enough confirmations or
|
|
|
|
* notes that are tied up because we are awaiting change from a transaction. When a note has
|
|
|
|
* been spent, its change cannot be used until there are enough confirmations.
|
|
|
|
*/
|
|
|
|
data class WalletBalance(
|
|
|
|
val total: Long = -1,
|
|
|
|
val available: Long = -1
|
|
|
|
)
|
2019-09-26 09:58:37 -07:00
|
|
|
}
|