Add logic for handling reorgs and simplify synchronization.

This commit is contained in:
Kevin Gorham 2019-06-14 19:24:52 -04:00 committed by Kevin Gorham
parent b14401eaeb
commit 4c635362a8
14 changed files with 589 additions and 648 deletions

View File

@ -0,0 +1,46 @@
package cash.z.wallet.sdk.block
import android.content.Context
import androidx.room.Room
import androidx.room.RoomDatabase
import cash.z.wallet.sdk.dao.CompactBlockDao
import cash.z.wallet.sdk.db.CompactBlockDb
import cash.z.wallet.sdk.entity.CompactBlock
import cash.z.wallet.sdk.ext.SAPLING_ACTIVATION_HEIGHT
import kotlinx.coroutines.Dispatchers.IO
import kotlinx.coroutines.withContext
class CompactBlockDbStore(
applicationContext: Context,
cacheDbName: String
) : CompactBlockStore {
private val cacheDao: CompactBlockDao
private val cacheDb: CompactBlockDb
init {
cacheDb = createCompactBlockCacheDb(applicationContext, cacheDbName)
cacheDao = cacheDb.complactBlockDao()
}
private fun createCompactBlockCacheDb(applicationContext: Context, cacheDbName: String): CompactBlockDb {
return Room.databaseBuilder(applicationContext, CompactBlockDb::class.java, cacheDbName)
.setJournalMode(RoomDatabase.JournalMode.TRUNCATE)
// this is a simple cache of blocks. destroying the db should be benign
.fallbackToDestructiveMigration()
.build()
}
override suspend fun getLatestHeight(): Int = withContext(IO) {
val lastBlock = Math.max(0, cacheDao.latestBlockHeight() - 1)
if (lastBlock < SAPLING_ACTIVATION_HEIGHT) -1 else lastBlock
}
override suspend fun write(result: List<CompactBlock>) = withContext(IO) {
cacheDao.insert(result)
}
override suspend fun rewindTo(height: Int) = withContext(IO) {
cacheDao.rewindTo(height)
}
}

View File

@ -0,0 +1,41 @@
package cash.z.wallet.sdk.block
import cash.z.wallet.sdk.data.twig
import cash.z.wallet.sdk.service.LightWalletService
import kotlinx.coroutines.Dispatchers.IO
import kotlinx.coroutines.withContext
/**
* Serves as a source of compact blocks received from the light wallet server. Once started, it will use the given
* lightwallet service to request all the appropriate blocks and compact block store to persist them. By delegating to
* these dependencies, the downloader remains agnostic to the particular implementation of how to retrieve and store
* data; although, by default the SDK uses gRPC and SQL.
*
* @property lightwalletService the service used for requesting compact blocks
* @property compactBlockStore responsible for persisting the compact blocks that are received
*/
open class CompactBlockDownloader(
val lightwalletService: LightWalletService,
val compactBlockStore: CompactBlockStore
) {
suspend fun downloadBlockRange(heightRange: IntRange) = withContext(IO) {
val result = lightwalletService.getBlockRange(heightRange)
compactBlockStore.write(result)
}
suspend fun rewindTo(height: Int) = withContext(IO) {
// TODO: cancel anything in flight
compactBlockStore.rewindTo(height)
}
suspend fun getLatestBlockHeight() = withContext(IO) {
lightwalletService.getLatestBlockHeight()
}
suspend fun getLastDownloadedHeight() = withContext(IO) {
compactBlockStore.getLatestHeight()
}
}

View File

@ -0,0 +1,209 @@
package cash.z.wallet.sdk.block
import androidx.annotation.VisibleForTesting
import cash.z.wallet.sdk.annotation.OpenForTesting
import cash.z.wallet.sdk.data.TransactionRepository
import cash.z.wallet.sdk.data.Twig
import cash.z.wallet.sdk.data.twig
import cash.z.wallet.sdk.exception.CompactBlockProcessorException
import cash.z.wallet.sdk.ext.*
import cash.z.wallet.sdk.jni.RustBackend
import cash.z.wallet.sdk.jni.RustBackendWelding
import kotlinx.coroutines.Dispatchers.IO
import kotlinx.coroutines.channels.ConflatedBroadcastChannel
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.delay
import kotlinx.coroutines.isActive
import kotlinx.coroutines.withContext
import java.io.File
import java.util.concurrent.atomic.AtomicInteger
/**
* Responsible for processing the compact blocks that are received from the lightwallet server. This class encapsulates
* all the business logic required to validate and scan the blockchain and is therefore tightly coupled with
* librustzcash.
*/
@OpenForTesting
class CompactBlockProcessor(
internal val config: ProcessorConfig,
internal val downloader: CompactBlockDownloader,
private val repository: TransactionRepository,
private val rustBackend: RustBackendWelding = RustBackend()
) {
private val progressChannel = ConflatedBroadcastChannel<Int>()
private var isStopped = false
private val consecutiveErrors = AtomicInteger(0)
fun progress(): ReceiveChannel<Int> = progressChannel.openSubscription()
/**
* Download compact blocks, verify and scan them.
*/
suspend fun start() = withContext(IO) {
twig("processor starting")
validateConfig()
// using do/while makes it easier to execute exactly one loop which helps with testing this processor quickly
do {
retryUpTo(config.retries) {
val result = processNewBlocks()
// immediately process again after failures in order to download new blocks right away
if (result < 0) {
consecutiveErrors.set(0)
twig("Successfully processed new blocks. Sleeping for ${config.blockPollFrequencyMillis}ms")
delay(config.blockPollFrequencyMillis)
} else {
if(consecutiveErrors.get() >= config.retries) {
val errorMessage = "ERROR: unable to resolve reorg at height $result after ${consecutiveErrors.get()} correction attempts!"
fail(CompactBlockProcessorException.FailedReorgRepair(errorMessage))
} else {
handleChainError(result)
}
consecutiveErrors.getAndIncrement()
}
}
} while (isActive && !isStopped)
twig("processor complete")
stop()
}
/**
* Validate the config to expose a common pitfall.
*/
private fun validateConfig() {
if(!config.cacheDbPath.contains(File.separator))
throw CompactBlockProcessorException.FileInsteadOfPath(config.cacheDbPath)
if(!config.dataDbPath.contains(File.separator))
throw CompactBlockProcessorException.FileInsteadOfPath(config.dataDbPath)
}
fun stop() {
isStopped = true
}
fun fail(error: Throwable) {
stop()
twig("${error.message}")
throw error
}
/**
* Process new blocks returning false whenever an error was found.
*
* @return -1 when processing was successful and did not encounter errors during validation or scanning. Otherwise
* return the block height where an error was found.
*/
private suspend fun processNewBlocks(): Int {
twig("beginning to process new blocks...")
// define ranges
val latestBlockHeight = downloader.getLatestBlockHeight()
val lastDownloadedHeight = Math.max(getLastDownloadedHeight(), SAPLING_ACTIVATION_HEIGHT - 1)
val lastScannedHeight = getLastScannedHeight()
twig("latestBlockHeight: $latestBlockHeight\tlastDownloadedHeight: $lastDownloadedHeight" +
"\tlastScannedHeight: $lastScannedHeight")
// as long as the database has the sapling tree (like when it's initialized from a checkpoint) we can avoid
// downloading earlier blocks so take the larger of these two numbers
val rangeToDownload = (Math.max(lastDownloadedHeight, lastScannedHeight) + 1)..latestBlockHeight
val rangeToScan = (lastScannedHeight + 1)..latestBlockHeight
downloadNewBlocks(rangeToDownload)
val error = validateNewBlocks(rangeToScan)
return if (error < 0) {
scanNewBlocks(rangeToScan)
-1 // TODO: in theory scan should not fail when validate succeeds but maybe consider returning the failed block height whenever scan does fail
} else {
error
}
}
@VisibleForTesting //allow mocks to verify how this is called, rather than the downloader, which is more complex
internal suspend fun downloadNewBlocks(range: IntRange) {
if (range.isEmpty()) {
twig("no blocks to download")
return
}
Twig.sprout("downloading")
twig("downloading blocks in range $range")
var downloadedBlockHeight = range.start
val missingBlockCount = range.last - range.first + 1
val batches = (missingBlockCount / config.downloadBatchSize
+ (if (missingBlockCount.rem(config.downloadBatchSize) == 0) 0 else 1))
var progress: Int
twig("found $missingBlockCount missing blocks, downloading in $batches batches of ${config.downloadBatchSize}...")
for (i in 1..batches) {
retryUpTo(config.retries) {
val end = Math.min(range.first + (i * config.downloadBatchSize), range.last + 1)
val batchRange = downloadedBlockHeight..(end - 1)
twig("downloaded $batchRange (batch $i of $batches)") {
downloader.downloadBlockRange(batchRange)
}
progress = Math.round(i / batches.toFloat() * 100)
progressChannel.send(progress)
downloadedBlockHeight = end
}
}
Twig.clip("downloading")
}
private fun validateNewBlocks(range: IntRange?): Int {
if (range?.isEmpty() != false) {
twig("no blocks to validate: $range")
return -1
}
Twig.sprout("validating")
twig("validating blocks in range $range")
val result = rustBackend.validateCombinedChain(config.cacheDbPath, config.dataDbPath)
Twig.clip("validating")
return result
}
private fun scanNewBlocks(range: IntRange?): Boolean {
if (range?.isEmpty() != false) {
twig("no blocks to scan")
return true
}
Twig.sprout("scanning")
twig("scanning blocks in range $range")
val result = rustBackend.scanBlocks(config.cacheDbPath, config.dataDbPath)
Twig.clip("scanning")
return result
}
private suspend fun handleChainError(errorHeight: Int) = withContext(IO) {
val lowerBound = determineLowerBound(errorHeight)
twig("handling chain error at $errorHeight by rewinding to block $lowerBound")
rustBackend.rewindToHeight(config.dataDbPath, lowerBound)
downloader.rewindTo(lowerBound)
}
private fun determineLowerBound(errorHeight: Int): Int {
val offset = Math.min(MAX_REORG_SIZE, config.rewindDistance * (consecutiveErrors.get() + 1))
return Math.max(errorHeight - offset, SAPLING_ACTIVATION_HEIGHT)
}
suspend fun getLastDownloadedHeight() = withContext(IO) {
downloader.getLastDownloadedHeight()
}
suspend fun getLastScannedHeight() = withContext(IO) {
repository.lastScannedHeight()
}
}
/**
* @property cacheDbPath absolute file path of the DB where raw, unprocessed compact blocks are stored.
* @property dataDbPath absolute file path of the DB where all information derived from the cache DB is stored.
*/
data class ProcessorConfig(
val cacheDbPath: String = "",
val dataDbPath: String = "",
val downloadBatchSize: Int = DEFAULT_BATCH_SIZE,
val blockPollFrequencyMillis: Long = DEFAULT_POLL_INTERVAL,
val retries: Int = DEFAULT_RETRIES,
val rewindDistance: Int = DEFAULT_REWIND_DISTANCE
)

View File

@ -0,0 +1,26 @@
package cash.z.wallet.sdk.block
import cash.z.wallet.sdk.entity.CompactBlock
/**
* Interface for storing compact blocks.
*/
interface CompactBlockStore {
/**
* Gets the highest block that is currently stored.
*/
suspend fun getLatestHeight(): Int
/**
* Write the given blocks to this store, which may be anything from an in-memory cache to a DB.
*/
suspend fun write(result: List<CompactBlock>)
/**
* Remove every block above and including the given height.
*
* After this operation, the data store will look the same as one that has not yet stored the given block height.
* Meaning, if max height is 100 block and rewindTo(50) is called, then the highest block remaining will be 49.
*/
suspend fun rewindTo(height: Int)
}

View File

@ -8,9 +8,15 @@ import cash.z.wallet.sdk.entity.CompactBlock
@Dao
interface CompactBlockDao {
@Insert(onConflict = OnConflictStrategy.IGNORE)
@Insert(onConflict = OnConflictStrategy.REPLACE)
fun insert(block: CompactBlock)
@Insert(onConflict = OnConflictStrategy.REPLACE)
fun insert(block: List<CompactBlock>)
@Query("DELETE FROM compactblocks WHERE height >= :height")
fun rewindTo(height: Int)
@Query("SELECT MAX(height) FROM compactblocks")
fun latestBlockHeight(): Int
}

View File

@ -1,122 +0,0 @@
package cash.z.wallet.sdk.data
import android.content.Context
import androidx.room.Room
import androidx.room.RoomDatabase
import cash.z.wallet.sdk.dao.CompactBlockDao
import cash.z.wallet.sdk.db.CompactBlockDb
import cash.z.wallet.sdk.exception.CompactBlockProcessorException
import cash.z.wallet.sdk.jni.RustBackend
import cash.z.wallet.sdk.jni.RustBackendWelding
import cash.z.wallet.sdk.rpc.CompactFormats
import kotlinx.coroutines.Dispatchers.IO
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.isActive
import kotlinx.coroutines.withContext
import java.io.File
/**
* Responsible for processing the blocks on the stream. Saves them to the cacheDb and periodically scans for transactions.
*
* @property applicationContext used to connect to the DB on the device. No reference is kept beyond construction.
*/
class CompactBlockProcessor(
applicationContext: Context,
val rustBackend: RustBackendWelding = RustBackend(),
cacheDbName: String = DEFAULT_CACHE_DB_NAME,
dataDbName: String = DEFAULT_DATA_DB_NAME
) {
internal val cacheDao: CompactBlockDao
private val cacheDb: CompactBlockDb
private val cacheDbPath: String
private val dataDbPath: String
val dataDbExists get() = File(dataDbPath).exists()
val cachDbExists get() = File(cacheDbPath).exists()
init {
cacheDb = createCompactBlockCacheDb(applicationContext, cacheDbName)
cacheDao = cacheDb.complactBlockDao()
cacheDbPath = applicationContext.getDatabasePath(cacheDbName).absolutePath
dataDbPath = applicationContext.getDatabasePath(dataDbName).absolutePath
}
private fun createCompactBlockCacheDb(applicationContext: Context, cacheDbName: String): CompactBlockDb {
return Room.databaseBuilder(applicationContext, CompactBlockDb::class.java, cacheDbName)
.setJournalMode(RoomDatabase.JournalMode.TRUNCATE)
// this is a simple cache of blocks. destroying the db should be benign
.fallbackToDestructiveMigration()
.build()
}
/**
* Save blocks and periodically scan them.
*/
suspend fun processBlocks(incomingBlocks: ReceiveChannel<CompactFormats.CompactBlock>) = withContext(IO) {
ensureDataDb()
twigTask("processing blocks") {
var lastScanTime = System.currentTimeMillis()
var hasScanned = false
while (isActive && !incomingBlocks.isClosedForReceive) {
twig("awaiting next block")
val nextBlock = incomingBlocks.receive()
val nextBlockHeight = nextBlock.height
twig("received block with height ${nextBlockHeight} on thread ${Thread.currentThread().name}")
cacheDao.insert(cash.z.wallet.sdk.entity.CompactBlock(nextBlockHeight.toInt(), nextBlock.toByteArray()))
if (shouldScanBlocks(lastScanTime, hasScanned)) {
twig("last block prior to scan ${nextBlockHeight}")
scanBlocks()
lastScanTime = System.currentTimeMillis()
hasScanned = true
}
}
cacheDb.close()
}
}
private fun ensureDataDb() {
if (!dataDbExists) throw CompactBlockProcessorException.DataDbMissing(dataDbPath)
}
private fun shouldScanBlocks(lastScanTime: Long, hasScanned: Boolean): Boolean {
val deltaTime = System.currentTimeMillis() - lastScanTime
twig("${deltaTime}ms since last scan. Have we ever scanned? $hasScanned")
return (!hasScanned && deltaTime > INITIAL_SCAN_DELAY)
|| deltaTime > SCAN_FREQUENCY
}
suspend fun scanBlocks() = withContext(IO) {
Twig.sprout("scan")
twigTask("scanning blocks") {
if (isActive) {
try {
rustBackend.scanBlocks(cacheDbPath, dataDbPath)
} catch (t: Throwable) {
twig("error while scanning blocks: $t")
}
}
}
Twig.clip("scan")
}
/**
* Returns the height of the last processed block or -1 if no blocks have been processed.
*/
suspend fun lastProcessedBlock(): Int = withContext(IO) {
val lastBlock = Math.max(0, cacheDao.latestBlockHeight() - 1)
if (lastBlock < SAPLING_ACTIVATION_HEIGHT) -1 else lastBlock
}
companion object {
const val DEFAULT_CACHE_DB_NAME = "DownloadedCompactBlocks.db"
const val DEFAULT_DATA_DB_NAME = "CompactBlockScanResults.db"
/** Default amount of time to synchronize before initiating the first scan. This allows time to download a few blocks. */
const val INITIAL_SCAN_DELAY = 3000L
/** Minimum amount of time between scans. The frequency with which we check whether the block height has changed and, if so, trigger a scan */
const val SCAN_FREQUENCY = 75_000L
// TODO: find a better home for this constant
const val SAPLING_ACTIVATION_HEIGHT = 280_000
}
}

View File

@ -1,214 +0,0 @@
package cash.z.wallet.sdk.data
import cash.z.wallet.sdk.exception.CompactBlockStreamException
import cash.z.wallet.sdk.ext.toBlockRange
import cash.z.wallet.sdk.rpc.CompactFormats.CompactBlock
import cash.z.wallet.sdk.rpc.CompactTxStreamerGrpc
import cash.z.wallet.sdk.rpc.CompactTxStreamerGrpc.CompactTxStreamerBlockingStub
import cash.z.wallet.sdk.rpc.Service
import com.google.protobuf.ByteString
import io.grpc.Channel
import io.grpc.ManagedChannelBuilder
import kotlinx.coroutines.*
import kotlinx.coroutines.Dispatchers.IO
import kotlinx.coroutines.channels.BroadcastChannel
import kotlinx.coroutines.channels.ConflatedBroadcastChannel
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.channels.distinct
import java.io.Closeable
import java.util.concurrent.TimeUnit
/**
* Serves as a source of compact blocks received from the light wallet server. Once started, it will
* request all the appropriate blocks and then stream them into the channel returned when calling [start].
*/
class CompactBlockStream private constructor() {
lateinit var connection: Connection
// TODO: improve the creation of this channel (tweak its settings to use mobile device responsibly) and make sure it is properly cleaned up
constructor(host: String, port: Int) : this(
ManagedChannelBuilder.forAddress(host, port).usePlaintext().build()
)
constructor(channel: Channel) : this() {
connection = Connection(channel)
}
fun start(
scope: CoroutineScope,
startingBlockHeight: Int = Int.MAX_VALUE,
batchSize: Int = DEFAULT_BATCH_SIZE,
pollFrequencyMillis: Long = DEFAULT_POLL_INTERVAL
): ReceiveChannel<CompactBlock> {
if(connection.isClosed()) throw CompactBlockStreamException.ConnectionClosed
twig("starting")
scope.launch {
twig("preparing to stream blocks...")
delay(1000L) // TODO: we can probably get rid of this delay.
try {
connection.use {
twig("requesting latest block height")
var latestBlockHeight = it.getLatestBlockHeight()
twig("responded with latest block height of $latestBlockHeight")
if (startingBlockHeight < latestBlockHeight) {
twig("downloading missing blocks from $startingBlockHeight to $latestBlockHeight")
latestBlockHeight = it.downloadMissingBlocks(startingBlockHeight, batchSize)
twig("done downloading missing blocks")
}
it.streamBlocks(pollFrequencyMillis, latestBlockHeight)
}
} catch (t: Throwable) {
twig("throwing $t")
throw CompactBlockStreamException.FalseStart(t)
}
}
return connection.subscribe()
}
fun progress() = connection.progress().distinct()
fun stop() {
twig("stopping")
connection.close()
}
companion object {
const val DEFAULT_BATCH_SIZE = 10_000
const val DEFAULT_POLL_INTERVAL = 75_000L
const val DEFAULT_RETRIES = 5
}
inner class Connection(private val channel: Channel): Closeable {
private var job: Job? = null
private var syncJob: Job? = null
private val compactBlockChannel = BroadcastChannel<CompactBlock>(100)
private val latestBlockHeightChannel = ConflatedBroadcastChannel<Int>()
private val progressChannel = ConflatedBroadcastChannel<Int>()
fun createStub(timeoutMillis: Long = 60_000L): CompactTxStreamerBlockingStub {
return CompactTxStreamerGrpc.newBlockingStub(channel).withDeadlineAfter(timeoutMillis, TimeUnit.MILLISECONDS)
}
fun subscribe() = compactBlockChannel.openSubscription()
fun progress() = progressChannel.openSubscription()
fun latestHeights() = latestBlockHeightChannel.openSubscription()
/**
* Download all the missing blocks and return the height of the last block downloaded, which can be used to
* calculate the total number of blocks downloaded.
*/
suspend fun downloadMissingBlocks(startingBlockHeight: Int, batchSize: Int = DEFAULT_BATCH_SIZE): Int {
twig("downloadingMissingBlocks starting at $startingBlockHeight")
val latestBlockHeight = getLatestBlockHeight()
var downloadedBlockHeight = startingBlockHeight
// if blocks are missing then download them
if (startingBlockHeight < latestBlockHeight) {
val missingBlockCount = latestBlockHeight - startingBlockHeight + 1
val batches = missingBlockCount / batchSize + (if (missingBlockCount.rem(batchSize) == 0) 0 else 1)
var progress: Int
twig("found $missingBlockCount missing blocks, downloading in $batches batches...")
for (i in 1..batches) {
retryUpTo(DEFAULT_RETRIES) {
twig("beginning batch $i")
val end = Math.min(startingBlockHeight + (i * batchSize), latestBlockHeight + 1)
loadBlockRange(downloadedBlockHeight..(end-1))
progress = Math.round(i/batches.toFloat() * 100)
progressChannel.send(progress)
downloadedBlockHeight = end
twig("finished batch $i of $batches\n")
}
}
// progressChannel.cancel()
} else {
twig("no missing blocks to download!")
}
return downloadedBlockHeight
}
suspend fun getLatestBlockHeight(): Int = withContext(IO) {
createStub().getLatestBlock(Service.ChainSpec.newBuilder().build()).height.toInt()
}
suspend fun submitTransaction(raw: ByteArray) = withContext(IO) {
val request = Service.RawTransaction.newBuilder().setData(ByteString.copyFrom(raw)).build()
createStub().sendTransaction(request)
}
suspend fun streamBlocks(pollFrequencyMillis: Long = DEFAULT_POLL_INTERVAL, startingBlockHeight: Int = Int.MAX_VALUE) = withContext(IO) {
twig("streamBlocks started at $startingBlockHeight with interval $pollFrequencyMillis")
progressChannel.send(100) // anytime we make it to this method, we're done catching up
// start with the next block, unless we were asked to start before then
var nextBlockHeight = Math.min(startingBlockHeight, getLatestBlockHeight() + 1)
while (isActive && !compactBlockChannel.isClosedForSend) {
retryUpTo(DEFAULT_RETRIES) {
twig("polling for next block in stream on thread ${Thread.currentThread().name} . . .")
val latestBlockHeight = getLatestBlockHeight()
if (latestBlockHeight >= nextBlockHeight) {
twig("found a new block! (latest: $latestBlockHeight) on thread ${Thread.currentThread().name}")
loadBlockRange(nextBlockHeight..latestBlockHeight)
nextBlockHeight = latestBlockHeight + 1
} else {
twig("no new block yet (latest: $latestBlockHeight) on thread ${Thread.currentThread().name}")
}
twig("delaying $pollFrequencyMillis before polling for next block in stream")
delay(pollFrequencyMillis)
}
}
}
private suspend fun retryUpTo(retries: Int, initialDelay:Int = 10, block: suspend () -> Unit) {
var failedAttempts = 0
while (failedAttempts < retries) {
try {
block()
return
} catch (t: Throwable) {
failedAttempts++
if (failedAttempts >= retries) throw t
val duration = Math.pow(initialDelay.toDouble(), failedAttempts.toDouble()).toLong()
twig("failed due to $t retrying (${failedAttempts+1}/$retries) in ${duration}s...")
delay(duration)
}
}
}
suspend fun loadBlockRange(range: IntRange): Int = withContext(IO) {
twig("requesting block range $range on thread ${Thread.currentThread().name}")
val result = createStub(90_000L).getBlockRange(range.toBlockRange())
twig("done requesting block range")
var resultCount = 0
while (checkNextBlock(result)) { //calls result.hasNext, which blocks because we use a blockingStub
resultCount++
val nextBlock = result.next()
twig("...while loading block range $range, received new block ${nextBlock.height} on thread ${Thread.currentThread().name}. Sending...")
compactBlockChannel.send(nextBlock)
latestBlockHeightChannel.send(nextBlock.height.toInt())
twig("...done sending block ${nextBlock.height}")
}
twig("done loading block range $range")
resultCount
}
/* this helper method is used to allow for logic (like logging) before blocking on the current thread */
private fun checkNextBlock(result: MutableIterator<CompactBlock>): Boolean {
twig("awaiting next block...")
return result.hasNext()
}
fun isClosed(): Boolean {
return compactBlockChannel.isClosedForSend
}
override fun close() {
compactBlockChannel.cancel()
progressChannel.cancel()
syncJob?.cancel()
syncJob = null
job?.cancel()
job = null
}
}
}

View File

@ -1,6 +1,9 @@
package cash.z.wallet.sdk.data
import cash.z.wallet.sdk.block.CompactBlockProcessor
import cash.z.wallet.sdk.block.ProcessorConfig
import cash.z.wallet.sdk.dao.WalletTransaction
import cash.z.wallet.sdk.ext.MINERS_FEE_ZATOSHI
import cash.z.wallet.sdk.secure.Wallet
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.ConflatedBroadcastChannel
@ -24,7 +27,6 @@ import kotlin.random.nextLong
* will send regular updates such that it reaches 100 in this amount of time.
* @param activeTransactionUpdateFrequency the amount of time in milliseconds between updates to an active
* transaction's state. Active transactions move through their lifecycle and increment their state at this rate.
* @param isFirstRun whether this Mock should return `true` for isFirstRun. Defaults to a random boolean.
* @param isStale whether this Mock should return `true` for isStale. When null, this will follow the default behavior
* of returning true about 10% of the time.
* @param onSynchronizerErrorListener presently ignored because there are not yet any errors in mock.
@ -33,7 +35,6 @@ open class MockSynchronizer(
private val transactionInterval: Long = 30_000L,
private val initialLoadDuration: Long = 5_000L,
private val activeTransactionUpdateFrequency: Long = 3_000L,
private val isFirstRun: Boolean = Random.nextBoolean(),
private var isStale: Boolean? = null,
override var onSynchronizerErrorListener: ((Throwable?) -> Boolean)? = null // presently ignored (there are no errors in mock yet)
) : Synchronizer, CoroutineScope {
@ -96,14 +97,6 @@ open class MockSynchronizer(
return result
}
/**
* Returns [isFirstRun] as provided during initialization of this MockSynchronizer.
*/
override suspend fun isFirstRun(): Boolean {
twig("checking isFirstRun: $isFirstRun")
return isFirstRun
}
/**
* Returns the [mockAddress]. This address is not usable.
*/
@ -116,7 +109,7 @@ open class MockSynchronizer(
if (transactions.size != 0) {
return transactions.fold(0L) { acc, tx ->
if (tx.isSend && tx.isMined) acc - tx.value else acc + tx.value
} - 10_000L // miner's fee
} - MINERS_FEE_ZATOSHI
}
return 0L
}
@ -262,7 +255,7 @@ open class MockSynchronizer(
if (tx.isSend && tx.isMined) acc - tx.value else acc + tx.value
}
}
balanceChannel.send(Wallet.WalletBalance(balance, balance - 10000 /* miner's fee */))
balanceChannel.send(Wallet.WalletBalance(balance, balance - MINERS_FEE_ZATOSHI))
}
// other collaborators add to the list, periodically. This simulates, real-world, non-distinct updates.
delay(Random.nextLong(transactionInterval / 2))

View File

@ -1,8 +1,9 @@
package cash.z.wallet.sdk.data
import cash.z.wallet.sdk.block.CompactBlockProcessor
import cash.z.wallet.sdk.dao.WalletTransaction
import cash.z.wallet.sdk.data.SdkSynchronizer.SyncState.*
import cash.z.wallet.sdk.exception.SynchronizerException
import cash.z.wallet.sdk.exception.WalletException
import cash.z.wallet.sdk.secure.Wallet
import kotlinx.coroutines.*
import kotlinx.coroutines.Dispatchers.IO
@ -18,7 +19,6 @@ import kotlin.coroutines.CoroutineContext
* Another way of thinking about this class is the reference that demonstrates how all the pieces can be tied
* together.
*
* @param downloader the component that downloads compact blocks and exposes them as a stream
* @param processor the component that saves the downloaded compact blocks to the cache and then scans those blocks for
* data related to this wallet.
* @param repository the component that exposes streams of wallet transaction information.
@ -31,14 +31,11 @@ import kotlin.coroutines.CoroutineContext
* number represents the number of milliseconds the synchronizer will wait before checking for newly mined blocks.
*/
class SdkSynchronizer(
private val downloader: CompactBlockStream,
private val processor: CompactBlockProcessor,
private val repository: TransactionRepository,
private val activeTransactionManager: ActiveTransactionManager,
private val wallet: Wallet,
private val batchSize: Int = 1000,
private val staleTolerance: Int = 10,
private val blockPollFrequency: Long = CompactBlockStream.DEFAULT_POLL_INTERVAL
private val staleTolerance: Int = 10
) : Synchronizer {
/**
@ -103,7 +100,13 @@ class SdkSynchronizer(
failure = null
blockJob = parentScope.launch(CoroutineExceptionHandler(exceptionHandler)) {
supervisorScope {
continueWithState(determineState())
try {
wallet.initialize()
} catch (e: WalletException.AlreadyInitializedException) {
twig("Warning: wallet already initialized but this is safe to ignore " +
"because the SDK now automatically detects where to start downloading.")
}
onReady()
}
}
return this
@ -116,7 +119,6 @@ class SdkSynchronizer(
*/
override fun stop() {
twig("stopping")
downloader.stop().also { twig("downloader stopped") }
repository.stop().also { twig("repository stopped") }
activeTransactionManager.stop().also { twig("activeTransactionManager stopped") }
// TODO: investigate whether this is necessary and remove or improve, accordingly
@ -146,7 +148,7 @@ class SdkSynchronizer(
* switches from catching up on missed blocks to periodically monitoring for newly mined blocks.
*/
override fun progress(): ReceiveChannel<Int> {
return downloader.progress()
return processor.progress()
}
/**
@ -170,27 +172,15 @@ class SdkSynchronizer(
* @return true when the local data is significantly out of sync with the remote server and the app data is stale.
*/
override suspend fun isStale(): Boolean = withContext(IO) {
val latestBlockHeight = downloader.connection.getLatestBlockHeight()
val ourHeight = processor.cacheDao.latestBlockHeight()
val tolerance = 10
val latestBlockHeight = processor.downloader.getLatestBlockHeight()
val ourHeight = processor.downloader.getLastDownloadedHeight()
val tolerance = staleTolerance
val delta = latestBlockHeight - ourHeight
twig("checking whether out of sync. " +
"LatestHeight: $latestBlockHeight ourHeight: $ourHeight Delta: $delta tolerance: $tolerance")
delta > tolerance
}
/**
* A flag to indicate that the initial state of this synchronizer was firstRun. This is useful for knowing whether
* initializing the database is required and whether to show things like"first run walk-throughs."
*
* @return true when this synchronizer has not been run before on this device or when cache has been cleared since
* the last run.
*/
override suspend fun isFirstRun(): Boolean = withContext(IO) {
initialState is FirstRun
}
/* Operations */
/**
@ -234,80 +224,17 @@ class SdkSynchronizer(
// Private API
//
/**
* After determining the initial state, continue based on those findings.
*
* @param syncState the sync state found
*/
private fun CoroutineScope.continueWithState(syncState: SyncState): Job {
return when (syncState) {
FirstRun -> onFirstRun()
is CacheOnly -> onCacheOnly(syncState)
is ReadyToProcess -> onReady(syncState)
}
}
/**
* Logic for the first run. This is when the wallet gets initialized, which includes setting up the dataDB and
* preloading it with data corresponding to the wallet birthday.
*/
private fun CoroutineScope.onFirstRun(): Job {
twig("this appears to be a fresh install, beginning first run of application")
val firstRunStartHeight = wallet.initialize() // should get the latest sapling tree and return that height
twig("wallet firstRun returned a value of $firstRunStartHeight")
return continueWithState(ReadyToProcess(firstRunStartHeight))
}
/**
* Logic for starting the Synchronizer when no scans have yet occurred. Takes care of initializing the dataDb and
* then
*/
private fun CoroutineScope.onCacheOnly(syncState: CacheOnly): Job {
twig("we have cached blocks but no data DB, beginning pre-cached version of application")
val firstRunStartHeight = wallet.initialize(syncState.startingBlockHeight)
twig("wallet has already cached up to a height of $firstRunStartHeight")
return continueWithState(ReadyToProcess(firstRunStartHeight))
}
/**
* Logic for starting the Synchronizer once it is ready for processing. All starts eventually end with this method.
*/
private fun CoroutineScope.onReady(syncState: ReadyToProcess) = launch {
twig("synchronization is ready to begin at height ${syncState.startingBlockHeight}")
// TODO: for PIR concerns, introduce some jitter here for where, exactly, the downloader starts
val blockChannel =
downloader.start(
this,
syncState.startingBlockHeight,
batchSize,
pollFrequencyMillis = blockPollFrequency
)
launch { monitorProgress(downloader.progress()) }
private fun CoroutineScope.onReady() = launch {
twig("synchronization is ready to begin!")
launch { monitorTransactions(repository.allTransactions().distinct()) }
activeTransactionManager.start()
repository.start(this)
processor.processBlocks(blockChannel)
}
/**
* Monitor download progress in order to trigger a scan the moment all blocks have been received. This reduces the
* amount of time it takes to get accurate balance information since scan intervals are fairly long.
*/
private suspend fun monitorProgress(progressChannel: ReceiveChannel<Int>) = withContext(IO) {
twig("beginning to monitor download progress")
for (i in progressChannel) {
if(i >= 100) {
twig("triggering a proactive scan in a second because all missing blocks have been loaded")
delay(1000L)
launch {
twig("triggering proactive scan!")
processor.scanBlocks()
twig("done triggering proactive scan!")
}
break
}
}
twig("done monitoring download progress")
processor.start()
}
/**
@ -324,32 +251,6 @@ class SdkSynchronizer(
twig("done monitoring transactions in order to update the balance")
}
/**
* Determine the initial state of the data by checking whether the dataDB is initialized and the last scanned block
* height. This is considered a first run if no blocks have been processed.
*/
private suspend fun determineState(): SyncState = withContext(IO) {
twig("determining state (has the app run before, what block did we last see, etc.)")
initialState = if (processor.dataDbExists) {
val isInitialized = repository.isInitialized()
// this call blocks because it does IO
val startingBlockHeight = Math.max(processor.lastProcessedBlock(), repository.lastScannedHeight())
twig("cacheDb exists with last height of $startingBlockHeight and isInitialized = $isInitialized")
if (!repository.isInitialized()) FirstRun else ReadyToProcess(startingBlockHeight)
} else if(processor.cachDbExists) {
// this call blocks because it does IO
val startingBlockHeight = processor.lastProcessedBlock()
twig("cacheDb exists with last height of $startingBlockHeight")
if (startingBlockHeight <= 0) FirstRun else CacheOnly(startingBlockHeight)
} else {
FirstRun
}
twig("determined ${initialState::class.java.simpleName}")
initialState
}
/**
* Wraps exceptions, logs them and then invokes the [onSynchronizerErrorListener], if it exists.
*/

View File

@ -66,15 +66,6 @@ interface Synchronizer {
*/
suspend fun isStale(): Boolean
/**
* A flag to indicate that this is the first run of this Synchronizer on this device. This is useful for knowing
* whether to initialize databases or other required resources, as well as whether to show walk-throughs.
*
* @return true when this is the first run. Implementations can set criteria for that but typically it will be when
* the database needs to be initialized.
*/
suspend fun isFirstRun(): Boolean
/**
* Gets or sets a global error listener. This is a useful hook for handling unexpected critical errors.
*

View File

@ -0,0 +1,56 @@
package cash.z.wallet.sdk.service
import cash.z.wallet.sdk.entity.CompactBlock
import cash.z.wallet.sdk.ext.toBlockHeight
import cash.z.wallet.sdk.rpc.CompactFormats
import cash.z.wallet.sdk.rpc.CompactTxStreamerGrpc
import cash.z.wallet.sdk.rpc.Service
import com.google.protobuf.ByteString
import io.grpc.Channel
import io.grpc.ManagedChannelBuilder
import java.util.concurrent.TimeUnit
class LightWalletGrpcService(private val channel: Channel) : LightWalletService {
constructor(host: String, port: Int = 9067) : this(ManagedChannelBuilder.forAddress(host, port).usePlaintext().build())
/* LightWalletService implementation */
override fun getBlockRange(heightRange: IntRange): List<CompactBlock> {
return channel.createStub(90L).getBlockRange(heightRange.toBlockRange()).toList()
}
override fun getLatestBlockHeight(): Int {
return channel.createStub(10L).getLatestBlock(Service.ChainSpec.newBuilder().build()).height.toInt()
}
override fun submitTransaction(raw: ByteArray): Service.SendResponse {
val request = Service.RawTransaction.newBuilder().setData(ByteString.copyFrom(raw)).build()
return channel.createStub().sendTransaction(request)
}
//
// Utilities
//
private fun Channel.createStub(timeoutSec: Long = 60L): CompactTxStreamerGrpc.CompactTxStreamerBlockingStub =
CompactTxStreamerGrpc
.newBlockingStub(this)
.withDeadlineAfter(timeoutSec, TimeUnit.SECONDS)
private fun IntRange.toBlockRange(): Service.BlockRange =
Service.BlockRange.newBuilder()
.setStart(this.first.toBlockHeight())
.setEnd(this.last.toBlockHeight())
.build()
private fun Iterator<CompactFormats.CompactBlock>.toList(): List<CompactBlock> =
mutableListOf<CompactBlock>().apply {
while (hasNext()) {
val compactBlock = next()
this@apply += CompactBlock(compactBlock.height.toInt(), compactBlock.toByteArray())
}
}
}

View File

@ -0,0 +1,28 @@
package cash.z.wallet.sdk.service
import cash.z.wallet.sdk.entity.CompactBlock
import cash.z.wallet.sdk.rpc.Service
/**
* Service for interacting with lightwalletd. Implementers of this service should make blocking calls because
* async concerns are handled at a higher level.
*/
interface LightWalletService {
/**
* Return the given range of blocks.
*
* @param heightRange the inclusive range to fetch. For instance if 1..5 is given, then every block in that range
* will be fetched, including 1 and 5.
*/
fun getBlockRange(heightRange: IntRange): List<CompactBlock>
/**
* Return the latest block height known to the service.
*/
fun getLatestBlockHeight(): Int
/**
* Submit a raw transaction.
*/
fun submitTransaction(transactionRaw: ByteArray): Service.SendResponse
}

View File

@ -0,0 +1,153 @@
package cash.z.wallet.sdk.block
import cash.z.wallet.sdk.data.TransactionRepository
import cash.z.wallet.sdk.data.TroubleshootingTwig
import cash.z.wallet.sdk.data.Twig
import cash.z.wallet.sdk.entity.CompactBlock
import cash.z.wallet.sdk.ext.SAPLING_ACTIVATION_HEIGHT
import cash.z.wallet.sdk.jni.RustBackendWelding
import cash.z.wallet.sdk.service.LightWalletService
import com.nhaarman.mockitokotlin2.*
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import org.junit.jupiter.api.AfterEach
import org.junit.jupiter.api.Assertions.*
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.extension.ExtendWith
import org.mockito.Mock
import org.mockito.junit.jupiter.MockitoExtension
import org.mockito.junit.jupiter.MockitoSettings
import org.mockito.quality.Strictness
@ExtendWith(MockitoExtension::class)
@MockitoSettings(strictness = Strictness.LENIENT)
internal class CompactBlockProcessorTest {
private val frequency = 5L
// Mocks/Spys
@Mock lateinit var rustBackend: RustBackendWelding
lateinit var processor: CompactBlockProcessor
// Test variables
private var latestBlockHeight: Int = 500_000
private var lastDownloadedHeight: Int = SAPLING_ACTIVATION_HEIGHT
private var lastScannedHeight: Int = SAPLING_ACTIVATION_HEIGHT
private var errorBlock: Int = -1
@BeforeEach
fun setUp(
@Mock lightwalletService: LightWalletService,
@Mock compactBlockStore: CompactBlockStore,
@Mock repository: TransactionRepository
) {
Twig.plant(TroubleshootingTwig())
lightwalletService.stub {
onBlocking {
getBlockRange(any())
}.thenAnswer { invocation ->
val range = invocation.arguments[0] as IntRange
range.map { CompactBlock(it, ByteArray(0)) }
}
}
lightwalletService.stub {
onBlocking {
getLatestBlockHeight()
}.thenAnswer { latestBlockHeight }
}
compactBlockStore.stub {
onBlocking {
write(any())
}.thenAnswer { invocation ->
val lastBlockHeight = (invocation.arguments[0] as List<CompactBlock>).last().height
lastDownloadedHeight = lastBlockHeight
Unit
}
}
compactBlockStore.stub {
onBlocking {
getLatestHeight()
}.thenAnswer { lastDownloadedHeight }
}
compactBlockStore.stub {
onBlocking {
rewindTo(any())
}.thenAnswer { invocation ->
lastDownloadedHeight = invocation.arguments[0] as Int
Unit
}
}
repository.stub {
onBlocking {
lastScannedHeight()
}.thenAnswer { lastScannedHeight }
}
val config = ProcessorConfig(retries = 1, blockPollFrequencyMillis = frequency, downloadBatchSize = 50_000)
val downloader = spy(CompactBlockDownloader(lightwalletService, compactBlockStore))
processor = spy(CompactBlockProcessor(config, downloader, repository, rustBackend))
whenever(rustBackend.validateCombinedChain(any(), any())).thenAnswer {
errorBlock
}
whenever(rustBackend.scanBlocks(any(), any())).thenAnswer {
true
}
}
@AfterEach
fun tearDown() {
}
@Test
fun `check for OBOE when downloading`() = runBlocking {
// if the last block downloaded was 350_000, then we already have that block and should start with 350_001
lastDownloadedHeight = 350_000
processBlocks()
verify(processor).downloadNewBlocks(350_001..latestBlockHeight)
}
@Test
fun `chain error rewinds by expected amount`() = runBlocking {
// if the highest block whose prevHash doesn't match happens at block 300_010
errorBlock = 300_010
// then we should rewind the default (10) blocks
val expectedBlock = errorBlock - processor.config.rewindDistance
processBlocks(100L)
verify(processor.downloader, atLeastOnce()).rewindTo(expectedBlock)
verify(rustBackend, atLeastOnce()).rewindToHeight("", expectedBlock)
assertNotNull(processor)
}
@Test
fun `chain error downloads expected number of blocks`() = runBlocking {
// if the highest block whose prevHash doesn't match happens at block 300_010
// and our rewind distance is the default (10), then we want to download exactly ten blocks
errorBlock = 300_010
// plus 1 because the range is inclusive
val expectedRange = (errorBlock - processor.config.rewindDistance + 1)..latestBlockHeight
processBlocks(1500L)
verify(processor, atLeastOnce()).downloadNewBlocks(expectedRange)
}
private fun processBlocks(delayMillis: Long? = null) = runBlocking {
launch { processor.start() }
val progressChannel = processor.progress()
for (i in progressChannel) {
if(i >= 100) {
if(delayMillis != null) delay(delayMillis)
processor.stop()
break
}
}
}
}

View File

@ -1,173 +0,0 @@
package cash.z.wallet.sdk.data
import cash.z.wallet.anyNotNull
import cash.z.wallet.sdk.ext.toBlockHeight
import cash.z.wallet.sdk.rpc.CompactFormats
import cash.z.wallet.sdk.rpc.CompactTxStreamerGrpc.CompactTxStreamerBlockingStub
import cash.z.wallet.sdk.rpc.Service
import com.nhaarman.mockitokotlin2.*
import kotlinx.coroutines.*
import org.junit.jupiter.api.AfterEach
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Assertions.assertTrue
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.extension.ExtendWith
import org.mockito.ArgumentMatchers.any
import org.mockito.Mock
import org.mockito.junit.jupiter.MockitoExtension
import org.mockito.junit.jupiter.MockitoSettings
import org.mockito.quality.Strictness
import kotlin.system.measureTimeMillis
import org.junit.Rule
import io.grpc.testing.GrpcServerRule
import org.junit.jupiter.migrationsupport.rules.EnableRuleMigrationSupport
@ExtendWith(MockitoExtension::class)
@MockitoSettings(strictness = Strictness.LENIENT) // allows us to setup the blockingStub once, with everything, rather than using custom stubs for each test
@EnableRuleMigrationSupport
class CompactBlockDownloaderTest {
lateinit var downloader: CompactBlockStream
lateinit var connection: CompactBlockStream.Connection
val job = Job()
val io = CoroutineScope(Dispatchers.IO + job)
@Rule
var grpcServerRule = GrpcServerRule()
@BeforeEach
fun setUp(@Mock blockingStub: CompactTxStreamerBlockingStub) {
whenever(blockingStub.getLatestBlock(any())).doAnswer {
getLatestBlock()
}
// when asked for a block range, create an array of blocks and return an iterator over them with a slight delay between iterations
whenever(blockingStub.getBlockRange(any())).doAnswer {
val serviceRange = it.arguments[0] as Service.BlockRange
val range = serviceRange.start.height..serviceRange.end.height
val blocks = mutableListOf<CompactFormats.CompactBlock>()
System.err.println("[Mock Connection] creating blocks in range: $range")
for (i in range) {
blocks.add(CompactFormats.CompactBlock.newBuilder().setHeight(i).build())
}
val blockIterator = blocks.iterator()
val delayedIterator = object : Iterator<CompactFormats.CompactBlock> {
override fun hasNext() = blockIterator.hasNext()
override fun next(): CompactFormats.CompactBlock {
Thread.sleep(10L)
return blockIterator.next()
}
}
delayedIterator
}
downloader = CompactBlockStream(grpcServerRule.channel, TroubleshootingTwig())
connection = spy(downloader.connection)
whenever(connection.createStub(anyNotNull())).thenReturn(blockingStub)
}
@AfterEach
fun tearDown() {
downloader.stop()
io.cancel()
}
@Test
fun `mock configuration sanity check`() = runBlocking<Unit> {
assertEquals(getLatestBlock().height, connection.getLatestBlockHeight(), "Unexpected height. Verify that mocks are properly configured.")
}
@Test
fun `downloading missing blocks happens in chunks`() = runBlocking<Unit> {
val start = getLatestBlock().height.toInt() - 31
val downloadCount = connection.downloadMissingBlocks(start, 10) - start
assertEquals(32, downloadCount)
// verify(connection).getLatestBlockHeight()
// verify(connection).loadBlockRange(start..(start + 9)) // a range of 10 block is requested
// verify(connection, times(4)).loadBlockRange(anyNotNull()) // 4 batches are required
}
@Test
fun `channel contains expected blocks`() = runBlocking {
val mailbox = connection.subscribe()
var blockCount = 0
val start = getLatestBlock().height - 31L
io.launch {
connection.downloadMissingBlocks(start.toInt(), 10)
mailbox.cancel() // exits the for loop, below, once downloading is complete
}
for(block in mailbox) {
println("got block with height ${block.height} on thread ${Thread.currentThread().name}")
blockCount++
}
assertEquals(32, blockCount)
}
// lots of logging here because this is more of a sanity test for peace of mind
@Test
fun `streaming yields the latest blocks with proper timing`() = runBlocking {
// just tweak these a bit for sanity rather than making a bunch of tests that would be slow
val pollInterval = BLOCK_INTERVAL_MILLIS/2L
val repetitions = 3
println("${System.currentTimeMillis()} : starting with blockInterval $BLOCK_INTERVAL_MILLIS and pollInterval $pollInterval")
val mailbox = connection.subscribe()
io.launch {
connection.streamBlocks(pollInterval)
}
// sync up with the block interval, first
mailbox.receive()
// now, get a few blocks and measure the expected time
val deltaTime = measureTimeMillis {
repeat(repetitions) {
println("${System.currentTimeMillis()} : checking the mailbox on thread ${Thread.currentThread().name}...")
val mail = mailbox.receive()
println("${System.currentTimeMillis()} : ...got ${mail.height} in the mail! on thread ${Thread.currentThread().name}")
}
}
val totalIntervals = repetitions * BLOCK_INTERVAL_MILLIS
val bounds = (totalIntervals - pollInterval)..(totalIntervals + pollInterval)
println("${System.currentTimeMillis()} : finished in $deltaTime and it was between $bounds")
mailbox.cancel()
assertTrue(bounds.contains(deltaTime), "Blocks received ${if(bounds.first < deltaTime) "slower" else "faster"} than expected. $deltaTime should be in the range of $bounds")
}
@Test
fun `downloader gets missing blocks and then streams`() = runBlocking {
val targetHeight = getLatestBlock().height.toInt() + 3
val initialBlockHeight = targetHeight - 30
println("starting from $initialBlockHeight to $targetHeight")
val mailbox = downloader.start(io, initialBlockHeight, 10, 500L)
// receive from channel until we reach the target height, counting blocks along the way
var firstBlock: CompactFormats.CompactBlock? = null
var blockCount = 0
do {
println("waiting for block number $blockCount...")
val block = mailbox.receive()
println("...received block ${block.height} on thread ${Thread.currentThread().name}")
blockCount++
if (firstBlock == null) firstBlock = block
} while (block.height < targetHeight)
mailbox.cancel()
assertEquals(firstBlock?.height, initialBlockHeight, "Failed to start at block $initialBlockHeight")
assertEquals(targetHeight - initialBlockHeight + 1L, blockCount.toLong(), "Incorrect number of blocks, verify that there are no duplicates in the test output")
}
companion object {
const val BLOCK_INTERVAL_MILLIS = 1000L
private fun getLatestBlock(): Service.BlockID {
// number of intervals that have passed (without rounding...)
val intervalCount = System.currentTimeMillis() / BLOCK_INTERVAL_MILLIS
return intervalCount.toInt().toBlockHeight()
}
}
}