Beginning to improve the use of channels.

Bringing over lessons learned from the Zcon1 app. There are a lot of changes to make, this is just the beginning of what was necessary to support that app.
This commit is contained in:
Kevin Gorham 2019-06-19 04:14:00 -04:00 committed by Kevin Gorham
parent f222bde2fa
commit 4dd8d408cb
9 changed files with 71 additions and 83 deletions

View File

@ -59,7 +59,7 @@ class IntegrationTest {
downloader = CompactBlockDownloader(lightwalletService, compactBlockStore)
processor = CompactBlockProcessor(config, downloader, repository, rustBackend)
repository = PollingTransactionRepository(context, dataDbName, 10_000L)
repository = PollingTransactionRepository(context, dataDbName, rustBackend, 10_000L)
wallet = Wallet(
context,
rustBackend,
@ -86,7 +86,7 @@ class IntegrationTest {
companion object {
private lateinit var synchronizer: Synchronizer
private lateinit var repository: TransactionRepository
private lateinit var repository: PollingTransactionRepository
@AfterClass
fun tearDown() {
repository.stop()

View File

@ -0,0 +1,6 @@
{
"height": 518000,
"hash": "000ba586d734c295f0bc034be229b1c96cb040f9d4929efdb5d2b187eeb238fb",
"time": 1560645743,
"tree": "01a4f5240a88a6eb4ffbda7961a1430506aad1a50ba011593f02c243d968feb0550010000140f91773b4ab669846e5bcb96f60e68256c49a27872a98e9d5ce50b30a0c434e0000018968663d6a7b444591de83f8a07223113f5de7e8203807adacc7677c3bcd4f420194c7ecac0ef6d702d475680ec32051fdf6368af0c459ab450009c001bcbf7a5300000001f0eead5192c3b3ab7208429877570676647e448210332c6da7e18660b142b80e01b98b14cab05247195b3b3be3dd8639bae99a0dd10bed1282ac25b62a134afd7200000000011f8322ef806eb2430dc4a7a41c1b344bea5be946efc7b4349c1c9edb14ff9d39"
}

View File

@ -64,5 +64,7 @@ data class WalletTransaction(
// does the raw transaction contain a memo?
val rawMemoExists: Boolean = false,
// TODO: investigate populating this with SQL rather than a separate SDK call. then get rid of rawMemoExists.
var memo: String? = null
var memo: String? = null,
// set/maintain a custom status
var status: String? = null
)

View File

@ -34,7 +34,7 @@ class ActiveTransactionManager(
// mutableMapOf gives the same result but we're explicit about preserving insertion order, since we rely on that
private val activeTransactions = LinkedHashMap<ActiveTransaction, TransactionState>()
private val channel = ConflatedBroadcastChannel<Map<ActiveTransaction, TransactionState>>()
private val transactionSubscription = repository.allTransactions()
private val clearedTransactions = ConflatedBroadcastChannel<List<WalletTransaction>>()
// private val latestHeightSubscription = service.latestHeights()
fun subscribe(): ReceiveChannel<Map<ActiveTransaction, TransactionState>> {
@ -49,10 +49,10 @@ class ActiveTransactionManager(
fun stop() {
twig("ActiveTransactionManager stopping")
clearedTransactions.cancel()
channel.cancel()
job.cancel()
sentTransactionMonitorJob.cancel()
transactionSubscription.cancel()
// confirmationMonitorJob.cancel() <- TODO: enable confirmation monitor
}
@ -124,11 +124,14 @@ class ActiveTransactionManager(
private fun CoroutineScope.launchSentTransactionMonitor() = launch {
withContext(Dispatchers.IO) {
while(isActive && !transactionSubscription.isClosedForReceive) {
(repository as PollingTransactionRepository).poll(clearedTransactions)
val results = clearedTransactions.openSubscription()
while(isActive && !clearedTransactions.isClosedForSend && !results.isClosedForReceive) {
twig("awaiting next modification to transactions...")
val transactions = transactionSubscription.receive()
val transactions = results.receive()
updateSentTransactions(transactions)
}
results.cancel()
}
}

View File

@ -8,12 +8,10 @@ import cash.z.wallet.sdk.dao.TransactionDao
import cash.z.wallet.sdk.dao.WalletTransaction
import cash.z.wallet.sdk.db.DerivedDataDb
import cash.z.wallet.sdk.entity.Transaction
import cash.z.wallet.sdk.exception.RepositoryException
import cash.z.wallet.sdk.jni.RustBackendWelding
import kotlinx.coroutines.*
import kotlinx.coroutines.Dispatchers.IO
import kotlinx.coroutines.channels.ConflatedBroadcastChannel
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.channels.SendChannel
/**
* Repository that does polling for simplicity. We will implement an alternative version that uses live data as well as
@ -49,33 +47,7 @@ open class PollingTransactionRepository(
internal val blocks: BlockDao = derivedDataDb.blockDao()
private val transactions: TransactionDao = derivedDataDb.transactionDao()
private lateinit var pollingJob: Job
private val allTransactionsChannel = ConflatedBroadcastChannel<List<WalletTransaction>>()
private val existingTransactions = listOf<WalletTransaction>()
private val wasPreviouslyStarted
get() = !existingTransactions.isEmpty() || allTransactionsChannel.isClosedForSend
override fun start(parentScope: CoroutineScope) {
// prevent restarts so the behavior of this class is easier to reason about
if (wasPreviouslyStarted) throw RepositoryException.FalseStart
twig("starting")
pollingJob = parentScope.launch {
poll()
}
}
override fun stop() {
twig("stopping but doing nothing")
pollingJob.cancel()
derivedDataDb.close()
// TODO: verify that the channels behave as expected in this scenario
}
override fun allTransactions(): ReceiveChannel<List<WalletTransaction>> {
return allTransactionsChannel.openSubscription()
}
private var pollingJob: Job? = null
override fun lastScannedHeight(): Int {
return blocks.lastScannedHeight()
@ -98,23 +70,31 @@ open class PollingTransactionRepository(
}
}
private suspend fun poll() = withContext(IO) {
var previousTransactions: List<WalletTransaction>? = null
while (isActive && !allTransactionsChannel.isClosedForSend) {
twigTask("polling for transactions every ${pollFrequencyMillis}ms") {
val newTransactions = transactions.getAll()
suspend fun poll(channel: SendChannel<List<WalletTransaction>>, frequency: Long = pollFrequencyMillis) = withContext(IO) {
pollingJob?.cancel()
pollingJob = launch {
var previousTransactions: List<WalletTransaction>? = null
while (isActive && !channel.isClosedForSend) {
twigTask("polling for cleared transactions every ${frequency}ms") {
val newTransactions = transactions.getAll()
if (hasChanged(previousTransactions, newTransactions)) {
twig("loaded ${newTransactions.count()} transactions and changes were detected!")
allTransactionsChannel.send(addMemos(newTransactions))
previousTransactions = newTransactions
} else {
twig("loaded ${newTransactions.count()} transactions but no changes detected.")
if (hasChanged(previousTransactions, newTransactions)) {
twig("loaded ${newTransactions.count()} cleared transactions and changes were detected!")
channel.send(addMemos(newTransactions))
previousTransactions = newTransactions
} else {
twig("loaded ${newTransactions.count()} cleared transactions but no changes detected.")
}
}
delay(pollFrequencyMillis)
}
delay(pollFrequencyMillis)
twig("Done polling for cleared transactions")
}
twig("Done polling for transactions")
}
fun stop() {
pollingJob?.cancel().also { pollingJob = null }
derivedDataDb.close()
}
private suspend fun addMemos(newTransactions: List<WalletTransaction>): List<WalletTransaction> = withContext(IO){

View File

@ -7,6 +7,7 @@ import cash.z.wallet.sdk.exception.WalletException
import cash.z.wallet.sdk.secure.Wallet
import kotlinx.coroutines.*
import kotlinx.coroutines.Dispatchers.IO
import kotlinx.coroutines.channels.ConflatedBroadcastChannel
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.channels.distinct
import kotlin.coroutines.CoroutineContext
@ -32,7 +33,7 @@ import kotlin.coroutines.CoroutineContext
*/
class SdkSynchronizer(
private val processor: CompactBlockProcessor,
private val repository: TransactionRepository,
private val repository: PollingTransactionRepository,
private val activeTransactionManager: ActiveTransactionManager,
private val wallet: Wallet,
private val staleTolerance: Int = 10
@ -76,6 +77,15 @@ class SdkSynchronizer(
if (failure != null) value?.invoke(failure)
}
/**
* Channel of transactions from the repository.
*/
private val transactionChannel = ConflatedBroadcastChannel<List<WalletTransaction>>()
/**
* Channel of balance information.
*/
private val balanceChannel = ConflatedBroadcastChannel<Wallet.WalletBalance>()
//
// Public API
@ -119,7 +129,7 @@ class SdkSynchronizer(
*/
override fun stop() {
twig("stopping")
repository.stop().also { twig("repository stopped") }
(repository as? PollingTransactionRepository)?.stop().also { twig("repository stopped") }
activeTransactionManager.stop().also { twig("activeTransactionManager stopped") }
// TODO: investigate whether this is necessary and remove or improve, accordingly
Thread.sleep(5000L)
@ -138,7 +148,7 @@ class SdkSynchronizer(
* A stream of all the wallet transactions, delegated to the [repository].
*/
override fun allTransactions(): ReceiveChannel<List<WalletTransaction>> {
return repository.allTransactions()
return transactionChannel.openSubscription()
}
/**
@ -155,7 +165,7 @@ class SdkSynchronizer(
* A stream of balance values, delegated to the [wallet].
*/
override fun balances(): ReceiveChannel<Wallet.WalletBalance> {
return wallet.balances()
return balanceChannel.openSubscription()
}
@ -230,10 +240,10 @@ class SdkSynchronizer(
*/
private fun CoroutineScope.onReady() = launch {
twig("synchronization is ready to begin!")
launch { monitorTransactions(repository.allTransactions().distinct()) }
launch { monitorTransactions(transactionChannel.openSubscription().distinct()) }
activeTransactionManager.start()
repository.start(this)
repository.poll(transactionChannel)
processor.start()
}
@ -243,10 +253,12 @@ class SdkSynchronizer(
private suspend fun monitorTransactions(transactionChannel: ReceiveChannel<List<WalletTransaction>>) =
withContext(IO) {
twig("beginning to monitor transactions in order to update the balance")
for (i in transactionChannel) {
twig("triggering a balance update because transactions have changed")
wallet.sendBalanceInfo()
twig("done triggering balance check!")
launch {
for (i in transactionChannel) {
twig("triggering a balance update because transactions have changed")
balanceChannel.send(wallet.getBalanceInfo())
twig("done triggering balance check!")
}
}
twig("done monitoring transactions in order to update the balance")
}

View File

@ -1,14 +1,8 @@
package cash.z.wallet.sdk.data
import cash.z.wallet.sdk.dao.WalletTransaction
import cash.z.wallet.sdk.entity.Transaction
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.channels.ReceiveChannel
interface TransactionRepository {
fun start(parentScope: CoroutineScope)
fun stop()
fun allTransactions(): ReceiveChannel<List<WalletTransaction>>
fun lastScannedHeight(): Int
fun isInitialized(): Boolean
suspend fun findTransactionById(txId: Long): Transaction?

View File

@ -25,6 +25,11 @@ const val SAPLING_ACTIVATION_HEIGHT = 280_000
*/
const val MAX_REORG_SIZE = 100
/**
* The amount of blocks ahead of the current height where new transactions are set to expire. This value is controlled
* by the rust backend but it is helpful to know what it is set to and shdould be kept in sync.
*/
const val EXPIRY_OFFSET = 20
//
// Defaults

View File

@ -15,8 +15,6 @@ import com.google.gson.stream.JsonReader
import com.squareup.okhttp.OkHttpClient
import com.squareup.okhttp.Request
import kotlinx.coroutines.Dispatchers.IO
import kotlinx.coroutines.channels.ConflatedBroadcastChannel
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.withContext
import okio.Okio
import java.io.File
@ -64,11 +62,6 @@ class Wallet(
*/
private var spendingKeyStore by spendingKeyProvider
/**
* Channel where balance info will be emitted.
*/
private val balanceChannel = ConflatedBroadcastChannel<WalletBalance>()
/**
* Initializes the wallet by creating the DataDb and pre-populating it with data corresponding to the birthday for
* this wallet.
@ -113,13 +106,6 @@ class Wallet(
return rustBackend.getAddress(dataDbPath, accountId)
}
/**
* Stream of balances.
*/
fun balances(): ReceiveChannel<WalletBalance> {
return balanceChannel.openSubscription()
}
/**
* Return a quick snapshot of the available balance. In most cases, the stream of balances
* provided by [balances] should be used instead of this funciton.
@ -135,14 +121,14 @@ class Wallet(
*
* @param accountId the account to check for balance info.
*/
suspend fun sendBalanceInfo(accountId: Int = accountIds[0]) = withContext(IO) {
suspend fun getBalanceInfo(accountId: Int = accountIds[0]): WalletBalance = withContext(IO) {
twigTask("checking balance info") {
try {
val balanceTotal = rustBackend.getBalance(dataDbPath, accountId)
twig("found total balance of: $balanceTotal")
val balanceAvailable = rustBackend.getVerifiedBalance(dataDbPath, accountId)
twig("found available balance of: $balanceAvailable")
balanceChannel.send(WalletBalance(balanceTotal, balanceAvailable))
WalletBalance(balanceTotal, balanceAvailable)
} catch (t: Throwable) {
twig("failed to get balance due to $t")
throw RustLayerException.BalanceException(t)
@ -163,7 +149,7 @@ class Wallet(
suspend fun createRawSendTransaction(value: Long, toAddress: String, memo: String = "", fromAccountId: Int = accountIds[0]): Long =
withContext(IO) {
var result = -1L
twigTask("creating raw transaction to send $value zatoshi to ${toAddress.masked()}") {
twigTask("creating raw transaction to send $value zatoshi to ${toAddress.masked()} with memo $memo") {
result = try {
ensureParams(paramDestinationDir)
twig("params exist at $paramDestinationDir! attempting to send...")