New: Add support for rewinding and wiping data.

A simple rewind feature in a wallet can recover from a wide variety of issues.
This commit is contained in:
Kevin Gorham 2021-04-09 21:25:21 -04:00
parent 1eb1a6aa8e
commit 5133306c08
No known key found for this signature in database
GPG Key ID: CCA55602DF49FC38
7 changed files with 82 additions and 20 deletions

View File

@ -454,6 +454,16 @@ class Initializer constructor(appContext: Context, onCriticalErrorHandler: ((Thr
return File(parentDir, "$prefix${network.networkName}_$dbFileName").absolutePath
}
/**
* Delete a database and it's potential journal file at the given path.
*
* @param filePath the path of the db to erase.
* @return true when a file exists at the given path and was deleted.
*/
private fun deleteDb(filePath: String): Boolean {
// just try the journal file. Doesn't matter if it's not there.
delete("$filePath-journal")
return delete(filePath)
}
/**

View File

@ -48,6 +48,8 @@ import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.isActive
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import kotlinx.coroutines.withContext
import java.util.Locale
import java.util.concurrent.atomic.AtomicInteger
@ -117,6 +119,7 @@ class CompactBlockProcessor(
private val _state: ConflatedBroadcastChannel<State> = ConflatedBroadcastChannel(Initialized)
private val _progress = ConflatedBroadcastChannel(0)
private val _processorInfo = ConflatedBroadcastChannel(ProcessorInfo())
private val processingMutex = Mutex()
/**
* Flow of birthday heights. The birthday is essentially the first block that the wallet cares
@ -176,7 +179,9 @@ class CompactBlockProcessor(
// (because you can start and then immediately set isStopped=true to always get precisely one loop)
do {
retryWithBackoff(::onProcessorError, maxDelayMillis = MAX_BACKOFF_INTERVAL) {
val result = processNewBlocks()
val result = processingMutex.withLockLogged("processNewBlocks") {
processNewBlocks()
}
// immediately process again after failures in order to download new blocks right away
if (result == ERROR_CODE_RECONNECT) {
val napTime = calculatePollInterval(true)
@ -592,24 +597,39 @@ class CompactBlockProcessor(
* blocks. Otherwise, the cached blocks will be used in the rescan, which in most cases, is fine.
*/
suspend fun rewindToHeight(height: Int, alsoClearBlockCache: Boolean = false) = withContext(IO) {
val lastScannedHeight = currentInfo.lastScannedHeight
twig("Rewinding from $lastScannedHeight to height: $height")
// TODO: think about how we might pause all processing during a rewind
if (height < lastScannedHeight) {
rustBackend.rewindToHeight(height)
} else {
twig("not rewinding dataDb because the last scanned height is $lastScannedHeight which is less than the target height of $height")
}
processingMutex.withLockLogged("rewindToHeight") {
val lastScannedHeight = currentInfo.lastScannedHeight
val targetHeight = determineTargetHeight(height)
twig("Rewinding from $lastScannedHeight to requested height: $height using target height: $targetHeight")
// TODO: think about how we might pause all processing during a rewind
if (targetHeight < lastScannedHeight) {
rustBackend.rewindToHeight(targetHeight)
} else {
twig("not rewinding dataDb because the last scanned height is $lastScannedHeight which is less than the target height of $targetHeight")
}
if (alsoClearBlockCache) {
twig("Also clearing block cache back to $height. These rewound blocks will download in the next scheduled scan")
downloader.rewindToHeight(height)
// communicate that the wallet is no longer synced because it might remain this way for 20+ seconds because we only download on 20s time boundaries so we can't trigger any immediate action
setState(Downloading)
} else {
val range = (height + 1)..lastScannedHeight
twig("We kept the cache blocks in place so we don't need to wait for the next scheduled download to rescan. Instead we will rescan and validate blocks ${range.first}..${range.last}")
if (validateAndScanNewBlocks(range) == ERROR_CODE_NONE) enhanceTransactionDetails(range)
if (alsoClearBlockCache) {
twig("Also clearing block cache back to $targetHeight. These rewound blocks will download in the next scheduled scan")
downloader.rewindToHeight(targetHeight)
// communicate that the wallet is no longer synced because it might remain this way for 20+ seconds because we only download on 20s time boundaries so we can't trigger any immediate action
setState(Downloading)
updateProgress(
lastScannedHeight = targetHeight,
lastDownloadedHeight = targetHeight,
lastScanRange = (targetHeight + 1)..currentInfo.networkBlockHeight,
lastDownloadRange = (targetHeight + 1)..currentInfo.networkBlockHeight
)
_progress.send(0)
} else {
updateProgress(
lastScannedHeight = targetHeight,
lastScanRange = (targetHeight + 1)..currentInfo.networkBlockHeight,
)
_progress.send(0)
val range = (targetHeight + 1)..lastScannedHeight
twig("We kept the cache blocks in place so we don't need to wait for the next scheduled download to rescan. Instead we will rescan and validate blocks ${range.first}..${range.last}")
if (validateAndScanNewBlocks(range) == ERROR_CODE_NONE) enhanceTransactionDetails(range)
}
}
}
@ -931,6 +951,20 @@ class CompactBlockProcessor(
return chainName.toId() == network.toId()
}
/**
* Log the mutex in great detail just in case we need it for troubleshooting deadlock.
*/
private suspend inline fun <T> Mutex.withLockLogged(name: String, block: () -> T): T {
twig("$name MUTEX: acquiring lock...")
this.withLock {
twig("$name MUTEX: ...lock acquired!")
return block().also {
twig("$name MUTEX: releasing lock")
}
}
twig("$name MUTEX: withLock complete")
}
companion object {
const val ERROR_CODE_NONE = -1
const val ERROR_CODE_RECONNECT = 20

View File

@ -42,7 +42,7 @@ interface CompactBlockDao {
@Insert(onConflict = OnConflictStrategy.REPLACE)
fun insert(block: List<CompactBlockEntity>)
@Query("DELETE FROM compactblocks WHERE height >= :height")
@Query("DELETE FROM compactblocks WHERE height > :height")
fun rewindTo(height: Int)
@Query("SELECT MAX(height) FROM compactblocks")

View File

@ -200,6 +200,9 @@ interface BlockDao {
@Query("SELECT MAX(height) FROM blocks")
fun lastScannedHeight(): Int
@Query("SELECT MIN(height) FROM blocks")
fun firstScannedHeight(): Int
@Query("SELECT hash FROM BLOCKS WHERE height = :height")
fun findHashByHeight(height: Int): ByteArray?
}

View File

@ -97,7 +97,11 @@ class RustBackend private constructor() : RustBackendWelding {
override fun validateCombinedChain() = validateCombinedChain(pathCacheDb, pathDataDb, networkId = network.id,)
override fun rewindToHeight(height: Int) = rewindToHeight(pathDataDb, height)
/**
* Deletes data for all blocks above the given height. Boils down to:
*
* DELETE FROM blocks WHERE height > ?
*/
override fun rewindToHeight(height: Int) = rewindToHeight(pathDataDb, height, networkId = network.id)
override fun scanBlocks(limit: Int): Boolean {

View File

@ -72,6 +72,10 @@ open class PagedTransactionRepository(
return blocks.lastScannedHeight()
}
override fun firstScannedHeight(): Int {
return blocks.firstScannedHeight()
}
override fun isInitialized(): Boolean {
return blocks.count() > 0
}

View File

@ -18,6 +18,13 @@ interface TransactionRepository {
*/
fun lastScannedHeight(): Int
/**
* The height of the first block in this repository. This is typically the checkpoint that was
* used to initialize this wallet. If we overwrite this block, it breaks our ability to spend
* funds.
*/
fun firstScannedHeight(): Int
/**
* Returns true when this repository has been initialized and seeded with the initial checkpoint.
*