Create mock synchronizer to help with driving the UI.
After experiencing several issues that make it more difficult to test send behavior, including the amount of time required to wait for blocks to get mined when testnet is slow, it became obvious that it was time to investigate mocking. Most of the behavior in the SDK is driven by channels so the mock only has to focus on putting useful data in the expected channels at the right time. One tradeoff here was the need to make all the synchronizer properties private, that way any implementation can achieve compatability without necessasily leveraging the same combinations of building blocks. This tradeoff felt acceptable given that these dependencies can be injected and available as singletons, if needed. This also had a side effect of elevating several channels into the Synchronizer interface, rather than reaching into the synchronizer to directly access those dependencies. Another benefit is that it's now easier to see what matters most to the app, particularly which channels are essential.
This commit is contained in:
parent
bf7b3ee744
commit
1b37784e44
22
build.gradle
22
build.gradle
|
@ -8,7 +8,8 @@ buildscript {
|
|||
'architectureComponents': '2.0.0',
|
||||
'grpc':'1.17.1',
|
||||
'kotlin': '1.3.10',
|
||||
'coroutines': '1.1.0'
|
||||
'coroutines': '1.1.0',
|
||||
'junitJupiter': '5.4.0'
|
||||
]
|
||||
repositories {
|
||||
google()
|
||||
|
@ -53,7 +54,7 @@ android {
|
|||
versionCode = 1_06_00
|
||||
versionName = version
|
||||
testInstrumentationRunner = "androidx.test.runner.AndroidJUnitRunner"
|
||||
multiDexEnabled false
|
||||
multiDexEnabled true
|
||||
archivesBaseName = "zcash-android-wallet-sdk-$versionName"
|
||||
}
|
||||
|
||||
|
@ -143,18 +144,23 @@ dependencies {
|
|||
implementation "io.grpc:grpc-okhttp:${versions.grpc}"
|
||||
implementation "io.grpc:grpc-protobuf-lite:${versions.grpc}"
|
||||
implementation "io.grpc:grpc-stub:${versions.grpc}"
|
||||
implementation 'javax.annotation:javax.annotation-api:1.2'
|
||||
implementation 'javax.annotation:javax.annotation-api:1.3.2'
|
||||
|
||||
// other
|
||||
implementation "com.jakewharton.timber:timber:4.7.1"
|
||||
|
||||
// Tests
|
||||
// testImplementation 'org.mockito:mockito-junit-jupiter:2.23.0'
|
||||
// testImplementation 'com.nhaarman.mockitokotlin2:mockito-kotlin:2.1.0'
|
||||
testImplementation 'org.junit.jupiter:junit-jupiter-api:5.3.2'
|
||||
testImplementation "org.jetbrains.kotlin:kotlin-reflect:${versions.kotlin}"
|
||||
testImplementation 'org.mockito:mockito-junit-jupiter:2.24.0'
|
||||
testImplementation 'com.nhaarman.mockitokotlin2:mockito-kotlin:2.1.0'
|
||||
testImplementation "org.junit.jupiter:junit-jupiter-api:${versions.junitJupiter}"
|
||||
testImplementation "org.junit.jupiter:junit-jupiter-engine:${versions.junitJupiter}"
|
||||
testImplementation "org.junit.jupiter:junit-jupiter-migrationsupport:${versions.junitJupiter}"
|
||||
testImplementation "io.grpc:grpc-testing:1.18.0"
|
||||
|
||||
androidTestImplementation 'com.nhaarman.mockitokotlin2:mockito-kotlin:2.1.0'
|
||||
androidTestImplementation 'org.mockito:mockito-android:2.23.4'
|
||||
androidTestImplementation 'org.junit.jupiter:junit-jupiter-api:5.3.2'
|
||||
androidTestImplementation 'org.mockito:mockito-android:2.24.0'
|
||||
androidTestImplementation "org.junit.jupiter:junit-jupiter-api:${versions.junitJupiter}"
|
||||
androidTestImplementation "androidx.test:runner:1.1.1"
|
||||
androidTestImplementation "androidx.test.espresso:espresso-core:3.1.1"
|
||||
androidTestImplementation "androidx.test:core:1.1.0"
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
<manifest xmlns:android="http://schemas.android.com/apk/res/android"
|
||||
package="cash.z.wallet.sdk">
|
||||
<!--<application-->
|
||||
<!--android:name="androidx.multidex.MultiDexApplication" />-->
|
||||
<application
|
||||
android:name="androidx.multidex.MultiDexApplication" />
|
||||
<uses-permission android:name="android.permission.INTERNET" />
|
||||
</manifest>
|
||||
|
|
|
@ -0,0 +1,243 @@
|
|||
package cash.z.wallet.sdk.data
|
||||
|
||||
import cash.z.wallet.sdk.dao.WalletTransaction
|
||||
import kotlinx.coroutines.*
|
||||
import kotlinx.coroutines.channels.ConflatedBroadcastChannel
|
||||
import kotlinx.coroutines.sync.Mutex
|
||||
import kotlinx.coroutines.sync.withLock
|
||||
import java.util.concurrent.atomic.AtomicInteger
|
||||
import java.util.concurrent.atomic.AtomicLong
|
||||
import kotlin.coroutines.CoroutineContext
|
||||
import kotlin.random.Random
|
||||
import kotlin.random.nextInt
|
||||
import kotlin.random.nextLong
|
||||
|
||||
/**
|
||||
* Utility for building UIs. It does the best it can to mock the synchronizer so that it can be dropped right into any
|
||||
* project and drive the UI. It generates active transactions in response to funds being sent and generates random
|
||||
* received transactions periodically.
|
||||
*
|
||||
* @param transactionInterval the time in milliseconds between receive transactions being added because those are the
|
||||
* only ones auto-generated. Send transactions are triggered by the UI. Transactions are polled at half this interval.
|
||||
* @param activeTransactionUpdateFrequency the amount of time in milliseconds between updates to an active
|
||||
* transaction's state. Active transactions move through their lifecycle and increment their state at this rate.
|
||||
*/
|
||||
open class MockSynchronizer(
|
||||
private val transactionInterval: Long = 30_000L,
|
||||
private val activeTransactionUpdateFrequency: Long = 3_000L
|
||||
) : Synchronizer, CoroutineScope {
|
||||
|
||||
private val mockAddress = "ztestsaplingmock0000this0is0a0mock0address0do0not0send0funds0to0this0address0ok0thanks00"
|
||||
|
||||
private val job = Job()
|
||||
|
||||
override val coroutineContext: CoroutineContext
|
||||
get() = Dispatchers.IO + job
|
||||
|
||||
/* only accessed through mutual exclusion */
|
||||
private val transactions = mutableListOf<WalletTransaction>()
|
||||
private val activeTransactions = mutableMapOf<ActiveTransaction, TransactionState>()
|
||||
|
||||
private val transactionMutex = Mutex()
|
||||
private val activeTransactionMutex = Mutex()
|
||||
|
||||
private val forge = Forge()
|
||||
|
||||
private val balanceChannel = ConflatedBroadcastChannel(0L)
|
||||
private val activeTransactionsChannel = ConflatedBroadcastChannel<Map<ActiveTransaction, TransactionState>>(mutableMapOf())
|
||||
private val transactionsChannel = ConflatedBroadcastChannel<List<WalletTransaction>>(listOf())
|
||||
private val progressChannel = ConflatedBroadcastChannel(0)
|
||||
|
||||
override fun start(parentScope: CoroutineScope): Synchronizer {
|
||||
forge.start(parentScope)
|
||||
return this
|
||||
}
|
||||
|
||||
override fun stop() {
|
||||
job.cancel()
|
||||
}
|
||||
|
||||
override fun activeTransactions() = activeTransactionsChannel.openSubscription()
|
||||
override fun allTransactions() = transactionsChannel.openSubscription()
|
||||
override fun balance() = balanceChannel.openSubscription()
|
||||
override fun progress() = progressChannel.openSubscription()
|
||||
|
||||
override suspend fun isOutOfSync(): Boolean {
|
||||
return Random.nextInt(100) < 10
|
||||
}
|
||||
|
||||
override suspend fun isFirstRun(): Boolean {
|
||||
return Random.nextBoolean()
|
||||
}
|
||||
|
||||
override fun getAddress() = mockAddress
|
||||
|
||||
override suspend fun sendToAddress(zatoshi: Long, toAddress: String) = withContext<Unit>(Dispatchers.IO) {
|
||||
val walletTransaction = forge.createSendTransaction(zatoshi)
|
||||
val activeTransaction = forge.createActiveSendTransaction(walletTransaction, toAddress)
|
||||
val isInvalidForTestnet = toAddress.length != 88 && toAddress.startsWith("ztest")
|
||||
val isInvalidForMainnet = toAddress.length != 78 && toAddress.startsWith("zs")
|
||||
val state = when {
|
||||
zatoshi < 0 -> TransactionState.Failure(TransactionState.Creating, "amount cannot be negative")
|
||||
!toAddress.startsWith("z") -> TransactionState.Failure(TransactionState.Creating, "address must start with z")
|
||||
isInvalidForTestnet -> TransactionState.Failure(TransactionState.Creating, "invalid testnet address")
|
||||
isInvalidForMainnet -> TransactionState.Failure(TransactionState.Creating, "invalid mainnet address")
|
||||
else -> TransactionState.Creating
|
||||
}
|
||||
println("after input validation, state is being set to ${state::class.simpleName}")
|
||||
setState(activeTransaction, state)
|
||||
|
||||
println("active tx size is ${activeTransactions.size}")
|
||||
|
||||
// next, transition it through the states, if it got created
|
||||
if (state !is TransactionState.Creating) {
|
||||
println("failed to create transaction")
|
||||
return@withContext
|
||||
} else {
|
||||
// first, add the transaction
|
||||
println("adding transaction")
|
||||
transactionMutex.withLock {
|
||||
transactions.add(walletTransaction)
|
||||
}
|
||||
|
||||
// then update the active transaction through the creation and submission steps
|
||||
listOf(TransactionState.Created(walletTransaction.txId), TransactionState.SendingToNetwork)
|
||||
.forEach { newState ->
|
||||
if (!job.isActive) return@withContext
|
||||
delay(activeTransactionUpdateFrequency)
|
||||
setState(activeTransaction, newState)
|
||||
}
|
||||
|
||||
// then set the wallet transaction's height (to simulate it being mined)
|
||||
val minedHeight = forge.latestHeight.getAndIncrement()
|
||||
transactionMutex.withLock {
|
||||
transactions.remove(walletTransaction)
|
||||
transactions.add(walletTransaction.copy(height = minedHeight, isMined = true))
|
||||
}
|
||||
|
||||
// simply transition it through the states
|
||||
List(11) { TransactionState.AwaitingConfirmations(it) }
|
||||
.forEach { newState ->
|
||||
if (!job.isActive) return@withContext
|
||||
delay(activeTransactionUpdateFrequency)
|
||||
activeTransaction.height.set(minedHeight + newState.confirmationCount)
|
||||
setState(activeTransaction, newState)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private suspend fun setState(activeTransaction: ActiveTransaction, state: TransactionState) {
|
||||
var copyMap = mutableMapOf<ActiveTransaction, TransactionState>()
|
||||
activeTransactionMutex.withLock {
|
||||
val currentState = activeTransactions[activeTransaction]
|
||||
if ((currentState?.order ?: 0) < 0) {
|
||||
println("ignoring state ${state::class.simpleName} because the current state is ${currentState!!::class.simpleName}")
|
||||
return
|
||||
}
|
||||
activeTransactions[activeTransaction] = state
|
||||
var count = if (state is TransactionState.AwaitingConfirmations) "(${state.confirmationCount})" else ""
|
||||
println("state set to ${state::class.simpleName}$count on thread ${Thread.currentThread().name}")
|
||||
}
|
||||
|
||||
copyMap = activeTransactions.toMutableMap()
|
||||
println("sending ${copyMap.size} active transactions")
|
||||
launch {
|
||||
activeTransactionsChannel.send(copyMap)
|
||||
}
|
||||
}
|
||||
|
||||
override fun cancelSend(transaction: ActiveSendTransaction): Boolean {
|
||||
launch {
|
||||
println("cancelling transaction $transaction")
|
||||
setState(transaction, TransactionState.Cancelled)
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
|
||||
/* creators */
|
||||
|
||||
private inner class Forge {
|
||||
val transactionId = AtomicLong(Random.nextLong(1L..100_000L))
|
||||
val latestHeight = AtomicInteger(Random.nextInt(280000..600000))
|
||||
|
||||
fun start(scope: CoroutineScope) {
|
||||
scope.launchAddReceiveTransactions()
|
||||
scope.launchUpdateTransactionsAndBalance()
|
||||
scope.launchUpdateProgress()
|
||||
}
|
||||
|
||||
fun CoroutineScope.launchUpdateTransactionsAndBalance() = launch {
|
||||
while (job.isActive) {
|
||||
if (transactions.size != 0) {
|
||||
var balance = 0L
|
||||
transactionMutex.withLock {
|
||||
// does not factor in confirmations
|
||||
balance =
|
||||
transactions.fold(0L) { acc, tx -> if (tx.isSend && tx.isMined) acc - tx.value else acc + tx.value }
|
||||
}
|
||||
balanceChannel.send(balance)
|
||||
}
|
||||
// other collaborators add to the list, periodically. So this simulates, real-world, non-distinct updates.
|
||||
delay(Random.nextLong(transactionInterval / 2))
|
||||
var copyList = listOf<WalletTransaction>()
|
||||
transactionMutex.withLock {
|
||||
// shallow copy
|
||||
copyList = transactions.map { it }
|
||||
}
|
||||
transactionsChannel.send(copyList)
|
||||
}
|
||||
}
|
||||
|
||||
fun CoroutineScope.launchAddReceiveTransactions() = launch {
|
||||
while (job.isActive) {
|
||||
delay(transactionInterval)
|
||||
transactionMutex.withLock {
|
||||
println("adding received transaction for random value")
|
||||
transactions.add(createReceiveTransaction())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fun CoroutineScope.launchUpdateProgress() = launch {
|
||||
val progressInterval = Math.max(transactionInterval / 4, 1)
|
||||
while (job.isActive) {
|
||||
delay(Random.nextLong(1L..progressInterval))
|
||||
progressChannel.send(Math.min(transactions.size * 10, 100))
|
||||
}
|
||||
}
|
||||
|
||||
fun createReceiveTransaction(): WalletTransaction {
|
||||
return WalletTransaction(
|
||||
txId = transactionId.getAndIncrement(),
|
||||
value = Random.nextLong(20_000L..1_000_000_000L),
|
||||
height = latestHeight.getAndIncrement(),
|
||||
isSend = false,
|
||||
timeInSeconds = System.currentTimeMillis() / 1000,
|
||||
isMined = true
|
||||
)
|
||||
}
|
||||
|
||||
fun createSendTransaction(amount: Long = Random.nextLong(20_000L..1_000_000_000L), txId: Long = -1L): WalletTransaction {
|
||||
return WalletTransaction(
|
||||
txId = if(txId == -1L) transactionId.getAndIncrement() else txId,
|
||||
value = amount,
|
||||
height = null,
|
||||
isSend = true,
|
||||
timeInSeconds = System.currentTimeMillis() / 1000,
|
||||
isMined = false
|
||||
)
|
||||
}
|
||||
|
||||
fun createActiveSendTransaction(walletTransaction: WalletTransaction, toAddress: String)
|
||||
= createActiveSendTransaction(walletTransaction.value, toAddress, walletTransaction.txId)
|
||||
|
||||
fun createActiveSendTransaction(amount: Long, address: String, txId: Long = -1): ActiveSendTransaction {
|
||||
return ActiveSendTransaction(
|
||||
transactionId = AtomicLong(if (txId < 0) transactionId.getAndIncrement() else txId),
|
||||
toAddress = address,
|
||||
value = amount
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,184 @@
|
|||
package cash.z.wallet.sdk.data
|
||||
|
||||
import cash.z.wallet.sdk.dao.WalletTransaction
|
||||
import cash.z.wallet.sdk.data.SdkSynchronizer.SyncState.*
|
||||
import cash.z.wallet.sdk.exception.SynchronizerException
|
||||
import cash.z.wallet.sdk.secure.Wallet
|
||||
import kotlinx.coroutines.*
|
||||
import kotlinx.coroutines.Dispatchers.IO
|
||||
import kotlinx.coroutines.channels.ReceiveChannel
|
||||
|
||||
/**
|
||||
* The glue. Downloads compact blocks to the database and then scans them for transactions. In order to serve that
|
||||
* purpose, this class glues together a variety of key components. Each component contributes to the team effort of
|
||||
* providing a simple source of truth to interact with.
|
||||
*
|
||||
* Another way of thinking about this class is the reference that demonstrates how all the pieces can be tied
|
||||
* together.
|
||||
*/
|
||||
class SdkSynchronizer(
|
||||
private val downloader: CompactBlockStream,
|
||||
private val processor: CompactBlockProcessor,
|
||||
private val repository: TransactionRepository,
|
||||
private val activeTransactionManager: ActiveTransactionManager,
|
||||
private val wallet: Wallet,
|
||||
private val batchSize: Int = 1000,
|
||||
private val blockPollFrequency: Long = CompactBlockStream.DEFAULT_POLL_INTERVAL,
|
||||
logger: Twig = SilentTwig()
|
||||
) : Twig by logger, Synchronizer {
|
||||
|
||||
private lateinit var blockJob: Job
|
||||
|
||||
private val wasPreviouslyStarted
|
||||
get() = ::blockJob.isInitialized
|
||||
|
||||
|
||||
//
|
||||
// Public API
|
||||
//
|
||||
|
||||
/* Lifecycle */
|
||||
|
||||
override fun start(parentScope: CoroutineScope): Synchronizer {
|
||||
// prevent restarts so the behavior of this class is easier to reason about
|
||||
if (wasPreviouslyStarted) throw SynchronizerException.FalseStart
|
||||
twig("starting")
|
||||
blockJob = parentScope.launch {
|
||||
continueWithState(determineState())
|
||||
}
|
||||
return this
|
||||
}
|
||||
|
||||
override fun stop() {
|
||||
twig("stopping")
|
||||
blockJob.cancel()
|
||||
downloader.stop()
|
||||
repository.stop()
|
||||
activeTransactionManager.stop()
|
||||
}
|
||||
|
||||
/* Channels */
|
||||
|
||||
override fun activeTransactions() = activeTransactionManager.subscribe()
|
||||
override fun allTransactions(): ReceiveChannel<List<WalletTransaction>> {
|
||||
return repository.allTransactions()
|
||||
}
|
||||
override fun progress(): ReceiveChannel<Int> {
|
||||
return downloader.progress()
|
||||
}
|
||||
|
||||
override fun balance(): ReceiveChannel<Long> {
|
||||
return repository.balance()
|
||||
}
|
||||
|
||||
/* Status */
|
||||
|
||||
override suspend fun isOutOfSync(): Boolean = withContext(IO) {
|
||||
val latestBlockHeight = downloader.connection.getLatestBlockHeight()
|
||||
val ourHeight = processor.cacheDao.latestBlockHeight()
|
||||
val tolerance = 10
|
||||
val delta = latestBlockHeight - ourHeight
|
||||
twig("checking whether out of sync. LatestHeight: $latestBlockHeight ourHeight: $ourHeight Delta: $delta tolerance: $tolerance")
|
||||
delta > tolerance
|
||||
}
|
||||
|
||||
override suspend fun isFirstRun(): Boolean = withContext(IO) {
|
||||
// maybe just toggle a flag somewhere rather than inferring based on db status
|
||||
!processor.dataDbExists && (!processor.cachDbExists || processor.cacheDao.count() == 0)
|
||||
}
|
||||
|
||||
/* Operations */
|
||||
|
||||
override fun getAddress() = wallet.getAddress()
|
||||
|
||||
override suspend fun sendToAddress(zatoshi: Long, toAddress: String) =
|
||||
activeTransactionManager.sendToAddress(zatoshi, toAddress)
|
||||
|
||||
override fun cancelSend(transaction: ActiveSendTransaction): Boolean = activeTransactionManager.cancel(transaction)
|
||||
|
||||
|
||||
//
|
||||
// Private API
|
||||
//
|
||||
|
||||
private fun CoroutineScope.continueWithState(syncState: SyncState): Job {
|
||||
return when (syncState) {
|
||||
FirstRun -> onFirstRun()
|
||||
is CacheOnly -> onCacheOnly(syncState)
|
||||
is ReadyToProcess -> onReady(syncState)
|
||||
}
|
||||
}
|
||||
|
||||
private fun CoroutineScope.onFirstRun(): Job {
|
||||
twig("this appears to be a fresh install, beginning first run of application")
|
||||
val firstRunStartHeight = wallet.initialize() // should get the latest sapling tree and return that height
|
||||
twig("wallet firstRun returned a value of $firstRunStartHeight")
|
||||
return continueWithState(ReadyToProcess(firstRunStartHeight))
|
||||
}
|
||||
|
||||
private fun CoroutineScope.onCacheOnly(syncState: CacheOnly): Job {
|
||||
twig("we have cached blocks but no data DB, beginning pre-cached version of application")
|
||||
val firstRunStartHeight = wallet.initialize(syncState.startingBlockHeight)
|
||||
twig("wallet has already cached up to a height of $firstRunStartHeight")
|
||||
return continueWithState(ReadyToProcess(firstRunStartHeight))
|
||||
}
|
||||
|
||||
private fun CoroutineScope.onReady(syncState: ReadyToProcess) = launch {
|
||||
twig("synchronization is ready to begin at height ${syncState.startingBlockHeight}")
|
||||
try {
|
||||
// TODO: for PIR concerns, introduce some jitter here for where, exactly, the downloader starts
|
||||
val blockChannel =
|
||||
downloader.start(this, syncState.startingBlockHeight, batchSize, pollFrequencyMillis = blockPollFrequency)
|
||||
launch { monitorProgress(downloader.progress()) }
|
||||
activeTransactionManager.start()
|
||||
repository.start(this)
|
||||
processor.processBlocks(blockChannel)
|
||||
} finally {
|
||||
stop()
|
||||
}
|
||||
}
|
||||
|
||||
private suspend fun monitorProgress(progressChannel: ReceiveChannel<Int>) = withContext(IO) {
|
||||
twig("beginning to monitor download progress")
|
||||
for (i in progressChannel) {
|
||||
if(i >= 100) {
|
||||
twig("triggering a proactive scan in a second because all missing blocks have been loaded")
|
||||
delay(1000L)
|
||||
launch {
|
||||
twig("triggering proactive scan!")
|
||||
processor.scanBlocks()
|
||||
twig("done triggering proactive scan!")
|
||||
}
|
||||
break
|
||||
}
|
||||
}
|
||||
twig("done monitoring download progress")
|
||||
}
|
||||
|
||||
//TODO: add state for never scanned . . . where we have some cache but no entries in the data db
|
||||
private suspend fun determineState(): SyncState = withContext(IO) {
|
||||
twig("determining state (has the app run before, what block did we last see, etc.)")
|
||||
val state = if (processor.dataDbExists) {
|
||||
// this call blocks because it does IO
|
||||
val startingBlockHeight = processor.lastProcessedBlock()
|
||||
twig("cacheDb exists with last height of $startingBlockHeight")
|
||||
if (startingBlockHeight <= 0) FirstRun else ReadyToProcess(startingBlockHeight)
|
||||
} else if(processor.cachDbExists) {
|
||||
// this call blocks because it does IO
|
||||
val startingBlockHeight = processor.lastProcessedBlock()
|
||||
twig("cacheDb exists with last height of $startingBlockHeight")
|
||||
if (startingBlockHeight <= 0) FirstRun else CacheOnly(startingBlockHeight)
|
||||
} else {
|
||||
FirstRun
|
||||
}
|
||||
|
||||
twig("determined ${state::class.java.simpleName}")
|
||||
state
|
||||
}
|
||||
|
||||
sealed class SyncState {
|
||||
object FirstRun : SyncState()
|
||||
class CacheOnly(val startingBlockHeight: Int = Int.MAX_VALUE) : SyncState()
|
||||
class ReadyToProcess(val startingBlockHeight: Int = Int.MAX_VALUE) : SyncState()
|
||||
}
|
||||
}
|
|
@ -1,167 +1,28 @@
|
|||
package cash.z.wallet.sdk.data
|
||||
|
||||
import cash.z.wallet.sdk.data.Synchronizer.SyncState.*
|
||||
import cash.z.wallet.sdk.exception.SynchronizerException
|
||||
import cash.z.wallet.sdk.rpc.CompactFormats
|
||||
import cash.z.wallet.sdk.secure.Wallet
|
||||
import kotlinx.coroutines.*
|
||||
import kotlinx.coroutines.Dispatchers.IO
|
||||
import kotlinx.coroutines.channels.ConflatedBroadcastChannel
|
||||
import cash.z.wallet.sdk.dao.WalletTransaction
|
||||
import kotlinx.coroutines.CoroutineScope
|
||||
import kotlinx.coroutines.channels.ReceiveChannel
|
||||
|
||||
/**
|
||||
* The glue. Downloads compact blocks to the database and then scans them for transactions. In order to serve that
|
||||
* purpose, this class glues together a variety of key components. Each component contributes to the team effort of
|
||||
* providing a simple source of truth to interact with.
|
||||
*
|
||||
* Another way of thinking about this class is the reference that demonstrates how all the pieces can be tied
|
||||
* together.
|
||||
*/
|
||||
class Synchronizer(
|
||||
val downloader: CompactBlockStream,
|
||||
val processor: CompactBlockProcessor,
|
||||
val repository: TransactionRepository,
|
||||
val activeTransactionManager: ActiveTransactionManager,
|
||||
val wallet: Wallet,
|
||||
val batchSize: Int = 1000,
|
||||
logger: Twig = SilentTwig()
|
||||
) : Twig by logger {
|
||||
interface Synchronizer {
|
||||
|
||||
// private val downloader = CompactBlockDownloader("10.0.2.2", 9067)
|
||||
private val savedBlockChannel = ConflatedBroadcastChannel<CompactFormats.CompactBlock>()
|
||||
/* Lifecycle */
|
||||
fun start(parentScope: CoroutineScope): Synchronizer
|
||||
fun stop()
|
||||
|
||||
private lateinit var blockJob: Job
|
||||
/* Channels */
|
||||
// NOTE: each of these are expected to be a broadcast channel, such that [receive] always returns the latest value
|
||||
fun activeTransactions(): ReceiveChannel<Map<ActiveTransaction, TransactionState>>
|
||||
fun allTransactions(): ReceiveChannel<List<WalletTransaction>>
|
||||
fun balance(): ReceiveChannel<Long>
|
||||
fun progress(): ReceiveChannel<Int>
|
||||
|
||||
private val wasPreviouslyStarted
|
||||
get() = savedBlockChannel.isClosedForSend || ::blockJob.isInitialized
|
||||
/* Status */
|
||||
suspend fun isOutOfSync(): Boolean
|
||||
suspend fun isFirstRun(): Boolean
|
||||
fun getAddress(): String
|
||||
|
||||
|
||||
//
|
||||
// Public API
|
||||
//
|
||||
|
||||
fun activeTransactions() = activeTransactionManager.subscribe()
|
||||
|
||||
fun start(parentScope: CoroutineScope): Synchronizer {
|
||||
// prevent restarts so the behavior of this class is easier to reason about
|
||||
if (wasPreviouslyStarted) throw SynchronizerException.FalseStart
|
||||
twig("starting")
|
||||
blockJob = parentScope.launch {
|
||||
continueWithState(determineState())
|
||||
}
|
||||
return this
|
||||
}
|
||||
|
||||
fun stop() {
|
||||
twig("stopping")
|
||||
blockJob.cancel()
|
||||
downloader.stop()
|
||||
repository.stop()
|
||||
activeTransactionManager.stop()
|
||||
}
|
||||
|
||||
suspend fun isOutOfSync(): Boolean = withContext(IO) {
|
||||
val latestBlockHeight = downloader.connection.getLatestBlockHeight()
|
||||
val ourHeight = processor.cacheDao.latestBlockHeight()
|
||||
val tolerance = 10
|
||||
val delta = latestBlockHeight - ourHeight
|
||||
twig("checking whether out of sync. LatestHeight: $latestBlockHeight ourHeight: $ourHeight Delta: $delta tolerance: $tolerance")
|
||||
delta > tolerance
|
||||
}
|
||||
|
||||
suspend fun isFirstRun(): Boolean = withContext(IO) {
|
||||
// maybe just toggle a flag somewhere rather than inferring based on db status
|
||||
!processor.dataDbExists && (!processor.cachDbExists || processor.cacheDao.count() == 0)
|
||||
}
|
||||
|
||||
suspend fun sendToAddress(zatoshi: Long, toAddress: String) =
|
||||
activeTransactionManager.sendToAddress(zatoshi, toAddress)
|
||||
|
||||
fun cancelSend(transaction: ActiveSendTransaction): Boolean = activeTransactionManager.cancel(transaction)
|
||||
|
||||
|
||||
//
|
||||
// Private API
|
||||
//
|
||||
|
||||
private fun CoroutineScope.continueWithState(syncState: SyncState): Job {
|
||||
return when (syncState) {
|
||||
FirstRun -> onFirstRun()
|
||||
is CacheOnly -> onCacheOnly(syncState)
|
||||
is ReadyToProcess -> onReady(syncState)
|
||||
}
|
||||
}
|
||||
|
||||
private fun CoroutineScope.onFirstRun(): Job {
|
||||
twig("this appears to be a fresh install, beginning first run of application")
|
||||
val firstRunStartHeight = wallet.initialize() // should get the latest sapling tree and return that height
|
||||
twig("wallet firstRun returned a value of $firstRunStartHeight")
|
||||
return continueWithState(ReadyToProcess(firstRunStartHeight))
|
||||
}
|
||||
|
||||
private fun CoroutineScope.onCacheOnly(syncState: CacheOnly): Job {
|
||||
twig("we have cached blocks but no data DB, beginning pre-cached version of application")
|
||||
val firstRunStartHeight = wallet.initialize(syncState.startingBlockHeight)
|
||||
twig("wallet has already cached up to a height of $firstRunStartHeight")
|
||||
return continueWithState(ReadyToProcess(firstRunStartHeight))
|
||||
}
|
||||
|
||||
private fun CoroutineScope.onReady(syncState: ReadyToProcess) = launch {
|
||||
twig("synchronization is ready to begin at height ${syncState.startingBlockHeight}")
|
||||
try {
|
||||
// TODO: for PIR concerns, introduce some jitter here for where, exactly, the downloader starts
|
||||
val blockChannel =
|
||||
downloader.start(this, syncState.startingBlockHeight, batchSize)
|
||||
launch { monitorProgress(downloader.progress()) }
|
||||
activeTransactionManager.start()
|
||||
repository.start(this)
|
||||
processor.processBlocks(blockChannel)
|
||||
} finally {
|
||||
stop()
|
||||
}
|
||||
}
|
||||
|
||||
private suspend fun monitorProgress(progressChannel: ReceiveChannel<Int>) = withContext(IO) {
|
||||
twig("beginning to monitor download progress")
|
||||
for (i in progressChannel) {
|
||||
if(i >= 100) {
|
||||
twig("triggering a proactive scan in a second because all missing blocks have been loaded")
|
||||
delay(1000L)
|
||||
launch {
|
||||
twig("triggering proactive scan!")
|
||||
processor.scanBlocks()
|
||||
twig("done triggering proactive scan!")
|
||||
}
|
||||
break
|
||||
}
|
||||
}
|
||||
twig("done monitoring download progress")
|
||||
}
|
||||
|
||||
//TODO: add state for never scanned . . . where we have some cache but no entries in the data db
|
||||
private suspend fun determineState(): SyncState = withContext(IO) {
|
||||
twig("determining state (has the app run before, what block did we last see, etc.)")
|
||||
val state = if (processor.dataDbExists) {
|
||||
// this call blocks because it does IO
|
||||
val startingBlockHeight = processor.lastProcessedBlock()
|
||||
twig("cacheDb exists with last height of $startingBlockHeight")
|
||||
if (startingBlockHeight <= 0) FirstRun else ReadyToProcess(startingBlockHeight)
|
||||
} else if(processor.cachDbExists) {
|
||||
// this call blocks because it does IO
|
||||
val startingBlockHeight = processor.lastProcessedBlock()
|
||||
twig("cacheDb exists with last height of $startingBlockHeight")
|
||||
if (startingBlockHeight <= 0) FirstRun else CacheOnly(startingBlockHeight)
|
||||
} else {
|
||||
FirstRun
|
||||
}
|
||||
|
||||
twig("determined ${state::class.java.simpleName}")
|
||||
state
|
||||
}
|
||||
|
||||
sealed class SyncState {
|
||||
object FirstRun : SyncState()
|
||||
class CacheOnly(val startingBlockHeight: Int = Int.MAX_VALUE) : SyncState()
|
||||
class ReadyToProcess(val startingBlockHeight: Int = Int.MAX_VALUE) : SyncState()
|
||||
}
|
||||
/* Operations */
|
||||
suspend fun sendToAddress(zatoshi: Long, toAddress: String)
|
||||
fun cancelSend(transaction: ActiveSendTransaction): Boolean
|
||||
}
|
|
@ -20,11 +20,13 @@ import org.mockito.junit.jupiter.MockitoSettings
|
|||
import org.mockito.quality.Strictness
|
||||
import kotlin.system.measureTimeMillis
|
||||
import org.junit.Rule
|
||||
|
||||
import io.grpc.testing.GrpcServerRule
|
||||
import org.junit.jupiter.migrationsupport.rules.EnableRuleMigrationSupport
|
||||
|
||||
|
||||
@ExtendWith(MockitoExtension::class)
|
||||
@MockitoSettings(strictness = Strictness.LENIENT) // allows us to setup the blockingStub once, with everything, rather than using custom stubs for each test
|
||||
@EnableRuleMigrationSupport
|
||||
class CompactBlockDownloaderTest {
|
||||
|
||||
lateinit var downloader: CompactBlockStream
|
||||
|
|
|
@ -0,0 +1,228 @@
|
|||
package cash.z.wallet.sdk.data
|
||||
|
||||
import cash.z.wallet.sdk.dao.WalletTransaction
|
||||
import com.nhaarman.mockitokotlin2.timeout
|
||||
import kotlinx.coroutines.*
|
||||
import kotlinx.coroutines.channels.distinctBy
|
||||
import kotlinx.coroutines.channels.filter
|
||||
import org.junit.jupiter.api.AfterEach
|
||||
import org.junit.jupiter.api.BeforeEach
|
||||
import org.junit.jupiter.api.Test
|
||||
|
||||
import org.junit.jupiter.api.Assertions.*
|
||||
import kotlin.random.Random
|
||||
import kotlin.random.nextLong
|
||||
import kotlin.system.measureTimeMillis
|
||||
|
||||
internal class MockSynchronizerTest {
|
||||
|
||||
private val transactionInterval = 200L
|
||||
private val activeTransactionInterval = 200L
|
||||
private val synchronizer = MockSynchronizer(transactionInterval, activeTransactionInterval)
|
||||
private val fastSynchronizer = MockSynchronizer(2L, 2L)
|
||||
private val allTransactionChannel = synchronizer.allTransactions()
|
||||
private val validAddress = "ztestsapling1yu2zy9aane2pje2qvm4qmn4k6q57y2d9ecs5vz0guthxx3m2aq57qm6hkx0520m9u9635xh6ttd"
|
||||
|
||||
@BeforeEach
|
||||
fun setUp() {
|
||||
synchronizer.start(CoroutineScope(Dispatchers.IO))
|
||||
}
|
||||
|
||||
@AfterEach
|
||||
fun tearDown() {
|
||||
synchronizer.stop()
|
||||
}
|
||||
|
||||
@Test
|
||||
fun allTransactions() = runBlocking {
|
||||
var total = 0
|
||||
val duration = measureTimeMillis {
|
||||
repeat(10) {
|
||||
val transactions = allTransactionChannel.receive()
|
||||
total += transactions.size
|
||||
println("received ${transactions.size} transactions")
|
||||
}
|
||||
}
|
||||
assertTrue(total > 0)
|
||||
assertTrue(duration > transactionInterval)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `never calling send yields zero sent transactions`() = runBlocking {
|
||||
val fastChannel = fastSynchronizer.start(fastSynchronizer).allTransactions()
|
||||
var transactions = fastChannel.receive()
|
||||
repeat(10_000) {
|
||||
transactions = fastChannel.receive()
|
||||
}
|
||||
assertTrue(transactions.size > 0, "no transactions created at all")
|
||||
assertTrue(transactions.none { it.isSend })
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `send - each call to send generates exactly one sent transaction`() = runBlocking {
|
||||
val fastChannel = fastSynchronizer.start(fastSynchronizer).allTransactions()
|
||||
var transactions = fastChannel.receive()
|
||||
repeat(10_000) {
|
||||
if (it.rem(2_000) == 0) {
|
||||
fastSynchronizer.sendToAddress(10, validAddress); println("yep")
|
||||
}
|
||||
transactions = fastChannel.receive()
|
||||
}
|
||||
assertEquals(5, transactions.count { it.isSend })
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `send - triggers an active transaction`() = runBlocking {
|
||||
synchronizer.sendToAddress(10, validAddress)
|
||||
delay(500L)
|
||||
assertNotNull(synchronizer.activeTransactions().receiveOrNull())
|
||||
synchronizer.stop()
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `send - results in success`() = runBlocking {
|
||||
synchronizer.sendToAddress(10, validAddress)
|
||||
delay(500L)
|
||||
val result = synchronizer.activeTransactions().receive()
|
||||
assertTrue(result.isNotEmpty(), "result was empty")
|
||||
assertTrue(TransactionState.AwaitingConfirmations(0).order <= result.values.first().order)
|
||||
assertTrue((result.keys.first() as ActiveSendTransaction).transactionId.get() != -1L, "transactionId missing")
|
||||
assertTrue((result.keys.first() as ActiveSendTransaction).height.get() != -1, "height missing")
|
||||
synchronizer.stop()
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `send - results in mined transaction`() = runBlocking {
|
||||
synchronizer.sendToAddress(10, validAddress)
|
||||
delay(500L)
|
||||
val result = synchronizer.activeTransactions().receive()
|
||||
assertTrue(result.isNotEmpty(), "result was empty")
|
||||
assertTrue(TransactionState.AwaitingConfirmations(0).order <= result.values.first().order)
|
||||
synchronizer.stop()
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `send - a bad address fails`() = runBlocking {
|
||||
synchronizer.sendToAddress(10, "fail")
|
||||
delay(500L)
|
||||
val result = synchronizer.activeTransactions().receive()
|
||||
assertTrue(result.isNotEmpty(), "result was empty")
|
||||
assertTrue(0 > result.values.first().order)
|
||||
synchronizer.stop()
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `send - a short address fails`() = runBlocking {
|
||||
// one character too short
|
||||
val toAddress = "ztestsapling1yu2zy9aane2pje2qvm4qmn4k6q57y2d9ecs5vz0guthxx3m2aq57qm6hkx0520m9u9635xh6tt"
|
||||
assertTrue(toAddress.length < 88, "sample address wasn't short enough (${toAddress.length})")
|
||||
|
||||
synchronizer.sendToAddress(10, toAddress)
|
||||
delay(500L)
|
||||
val result = synchronizer.activeTransactions().receive()
|
||||
assertTrue(result.isNotEmpty(), "result was empty")
|
||||
assertTrue(0 > result.values.first().order,
|
||||
"result should have been a failure but was ${result.values.first()::class.simpleName}")
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `send - a non-z prefix address fails`() = runBlocking {
|
||||
// one character too short
|
||||
val toAddress = "atestsapling1yu2zy9aane2pje2qvm4qmn4k6q57y2d9ecs5vz0guthxx3m2aq57qm6hkx0520m9u9635xh6ttd"
|
||||
assertTrue(toAddress.length == 88,
|
||||
"sample address was not the proper length (${toAddress.length}")
|
||||
assertFalse(toAddress.startsWith('z'),
|
||||
"sample address should not start with z")
|
||||
|
||||
synchronizer.sendToAddress(10, toAddress)
|
||||
delay(500L)
|
||||
val result = synchronizer.activeTransactions().receive()
|
||||
assertTrue(result.isNotEmpty(), "result was empty")
|
||||
assertTrue(0 > result.values.first().order,
|
||||
"result should have been a failure but was ${result.values.first()::class.simpleName}")
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `balance matches transactions without sends`() = runBlocking {
|
||||
val balances = fastSynchronizer.start(fastSynchronizer).balance()
|
||||
var transactions = listOf<WalletTransaction>()
|
||||
while (transactions.count() < 10) {
|
||||
transactions = fastSynchronizer.allTransactions().receive()
|
||||
println("got ${transactions.count()} transaction(s)")
|
||||
}
|
||||
assertEquals(transactions.fold(0L) { acc, tx -> acc + tx.value }, balances.receive())
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `balance matches transactions with sends`() = runBlocking {
|
||||
var transactions = listOf<WalletTransaction>()
|
||||
val balances = fastSynchronizer.start(fastSynchronizer).balance()
|
||||
val transactionChannel = fastSynchronizer.allTransactions()
|
||||
while (transactions.count() < 10) {
|
||||
fastSynchronizer.sendToAddress(Random.nextLong(1L..10_000_000_000), validAddress)
|
||||
transactions = transactionChannel.receive()
|
||||
println("got ${transactions.count()} transaction(s)")
|
||||
}
|
||||
val transactionsSnapshot = transactionChannel.receive()
|
||||
val balanceSnapshot = balances.receive()
|
||||
|
||||
val positiveValue = transactionsSnapshot.fold(0L) { acc, tx -> acc + (if (tx.isSend) 0 else tx.value) }
|
||||
val negativeValue = transactionsSnapshot.fold(0L) { acc, tx -> acc + (if(!tx.isSend) 0 else tx.value) }
|
||||
assertEquals(positiveValue - negativeValue, balanceSnapshot, "incorrect balance. negative balance: $negativeValue positive balance: $positiveValue")
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `progress hits 100`() = runBlocking {
|
||||
var channel = synchronizer.progress()
|
||||
var now = System.currentTimeMillis()
|
||||
var delta = 0L
|
||||
val expectedUpperBounds = transactionInterval * 10
|
||||
while (channel.receive() < 100) {
|
||||
delta = now - System.currentTimeMillis()
|
||||
if (delta > expectedUpperBounds) break
|
||||
}
|
||||
assertTrue(delta < expectedUpperBounds, "progress did not hit 100 within the expected time of $expectedUpperBounds")
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `is out of sync about 10% of the time`() = runBlocking {
|
||||
var count = 0
|
||||
repeat(100_000) {
|
||||
if (synchronizer.isOutOfSync()) count++
|
||||
}
|
||||
assertTrue(count < 11_000, "a count of $count is too frequent")
|
||||
assertTrue(count > 9_000, "a count of $count is too infrequent")
|
||||
}
|
||||
|
||||
@Test
|
||||
fun isFirstRun() {
|
||||
}
|
||||
|
||||
@Test
|
||||
fun cancelSend() = runBlocking {
|
||||
val activeTransactions = synchronizer.activeTransactions()
|
||||
|
||||
// verify that send creates one transaction
|
||||
launch {
|
||||
synchronizer.sendToAddress(10, validAddress)
|
||||
}
|
||||
println("done sending to address")
|
||||
delay(300L)
|
||||
var actives = activeTransactions.receiveOrNull()
|
||||
assertEquals(1, actives?.size)
|
||||
assertTrue((actives?.values?.first()?.order ?: 0) > -1, "expected positive order but was ${actives?.values?.first()?.order}")
|
||||
val transaction = actives?.keys?.first() as? ActiveSendTransaction
|
||||
assertNotNull(transaction)
|
||||
|
||||
// and then verify that cancel changes its status
|
||||
synchronizer.cancelSend(transaction!!)
|
||||
delay(100L) // look for ignored state change
|
||||
actives = activeTransactions.receiveOrNull()
|
||||
assertNotNull(actives, "cancel changed nothing in 100ms")
|
||||
assertEquals(1, actives!!.size, "unexpected number of active transactions ${actives.size}")
|
||||
val finalState = actives!!.values.first()
|
||||
assertNotNull(finalState as? TransactionState.Cancelled, "transaction was ${finalState::class.simpleName} instead of cancelled for ${actives.keys.first()}")
|
||||
println("donso")
|
||||
synchronizer.stop()
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue