zcash-android-wallet-sdk/sdk-lib/src/main/java/cash/z/ecc/android/sdk/block/CompactBlockProcessor.kt

1328 lines
60 KiB
Kotlin
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package cash.z.ecc.android.sdk.block
import androidx.annotation.VisibleForTesting
import cash.z.ecc.android.sdk.BuildConfig
import cash.z.ecc.android.sdk.annotation.OpenForTesting
import cash.z.ecc.android.sdk.block.CompactBlockProcessor.State.Disconnected
import cash.z.ecc.android.sdk.block.CompactBlockProcessor.State.Downloading
import cash.z.ecc.android.sdk.block.CompactBlockProcessor.State.Enhancing
import cash.z.ecc.android.sdk.block.CompactBlockProcessor.State.Initialized
import cash.z.ecc.android.sdk.block.CompactBlockProcessor.State.Scanned
import cash.z.ecc.android.sdk.block.CompactBlockProcessor.State.Scanning
import cash.z.ecc.android.sdk.block.CompactBlockProcessor.State.Stopped
import cash.z.ecc.android.sdk.block.CompactBlockProcessor.State.Validating
import cash.z.ecc.android.sdk.exception.CompactBlockProcessorException
import cash.z.ecc.android.sdk.exception.CompactBlockProcessorException.EnhanceTransactionError.EnhanceTxDecryptError
import cash.z.ecc.android.sdk.exception.CompactBlockProcessorException.EnhanceTransactionError.EnhanceTxDownloadError
import cash.z.ecc.android.sdk.exception.CompactBlockProcessorException.MismatchedNetwork
import cash.z.ecc.android.sdk.exception.InitializeException
import cash.z.ecc.android.sdk.exception.RustLayerException
import cash.z.ecc.android.sdk.ext.BatchMetrics
import cash.z.ecc.android.sdk.ext.ZcashSdk
import cash.z.ecc.android.sdk.ext.ZcashSdk.DOWNLOAD_BATCH_SIZE
import cash.z.ecc.android.sdk.ext.ZcashSdk.MAX_BACKOFF_INTERVAL
import cash.z.ecc.android.sdk.ext.ZcashSdk.MAX_REORG_SIZE
import cash.z.ecc.android.sdk.ext.ZcashSdk.POLL_INTERVAL
import cash.z.ecc.android.sdk.ext.ZcashSdk.RETRIES
import cash.z.ecc.android.sdk.ext.ZcashSdk.REWIND_DISTANCE
import cash.z.ecc.android.sdk.ext.ZcashSdk.SCAN_BATCH_SIZE
import cash.z.ecc.android.sdk.internal.Twig
import cash.z.ecc.android.sdk.internal.block.CompactBlockDownloader
import cash.z.ecc.android.sdk.internal.ext.retryUpTo
import cash.z.ecc.android.sdk.internal.ext.retryWithBackoff
import cash.z.ecc.android.sdk.internal.ext.toHexReversed
import cash.z.ecc.android.sdk.internal.isEmpty
import cash.z.ecc.android.sdk.internal.model.from
import cash.z.ecc.android.sdk.internal.model.toBlockHeight
import cash.z.ecc.android.sdk.internal.repository.DerivedDataRepository
import cash.z.ecc.android.sdk.internal.twig
import cash.z.ecc.android.sdk.internal.twigTask
import cash.z.ecc.android.sdk.jni.RustBackend
import cash.z.ecc.android.sdk.jni.RustBackendWelding
import cash.z.ecc.android.sdk.model.Account
import cash.z.ecc.android.sdk.model.BlockHeight
import cash.z.ecc.android.sdk.model.TransactionOverview
import cash.z.ecc.android.sdk.model.UnifiedSpendingKey
import cash.z.ecc.android.sdk.model.WalletBalance
import cash.z.ecc.android.sdk.model.ZcashNetwork
import cash.z.wallet.sdk.internal.rpc.Service
import co.electriccoin.lightwallet.client.ext.BenchmarkingExt
import co.electriccoin.lightwallet.client.fixture.BlockRangeFixture
import co.electriccoin.lightwallet.client.model.BlockHeightUnsafe
import co.electriccoin.lightwallet.client.model.LightWalletEndpointInfoUnsafe
import co.electriccoin.lightwallet.client.model.Response
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Dispatchers.IO
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.asStateFlow
import kotlinx.coroutines.isActive
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import kotlinx.coroutines.withContext
import java.util.Locale
import java.util.concurrent.atomic.AtomicInteger
import kotlin.math.max
import kotlin.math.min
import kotlin.math.roundToInt
import kotlin.time.Duration.Companion.days
/**
* Responsible for processing the compact blocks that are received from the lightwallet server. This class encapsulates
* all the business logic required to validate and scan the blockchain and is therefore tightly coupled with
* librustzcash.
*
* @property downloader the component responsible for downloading compact blocks and persisting them
* locally for processing.
* @property repository the repository holding transaction information.
* @property rustBackend the librustzcash functionality available and exposed to the SDK.
* @param minimumHeight the lowest height that we could care about. This is mostly used during
* reorgs as a backstop to make sure we do not rewind beyond sapling activation. It also is factored
* in when considering initial range to download. In most cases, this should be the birthday height
* of the current wallet--the height before which we do not need to scan for transactions.
*/
@OpenForTesting
@Suppress("TooManyFunctions", "LargeClass")
class CompactBlockProcessor internal constructor(
val downloader: CompactBlockDownloader,
private val repository: DerivedDataRepository,
private val rustBackend: RustBackendWelding,
minimumHeight: BlockHeight = rustBackend.network.saplingActivationHeight
) {
/**
* Callback for any non-trivial errors that occur while processing compact blocks.
*
* @return true when processing should continue. Return false when the error is unrecoverable
* and all processing should halt and stop retrying.
*/
var onProcessorErrorListener: ((Throwable) -> Boolean)? = null
/**
* Callback for reorgs. This callback is invoked when validation fails with the height at which
* an error was found and the lower bound to which the data will rewind, at most.
*/
var onChainErrorListener: ((errorHeight: BlockHeight, rewindHeight: BlockHeight) -> Any)? = null
/**
* Callback for setup errors that occur prior to processing compact blocks. Can be used to
* override any errors from [verifySetup]. When this listener is missing then all setup errors
* will result in the processor not starting. This is particularly useful for wallets to receive
* a callback right before the SDK will reject a lightwalletd server because it appears not to
* match.
*
* @return true when the setup error should be ignored and processing should be allowed to
* start. Otherwise, processing will not begin.
*/
var onSetupErrorListener: ((Throwable) -> Boolean)? = null
/**
* Callback for apps to report scan times. As blocks are scanned in batches, this listener is
* invoked at the end of every batch and the second parameter is only true when all batches are
* complete. The first parameter contains useful information on the blocks scanned per second.
*
* The Boolean param (isComplete) is true when this event represents the completion of a scan
*/
var onScanMetricCompleteListener: ((BatchMetrics, Boolean) -> Unit)? = null
private val consecutiveChainErrors = AtomicInteger(0)
private val lowerBoundHeight: BlockHeight = BlockHeight(
max(
rustBackend.network.saplingActivationHeight.value,
minimumHeight.value - MAX_REORG_SIZE
)
)
private val _state: MutableStateFlow<State> = MutableStateFlow(Initialized)
private val _progress = MutableStateFlow(0)
private val _processorInfo = MutableStateFlow(ProcessorInfo(null, null, null, null, null))
private val _networkHeight = MutableStateFlow<BlockHeight?>(null)
private val processingMutex = Mutex()
/**
* Flow of birthday heights. The birthday is essentially the first block that the wallet cares
* about. Any prior block can be ignored. This is not a fixed value because the height is
* influenced by the first transaction, which isn't always known. So we start with an estimation
* and improve it as the wallet progresses. Once the first transaction occurs, this value is
* effectively fixed.
*/
private val _birthdayHeight = MutableStateFlow(lowerBoundHeight)
/**
* The root source of truth for the processor's progress. All processing must be done
* sequentially, due to the way sqlite works so it is okay for this not to be threadsafe or
* coroutine safe because processing cannot be concurrent.
*/
// This accessed by the Dispatchers.IO thread, which means multiple threads are reading/writing
// concurrently.
@Volatile
internal var currentInfo = ProcessorInfo(null, null, null, null, null)
/**
* The zcash network that is being processed. Either Testnet or Mainnet.
*/
val network = rustBackend.network
/**
* The flow of state values so that a wallet can monitor the state of this class without needing
* to poll.
*/
val state = _state.asStateFlow()
/**
* The flow of progress values so that a wallet can monitor how much downloading remains
* without needing to poll.
*/
val progress = _progress.asStateFlow()
/**
* The flow of detailed processorInfo like the range of blocks that shall be downloaded and
* scanned. This gives the wallet a lot of insight into the work of this processor.
*/
val processorInfo = _processorInfo.asStateFlow()
/**
* The flow of network height. This value is updated at the same time that [currentInfo] is
* updated but this allows consumers to have the information pushed instead of polling.
*/
val networkHeight = _networkHeight.asStateFlow()
/**
* The first block this wallet cares about anything prior can be ignored. If a wallet has no
* transactions, this value will later update to 100 blocks before the first transaction,
* rounded down to the nearest 100. So in some cases, this is a dynamic value.
*/
val birthdayHeight = _birthdayHeight.value
/**
* Download compact blocks, verify and scan them until [stop] is called.
*/
@Suppress("LongMethod")
suspend fun start() = withContext(IO) {
verifySetup()
updateBirthdayHeight()
twig("setup verified. processor starting")
// using do/while makes it easier to execute exactly one loop which helps with testing this processor quickly
// (because you can start and then immediately set isStopped=true to always get precisely one loop)
do {
retryWithBackoff(::onProcessorError, maxDelayMillis = MAX_BACKOFF_INTERVAL) {
val result = processingMutex.withLockLogged("processNewBlocks") {
processNewBlocks()
}
// immediately process again after failures in order to download new blocks right away
when (result) {
BlockProcessingResult.Reconnecting -> {
val napTime = calculatePollInterval(true)
twig(
"Unable to process new blocks because we are disconnected! Attempting to " +
"reconnect in ${napTime}ms"
)
delay(napTime)
}
BlockProcessingResult.NoBlocksToProcess, BlockProcessingResult.FailedEnhance -> {
val noWorkDone =
currentInfo.lastDownloadRange?.isEmpty() ?: true &&
currentInfo.lastScanRange?.isEmpty() ?: true
val summary = if (noWorkDone) {
"Nothing to process: no new blocks to download or scan"
} else {
"Done processing blocks"
}
consecutiveChainErrors.set(0)
val napTime = calculatePollInterval()
twig(
"$summary${
if (result == BlockProcessingResult.FailedEnhance) {
" (but there were" +
" enhancement errors! We ignore those, for now. Memos in this block range are" +
" probably missing! This will be improved in a future release.)"
} else {
""
}
}! Sleeping" +
" for ${napTime}ms (latest height: ${currentInfo.networkBlockHeight})."
)
delay(napTime)
}
is BlockProcessingResult.Error -> {
if (consecutiveChainErrors.get() >= RETRIES) {
val errorMessage = "ERROR: unable to resolve reorg at height $result after " +
"${consecutiveChainErrors.get()} correction attempts!"
fail(CompactBlockProcessorException.FailedReorgRepair(errorMessage))
} else {
handleChainError(result.failedAtHeight)
}
consecutiveChainErrors.getAndIncrement()
}
is BlockProcessingResult.Success -> {
// Do nothing. We are done.
}
}
}
} while (isActive && _state.value !is Stopped)
twig("processor complete")
stop()
}
/**
* Sets the state to [Stopped], which causes the processor loop to exit.
*/
suspend fun stop() {
runCatching {
setState(Stopped)
downloader.stop()
}
}
/**
* Stop processing and throw an error.
*/
private suspend fun fail(error: Throwable) {
stop()
twig("${error.message}")
throw error
}
private suspend fun processNewBlocks(): BlockProcessingResult = withContext(IO) {
twig("beginning to process new blocks (with lower bound: $lowerBoundHeight)...", -1)
if (!updateRanges()) {
twig("Disconnection detected! Attempting to reconnect!")
setState(Disconnected)
downloader.lightWalletClient.reconnect()
BlockProcessingResult.Reconnecting
} else if (currentInfo.lastDownloadRange.isEmpty() && currentInfo.lastScanRange.isEmpty()) {
setState(Scanned(currentInfo.lastScanRange))
BlockProcessingResult.NoBlocksToProcess
} else {
if (BenchmarkingExt.isBenchmarking()) {
// We inject a benchmark test blocks range at this point to process only a restricted range of blocks
// for a more reliable benchmark results.
val benchmarkBlockRange = BlockRangeFixture.new().let {
// Convert range of Longs to range of BlockHeights
BlockHeight.new(ZcashNetwork.Mainnet, it.start)..(
BlockHeight.new(ZcashNetwork.Mainnet, it.endInclusive)
)
}
downloadNewBlocks(benchmarkBlockRange)
val error = validateAndScanNewBlocks(benchmarkBlockRange)
if (error != BlockProcessingResult.Success) {
error
} else {
enhanceTransactionDetails(benchmarkBlockRange)
}
} else {
downloadNewBlocks(currentInfo.lastDownloadRange)
val error = validateAndScanNewBlocks(currentInfo.lastScanRange)
if (error != BlockProcessingResult.Success) {
error
} else {
currentInfo.lastScanRange?.let { enhanceTransactionDetails(it) }
?: BlockProcessingResult.NoBlocksToProcess
}
}
}
}
sealed class BlockProcessingResult {
object NoBlocksToProcess : BlockProcessingResult()
object Success : BlockProcessingResult()
object Reconnecting : BlockProcessingResult()
object FailedEnhance : BlockProcessingResult()
data class Error(val failedAtHeight: BlockHeight) : BlockProcessingResult()
}
/**
* Gets the latest range info and then uses that initialInfo to update (and transmit)
* the scan/download ranges that require processing.
*
* @return true when the update succeeds.
*/
private suspend fun updateRanges(): Boolean = withContext(IO) {
// This fetches the latest height each time this method is called, which can be very inefficient
// when downloading all of the blocks from the server
val networkBlockHeight = run {
val networkBlockHeightUnsafe =
when (val response = downloader.getLatestBlockHeight()) {
is Response.Success -> response.result
else -> null
}
runCatching { networkBlockHeightUnsafe?.toBlockHeight(network) }.getOrNull()
} ?: return@withContext false
// TODO [#683]: rethink this and make it easier to understand what's happening. Can we reduce this
// so that we only work with actual changing info rather than periodic snapshots? Do we need
// to calculate these derived values every time?
// TODO [#683]: https://github.com/zcash/zcash-android-wallet-sdk/issues/683
ProcessorInfo(
networkBlockHeight = networkBlockHeight,
lastScannedHeight = getLastScannedHeight(),
lastDownloadedHeight = getLastDownloadedHeight()?.let {
BlockHeight.new(
network,
max(
it.value,
lowerBoundHeight.value - 1
)
)
},
lastDownloadRange = null,
lastScanRange = null
).let { initialInfo ->
updateProgress(
networkBlockHeight = initialInfo.networkBlockHeight,
lastScannedHeight = initialInfo.lastScannedHeight,
lastDownloadedHeight = initialInfo.lastDownloadedHeight,
lastScanRange = if (
initialInfo.lastScannedHeight != null &&
initialInfo.networkBlockHeight != null
) {
initialInfo.lastScannedHeight + 1..initialInfo.networkBlockHeight
} else {
null
},
lastDownloadRange = if (initialInfo.networkBlockHeight != null) {
BlockHeight.new(
network,
buildList {
add(network.saplingActivationHeight.value)
initialInfo.lastDownloadedHeight?.let { add(it.value + 1) }
initialInfo.lastScannedHeight?.let { add(it.value + 1) }
}.max()
)..initialInfo.networkBlockHeight
} else {
null
}
)
}
true
}
/**
* Given a range, validate and then scan all blocks. Validation is ensuring that the blocks are
* in ascending order, with no gaps and are also chain-sequential. This means every block's
* prevHash value matches the preceding block in the chain.
*
* @param lastScanRange the range to be validated and scanned.
*/
private suspend fun validateAndScanNewBlocks(lastScanRange: ClosedRange<BlockHeight>?): BlockProcessingResult =
withContext(IO) {
setState(Validating)
val result = validateNewBlocks(lastScanRange)
if (result == BlockProcessingResult.Success) {
// in theory, a scan should not fail after validation succeeds but maybe consider
// changing the rust layer to return the failed block height whenever scan does fail
// rather than a boolean
setState(Scanning)
val success = scanNewBlocks(lastScanRange)
if (!success) {
throw CompactBlockProcessorException.FailedScan()
} else {
setState(Scanned(lastScanRange))
}
}
result
}
private suspend fun enhanceTransactionDetails(lastScanRange: ClosedRange<BlockHeight>): BlockProcessingResult {
Twig.sprout("enhancing")
twig("Enhancing transaction details for blocks $lastScanRange")
setState(Enhancing)
@Suppress("TooGenericExceptionCaught")
return try {
val newTxs = repository.findNewTransactions(lastScanRange)
if (newTxs.isEmpty()) {
twig("no new transactions found in $lastScanRange")
} else {
twig("enhancing ${newTxs.size} transaction(s)!")
// if the first transaction has been added
if (newTxs.size.toLong() == repository.getTransactionCount()) {
twig("Encountered the first transaction. This changes the birthday height!")
updateBirthdayHeight()
}
}
newTxs.filter { it.minedHeight != null }.onEach { newTransaction ->
enhance(newTransaction)
}
twig("Done enhancing transaction details")
BlockProcessingResult.Success
} catch (t: Throwable) {
twig("Failed to enhance due to: ${t.message} caused by: ${t.cause}")
BlockProcessingResult.FailedEnhance
} finally {
Twig.clip("enhancing")
}
}
// TODO [#683]: we still need a way to identify those transactions that failed to be enhanced
// TODO [#683]: https://github.com/zcash/zcash-android-wallet-sdk/issues/683
private suspend fun enhance(transaction: TransactionOverview) = withContext(Dispatchers.IO) {
transaction.minedHeight?.let { minedHeight ->
enhanceHelper(transaction.id, transaction.rawId.byteArray, minedHeight)
}
}
private suspend fun enhanceHelper(id: Long, rawTransactionId: ByteArray, minedHeight: BlockHeight) {
twig("START: enhancing transaction (id:$id block:$minedHeight)")
when (val response = downloader.fetchTransaction(rawTransactionId)) {
is Response.Success -> {
runCatching {
twig("decrypting and storing transaction (id:$id block:$minedHeight)")
rustBackend.decryptAndStoreTransaction(response.result.data)
}.onSuccess {
twig("DONE: enhancing transaction (id:$id block:$minedHeight)")
}.onFailure { error ->
onProcessorError(EnhanceTxDecryptError(minedHeight, error))
}
}
is Response.Failure -> {
onProcessorError(EnhanceTxDownloadError(minedHeight, response.toThrowable()))
}
}
}
/**
* Confirm that the wallet data is properly setup for use.
*/
// Need to refactor this to be less ugly and more testable
@Suppress("NestedBlockDepth")
private suspend fun verifySetup() {
// verify that the data is initialized
val error = if (!repository.isInitialized()) {
CompactBlockProcessorException.Uninitialized
} else if (repository.getAccountCount() == 0) {
CompactBlockProcessorException.NoAccount
} else {
// verify that the server is correct
// How do we handle network connection issues?
downloader.getServerInfo()?.let { info ->
val serverBlockHeight =
runCatching { info.blockHeightUnsafe.toBlockHeight(network) }.getOrNull()
if (null == serverBlockHeight) {
// TODO Better signal network connection issue
CompactBlockProcessorException.BadBlockHeight(info.blockHeightUnsafe)
} else {
val clientBranch = "%x".format(
Locale.ROOT,
rustBackend.getBranchIdForHeight(serverBlockHeight)
)
val network = rustBackend.network.networkName
if (!clientBranch.equals(info.consensusBranchId, true)) {
MismatchedNetwork(
clientNetwork = network,
serverNetwork = info.chainName
)
} else if (!info.matchingNetwork(network)) {
MismatchedNetwork(
clientNetwork = network,
serverNetwork = info.chainName
)
} else {
null
}
}
}
}
if (error != null) {
twig("Validating setup prior to scanning . . . ISSUE FOUND! - ${error.javaClass.simpleName}")
// give listener a chance to override
if (onSetupErrorListener?.invoke(error) != true) {
throw error
} else {
twig(
"Warning: An ${error::class.java.simpleName} was encountered while verifying setup but " +
"it was ignored by the onSetupErrorHandler. Ignoring message: ${error.message}"
)
}
}
}
private suspend fun updateBirthdayHeight() {
@Suppress("TooGenericExceptionCaught")
try {
val betterBirthday = calculateBirthdayHeight()
if (betterBirthday > birthdayHeight) {
twig("Better birthday found! Birthday height updated from $birthdayHeight to $betterBirthday")
_birthdayHeight.value = betterBirthday
}
} catch (e: Throwable) {
twig("Warning: updating the birthday height failed due to $e")
}
}
var failedUtxoFetches = 0
@Suppress("MagicNumber")
internal suspend fun refreshUtxos(tAddress: String, startHeight: BlockHeight): Int? =
withContext(IO) {
var count: Int? = null
// TODO [683]: cleanup the way that we prevent this from running excessively
// For now, try for about 3 blocks per app launch. If the service fails it is
// probably disabled on ligthtwalletd, so then stop trying until the next app launch.
// TODO [#683]: https://github.com/zcash/zcash-android-wallet-sdk/issues/683
if (failedUtxoFetches < 9) { // there are 3 attempts per block
@Suppress("TooGenericExceptionCaught")
try {
retryUpTo(3) {
val result = downloader.lightWalletClient.fetchUtxos(
tAddress,
BlockHeightUnsafe.from(startHeight)
)
count = processUtxoResult(result, tAddress, startHeight)
}
} catch (e: Throwable) {
failedUtxoFetches++
twig(
"Warning: Fetching UTXOs is repeatedly failing! We will only try about " +
"${(9 - failedUtxoFetches + 2) / 3} more times then give up for this session. " +
"Exception message: ${e.message}, caused by: ${e.cause}."
)
}
} else {
twig(
"Warning: gave up on fetching UTXOs for this session. It seems to unavailable on " +
"lightwalletd."
)
}
count
}
internal suspend fun processUtxoResult(
result: Sequence<Service.GetAddressUtxosReply>,
tAddress: String,
startHeight: BlockHeight
): Int = withContext(IO) {
var skipped = 0
val aboveHeight = startHeight
// TODO(str4d): We no longer clear UTXOs here, as rustBackend.putUtxo now uses an upsert instead of an insert.
// This means that now-spent UTXOs would previously have been deleted, but now are left in the database (like
// shielded notes). Due to the fact that the lightwalletd query only returns _current_ UTXOs, we don't learn
// about recently-spent UTXOs here, so the transparent balance does not get updated here. Instead, when a
// received shielded note is "enhanced" by downloading the full transaction, we mark any UTXOs spent in that
// transaction as spent in the database. This relies on two current properties: UTXOs are only ever spent in
// shielding transactions, and at least one shielded note from each shielding transaction is always enhanced.
// However, for greater reliability, we may want to alter the Data Access API to support "inferring spentness"
// from what is _not_ returned as a UTXO, or alternatively fetch TXOs from lightwalletd instead of just UTXOs.
twig("Checking for UTXOs above height $aboveHeight")
result.forEach { utxo: Service.GetAddressUtxosReply ->
twig("Found UTXO at height ${utxo.height.toInt()} with ${utxo.valueZat} zatoshi")
@Suppress("TooGenericExceptionCaught")
try {
rustBackend.putUtxo(
tAddress,
utxo.txid.toByteArray(),
utxo.index,
utxo.script.toByteArray(),
utxo.valueZat,
BlockHeight(utxo.height)
)
} catch (t: Throwable) {
// TODO [#683]: more accurately track the utxos that were skipped (in theory, this could fail for other
// reasons)
// TODO [#683]: https://github.com/zcash/zcash-android-wallet-sdk/issues/683
skipped++
twig(
"Warning: Ignoring transaction at height ${utxo.height} @ index ${utxo.index} because " +
"it already exists. Exception message: ${t.message}, caused by: ${t.cause}."
)
}
}
// return the number of UTXOs that were downloaded
result.count() - skipped
}
/**
* Request all blocks in the given range and persist them locally for processing, later.
*
* @param range the range of blocks to download.
*/
@VisibleForTesting
// allow mocks to verify how this is called, rather than the downloader, which is more complex
@Suppress("MagicNumber")
internal suspend fun downloadNewBlocks(range: ClosedRange<BlockHeight>?) =
withContext<Unit>(IO) {
if (null == range || range.isEmpty()) {
twig("no blocks to download")
} else {
_state.value = Downloading
Twig.sprout("downloading")
twig("downloading blocks in range $range", -1)
var downloadedBlockHeight = range.start
val missingBlockCount = range.endInclusive.value - range.start.value + 1
val batches = (
missingBlockCount / DOWNLOAD_BATCH_SIZE +
(if (missingBlockCount.rem(DOWNLOAD_BATCH_SIZE) == 0L) 0 else 1)
)
var progress: Int
twig(
"found $missingBlockCount missing blocks, downloading in $batches batches of " +
"$DOWNLOAD_BATCH_SIZE..."
)
for (i in 1..batches) {
retryUpTo(RETRIES, { CompactBlockProcessorException.FailedDownload(it) }) {
val end = BlockHeight.new(
network,
min(
(range.start.value + (i * DOWNLOAD_BATCH_SIZE)) - 1,
range.endInclusive.value
)
) // subtract 1 on the first value because the range is inclusive
var count = 0
twig(
"downloaded $downloadedBlockHeight..$end (batch $i of $batches) " +
"[${downloadedBlockHeight..end}]"
) {
count = downloader.downloadBlockRange(downloadedBlockHeight..end)
}
twig("downloaded $count blocks!")
progress = (i / batches.toFloat() * 100).roundToInt()
_progress.value = progress
val lastDownloadedHeight = downloader.getLastDownloadedHeight()
updateProgress(lastDownloadedHeight = lastDownloadedHeight)
downloadedBlockHeight = end + 1
}
}
Twig.clip("downloading")
}
_progress.value = 100
}
/**
* Validate all blocks in the given range, ensuring that the blocks are in ascending order, with
* no gaps and are also chain-sequential. This means every block's prevHash value matches the
* preceding block in the chain. Validation starts at the back of the chain and works toward the tip.
*
* @param range the range of blocks to validate.
*/
private suspend fun validateNewBlocks(range: ClosedRange<BlockHeight>?): BlockProcessingResult {
if (null == range || range.isEmpty()) {
twig("no blocks to validate: $range")
return BlockProcessingResult.NoBlocksToProcess
}
Twig.sprout("validating")
twig("validating blocks in range $range in db: ${(rustBackend as RustBackend).cacheDbFile.absolutePath}")
val result = rustBackend.validateCombinedChain()
Twig.clip("validating")
return if (null == result) {
BlockProcessingResult.Success
} else {
BlockProcessingResult.Error(result)
}
}
/**
* Scan all blocks in the given range, decrypting and persisting anything that matches our
* wallet. Scanning starts at the back of the chain and works toward the tip.
*
* @param range the range of blocks to scan.
*/
@Suppress("MagicNumber")
private suspend fun scanNewBlocks(range: ClosedRange<BlockHeight>?): Boolean = withContext(IO) {
if (null == range || range.isEmpty()) {
twig("no blocks to scan for range $range")
true
} else {
Twig.sprout("scanning")
twig("scanning blocks for range $range in batches")
var result = false
var metrics = BatchMetrics(range, SCAN_BATCH_SIZE, onScanMetricCompleteListener)
// Attempt to scan a few times to work around any concurrent modification errors, then
// rethrow as an official processorError which is handled by [start.retryWithBackoff]
retryUpTo(3, { CompactBlockProcessorException.FailedScan(it) }) { failedAttempts ->
if (failedAttempts > 0) twig("retrying the scan after $failedAttempts failure(s)...")
do {
var scannedNewBlocks = false
metrics.beginBatch()
result = rustBackend.scanBlocks(SCAN_BATCH_SIZE)
metrics.endBatch()
val lastScannedHeight =
BlockHeight.new(network, range.start.value + metrics.cumulativeItems - 1)
val percentValue =
(lastScannedHeight.value - range.start.value) /
(range.endInclusive.value - range.start.value + 1).toFloat() * 100.0f
val percent = "%.0f".format(
percentValue.coerceAtMost(100f)
.coerceAtLeast(0f)
)
twig(
"batch scanned ($percent%): $lastScannedHeight/${range.endInclusive} | " +
"${metrics.batchTime}ms, ${metrics.batchItems}blks, ${metrics.batchIps.format()}bps"
)
if (currentInfo.lastScannedHeight != lastScannedHeight) {
scannedNewBlocks = true
updateProgress(lastScannedHeight = lastScannedHeight)
}
// if we made progress toward our scan, then keep trying
} while (result && scannedNewBlocks && lastScannedHeight < range.endInclusive)
twig(
"batch scan complete! Total time: ${metrics.cumulativeTime} Total blocks measured: " +
"${metrics.cumulativeItems} Cumulative bps: ${metrics.cumulativeIps.format()}"
)
}
Twig.clip("scanning")
result
}
}
private fun Float.format(places: Int = 0) = "%.${places}f".format(this)
/**
* Emit an instance of processorInfo, corresponding to the provided data.
*
* @param networkBlockHeight the latest block available to lightwalletd that may or may not be
* downloaded by this wallet yet.
* @param lastScannedHeight the height up to which the wallet last scanned. This determines
* where the next scan will begin.
* @param lastDownloadedHeight the last compact block that was successfully downloaded.
* @param lastScanRange the inclusive range to scan. This represents what we most recently
* wanted to scan. In most cases, it will be an invalid range because we'd like to scan blocks
* that we don't yet have.
* @param lastDownloadRange the inclusive range to download. This represents what we most
* recently wanted to scan. In most cases, it will be an invalid range because we'd like to scan
* blocks that we don't yet have.
*/
private suspend fun updateProgress(
networkBlockHeight: BlockHeight? = currentInfo.networkBlockHeight,
lastScannedHeight: BlockHeight? = currentInfo.lastScannedHeight,
lastDownloadedHeight: BlockHeight? = currentInfo.lastDownloadedHeight,
lastScanRange: ClosedRange<BlockHeight>? = currentInfo.lastScanRange,
lastDownloadRange: ClosedRange<BlockHeight>? = currentInfo.lastDownloadRange
) {
currentInfo = currentInfo.copy(
networkBlockHeight = networkBlockHeight,
lastScannedHeight = lastScannedHeight,
lastDownloadedHeight = lastDownloadedHeight,
lastScanRange = lastScanRange,
lastDownloadRange = lastDownloadRange
)
withContext(IO) {
_networkHeight.value = networkBlockHeight
_processorInfo.value = currentInfo
}
}
private suspend fun handleChainError(errorHeight: BlockHeight) {
// TODO [#683]: consider an error object containing hash information
// TODO [#683]: https://github.com/zcash/zcash-android-wallet-sdk/issues/683
printValidationErrorInfo(errorHeight)
determineLowerBound(errorHeight).let { lowerBound ->
twig("handling chain error at $errorHeight by rewinding to block $lowerBound")
onChainErrorListener?.invoke(errorHeight, lowerBound)
rewindToNearestHeight(lowerBound, true)
}
}
suspend fun getNearestRewindHeight(height: BlockHeight): BlockHeight {
// TODO [#683]: add a concept of original checkpoint height to the processor. For now, derive it
// add one because we already have the checkpoint. Add one again because we delete ABOVE the block
// TODO [#683]: https://github.com/zcash/zcash-android-wallet-sdk/issues/683
val originalCheckpoint = lowerBoundHeight + MAX_REORG_SIZE + 2
return if (height < originalCheckpoint) {
originalCheckpoint
} else {
// tricky: subtract one because we delete ABOVE this block
// This could create an invalid height if if height was saplingActivationHeight
val rewindHeight = BlockHeight(height.value - 1)
rustBackend.getNearestRewindHeight(rewindHeight)
}
}
/**
* Rewind back at least two weeks worth of blocks.
*/
suspend fun quickRewind() {
val height = max(currentInfo.lastScannedHeight, repository.lastScannedHeight())
val blocksPer14Days = 14.days.inWholeMilliseconds / ZcashSdk.BLOCK_INTERVAL_MILLIS.toInt()
val twoWeeksBack = BlockHeight.new(
network,
(height.value - blocksPer14Days).coerceAtLeast(lowerBoundHeight.value)
)
rewindToNearestHeight(twoWeeksBack, false)
}
/**
* @param alsoClearBlockCache when true, also clear the block cache which forces a redownload of
* blocks. Otherwise, the cached blocks will be used in the rescan, which in most cases, is fine.
*/
@Suppress("LongMethod")
suspend fun rewindToNearestHeight(
height: BlockHeight,
alsoClearBlockCache: Boolean = false
) =
withContext(IO) {
processingMutex.withLockLogged("rewindToHeight") {
val lastScannedHeight = currentInfo.lastScannedHeight
val lastLocalBlock = repository.lastScannedHeight()
val targetHeight = getNearestRewindHeight(height)
twig(
"Rewinding from $lastScannedHeight to requested height: $height using target height: " +
"$targetHeight with last local block: $lastLocalBlock"
)
if (null == lastScannedHeight && targetHeight < lastLocalBlock) {
twig("Rewinding because targetHeight is less than lastLocalBlock.")
rustBackend.rewindToHeight(targetHeight)
} else if (null != lastScannedHeight && targetHeight < lastScannedHeight) {
twig("Rewinding because targetHeight is less than lastScannedHeight.")
rustBackend.rewindToHeight(targetHeight)
} else {
twig(
"not rewinding dataDb because the last scanned height is $lastScannedHeight and the" +
" last local block is $lastLocalBlock both of which are less than the target height of " +
"$targetHeight"
)
}
val currentNetworkBlockHeight = currentInfo.networkBlockHeight
if (alsoClearBlockCache) {
twig(
"Also clearing block cache back to $targetHeight. These rewound blocks will download " +
"in the next scheduled scan"
)
downloader.rewindToHeight(targetHeight)
// communicate that the wallet is no longer synced because it might remain this way for 20+ second
// because we only download on 20s time boundaries so we can't trigger any immediate action
setState(Downloading)
if (null == currentNetworkBlockHeight) {
updateProgress(
lastScannedHeight = targetHeight,
lastDownloadedHeight = targetHeight,
lastScanRange = null,
lastDownloadRange = null
)
} else {
updateProgress(
lastScannedHeight = targetHeight,
lastDownloadedHeight = targetHeight,
lastScanRange = (targetHeight + 1)..currentNetworkBlockHeight,
lastDownloadRange = (targetHeight + 1)..currentNetworkBlockHeight
)
}
_progress.value = 0
} else {
if (null == currentNetworkBlockHeight) {
updateProgress(
lastScannedHeight = targetHeight,
lastScanRange = null
)
} else {
updateProgress(
lastScannedHeight = targetHeight,
lastScanRange = (targetHeight + 1)..currentNetworkBlockHeight
)
}
_progress.value = 0
if (null != lastScannedHeight) {
val range = (targetHeight + 1)..lastScannedHeight
twig(
"We kept the cache blocks in place so we don't need to wait for the next " +
"scheduled download to rescan. Instead we will rescan and validate blocks " +
"${range.start}..${range.endInclusive}"
)
if (validateAndScanNewBlocks(range) == BlockProcessingResult.Success) {
enhanceTransactionDetails(range)
}
}
}
}
}
/** insightful function for debugging these critical errors */
private suspend fun printValidationErrorInfo(errorHeight: BlockHeight, count: Int = 11) {
// Note: blocks are public information so it's okay to print them but, still, let's not unless we're
// debugging something
if (!BuildConfig.DEBUG) return
var errorInfo = fetchValidationErrorInfo(errorHeight)
twig(
"validation failed at block ${errorInfo.errorHeight} which had hash " +
"${errorInfo.actualPrevHash} but the expected hash was ${errorInfo.expectedPrevHash}"
)
errorInfo = fetchValidationErrorInfo(errorHeight + 1)
twig(
"The next block block: ${errorInfo.errorHeight} which had hash ${errorInfo.actualPrevHash} but " +
"the expected hash was ${errorInfo.expectedPrevHash}"
)
twig("=================== BLOCKS [$errorHeight..${errorHeight.value + count - 1}]: START ========")
repeat(count) { i ->
val height = errorHeight + i
val block = downloader.compactBlockRepository.findCompactBlock(height)
// sometimes the initial block was inserted via checkpoint and will not appear in the cache. We can get
// the hash another way but prevHash is correctly null.
val hash = block?.hash?.toByteArray()
?: repository.findBlockHash(height)
twig(
"block: $height\thash=${hash?.toHexReversed()} \tprevHash=${
block?.prevHash?.toByteArray()?.toHexReversed()
}"
)
}
twig("=================== BLOCKS [$errorHeight..${errorHeight.value + count - 1}]: END ========")
}
private suspend fun fetchValidationErrorInfo(errorHeight: BlockHeight): ValidationErrorInfo {
val hash = repository.findBlockHash(errorHeight + 1)
?.toHexReversed()
val prevHash = repository.findBlockHash(errorHeight)?.toHexReversed()
val compactBlock = downloader.compactBlockRepository.findCompactBlock(errorHeight + 1)
val expectedPrevHash = compactBlock?.prevHash?.toByteArray()?.toHexReversed()
return ValidationErrorInfo(errorHeight, hash, expectedPrevHash, prevHash)
}
/**
* Called for every noteworthy error.
*
* @return true when processing should continue. Return false when the error is unrecoverable
* and all processing should halt and stop retrying.
*/
private fun onProcessorError(throwable: Throwable): Boolean {
return onProcessorErrorListener?.invoke(throwable) ?: true
}
private fun determineLowerBound(errorHeight: BlockHeight): BlockHeight {
val offset = min(MAX_REORG_SIZE, REWIND_DISTANCE * (consecutiveChainErrors.get() + 1))
return BlockHeight(max(errorHeight.value - offset, lowerBoundHeight.value)).also {
twig(
"offset = min($MAX_REORG_SIZE, $REWIND_DISTANCE * (${consecutiveChainErrors.get() + 1})) = " +
"$offset"
)
twig("lowerBound = max($errorHeight - $offset, $lowerBoundHeight) = $it")
}
}
/**
* Poll on time boundaries. Per Issue #95, we want to avoid exposing computation time to a
* network observer. Instead, we poll at regular time intervals that are large enough for all
* computation to complete so no intervals are skipped. See 95 for more details.
*
* @param fastIntervalDesired currently not used but sometimes we want to poll quickly, such as
* when we unexpectedly lose server connection or are waiting for an event to happen on the
* chain. We can pass this desire along now and later figure out how to handle it, privately.
*/
@Suppress("UNUSED_PARAMETER")
private fun calculatePollInterval(fastIntervalDesired: Boolean = false): Long {
val interval = POLL_INTERVAL
val now = System.currentTimeMillis()
val deltaToNextInteral = interval - (now + interval).rem(interval)
// twig("sleeping for ${deltaToNextInteral}ms from $now in order to wake at ${now + deltaToNextInteral}")
return deltaToNextInteral
}
suspend fun calculateBirthdayHeight(): BlockHeight {
var oldestTransactionHeight: BlockHeight? = null
@Suppress("TooGenericExceptionCaught")
try {
val tempOldestTransactionHeight = repository.getOldestTransaction()?.minedHeight
?: lowerBoundHeight
// to be safe adjust for reorgs (and generally a little cushion is good for privacy)
// so we round down to the nearest 100 and then subtract 100 to ensure that the result is always at least
// 100 blocks away
oldestTransactionHeight = BlockHeight.new(
network,
tempOldestTransactionHeight.value -
tempOldestTransactionHeight.value.rem(MAX_REORG_SIZE) - MAX_REORG_SIZE.toLong()
)
} catch (t: Throwable) {
twig("failed to calculate birthday due to: $t")
}
return buildList<BlockHeight> {
add(lowerBoundHeight)
add(rustBackend.network.saplingActivationHeight)
oldestTransactionHeight?.let { add(it) }
}.maxOf { it }
}
/**
* Get the height of the last block that was downloaded by this processor.
*
* @return the last downloaded height reported by the downloader.
*/
suspend fun getLastDownloadedHeight() =
downloader.getLastDownloadedHeight()
/**
* Get the height of the last block that was scanned by this processor.
*
* @return the last scanned height reported by the repository.
*/
suspend fun getLastScannedHeight() =
repository.lastScannedHeight()
// CompactBlockProcessor is the wrong place for this, but it's where all the other APIs that need
// access to the RustBackend live. This should be refactored.
internal suspend fun createAccount(seed: ByteArray): UnifiedSpendingKey =
rustBackend.createAccount(seed)
/**
* Get the current unified address for the given wallet account.
*
* @return the current unified address of this account.
*/
suspend fun getCurrentAddress(account: Account) =
rustBackend.getCurrentAddress(account.value)
/**
* Get the legacy Sapling address corresponding to the current unified address for the given wallet account.
*
* @return a Sapling address.
*/
suspend fun getLegacySaplingAddress(account: Account) =
rustBackend.getSaplingReceiver(
rustBackend.getCurrentAddress(account.value)
)
?: throw InitializeException.MissingAddressException("legacy Sapling")
/**
* Get the legacy transparent address corresponding to the current unified address for the given wallet account.
*
* @return a transparent address.
*/
suspend fun getTransparentAddress(account: Account) =
rustBackend.getTransparentReceiver(
rustBackend.getCurrentAddress(account.value)
)
?: throw InitializeException.MissingAddressException("legacy transparent")
/**
* Calculates the latest balance info.
*
* @param account the account to check for balance info.
*
* @return an instance of WalletBalance containing information about available and total funds.
*/
suspend fun getBalanceInfo(account: Account): WalletBalance =
twigTask("checking balance info", -1) {
@Suppress("TooGenericExceptionCaught")
try {
val balanceTotal = rustBackend.getBalance(account.value)
twig("found total balance: $balanceTotal")
val balanceAvailable = rustBackend.getVerifiedBalance(account.value)
twig("found available balance: $balanceAvailable")
WalletBalance(balanceTotal, balanceAvailable)
} catch (t: Throwable) {
twig("failed to get balance due to $t")
throw RustLayerException.BalanceException(t)
}
}
suspend fun getUtxoCacheBalance(address: String): WalletBalance =
rustBackend.getDownloadedUtxoBalance(address)
/**
* Transmits the given state for this processor.
*/
private suspend fun setState(newState: State) {
_state.value = newState
}
/**
* Sealed class representing the various states of this processor.
*/
sealed class State {
/**
* Marker interface for [State] instances that represent when the wallet is connected.
*/
interface Connected
/**
* Marker interface for [State] instances that represent when the wallet is syncing.
*/
interface Syncing
/**
* [State] for when the wallet is actively downloading compact blocks because the latest
* block height available from the server is greater than what we have locally. We move out
* of this state once our local height matches the server.
*/
object Downloading : Connected, Syncing, State()
/**
* [State] for when the blocks that have been downloaded are actively being validated to
* ensure that there are no gaps and that every block is chain-sequential to the previous
* block, which determines whether a reorg has happened on our watch.
*/
object Validating : Connected, Syncing, State()
/**
* [State] for when the blocks that have been downloaded are actively being decrypted.
*/
object Scanning : Connected, Syncing, State()
/**
* [State] for when we are done decrypting blocks, for now.
*/
class Scanned(val scannedRange: ClosedRange<BlockHeight>?) : Connected, Syncing, State()
/**
* [State] for when transaction details are being retrieved. This typically means the wallet
* has downloaded and scanned blocks and is now processing any transactions that were
* discovered. Once a transaction is discovered, followup network requests are needed in
* order to retrieve memos or outbound transaction information, like the recipient address.
* The existing information we have about transactions is enhanced by the new information.
*/
object Enhancing : Connected, Syncing, State()
/**
* [State] for when we have no connection to lightwalletd.
*/
object Disconnected : State()
/**
* [State] for when [stop] has been called. For simplicity, processors should not be
* restarted but they are not prevented from this behavior.
*/
object Stopped : State()
/**
* [State] the initial state of the processor, once it is constructed.
*/
object Initialized : State()
}
/**
* Data class for holding detailed information about the processor.
*
* @param networkBlockHeight the latest block available to lightwalletd that may or may not be
* downloaded by this wallet yet.
* @param lastScannedHeight the height up to which the wallet last scanned. This determines
* where the next scan will begin.
* @param lastDownloadedHeight the last compact block that was successfully downloaded.
*
* @param lastDownloadRange inclusive range to download. Meaning, if the range is 10..10,
* then we will download exactly block 10. If the range is 11..10, then we want to download
* block 11 but can't.
* @param lastScanRange inclusive range to scan.
*/
data class ProcessorInfo(
val networkBlockHeight: BlockHeight?,
val lastScannedHeight: BlockHeight?,
val lastDownloadedHeight: BlockHeight?,
val lastDownloadRange: ClosedRange<BlockHeight>?,
val lastScanRange: ClosedRange<BlockHeight>?
) {
/**
* Determines whether this instance has data.
*
* @return false when all values match their defaults.
*/
val hasData
get() = networkBlockHeight != null ||
lastScannedHeight != null ||
lastDownloadedHeight != null ||
lastDownloadRange != null ||
lastScanRange != null
/**
* Determines whether this instance is actively downloading compact blocks.
*
* @return true when there are more than zero blocks remaining to download.
*/
val isDownloading: Boolean
get() =
lastDownloadedHeight != null &&
lastDownloadRange != null &&
!lastDownloadRange.isEmpty() &&
lastDownloadedHeight < lastDownloadRange.endInclusive
/**
* Determines whether this instance is actively scanning or validating compact blocks.
*
* @return true when downloading has completed and there are more than zero blocks remaining
* to be scanned.
*/
val isScanning: Boolean
get() =
!isDownloading &&
lastScannedHeight != null &&
lastScanRange != null &&
!lastScanRange.isEmpty() &&
lastScannedHeight < lastScanRange.endInclusive
/**
* The amount of scan progress from 0 to 100.
*/
@Suppress("MagicNumber")
val scanProgress
get() = when {
lastScannedHeight == null -> 0
lastScanRange == null -> 100
lastScannedHeight >= lastScanRange.endInclusive -> 100
else -> {
// when lastScannedHeight == lastScanRange.first, we have scanned one block, thus the offsets
val blocksScanned =
(lastScannedHeight.value - lastScanRange.start.value + 1).coerceAtLeast(0)
// we scan the range inclusively so 100..100 is one block to scan, thus the offset
val numberOfBlocks =
lastScanRange.endInclusive.value - lastScanRange.start.value + 1
// take the percentage then convert and round
((blocksScanned.toFloat() / numberOfBlocks) * 100.0f).let { percent ->
percent.coerceAtMost(100.0f).roundToInt()
}
}
}
}
data class ValidationErrorInfo(
val errorHeight: BlockHeight,
val hash: String?,
val expectedPrevHash: String?,
val actualPrevHash: String?
)
//
// Helper Extensions
//
/**
* Log the mutex in great detail just in case we need it for troubleshooting deadlock.
*/
private suspend inline fun <T> Mutex.withLockLogged(name: String, block: () -> T): T {
twig("$name MUTEX: acquiring lock...", -1)
this.withLock {
twig("$name MUTEX: ...lock acquired!", -1)
return block().also {
twig("$name MUTEX: releasing lock", -1)
}
}
}
}
private fun LightWalletEndpointInfoUnsafe.matchingNetwork(network: String): Boolean {
fun String.toId() = lowercase(Locale.ROOT).run {
when {
contains("main") -> "mainnet"
contains("test") -> "testnet"
else -> this
}
}
return chainName.toId() == network.toId()
}
private fun max(a: BlockHeight?, b: BlockHeight) = if (null == a) {
b
} else if (a.value > b.value) {
a
} else {
b
}