diff --git a/src/androidTest/java/cash/z/wallet/sdk/db/IntegrationTest.kt b/src/androidTest/java/cash/z/wallet/sdk/db/IntegrationTest.kt index c2440d2f..a31d945f 100644 --- a/src/androidTest/java/cash/z/wallet/sdk/db/IntegrationTest.kt +++ b/src/androidTest/java/cash/z/wallet/sdk/db/IntegrationTest.kt @@ -59,7 +59,7 @@ class IntegrationTest { downloader = CompactBlockDownloader(lightwalletService, compactBlockStore) processor = CompactBlockProcessor(config, downloader, repository, rustBackend) - repository = PollingTransactionRepository(context, dataDbName, 10_000L) + repository = PollingTransactionRepository(context, dataDbName, rustBackend, 10_000L) wallet = Wallet( context, rustBackend, @@ -86,7 +86,7 @@ class IntegrationTest { companion object { private lateinit var synchronizer: Synchronizer - private lateinit var repository: TransactionRepository + private lateinit var repository: PollingTransactionRepository @AfterClass fun tearDown() { repository.stop() diff --git a/src/main/assets/zcash/saplingtree/518000.json b/src/main/assets/zcash/saplingtree/518000.json new file mode 100644 index 00000000..2d271993 --- /dev/null +++ b/src/main/assets/zcash/saplingtree/518000.json @@ -0,0 +1,6 @@ +{ + "height": 518000, + "hash": "000ba586d734c295f0bc034be229b1c96cb040f9d4929efdb5d2b187eeb238fb", + "time": 1560645743, + "tree": "01a4f5240a88a6eb4ffbda7961a1430506aad1a50ba011593f02c243d968feb0550010000140f91773b4ab669846e5bcb96f60e68256c49a27872a98e9d5ce50b30a0c434e0000018968663d6a7b444591de83f8a07223113f5de7e8203807adacc7677c3bcd4f420194c7ecac0ef6d702d475680ec32051fdf6368af0c459ab450009c001bcbf7a5300000001f0eead5192c3b3ab7208429877570676647e448210332c6da7e18660b142b80e01b98b14cab05247195b3b3be3dd8639bae99a0dd10bed1282ac25b62a134afd7200000000011f8322ef806eb2430dc4a7a41c1b344bea5be946efc7b4349c1c9edb14ff9d39" +} diff --git a/src/main/java/cash/z/wallet/sdk/dao/TransactionDao.kt b/src/main/java/cash/z/wallet/sdk/dao/TransactionDao.kt index 45f07fc6..85355a8f 100644 --- a/src/main/java/cash/z/wallet/sdk/dao/TransactionDao.kt +++ b/src/main/java/cash/z/wallet/sdk/dao/TransactionDao.kt @@ -64,5 +64,7 @@ data class WalletTransaction( // does the raw transaction contain a memo? val rawMemoExists: Boolean = false, // TODO: investigate populating this with SQL rather than a separate SDK call. then get rid of rawMemoExists. - var memo: String? = null + var memo: String? = null, + // set/maintain a custom status + var status: String? = null ) \ No newline at end of file diff --git a/src/main/java/cash/z/wallet/sdk/data/ActiveTransactionManager.kt b/src/main/java/cash/z/wallet/sdk/data/ActiveTransactionManager.kt index 4c0d145e..2fa5f8eb 100644 --- a/src/main/java/cash/z/wallet/sdk/data/ActiveTransactionManager.kt +++ b/src/main/java/cash/z/wallet/sdk/data/ActiveTransactionManager.kt @@ -34,7 +34,7 @@ class ActiveTransactionManager( // mutableMapOf gives the same result but we're explicit about preserving insertion order, since we rely on that private val activeTransactions = LinkedHashMap() private val channel = ConflatedBroadcastChannel>() - private val transactionSubscription = repository.allTransactions() + private val clearedTransactions = ConflatedBroadcastChannel>() // private val latestHeightSubscription = service.latestHeights() fun subscribe(): ReceiveChannel> { @@ -49,10 +49,10 @@ class ActiveTransactionManager( fun stop() { twig("ActiveTransactionManager stopping") + clearedTransactions.cancel() channel.cancel() job.cancel() sentTransactionMonitorJob.cancel() - transactionSubscription.cancel() // confirmationMonitorJob.cancel() <- TODO: enable confirmation monitor } @@ -124,11 +124,14 @@ class ActiveTransactionManager( private fun CoroutineScope.launchSentTransactionMonitor() = launch { withContext(Dispatchers.IO) { - while(isActive && !transactionSubscription.isClosedForReceive) { + (repository as PollingTransactionRepository).poll(clearedTransactions) + val results = clearedTransactions.openSubscription() + while(isActive && !clearedTransactions.isClosedForSend && !results.isClosedForReceive) { twig("awaiting next modification to transactions...") - val transactions = transactionSubscription.receive() + val transactions = results.receive() updateSentTransactions(transactions) } + results.cancel() } } diff --git a/src/main/java/cash/z/wallet/sdk/data/PollingTransactionRepository.kt b/src/main/java/cash/z/wallet/sdk/data/PollingTransactionRepository.kt index d52f0efd..786671e4 100644 --- a/src/main/java/cash/z/wallet/sdk/data/PollingTransactionRepository.kt +++ b/src/main/java/cash/z/wallet/sdk/data/PollingTransactionRepository.kt @@ -8,12 +8,10 @@ import cash.z.wallet.sdk.dao.TransactionDao import cash.z.wallet.sdk.dao.WalletTransaction import cash.z.wallet.sdk.db.DerivedDataDb import cash.z.wallet.sdk.entity.Transaction -import cash.z.wallet.sdk.exception.RepositoryException import cash.z.wallet.sdk.jni.RustBackendWelding import kotlinx.coroutines.* import kotlinx.coroutines.Dispatchers.IO -import kotlinx.coroutines.channels.ConflatedBroadcastChannel -import kotlinx.coroutines.channels.ReceiveChannel +import kotlinx.coroutines.channels.SendChannel /** * Repository that does polling for simplicity. We will implement an alternative version that uses live data as well as @@ -49,33 +47,7 @@ open class PollingTransactionRepository( internal val blocks: BlockDao = derivedDataDb.blockDao() private val transactions: TransactionDao = derivedDataDb.transactionDao() - private lateinit var pollingJob: Job - private val allTransactionsChannel = ConflatedBroadcastChannel>() - private val existingTransactions = listOf() - private val wasPreviouslyStarted - get() = !existingTransactions.isEmpty() || allTransactionsChannel.isClosedForSend - - override fun start(parentScope: CoroutineScope) { - // prevent restarts so the behavior of this class is easier to reason about - if (wasPreviouslyStarted) throw RepositoryException.FalseStart - - twig("starting") - - pollingJob = parentScope.launch { - poll() - } - } - - override fun stop() { - twig("stopping but doing nothing") - pollingJob.cancel() - derivedDataDb.close() - // TODO: verify that the channels behave as expected in this scenario - } - - override fun allTransactions(): ReceiveChannel> { - return allTransactionsChannel.openSubscription() - } + private var pollingJob: Job? = null override fun lastScannedHeight(): Int { return blocks.lastScannedHeight() @@ -98,23 +70,31 @@ open class PollingTransactionRepository( } } - private suspend fun poll() = withContext(IO) { - var previousTransactions: List? = null - while (isActive && !allTransactionsChannel.isClosedForSend) { - twigTask("polling for transactions every ${pollFrequencyMillis}ms") { - val newTransactions = transactions.getAll() + suspend fun poll(channel: SendChannel>, frequency: Long = pollFrequencyMillis) = withContext(IO) { + pollingJob?.cancel() + pollingJob = launch { + var previousTransactions: List? = null + while (isActive && !channel.isClosedForSend) { + twigTask("polling for cleared transactions every ${frequency}ms") { + val newTransactions = transactions.getAll() - if (hasChanged(previousTransactions, newTransactions)) { - twig("loaded ${newTransactions.count()} transactions and changes were detected!") - allTransactionsChannel.send(addMemos(newTransactions)) - previousTransactions = newTransactions - } else { - twig("loaded ${newTransactions.count()} transactions but no changes detected.") + if (hasChanged(previousTransactions, newTransactions)) { + twig("loaded ${newTransactions.count()} cleared transactions and changes were detected!") + channel.send(addMemos(newTransactions)) + previousTransactions = newTransactions + } else { + twig("loaded ${newTransactions.count()} cleared transactions but no changes detected.") + } } + delay(pollFrequencyMillis) } - delay(pollFrequencyMillis) + twig("Done polling for cleared transactions") } - twig("Done polling for transactions") + } + + fun stop() { + pollingJob?.cancel().also { pollingJob = null } + derivedDataDb.close() } private suspend fun addMemos(newTransactions: List): List = withContext(IO){ diff --git a/src/main/java/cash/z/wallet/sdk/data/SdkSynchronizer.kt b/src/main/java/cash/z/wallet/sdk/data/SdkSynchronizer.kt index 97c6fca0..78b8a6d6 100644 --- a/src/main/java/cash/z/wallet/sdk/data/SdkSynchronizer.kt +++ b/src/main/java/cash/z/wallet/sdk/data/SdkSynchronizer.kt @@ -7,6 +7,7 @@ import cash.z.wallet.sdk.exception.WalletException import cash.z.wallet.sdk.secure.Wallet import kotlinx.coroutines.* import kotlinx.coroutines.Dispatchers.IO +import kotlinx.coroutines.channels.ConflatedBroadcastChannel import kotlinx.coroutines.channels.ReceiveChannel import kotlinx.coroutines.channels.distinct import kotlin.coroutines.CoroutineContext @@ -32,7 +33,7 @@ import kotlin.coroutines.CoroutineContext */ class SdkSynchronizer( private val processor: CompactBlockProcessor, - private val repository: TransactionRepository, + private val repository: PollingTransactionRepository, private val activeTransactionManager: ActiveTransactionManager, private val wallet: Wallet, private val staleTolerance: Int = 10 @@ -76,6 +77,15 @@ class SdkSynchronizer( if (failure != null) value?.invoke(failure) } + /** + * Channel of transactions from the repository. + */ + private val transactionChannel = ConflatedBroadcastChannel>() + + /** + * Channel of balance information. + */ + private val balanceChannel = ConflatedBroadcastChannel() // // Public API @@ -119,7 +129,7 @@ class SdkSynchronizer( */ override fun stop() { twig("stopping") - repository.stop().also { twig("repository stopped") } + (repository as? PollingTransactionRepository)?.stop().also { twig("repository stopped") } activeTransactionManager.stop().also { twig("activeTransactionManager stopped") } // TODO: investigate whether this is necessary and remove or improve, accordingly Thread.sleep(5000L) @@ -138,7 +148,7 @@ class SdkSynchronizer( * A stream of all the wallet transactions, delegated to the [repository]. */ override fun allTransactions(): ReceiveChannel> { - return repository.allTransactions() + return transactionChannel.openSubscription() } /** @@ -155,7 +165,7 @@ class SdkSynchronizer( * A stream of balance values, delegated to the [wallet]. */ override fun balances(): ReceiveChannel { - return wallet.balances() + return balanceChannel.openSubscription() } @@ -230,10 +240,10 @@ class SdkSynchronizer( */ private fun CoroutineScope.onReady() = launch { twig("synchronization is ready to begin!") - launch { monitorTransactions(repository.allTransactions().distinct()) } + launch { monitorTransactions(transactionChannel.openSubscription().distinct()) } activeTransactionManager.start() - repository.start(this) + repository.poll(transactionChannel) processor.start() } @@ -243,10 +253,12 @@ class SdkSynchronizer( private suspend fun monitorTransactions(transactionChannel: ReceiveChannel>) = withContext(IO) { twig("beginning to monitor transactions in order to update the balance") - for (i in transactionChannel) { - twig("triggering a balance update because transactions have changed") - wallet.sendBalanceInfo() - twig("done triggering balance check!") + launch { + for (i in transactionChannel) { + twig("triggering a balance update because transactions have changed") + balanceChannel.send(wallet.getBalanceInfo()) + twig("done triggering balance check!") + } } twig("done monitoring transactions in order to update the balance") } diff --git a/src/main/java/cash/z/wallet/sdk/data/TransactionRepository.kt b/src/main/java/cash/z/wallet/sdk/data/TransactionRepository.kt index 9475667b..e1033b7c 100644 --- a/src/main/java/cash/z/wallet/sdk/data/TransactionRepository.kt +++ b/src/main/java/cash/z/wallet/sdk/data/TransactionRepository.kt @@ -1,14 +1,8 @@ package cash.z.wallet.sdk.data -import cash.z.wallet.sdk.dao.WalletTransaction import cash.z.wallet.sdk.entity.Transaction -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.channels.ReceiveChannel interface TransactionRepository { - fun start(parentScope: CoroutineScope) - fun stop() - fun allTransactions(): ReceiveChannel> fun lastScannedHeight(): Int fun isInitialized(): Boolean suspend fun findTransactionById(txId: Long): Transaction? diff --git a/src/main/java/cash/z/wallet/sdk/ext/ZcashSdk.kt b/src/main/java/cash/z/wallet/sdk/ext/ZcashSdk.kt index 940657c6..02acf852 100644 --- a/src/main/java/cash/z/wallet/sdk/ext/ZcashSdk.kt +++ b/src/main/java/cash/z/wallet/sdk/ext/ZcashSdk.kt @@ -25,6 +25,11 @@ const val SAPLING_ACTIVATION_HEIGHT = 280_000 */ const val MAX_REORG_SIZE = 100 +/** + * The amount of blocks ahead of the current height where new transactions are set to expire. This value is controlled + * by the rust backend but it is helpful to know what it is set to and shdould be kept in sync. + */ +const val EXPIRY_OFFSET = 20 // // Defaults diff --git a/src/main/java/cash/z/wallet/sdk/secure/Wallet.kt b/src/main/java/cash/z/wallet/sdk/secure/Wallet.kt index 46caf4b8..ce1425d6 100644 --- a/src/main/java/cash/z/wallet/sdk/secure/Wallet.kt +++ b/src/main/java/cash/z/wallet/sdk/secure/Wallet.kt @@ -15,8 +15,6 @@ import com.google.gson.stream.JsonReader import com.squareup.okhttp.OkHttpClient import com.squareup.okhttp.Request import kotlinx.coroutines.Dispatchers.IO -import kotlinx.coroutines.channels.ConflatedBroadcastChannel -import kotlinx.coroutines.channels.ReceiveChannel import kotlinx.coroutines.withContext import okio.Okio import java.io.File @@ -64,11 +62,6 @@ class Wallet( */ private var spendingKeyStore by spendingKeyProvider - /** - * Channel where balance info will be emitted. - */ - private val balanceChannel = ConflatedBroadcastChannel() - /** * Initializes the wallet by creating the DataDb and pre-populating it with data corresponding to the birthday for * this wallet. @@ -113,13 +106,6 @@ class Wallet( return rustBackend.getAddress(dataDbPath, accountId) } - /** - * Stream of balances. - */ - fun balances(): ReceiveChannel { - return balanceChannel.openSubscription() - } - /** * Return a quick snapshot of the available balance. In most cases, the stream of balances * provided by [balances] should be used instead of this funciton. @@ -135,14 +121,14 @@ class Wallet( * * @param accountId the account to check for balance info. */ - suspend fun sendBalanceInfo(accountId: Int = accountIds[0]) = withContext(IO) { + suspend fun getBalanceInfo(accountId: Int = accountIds[0]): WalletBalance = withContext(IO) { twigTask("checking balance info") { try { val balanceTotal = rustBackend.getBalance(dataDbPath, accountId) twig("found total balance of: $balanceTotal") val balanceAvailable = rustBackend.getVerifiedBalance(dataDbPath, accountId) twig("found available balance of: $balanceAvailable") - balanceChannel.send(WalletBalance(balanceTotal, balanceAvailable)) + WalletBalance(balanceTotal, balanceAvailable) } catch (t: Throwable) { twig("failed to get balance due to $t") throw RustLayerException.BalanceException(t) @@ -163,7 +149,7 @@ class Wallet( suspend fun createRawSendTransaction(value: Long, toAddress: String, memo: String = "", fromAccountId: Int = accountIds[0]): Long = withContext(IO) { var result = -1L - twigTask("creating raw transaction to send $value zatoshi to ${toAddress.masked()}") { + twigTask("creating raw transaction to send $value zatoshi to ${toAddress.masked()} with memo $memo") { result = try { ensureParams(paramDestinationDir) twig("params exist at $paramDestinationDir! attempting to send...")