Refactor after Zcon1

This commit is contained in:
Kevin Gorham 2019-07-14 18:13:12 -04:00 committed by Kevin Gorham
parent 8c7103d0ee
commit a83ace6e43
32 changed files with 1459 additions and 1738 deletions

View File

@ -49,7 +49,7 @@ apply plugin: 'org.mozilla.rust-android-gradle.rust-android'
apply plugin: 'org.owasp.dependencycheck'
group = 'cash.z.android.wallet'
version = '1.9.0'
version = '1.9.1'
repositories {
google()
@ -64,7 +64,7 @@ android {
defaultConfig {
minSdkVersion buildConfig.minSdkVersion
targetSdkVersion buildConfig.targetSdkVersion
versionCode = 1_09_00_00 // last digits are alpha(0X) beta(1X) rc(2X) release(3X). Ex: 1_08_04_20 is a RC build
versionCode = 1_09_01_00 // last digits are alpha(0X) beta(1X) rc(2X) release(3X). Ex: 1_08_04_20 is a RC build
versionName = "$version-alpha"
testInstrumentationRunner = "androidx.test.runner.AndroidJUnitRunner"
testInstrumentationRunnerArguments clearPackageData: 'true'

View File

@ -4,10 +4,11 @@ import androidx.arch.core.executor.testing.InstantTaskExecutorRule
import androidx.room.Room
import androidx.room.RoomDatabase
import androidx.test.core.app.ApplicationProvider
import cash.z.wallet.sdk.dao.CompactBlockDao
import cash.z.wallet.sdk.entity.CompactBlock
import org.junit.*
import org.junit.Assert.*
import org.junit.AfterClass
import org.junit.Assert.assertNotNull
import org.junit.BeforeClass
import org.junit.Rule
import org.junit.Test
class CacheDbIntegrationTest {
@get:Rule

View File

@ -4,8 +4,6 @@ import androidx.arch.core.executor.testing.InstantTaskExecutorRule
import androidx.room.Room
import androidx.room.RoomDatabase
import androidx.test.core.app.ApplicationProvider
import cash.z.wallet.sdk.dao.BlockDao
import cash.z.wallet.sdk.dao.TransactionDao
import org.junit.AfterClass
import org.junit.Assert.assertEquals
import org.junit.Assert.assertNotNull
@ -39,7 +37,7 @@ class DerivedDbIntegrationTest {
@Test
fun testNoteQuery() {
val all = transactions.getAll()
val all = transactions.getReceivedTransactions()
assertEquals(3, all.size)
}
@ -47,7 +45,7 @@ class DerivedDbIntegrationTest {
fun testTransactionDaoPrepopulated() {
val tran = transactions.findById(1)
assertEquals(343987, tran?.block)
assertEquals(343987, tran?.minedHeight)
}
companion object {

View File

@ -5,7 +5,6 @@ import androidx.arch.core.executor.testing.InstantTaskExecutorRule
import androidx.room.Room
import androidx.room.RoomDatabase
import androidx.test.core.app.ApplicationProvider
import cash.z.wallet.sdk.dao.CompactBlockDao
import cash.z.wallet.sdk.entity.CompactBlock
import cash.z.wallet.sdk.ext.toBlockHeight
import cash.z.wallet.sdk.jni.RustBackend

View File

@ -5,20 +5,21 @@ import androidx.arch.core.executor.testing.InstantTaskExecutorRule
import androidx.room.Room
import androidx.room.RoomDatabase
import androidx.test.core.app.ApplicationProvider
import cash.z.wallet.sdk.dao.BlockDao
import cash.z.wallet.sdk.dao.CompactBlockDao
import cash.z.wallet.sdk.dao.TransactionDao
import cash.z.wallet.sdk.entity.CompactBlock
import cash.z.wallet.sdk.ext.toBlockHeight
import cash.z.wallet.sdk.jni.RustBackend
import cash.z.wallet.sdk.jni.RustBackendWelding
import io.grpc.ManagedChannel
import io.grpc.ManagedChannelBuilder
import org.junit.*
import org.junit.Assert.*
import cash.z.wallet.sdk.rpc.CompactTxStreamerGrpc
import cash.z.wallet.sdk.rpc.Service
import cash.z.wallet.sdk.rpc.Service.*
import cash.z.wallet.sdk.rpc.Service.BlockID
import cash.z.wallet.sdk.rpc.Service.BlockRange
import io.grpc.ManagedChannel
import io.grpc.ManagedChannelBuilder
import org.junit.AfterClass
import org.junit.Assert.assertNotNull
import org.junit.BeforeClass
import org.junit.Rule
import org.junit.Test
import java.util.concurrent.TimeUnit
class GlueSetupIntegrationTest {

View File

@ -61,7 +61,7 @@ class IntegrationTest {
downloader = CompactBlockDownloader(lightwalletService, compactBlockStore)
processor = CompactBlockProcessor(config, downloader, repository, rustBackend)
repository = PollingTransactionRepository(context, dataDbName, rustBackend, 10_000L)
repository = PollingTransactionRepository(context, dataDbName, 10_000L)
wallet = Wallet(
context = context,
rustBackend = rustBackend,
@ -71,17 +71,17 @@ class IntegrationTest {
)
// repository.start(this)
synchronizer = SdkSynchronizer(
processor,
repository,
ActiveTransactionManager(repository, lightwalletService, wallet),
wallet,
1000
).start(this)
for(i in synchronizer.progress()) {
twig("made progress: $i")
}
// synchronizer = StableSynchronizer(wallet, repository, , processor)
// processor,
// repository,
// ActiveTransactionManager(repository, lightwalletService, wallet),
// wallet,
// 1000
// ).start(this)
//
// for(i in synchronizer.progress()) {
// twig("made progress: $i")
// }
}
companion object {

View File

@ -4,7 +4,6 @@ import androidx.arch.core.executor.testing.InstantTaskExecutorRule
import androidx.room.Room
import androidx.room.RoomDatabase
import androidx.test.core.app.ApplicationProvider
import cash.z.wallet.sdk.dao.TransactionDao
import org.junit.AfterClass
import org.junit.Assert.assertEquals
import org.junit.BeforeClass

View File

@ -3,7 +3,7 @@ package cash.z.wallet.sdk.block
import android.content.Context
import androidx.room.Room
import androidx.room.RoomDatabase
import cash.z.wallet.sdk.dao.CompactBlockDao
import cash.z.wallet.sdk.db.CompactBlockDao
import cash.z.wallet.sdk.db.CompactBlockDb
import cash.z.wallet.sdk.entity.CompactBlock
import cash.z.wallet.sdk.ext.SAPLING_ACTIVATION_HEIGHT

View File

@ -1,13 +0,0 @@
package cash.z.wallet.sdk.dao
import androidx.room.*
import cash.z.wallet.sdk.entity.Block
@Dao
interface BlockDao {
@Query("SELECT COUNT(height) FROM blocks")
fun count(): Int
@Query("SELECT MAX(height) FROM blocks")
fun lastScannedHeight(): Int
}

View File

@ -1,22 +0,0 @@
package cash.z.wallet.sdk.dao
import androidx.room.Dao
import androidx.room.Insert
import androidx.room.OnConflictStrategy
import androidx.room.Query
import cash.z.wallet.sdk.entity.CompactBlock
@Dao
interface CompactBlockDao {
@Insert(onConflict = OnConflictStrategy.REPLACE)
fun insert(block: CompactBlock)
@Insert(onConflict = OnConflictStrategy.REPLACE)
fun insert(block: List<CompactBlock>)
@Query("DELETE FROM compactblocks WHERE height >= :height")
fun rewindTo(height: Int)
@Query("SELECT MAX(height) FROM compactblocks")
fun latestBlockHeight(): Int
}

View File

@ -1,147 +0,0 @@
package cash.z.wallet.sdk.dao
import androidx.room.Dao
import androidx.room.Delete
import androidx.room.Query
import cash.z.wallet.sdk.entity.Transaction
@Dao
interface TransactionDao {
@Query("SELECT * FROM transactions WHERE id_tx = :id")
fun findById(id: Long): Transaction?
@Delete
fun delete(transaction: Transaction)
@Query("DELETE FROM transactions WHERE id_tx = :id")
fun deleteById(id: Long)
/**
* Query transactions, aggregating information on send/receive, sorted carefully so the newest data is at the top
* and the oldest transactions are at the bottom.
*/
@Query("""
SELECT transactions.id_tx AS id,
transactions.txid AS rawTransactionId,
transactions.block AS height,
transactions.raw IS NOT NULL AS isSend,
transactions.block IS NOT NULL AS isMined,
blocks.time AS timeInSeconds,
sent_notes.address AS address,
CASE
WHEN transactions.raw IS NOT NULL THEN sent_notes.value
ELSE received_notes.value
END AS value,
CASE
WHEN transactions.raw IS NOT NULL THEN sent_notes.memo IS NOT NULL
ELSE received_notes.memo IS NOT NULL
END AS rawMemoExists,
CASE
WHEN transactions.raw IS NOT NULL THEN sent_notes.id_note
ELSE received_notes.id_note
END AS noteId
FROM transactions
LEFT JOIN sent_notes
ON transactions.id_tx = sent_notes.tx
LEFT JOIN received_notes
ON transactions.id_tx = received_notes.tx
LEFT JOIN blocks
ON transactions.block = blocks.height
WHERE received_notes.is_change != 1 or transactions.raw IS NOT NULL
ORDER BY block IS NOT NUll, height DESC, time DESC, txId DESC
""")
fun getAll(): List<ClearedTransaction>
/**
* Query transactions by rawTxId
*/
@Query("""
SELECT transactions.id_tx AS id,
transactions.txid AS rawTransactionId,
transactions.block AS height,
transactions.raw IS NOT NULL AS isSend,
transactions.block IS NOT NULL AS isMined,
blocks.time AS timeInSeconds,
sent_notes.address AS address,
CASE
WHEN transactions.raw IS NOT NULL THEN sent_notes.value
ELSE received_notes.value
END AS value,
CASE
WHEN transactions.raw IS NOT NULL THEN sent_notes.memo IS NOT NULL
ELSE received_notes.memo IS NOT NULL
END AS rawMemoExists,
CASE
WHEN transactions.raw IS NOT NULL THEN sent_notes.id_note
ELSE received_notes.id_note
END AS noteId
FROM transactions
LEFT JOIN sent_notes
ON transactions.id_tx = sent_notes.tx
LEFT JOIN received_notes
ON transactions.id_tx = received_notes.tx
LEFT JOIN blocks
ON transactions.block = blocks.height
WHERE (received_notes.is_change != 1 or transactions.raw IS NOT NULL) AND (height IS NOT NULL) and (rawTransactionId = :rawTxId)
ORDER BY block IS NOT NUll, height DESC, time DESC, txId DESC
""")
fun findByRawId(rawTxId: ByteArray): List<ClearedTransaction>
}
data class ClearedTransaction(
val id: Long = 0L,
val noteId: Long = 0L,
val rawTransactionId: ByteArray? = null,
val value: Long = 0L,
val height: Int? = null,
val isSend: Boolean = false,
val timeInSeconds: Long = 0L,
val address: String? = null,
val isMined: Boolean = false,
// does the raw transaction contain a memo?
val rawMemoExists: Boolean = false,
// TODO: investigate populating this with SQL rather than a separate SDK call. then get rid of rawMemoExists.
var memo: String? = null,
// set/maintain a custom status
var status: String? = null
) {
override fun equals(other: Any?): Boolean {
if (this === other) return true
if (other !is ClearedTransaction) return false
if (noteId != other.noteId) return false
if (id != other.id) return false
if (rawTransactionId != null) {
if (other.rawTransactionId == null) return false
if (!rawTransactionId.contentEquals(other.rawTransactionId)) return false
} else if (other.rawTransactionId != null) return false
if (value != other.value) return false
if (height != other.height) return false
if (isSend != other.isSend) return false
if (timeInSeconds != other.timeInSeconds) return false
if (address != other.address) return false
if (isMined != other.isMined) return false
if (rawMemoExists != other.rawMemoExists) return false
if (memo != other.memo) return false
if (status != other.status) return false
return true
}
override fun hashCode(): Int {
var result = noteId.hashCode()
result = 31 * result + id.hashCode()
result = 31 * result + (rawTransactionId?.contentHashCode() ?: 0)
result = 31 * result + value.hashCode()
result = 31 * result + (height ?: 0)
result = 31 * result + isSend.hashCode()
result = 31 * result + timeInSeconds.hashCode()
result = 31 * result + (address?.hashCode() ?: 0)
result = 31 * result + isMined.hashCode()
result = 31 * result + rawMemoExists.hashCode()
result = 31 * result + (memo?.hashCode() ?: 0)
result = 31 * result + (status?.hashCode() ?: 0)
return result
}
}

View File

@ -1,323 +0,0 @@
package cash.z.wallet.sdk.data
import cash.z.wallet.sdk.dao.ClearedTransaction
import cash.z.wallet.sdk.ext.masked
import cash.z.wallet.sdk.secure.Wallet
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.ConflatedBroadcastChannel
import kotlinx.coroutines.channels.ReceiveChannel
import java.util.*
import kotlin.coroutines.CoroutineContext
import cash.z.wallet.sdk.data.TransactionState.*
//import cash.z.wallet.sdk.rpc.CompactFormats
import cash.z.wallet.sdk.service.LightWalletService
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.atomic.AtomicLong
/**
* Manages active send/receive transactions. These are transactions that have been initiated but not completed with
* sufficient confirmations. All other transactions are stored in a separate [TransactionRepository].
*/
class ActiveTransactionManager(
private val repository: TransactionRepository,
private val service: LightWalletService,
private val wallet: Wallet
) : CoroutineScope {
private val job = Job()
override val coroutineContext: CoroutineContext = Dispatchers.Main + job
private lateinit var sentTransactionMonitorJob: Job
// private lateinit var confirmationMonitorJob: Job
// mutableMapOf gives the same result but we're explicit about preserving insertion order, since we rely on that
private val activeTransactions = LinkedHashMap<ActiveTransaction, TransactionState>()
private val channel = ConflatedBroadcastChannel<Map<ActiveTransaction, TransactionState>>()
private val clearedTransactions = ConflatedBroadcastChannel<List<ClearedTransaction>>()
// private val latestHeightSubscription = service.latestHeights()
fun subscribe(): ReceiveChannel<Map<ActiveTransaction, TransactionState>> {
return channel.openSubscription()
}
fun start() {
twig("ActiveTransactionManager starting")
sentTransactionMonitorJob = launchSentTransactionMonitor()
// confirmationMonitorJob = launchConfirmationMonitor() <- monitoring received transactions is disabled, presently <- TODO: enable confirmation monitor
}
fun stop() {
twig("ActiveTransactionManager stopping")
clearedTransactions.cancel()
channel.cancel()
job.cancel()
sentTransactionMonitorJob.cancel()
// confirmationMonitorJob.cancel() <- TODO: enable confirmation monitor
}
//
// State API
//
fun create(zatoshi: Long, toAddress: String): ActiveSendTransaction {
return ActiveSendTransaction(value = zatoshi, toAddress = toAddress).let { setState(it, TransactionState.Creating); it }
}
fun failure(transaction: ActiveTransaction, reason: String) {
setState(transaction, TransactionState.Failure(activeTransactions[transaction], reason))
}
fun created(transaction: ActiveSendTransaction, transactionId: Long) {
transaction.transactionId.set(transactionId)
setState(transaction, TransactionState.Created(transactionId))
}
fun upload(transaction: ActiveSendTransaction) {
setState(transaction, TransactionState.SendingToNetwork)
}
/**
* Request a cancel for this transaction. Once a transaction has been submitted it cannot be cancelled.
*
* @param transaction the send transaction to cancel
*
* @return true when the transaction can be cancelled. False when it is already in flight to the network.
*/
fun cancel(transaction: ActiveSendTransaction): Boolean {
val currentState = activeTransactions[transaction]
return if (currentState != null && currentState.order < TransactionState.SendingToNetwork.order) {
setState(transaction, TransactionState.Cancelled)
true
} else {
false
}
}
fun awaitConfirmation(transaction: ActiveTransaction, confirmationCount: Int = 0) {
setState(transaction, TransactionState.AwaitingConfirmations(confirmationCount))
}
fun isCancelled(transaction: ActiveSendTransaction): Boolean {
return activeTransactions[transaction] == TransactionState.Cancelled
}
/**
* Sets the state for this transaction and sends an update to subscribers on the main thread. The given transaction
* will be added if it does not match any existing transactions. If the given transaction was previously cancelled,
* this method takes no action.
*
* @param transaction the transaction to update and manage
* @param state the state to set for the given transaction
*/
private fun setState(transaction: ActiveTransaction, state: TransactionState) {
if (transaction is ActiveSendTransaction && isCancelled(transaction)) {
twig("state change to $state ignored because this send transaction has been cancelled")
} else {
twig("state set to $state for active transaction $transaction on thread ${Thread.currentThread().name}")
activeTransactions[transaction] = state
launch {
channel.send(activeTransactions)
}
}
}
private fun CoroutineScope.launchSentTransactionMonitor() = launch {
withContext(Dispatchers.IO) {
(repository as PollingTransactionRepository).poll(clearedTransactions)
val results = clearedTransactions.openSubscription()
while(isActive && !clearedTransactions.isClosedForSend && !results.isClosedForReceive) {
twig("awaiting next modification to transactions...")
val transactions = results.receive()
updateSentTransactions(transactions)
}
results.cancel()
}
}
//TODO: enable confirmation monitor
// private fun CoroutineScope.launchConfirmationMonitor() = launch {
// withContext(Dispatchers.IO) {
// for (block in blockSubscription) {
// updateConfirmations(block)
// }
// }
// }
/**
* Synchronize our internal list of transactions to match any modifications that have occurred in the database.
*
* @param transactions the latest transactions received from our subscription to the transaction repository. That
* channel only publishes transactions when they have changed in some way.
*/
private fun updateSentTransactions(transactions: List<ClearedTransaction>) {
twig("transaction modification received! Updating active sent transactions based on new transaction list")
val sentTransactions = transactions.filter { it.isSend }
val activeSentTransactions =
activeTransactions.entries.filter { (it.key is ActiveSendTransaction) && it.value.isActive() }
if(sentTransactions.isEmpty() || activeSentTransactions.isEmpty()) {
twig("done updating because the new transaction list" +
" ${if(sentTransactions.isEmpty()) "did not have any" else "had"} transactions and the active" +
" sent transactions is ${if(activeSentTransactions.isEmpty()) "" else "not"} empty.")
return
}
/* for all our active send transactions, see if there is a match in the DB and if so, update the height accordingly */
activeSentTransactions.forEach { (transaction, _) ->
val tx = transaction as ActiveSendTransaction
val transactionId = tx.transactionId.get()
if (tx.height.get() < 0) {
twig("checking whether active transaction $transactionId has been mined")
val matchingDbTransaction = sentTransactions.find { it.id == transactionId }
if (matchingDbTransaction?.height != null) {
twig("transaction $transactionId HAS BEEN MINED!!! updating the corresponding active transaction.")
tx.height.set(matchingDbTransaction.height)
twig("active transaction height updated to ${matchingDbTransaction.height} and state updated to AwaitingConfirmations(0)")
setState(transaction, AwaitingConfirmations(1))
} else {
twig("transaction $transactionId has still not been mined.")
}
}
}
}
// TODO: enable confirmation monitor
// private fun updateConfirmations(block: CompactFormats.CompactBlock) {
// twig("updating confirmations for all active transactions")
// val txsAwaitingConfirmation =
// activeTransactions.entries.filter { it.value is AwaitingConfirmations }
// for (tx in txsAwaitingConfirmation) {
//
// }
// }
//
// Active Transaction Management
//
suspend fun sendToAddress(zatoshi: Long, toAddress: String, memo: String = "", fromAccountId: Int = 0) = withContext(Dispatchers.IO) {
twig("creating send transaction for zatoshi value $zatoshi")
val activeSendTransaction = create(zatoshi, toAddress.masked())
val transactionId: Long = try {
// this call takes up to 20 seconds
wallet.createRawSendTransaction(zatoshi, toAddress, memo, fromAccountId)
} catch (t: Throwable) {
val reason = "${t.message}"
twig("Failed to create transaction due to: $reason")
failure(activeSendTransaction, reason)
return@withContext
}
// cancellation basically just prevents sending to the network but we cannot cancel after this moment
// well, technically we could still allow cancellation in the split second between this line of code and the upload request but lets not complicate things
if(isCancelled(activeSendTransaction)) {
twig("transaction $transactionId will not be submitted because it has been cancelled")
revertTransaction(transactionId)
return@withContext
}
if (transactionId < 0) {
failure(activeSendTransaction, "Failed to create, for unknown reason")
return@withContext
}
val transactionRaw: ByteArray? = repository.findTransactionById(transactionId)?.raw
if (transactionRaw == null) {
failure(activeSendTransaction, "Failed to find the transaction that we just attempted to create in the dataDb")
return@withContext
}
created(activeSendTransaction, transactionId)
uploadRawTransaction(transactionId, activeSendTransaction, transactionRaw)
//TODO: synchronously await confirmations by checking periodically inside a while loop until confirmations = 10
}
private suspend fun uploadRawTransaction(
transactionId: Long,
activeSendTransaction: ActiveSendTransaction,
transactionRaw: ByteArray
) {
try {
twig("attempting to submit transaction $transactionId")
upload(activeSendTransaction)
val response = service.submitTransaction(transactionRaw)
if (response.errorCode < 0) {
twig("submit failed with error code: ${response.errorCode} and message ${response.errorMessage}")
failure(activeSendTransaction, "Send failed due to ${response.errorMessage}")
} else {
twig("successfully submitted. error code: ${response.errorCode}")
awaitConfirmation(activeSendTransaction)
}
} catch (t: Throwable) {
val logMessage = "submit failed due to $t."
twig(logMessage)
val revertMessage = revertTransaction(transactionId)
failure(activeSendTransaction, "$logMessage $revertMessage Failure caused by: ${t.message}")
}
}
private suspend fun revertTransaction(transactionId: Long): String = withContext(Dispatchers.IO) {
var revertMessage = "Failed to revert pending send id $transactionId in the dataDb."
try {
repository.deleteTransactionById(transactionId)
revertMessage = "The pending send with id $transactionId has been removed from the DB."
} catch (t: Throwable) {
}
revertMessage
}
}
data class ActiveSendTransaction(
/** height where the transaction was minded. -1 when unmined */
val height: AtomicInteger = AtomicInteger(-1),
/** Transaction row that corresponds with this send. -1 when the transaction hasn't been created yet. */
val transactionId: AtomicLong = AtomicLong(-1L),
override val value: Long = 0,
override val internalId: UUID = UUID.randomUUID(),
val toAddress: String = ""
) : ActiveTransaction
data class ActiveReceiveTransaction(
val height: Int = -1,
override val value: Long = 0,
override val internalId: UUID = UUID.randomUUID()
) : ActiveTransaction
interface ActiveTransaction {
val value: Long
/** only used by this class for purposes of managing unique transactions */
val internalId: UUID
}
sealed class TransactionState(val order: Int) {
val timestamp: Long = System.currentTimeMillis()
object Creating : TransactionState(0)
/** @param id row in the database where the raw transaction has been stored, temporarily, by the rust lib */
class Created(val id: Long) : TransactionState(10)
object SendingToNetwork : TransactionState(20)
class AwaitingConfirmations(val confirmationCount: Int) : TransactionState(30) {
override fun toString(): String {
return "${super.toString()}($confirmationCount)"
}
}
object Cancelled : TransactionState(-1)
/** @param failedStep the state of this transaction at the time, prior to failure */
class Failure(val failedStep: TransactionState?, val reason: String = "") : TransactionState(-2) {
override fun toString(): String {
return "${super.toString()}($failedStep) : $reason"
}
}
fun isActive(): Boolean {
return order > 0
}
override fun toString(): String {
return javaClass.simpleName
}
}

View File

@ -1,18 +1,6 @@
package cash.z.wallet.sdk.data
import cash.z.wallet.sdk.dao.ClearedTransaction
import cash.z.wallet.sdk.ext.MINERS_FEE_ZATOSHI
import cash.z.wallet.sdk.secure.Wallet
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.ConflatedBroadcastChannel
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.atomic.AtomicLong
import kotlin.coroutines.CoroutineContext
import kotlin.random.Random
import kotlin.random.nextInt
import kotlin.random.nextLong
import kotlinx.coroutines.CoroutineScope
/**
* Utility for building UIs. It does the best it can to mock the SDKSynchronizer so that it can be dropped into any
@ -29,320 +17,322 @@ import kotlin.random.nextLong
* of returning true about 10% of the time.
* @param onSynchronizerErrorListener presently ignored because there are not yet any errors in mock.
*/
open class MockSynchronizer(
abstract class MockSynchronizer(
private val transactionInterval: Long = 30_000L,
private val initialLoadDuration: Long = 5_000L,
private val activeTransactionUpdateFrequency: Long = 3_000L,
private var isStale: Boolean? = null,
override var onSynchronizerErrorListener: ((Throwable?) -> Boolean)? = null // presently ignored (there are no errors in mock yet)
private var isStale: Boolean? = null
) : Synchronizer, CoroutineScope {
private val mockAddress = "ztestsaplingmock0000this0is0a0mock0address0do0not0send0funds0to0this0address0ok0thanks00"
//TODO: things have changed a lot and this class needs to be redone, from the ground up!
private val job = Job()
/**
* Coroutine context used for the CoroutineScope implementation, used to mock asynchronous behaviors.
*/
override val coroutineContext: CoroutineContext
get() = Dispatchers.IO + job
/* only accessed through mutual exclusion */
private val transactions = mutableListOf<ClearedTransaction>()
private val activeTransactions = mutableMapOf<ActiveTransaction, TransactionState>()
private val transactionMutex = Mutex()
private val activeTransactionMutex = Mutex()
private val forge = Forge()
private val balanceChannel = ConflatedBroadcastChannel<Wallet.WalletBalance>()
private val activeTransactionsChannel = ConflatedBroadcastChannel<Map<ActiveTransaction, TransactionState>>(mutableMapOf())
private val transactionsChannel = ConflatedBroadcastChannel<List<ClearedTransaction>>(listOf())
private val progressChannel = ConflatedBroadcastChannel<Int>()
/**
* Starts this mock Synchronizer.
*/
override fun start(parentScope: CoroutineScope): Synchronizer {
Twig.sprout("mock")
twig("synchronizer starting")
forge.start(parentScope)
return this
}
/**
* Stops this mock Synchronizer by cancelling its primary job.
*/
override fun stop() {
twig("synchronizer stopping!")
Twig.clip("mock")
job.cancel()
}
override fun activeTransactions() = activeTransactionsChannel.openSubscription()
override fun allTransactions() = transactionsChannel.openSubscription()
override fun balances() = balanceChannel.openSubscription()
override fun progress() = progressChannel.openSubscription()
/**
* Returns true roughly 10% of the time and then resets to false after some delay.
*/
override suspend fun isStale(): Boolean {
val result = isStale ?: (Random.nextInt(100) < 10)
twig("checking isStale: $result")
if(isStale == true) launch { delay(20_000L); isStale = false }
return result
}
/**
* Returns the [mockAddress]. This address is not usable.
*/
override fun getAddress(accountId: Int): String = mockAddress.also { twig("returning mock address $mockAddress") }
override suspend fun getBalance(accountId: Int): Wallet.WalletBalance {
if (transactions.size != 0) {
val balance = transactions.fold(0L) { acc, tx ->
if (tx.isSend && tx.isMined) acc - tx.value else acc + tx.value
} - MINERS_FEE_ZATOSHI
return Wallet.WalletBalance(balance, balance)
}
return Wallet.WalletBalance()
}
/**
* Uses the [forge] to fabricate a transaction and then walk it through the transaction lifecycle in a useful way.
* This method will validate the zatoshi amount and toAddress a bit to help with UI validation.
*
* @param zatoshi the amount to send. A transaction will be created matching this amount.
* @param toAddress the address to use. An active transaction will be created matching this address.
* @param memo the memo to use. This field is ignored.
* @param fromAccountId the account. This field is ignored.
*/
override suspend fun sendToAddress(zatoshi: Long, toAddress: String, memo: String, fromAccountId: Int) =
withContext<Unit>(Dispatchers.IO) {
Twig.sprout("send")
val walletTransaction = forge.createSendTransaction(zatoshi)
val activeTransaction = forge.createActiveSendTransaction(walletTransaction, toAddress)
val isInvalidForTestnet = toAddress.length != 88 && toAddress.startsWith("ztest")
val isInvalidForMainnet = toAddress.length != 78 && toAddress.startsWith("zs")
val state = when {
zatoshi < 0 -> TransactionState.Failure(TransactionState.Creating, "amount cannot be negative")
!toAddress.startsWith("z") -> TransactionState.Failure(
TransactionState.Creating,
"address must start with z"
)
isInvalidForTestnet -> TransactionState.Failure(TransactionState.Creating, "invalid testnet address")
isInvalidForMainnet -> TransactionState.Failure(TransactionState.Creating, "invalid mainnet address")
else -> TransactionState.Creating
}
twig("after input validation, state is being set to ${state::class.simpleName}")
setState(activeTransaction, state)
twig("active tx size is ${activeTransactions.size}")
// next, transition it through the states, if it got created
if (state !is TransactionState.Creating) {
twig("failed to create transaction")
return@withContext
} else {
// first, add the transaction
twig("adding transaction")
transactionMutex.withLock {
transactions.add(walletTransaction)
}
// then update the active transaction through the creation and submission steps
listOf(TransactionState.Created(walletTransaction.id), TransactionState.SendingToNetwork)
.forEach { newState ->
if (!job.isActive) return@withContext
delay(activeTransactionUpdateFrequency)
setState(activeTransaction, newState)
}
// then set the wallet transaction's height (to simulate it being mined)
val minedHeight = forge.latestHeight.getAndIncrement()
transactionMutex.withLock {
transactions.remove(walletTransaction)
transactions.add(walletTransaction.copy(height = minedHeight, isMined = true))
}
// simply transition it through the states
List(11) { TransactionState.AwaitingConfirmations(it) }
.forEach { newState ->
if (!job.isActive) return@withContext
delay(activeTransactionUpdateFrequency)
activeTransaction.height.set(minedHeight + newState.confirmationCount)
setState(activeTransaction, newState)
}
}
Twig.clip("send")
}
/**
* Helper method to update the state of the given active transaction.
*
* @param activeTransaction the transaction to update.
* @param state the new state to set.
*/
private suspend fun setState(activeTransaction: ActiveTransaction, state: TransactionState) {
var copyMap = mutableMapOf<ActiveTransaction, TransactionState>()
activeTransactionMutex.withLock {
val currentState = activeTransactions[activeTransaction]
if ((currentState?.order ?: 0) < 0) {
twig("ignoring state ${state::class.simpleName} " +
"because the current state is ${currentState!!::class.simpleName}")
return
}
activeTransactions[activeTransaction] = state
var count = if (state is TransactionState.AwaitingConfirmations) "(${state.confirmationCount})" else ""
twig("state set to ${state::class.simpleName}$count on thread ${Thread.currentThread().name}")
}
copyMap = activeTransactions.toMutableMap()
twig("sending ${copyMap.size} active transactions")
launch {
activeTransactionsChannel.send(copyMap)
}
}
/**
* Sets the state of the given transaction to 'Cancelled'.
*/
override fun cancelSend(transaction: ActiveSendTransaction): Boolean {
launch {
twig("cancelling transaction $transaction")
setState(transaction, TransactionState.Cancelled)
}
return true
}
/**
* Utility for forging transactions in both senses of the word.
*/
private inner class Forge {
val transactionId = AtomicLong(Random.nextLong(1L..100_000L))
val latestHeight = AtomicInteger(Random.nextInt(280000..600000))
/**
* Fire up this forge to begin fabricating transactions.
*/
fun start(scope: CoroutineScope) {
scope.launchAddReceiveTransactions()
scope.launchUpdateTransactionsAndBalance()
scope.launchUpdateProgress()
}
/**
* Take the current list of transactions in the outer class (in a thread-safe way) and send updates to the
* transaction and balance channels on a regular interval, regardless of what data is present in the
* transactions collection.
*/
fun CoroutineScope.launchUpdateTransactionsAndBalance() = launch {
while (job.isActive) {
if (transactions.size != 0) {
var balance = 0L
transactionMutex.withLock {
// does not factor in confirmations
balance =
transactions.fold(0L) { acc, tx ->
if (tx.isSend && tx.isMined) acc - tx.value else acc + tx.value
}
}
balanceChannel.send(Wallet.WalletBalance(balance, balance - MINERS_FEE_ZATOSHI))
}
// other collaborators add to the list, periodically. This simulates, real-world, non-distinct updates.
delay(Random.nextLong(transactionInterval / 2))
var copyList = listOf<ClearedTransaction>()
transactionMutex.withLock {
// shallow copy
copyList = transactions.map { it }
}
twig("sending ${copyList.size} transactions")
transactionsChannel.send(copyList)
}
}
/**
* Periodically create a transaction and add it to the running list of transactions in the outer class, knowing
* that this list of transactions will be periodically broadcast by the `launchUpdateTransactionsAndBalance`
* function.
*/
fun CoroutineScope.launchAddReceiveTransactions() = launch {
while (job.isActive) {
delay(transactionInterval)
transactionMutex.withLock {
twig("adding received transaction with random value")
transactions.add(
createReceiveTransaction()
.also { twig("adding received transaction with random value: ${it.value}") }
)
}
}
}
/**
* Fabricate a stream of progress events.
*/
fun CoroutineScope.launchUpdateProgress() = launch {
var progress = 0
while (job.isActive) {
delay(initialLoadDuration/100)
twig("sending progress of $progress")
progressChannel.send(progress++)
if(progress > 100) break
}
twig("progress channel complete!")
}
/**
* Fabricate a receive transaction.
*/
fun createReceiveTransaction(): ClearedTransaction {
return ClearedTransaction(
id = transactionId.getAndIncrement(),
value = Random.nextLong(20_000L..1_000_000_000L),
height = latestHeight.getAndIncrement(),
isSend = false,
timeInSeconds = System.currentTimeMillis() / 1000,
isMined = true
)
}
/**
* Fabricate a send transaction.
*/
fun createSendTransaction(
amount: Long = Random.nextLong(20_000L..1_000_000_000L),
txId: Long = -1L
): ClearedTransaction {
return ClearedTransaction(
id = if (txId == -1L) transactionId.getAndIncrement() else txId,
value = amount,
height = null,
isSend = true,
timeInSeconds = System.currentTimeMillis() / 1000,
isMined = false
)
}
/**
* Fabricate an active send transaction, based on the given wallet transaction instance.
*/
fun createActiveSendTransaction(walletTransaction: ClearedTransaction, toAddress: String)
= createActiveSendTransaction(walletTransaction.value, toAddress, walletTransaction.id)
/**
* Fabricate an active send transaction.
*/
fun createActiveSendTransaction(amount: Long, address: String, txId: Long = -1): ActiveSendTransaction {
return ActiveSendTransaction(
transactionId = AtomicLong(if (txId < 0) transactionId.getAndIncrement() else txId),
toAddress = address,
value = amount
)
}
}
//
// private val mockAddress = "ztestsaplingmock0000this0is0a0mock0address0do0not0send0funds0to0this0address0ok0thanks00"
//
// private val job = Job()
//
// /**
// * Coroutine context used for the CoroutineScope implementation, used to mock asynchronous behaviors.
// */
// override val coroutineContext: CoroutineContext
// get() = Dispatchers.IO + job
//
// /* only accessed through mutual exclusion */
// private val transactions = mutableListOf<ClearedTransaction>()
// private val activeTransactions = mutableMapOf<ActiveTransaction, TransactionState>()
//
// private val transactionMutex = Mutex()
// private val activeTransactionMutex = Mutex()
//
// private val forge = Forge()
//
// private val balanceChannel = ConflatedBroadcastChannel<Wallet.WalletBalance>()
// private val activeTransactionsChannel = ConflatedBroadcastChannel<Map<ActiveTransaction, TransactionState>>(mutableMapOf())
// private val transactionsChannel = ConflatedBroadcastChannel<List<ClearedTransaction>>(listOf())
// private val progressChannel = ConflatedBroadcastChannel<Int>()
//
// /**
// * Starts this mock Synchronizer.
// */
// override fun start(parentScope: CoroutineScope): Synchronizer {
// Twig.sprout("mock")
// twig("synchronizer starting")
// forge.start(parentScope)
// return this
// }
//
// /**
// * Stops this mock Synchronizer by cancelling its primary job.
// */
// override fun stop() {
// twig("synchronizer stopping!")
// Twig.clip("mock")
// job.cancel()
// }
//
// override fun activeTransactions() = activeTransactionsChannel.openSubscription()
// override fun allTransactions() = transactionsChannel.openSubscription()
// override fun balances() = balanceChannel.openSubscription()
// override fun progress() = progressChannel.openSubscription()
//
// /**
// * Returns true roughly 10% of the time and then resets to false after some delay.
// */
// override suspend fun isStale(): Boolean {
// val result = isStale ?: (Random.nextInt(100) < 10)
// twig("checking isStale: $result")
// if(isStale == true) launch { delay(20_000L); isStale = false }
// return result
// }
//
// /**
// * Returns the [mockAddress]. This address is not usable.
// */
// override fun getAddress(accountId: Int): String = mockAddress.also { twig("returning mock address $mockAddress") }
//
// override suspend fun lastBalance(accountId: Int): Wallet.WalletBalance {
// if (transactions.size != 0) {
// val balance = transactions.fold(0L) { acc, tx ->
// if (tx is SentTransaction) acc - tx.value else acc + tx.value
// } - MINERS_FEE_ZATOSHI
// return Wallet.WalletBalance(balance, balance)
// }
// return Wallet.WalletBalance()
// }
//
// /**
// * Uses the [forge] to fabricate a transaction and then walk it through the transaction lifecycle in a useful way.
// * This method will validate the zatoshi amount and toAddress a bit to help with UI validation.
// *
// * @param zatoshi the amount to send. A transaction will be created matching this amount.
// * @param toAddress the address to use. An active transaction will be created matching this address.
// * @param memo the memo to use. This field is ignored.
// * @param fromAccountId the account. This field is ignored.
// */
// override suspend fun sendToAddress(zatoshi: Long, toAddress: String, memo: String, fromAccountId: Int) =
// withContext<Unit>(Dispatchers.IO) {
// Twig.sprout("send")
// val walletTransaction = forge.createSendTransaction(zatoshi)
// val activeTransaction = forge.createActiveSendTransaction(walletTransaction, toAddress)
//
// val isInvalidForTestnet = toAddress.length != 88 && toAddress.startsWith("ztest")
// val isInvalidForMainnet = toAddress.length != 78 && toAddress.startsWith("zs")
//
// val state = when {
// zatoshi < 0 -> TransactionState.Failure(TransactionState.Creating, "amount cannot be negative")
// !toAddress.startsWith("z") -> TransactionState.Failure(
// TransactionState.Creating,
// "address must start with z"
// )
// isInvalidForTestnet -> TransactionState.Failure(TransactionState.Creating, "invalid testnet address")
// isInvalidForMainnet -> TransactionState.Failure(TransactionState.Creating, "invalid mainnet address")
// else -> TransactionState.Creating
// }
// twig("after input validation, state is being set to ${state::class.simpleName}")
// setState(activeTransaction, state)
//
// twig("active tx size is ${activeTransactions.size}")
//
// // next, transition it through the states, if it got created
// if (state !is TransactionState.Creating) {
// twig("failed to create transaction")
// return@withContext
// } else {
// // first, add the transaction
// twig("adding transaction")
// transactionMutex.withLock {
// transactions.add(walletTransaction)
// }
//
// // then update the active transaction through the creation and submission steps
// listOf(TransactionState.Created(walletTransaction.id), TransactionState.SendingToNetwork)
// .forEach { newState ->
// if (!job.isActive) return@withContext
// delay(activeTransactionUpdateFrequency)
// setState(activeTransaction, newState)
// }
//
// // then set the wallet transaction's height (to simulate it being mined)
// val minedHeight = forge.latestHeight.getAndIncrement()
// transactionMutex.withLock {
// transactions.remove(walletTransaction)
// transactions.add(walletTransaction.copy(height = minedHeight, isMined = true))
// }
//
// // simply transition it through the states
// List(11) { TransactionState.AwaitingConfirmations(it) }
// .forEach { newState ->
// if (!job.isActive) return@withContext
// delay(activeTransactionUpdateFrequency)
// activeTransaction.height.set(minedHeight + newState.confirmationCount)
// setState(activeTransaction, newState)
// }
// }
// Twig.clip("send")
// }
//
// /**
// * Helper method to update the state of the given active transaction.
// *
// * @param activeTransaction the transaction to update.
// * @param state the new state to set.
// */
// private suspend fun setState(activeTransaction: ActiveTransaction, state: TransactionState) {
// var copyMap = mutableMapOf<ActiveTransaction, TransactionState>()
// activeTransactionMutex.withLock {
// val currentState = activeTransactions[activeTransaction]
// if ((currentState?.order ?: 0) < 0) {
// twig("ignoring state ${state::class.simpleName} " +
// "because the current state is ${currentState!!::class.simpleName}")
// return
// }
// activeTransactions[activeTransaction] = state
// var count = if (state is TransactionState.AwaitingConfirmations) "(${state.confirmationCount})" else ""
// twig("state set to ${state::class.simpleName}$count on thread ${Thread.currentThread().name}")
// }
//
// copyMap = activeTransactions.toMutableMap()
// twig("sending ${copyMap.size} active transactions")
// launch {
// activeTransactionsChannel.send(copyMap)
// }
// }
//
// /**
// * Sets the state of the given transaction to 'Cancelled'.
// */
// override fun cancelSend(transaction: ActiveSendTransaction): Boolean {
// launch {
// twig("cancelling transaction $transaction")
// setState(transaction, TransactionState.Cancelled)
// }
// return true
// }
//
// /**
// * Utility for forging transactions in both senses of the word.
// */
// private inner class Forge {
// val transactionId = AtomicLong(Random.nextLong(1L..100_000L))
// val latestHeight = AtomicInteger(Random.nextInt(280000..600000))
//
// /**
// * Fire up this forge to begin fabricating transactions.
// */
// fun start(scope: CoroutineScope) {
// scope.launchAddReceiveTransactions()
// scope.launchUpdateTransactionsAndBalance()
// scope.launchUpdateProgress()
// }
//
// /**
// * Take the current list of transactions in the outer class (in a thread-safe way) and send updates to the
// * transaction and balance channels on a regular interval, regardless of what data is present in the
// * transactions collection.
// */
// fun CoroutineScope.launchUpdateTransactionsAndBalance() = launch {
// while (job.isActive) {
// if (transactions.size != 0) {
// var balance = 0L
// transactionMutex.withLock {
// // does not factor in confirmations
// balance =
// transactions.fold(0L) { acc, tx ->
// if (tx.isSend && tx.isMined) acc - tx.value else acc + tx.value
// }
// }
// balanceChannel.send(Wallet.WalletBalance(balance, balance - MINERS_FEE_ZATOSHI))
// }
// // other collaborators add to the list, periodically. This simulates, real-world, non-distinct updates.
// delay(Random.nextLong(transactionInterval / 2))
// var copyList = listOf<ClearedTransaction>()
// transactionMutex.withLock {
// // shallow copy
// copyList = transactions.map { it }
// }
// twig("sending ${copyList.size} transactions")
// transactionsChannel.send(copyList)
// }
// }
//
// /**
// * Periodically create a transaction and add it to the running list of transactions in the outer class, knowing
// * that this list of transactions will be periodically broadcast by the `launchUpdateTransactionsAndBalance`
// * function.
// */
// fun CoroutineScope.launchAddReceiveTransactions() = launch {
// while (job.isActive) {
// delay(transactionInterval)
// transactionMutex.withLock {
// twig("adding received transaction with random value")
// transactions.add(
// createReceiveTransaction()
// .also { twig("adding received transaction with random value: ${it.value}") }
// )
// }
// }
// }
//
// /**
// * Fabricate a stream of progress events.
// */
// fun CoroutineScope.launchUpdateProgress() = launch {
// var progress = 0
// while (job.isActive) {
// delay(initialLoadDuration/100)
// twig("sending progress of $progress")
// progressChannel.send(progress++)
// if(progress > 100) break
// }
// twig("progress channel complete!")
// }
//
// /**
// * Fabricate a receive transaction.
// */
// fun createReceiveTransaction(): ClearedTransaction {
// return ClearedTransaction(
// id = transactionId.getAndIncrement(),
// value = Random.nextLong(20_000L..1_000_000_000L),
// height = latestHeight.getAndIncrement(),
// isSend = false,
// timeInSeconds = System.currentTimeMillis() / 1000,
// isMined = true
// )
// }
//
// /**
// * Fabricate a send transaction.
// */
// fun createSendTransaction(
// amount: Long = Random.nextLong(20_000L..1_000_000_000L),
// txId: Long = -1L
// ): ClearedTransaction {
// return ClearedTransaction(
// id = if (txId == -1L) transactionId.getAndIncrement() else txId,
// value = amount,
// height = null,
// isSend = true,
// timeInSeconds = System.currentTimeMillis() / 1000,
// isMined = false
// )
// }
//
// /**
// * Fabricate an active send transaction, based on the given wallet transaction instance.
// */
// fun createActiveSendTransaction(walletTransaction: ClearedTransaction, toAddress: String)
// = createActiveSendTransaction(walletTransaction.value, toAddress, walletTransaction.id)
//
// /**
// * Fabricate an active send transaction.
// */
// fun createActiveSendTransaction(amount: Long, address: String, txId: Long = -1): ActiveSendTransaction {
// return ActiveSendTransaction(
// transactionId = AtomicLong(if (txId < 0) transactionId.getAndIncrement() else txId),
// toAddress = address,
// value = amount
// )
// }
// }
}

View File

@ -5,7 +5,8 @@ import androidx.room.Room
import androidx.room.RoomDatabase
import cash.z.wallet.sdk.db.PendingTransactionDao
import cash.z.wallet.sdk.db.PendingTransactionDb
import cash.z.wallet.sdk.db.PendingTransaction
import cash.z.wallet.sdk.entity.PendingTransaction
import cash.z.wallet.sdk.entity.Transaction
import cash.z.wallet.sdk.ext.EXPIRY_OFFSET
import cash.z.wallet.sdk.service.LightWalletService
import kotlinx.coroutines.Dispatchers.IO
@ -16,23 +17,22 @@ import kotlinx.coroutines.withContext
*/
// TODO: consider having the manager register the fail listeners rather than having that responsibility spread elsewhere (synchronizer and the broom)
class PersistentTransactionManager(private val db: PendingTransactionDb) : TransactionManager {
private lateinit var dao: PendingTransactionDao
private val dao: PendingTransactionDao = db.pendingTransactionDao()
/**
* Constructor that creates the database and then executes a callback on it.
*/
constructor(
appContext: Context,
dataDbName: String = "PendingTransactions.db",
dbCallback: (PendingTransactionDb) -> Unit = {}
dataDbName: String = "PendingTransactions.db"
) : this(
Room.databaseBuilder(appContext, PendingTransactionDb::class.java, dataDbName)
.setJournalMode(RoomDatabase.JournalMode.TRUNCATE)
.build()
) {
dbCallback(db)
dao = db.pendingTransactionDao()
}
Room.databaseBuilder(
appContext,
PendingTransactionDb::class.java,
dataDbName
).setJournalMode(RoomDatabase.JournalMode.TRUNCATE).build()
)
override fun start() {
twig("TransactionManager starting")
@ -66,7 +66,7 @@ class PersistentTransactionManager(private val db: PendingTransactionDb) : Trans
}
override suspend fun manageCreation(
encoder: RawTransactionEncoder,
encoder: TransactionEncoder,
zatoshiValue: Long,
toAddress: String,
memo: String,
@ -75,7 +75,7 @@ class PersistentTransactionManager(private val db: PendingTransactionDb) : Trans
suspend fun manageCreation(
encoder: RawTransactionEncoder,
encoder: TransactionEncoder,
transaction: PendingTransaction,
currentHeight: Int
): PendingTransaction = withContext(IO){
@ -83,9 +83,9 @@ class PersistentTransactionManager(private val db: PendingTransactionDb) : Trans
var tx = transaction.copy(expiryHeight = if (currentHeight == -1) -1 else currentHeight + EXPIRY_OFFSET)
try {
twig("beginning to encode transaction with : $encoder")
val encodedTx = encoder.create(tx.value, tx.address, tx.memo)
val encodedTx = encoder.create(tx.value, tx.toAddress, tx.memo ?: "")
twig("successfully encoded transaction for ${tx.memo}!!")
tx = tx.copy(raw = encodedTx.raw, txId = encodedTx.txId)
tx = tx.copy(raw = encodedTx.raw, rawTransactionId = encodedTx.txId)
tx
} catch (t: Throwable) {
val message = "failed to encode transaction due to : ${t.message} caused by: ${t.cause}"
@ -102,16 +102,16 @@ class PersistentTransactionManager(private val db: PendingTransactionDb) : Trans
}
}
override suspend fun manageSubmission(service: LightWalletService, pendingTransaction: RawTransaction) {
override suspend fun manageSubmission(service: LightWalletService, pendingTransaction: SignedTransaction) {
var tx = pendingTransaction as PendingTransaction
try {
twig("managing the preparation to submit transaction memo: ${tx.memo} amount: ${tx.value}")
val response = service.submitTransaction(pendingTransaction.raw!!)
twig("management of submit transaction completed with response: ${response.errorCode}: ${response.errorMessage}")
if (response.errorCode < 0) {
tx = tx.copy(errorMessage = response.errorMessage, errorCode = response.errorCode)
tx = if (response.errorCode < 0) {
tx.copy(errorMessage = response.errorMessage, errorCode = response.errorCode)
} else {
tx = tx.copy(errorMessage = null, errorCode = response.errorCode)
tx.copy(errorMessage = null, errorCode = response.errorCode)
}
} catch (t: Throwable) {
twig("error while managing submitting transaction: ${t.message} caused by: ${t.cause}")
@ -133,13 +133,13 @@ class PersistentTransactionManager(private val db: PendingTransactionDb) : Trans
): PendingTransaction {
return PendingTransaction(
value = value,
address = toAddress,
toAddress = toAddress,
memo = memo,
expiryHeight = if (currentHeight == -1) -1 else currentHeight + EXPIRY_OFFSET
)
}
suspend fun manageMined(pendingTx: PendingTransaction, matchingMinedTx: PendingTransaction) = withContext(IO) {
suspend fun manageMined(pendingTx: PendingTransaction, matchingMinedTx: Transaction) = withContext(IO) {
twig("a pending transaction has been mined!")
val tx = pendingTx.copy(minedHeight = matchingMinedTx.minedHeight)
dao.insert(tx)

View File

@ -1,16 +1,19 @@
package cash.z.wallet.sdk.data
import cash.z.wallet.sdk.dao.ClearedTransaction
import cash.z.wallet.sdk.data.PersistentTransactionSender.ChangeType.*
import cash.z.wallet.sdk.data.TransactionUpdateRequest.RefreshSentTx
import cash.z.wallet.sdk.data.TransactionUpdateRequest.SubmitPendingTx
import cash.z.wallet.sdk.db.PendingTransaction
import cash.z.wallet.sdk.db.isMined
import cash.z.wallet.sdk.db.isPending
import cash.z.wallet.sdk.entity.PendingTransaction
import cash.z.wallet.sdk.entity.isMined
import cash.z.wallet.sdk.entity.isPending
import cash.z.wallet.sdk.ext.retryWithBackoff
import cash.z.wallet.sdk.service.LightWalletService
import kotlinx.coroutines.*
import kotlinx.coroutines.Dispatchers.IO
import kotlinx.coroutines.channels.SendChannel
import kotlinx.coroutines.channels.actor
import kotlin.math.min
/**
* Monitors pending transactions and sends or retries them, when appropriate.
@ -25,14 +28,24 @@ class PersistentTransactionSender (
private var monitoringJob: Job? = null
private val initialMonitorDelay = 45_000L
private var listenerChannel: SendChannel<List<PendingTransaction>>? = null
override var onSubmissionError: ((Throwable) -> Unit)? = null
override var onSubmissionError: ((Throwable) -> Boolean)? = null
private var updateResult: CompletableDeferred<ChangeType>? = null
var lastChangeDetected: ChangeType = NoChange(0)
set(value) {
field = value
val details = when(value) {
is SizeChange -> " from ${value.oldSize} to ${value.newSize}"
is Modified -> " The culprit: ${value.tx}"
is NoChange -> " for the ${value.count.asOrdinal()} time"
else -> ""
}
twig("Checking pending tx detected: ${value.description}$details")
updateResult?.complete(field)
}
fun CoroutineScope.requestUpdate(triggerSend: Boolean) = launch {
twig("requesting update: $triggerSend")
if (!channel.isClosedForSend) {
twig("submitting request")
channel.send(if (triggerSend) SubmitPendingTx else RefreshSentTx)
twig("done submitting request")
} else {
twig("request ignored because the channel is closed for send!!!")
}
@ -45,7 +58,6 @@ class PersistentTransactionSender (
private fun CoroutineScope.startActor() = actor<TransactionUpdateRequest> {
var pendingTransactionDao = 0 // actor state:
for (msg in channel) { // iterate over incoming messages
twig("actor received message: ${msg.javaClass.simpleName}")
when (msg) {
is SubmitPendingTx -> updatePendingTransactions()
is RefreshSentTx -> refreshSentTransactions()
@ -56,14 +68,35 @@ class PersistentTransactionSender (
private fun CoroutineScope.startMonitor() = launch {
delay(5000) // todo see if we need a formal initial delay
while (!channel.isClosedForSend && isActive) {
// TODO: consider refactoring this since we actually want to wait on the return value of requestUpdate
updateResult = CompletableDeferred()
requestUpdate(true)
updateResult?.await()
delay(calculateDelay())
}
twig("TransactionMonitor stopping!")
}
private fun calculateDelay(): Long {
return initialMonitorDelay
// if we're actively waiting on results, then poll faster
val delay = when (lastChangeDetected) {
FirstChange -> initialMonitorDelay / 4
is NothingPending, is NoChange -> {
// simple linear offset when there has been no change
val count = (lastChangeDetected as? BackoffEnabled)?.count ?: 0
val offset = initialMonitorDelay / 5L * count
if (previousSentTxs?.isNotEmpty() == true) {
initialMonitorDelay / 4
} else {
initialMonitorDelay
} + offset
}
is SizeChange -> initialMonitorDelay / 4
is Modified -> initialMonitorDelay / 4
}
return min(delay, initialMonitorDelay * 8).also {
twig("Checking for pending tx changes again in ${it/1000L}s")
}
}
override fun start(scope: CoroutineScope) {
@ -88,7 +121,7 @@ class PersistentTransactionSender (
* Generates newly persisted information about a transaction so that other processes can send.
*/
override suspend fun sendToAddress(
encoder: RawTransactionEncoder,
encoder: TransactionEncoder,
zatoshi: Long,
toAddress: String,
memo: String,
@ -112,7 +145,7 @@ class PersistentTransactionSender (
}
override suspend fun sendPreparedTransaction(
encoder: RawTransactionEncoder,
encoder: TransactionEncoder,
tx: PendingTransaction
): PendingTransaction = withContext(IO) {
val currentHeight = service.safeLatestBlockHeight()
@ -123,7 +156,7 @@ class PersistentTransactionSender (
}
override suspend fun cleanupPreparedTransaction(tx: PendingTransaction) {
if (tx.raw == null) {
if (tx.raw.isEmpty()) {
(manager as PersistentTransactionManager).abortTransaction(tx)
}
}
@ -132,7 +165,6 @@ class PersistentTransactionSender (
var previousSentTxs: List<PendingTransaction>? = null
private suspend fun notifyIfChanged(currentSentTxs: List<PendingTransaction>) = withContext(IO) {
twig("notifyIfChanged: listener null? ${listenerChannel == null} closed? ${listenerChannel?.isClosedForSend}")
if (hasChanged(previousSentTxs, currentSentTxs) && listenerChannel?.isClosedForSend != true) {
twig("START notifying listenerChannel of changed txs")
listenerChannel?.send(currentSentTxs)
@ -154,14 +186,31 @@ class PersistentTransactionSender (
currentSents: List<PendingTransaction>
): Boolean {
// shortcuts first
if (currentSents.isEmpty() && previousSents == null) return false.also { twig("checking pending txs: detected nothing happened yet") } // if nothing has happened, that doesn't count as a change
if (previousSents == null) return true.also { twig("checking pending txs: detected first set of txs!") } // the first set of transactions is automatically a change
if (previousSents.size != currentSents.size) return true.also { twig("checking pending txs: detected size change from ${previousSents.size} to ${currentSents.size}") } // can't be the same and have different sizes, duh
for (tx in currentSents) {
if (!previousSents.contains(tx)) return true.also { twig("checking pending txs: detected change for $tx") }
if (currentSents.isEmpty() && previousSents.isNullOrEmpty()) return false.also {
val count = if (lastChangeDetected is BackoffEnabled) ((lastChangeDetected as? BackoffEnabled)?.count ?: 0) + 1 else 1
lastChangeDetected = NothingPending(count)
}
return false.also { twig("checking pending txs: detected no changes in pending txs") }
if (previousSents == null) return true.also { lastChangeDetected = FirstChange }
if (previousSents.size != currentSents.size) return true.also { lastChangeDetected = SizeChange(previousSentTxs?.size ?: -1, currentSents.size) }
for (tx in currentSents) {
// note: implicit .equals check inside `contains` will also detect modifications
if (!previousSents.contains(tx)) return true.also { lastChangeDetected = Modified(tx) }
}
return false.also {
val count = if (lastChangeDetected is BackoffEnabled) ((lastChangeDetected as? BackoffEnabled)?.count ?: 0) + 1 else 1
lastChangeDetected = NoChange(count)
}
}
sealed class ChangeType(val description: String) {
object FirstChange : ChangeType("This is the first time we've seen a change!")
data class NothingPending(override val count: Int) : ChangeType("Nothing happened yet!"), BackoffEnabled
data class NoChange(override val count: Int) : ChangeType("No changes"), BackoffEnabled
class SizeChange(val oldSize: Int, val newSize: Int) : ChangeType("The total number of pending transactions has changed")
class Modified(val tx: PendingTransaction) : ChangeType("At least one transaction has been modified")
}
interface BackoffEnabled {
val count: Int
}
/**
@ -169,7 +218,6 @@ class PersistentTransactionSender (
* when anything interesting has occurred with a transaction (via [requestUpdate]).
*/
private suspend fun refreshSentTransactions(): List<PendingTransaction> = withContext(IO) {
twig("refreshing all sent transactions")
val allSentTransactions = (manager as PersistentTransactionManager).getAll() // TODO: make this crash and catch error gracefully
notifyIfChanged(allSentTransactions)
allSentTransactions
@ -180,22 +228,20 @@ class PersistentTransactionSender (
*/
private suspend fun updatePendingTransactions() = withContext(IO) {
try {
twig("received request to submit pending transactions")
val allTransactions = refreshSentTransactions()
var pendingCount = 0
val currentHeight = service.safeLatestBlockHeight()
allTransactions.filter { !it.isMined() }.forEach { tx ->
if (tx.isPending(currentHeight)) {
pendingCount++
try {
retryWithBackoff(onSubmissionError, 1000L, 60_000L) {
manager.manageSubmission(service, tx)
} catch (t: Throwable) {
twig("Warning: manageSubmission failed")
onSubmissionError?.invoke(t)
}
} else {
findMatchingClearedTx(tx)?.let {
twig("matching cleared transaction found! $tx")
tx.rawTransactionId?.let {
ledger.findTransactionByRawId(tx.rawTransactionId)
}?.let {
twig("matching transaction found! $tx")
(manager as PersistentTransactionManager).manageMined(tx, it)
refreshSentTransactions()
}
@ -206,24 +252,15 @@ class PersistentTransactionSender (
twig("Error during updatePendingTransactions: $t caused by ${t.cause}")
}
}
private fun findMatchingClearedTx(tx: PendingTransaction): PendingTransaction? {
return if (tx.txId == null) null else {
(ledger as PollingTransactionRepository)
.findTransactionByRawId(tx.txId)?.firstOrNull()?.toPendingTransactionEntity()
}
}
}
private fun ClearedTransaction?.toPendingTransactionEntity(): PendingTransaction? {
if(this == null) return null
return PendingTransaction(
address = address ?: "",
value = value,
memo = memo ?: "",
minedHeight = height ?: -1,
txId = rawTransactionId
)
private fun Int.asOrdinal(): String {
return "$this" + if (this % 100 in 11..13) "th" else when(this % 10) {
1 -> "st"
2 -> "nd"
3 -> "rd"
else -> "th"
}
}
private fun LightWalletService.safeLatestBlockHeight(): Int {
@ -238,59 +275,4 @@ private fun LightWalletService.safeLatestBlockHeight(): Int {
sealed class TransactionUpdateRequest {
object SubmitPendingTx : TransactionUpdateRequest()
object RefreshSentTx : TransactionUpdateRequest()
}
private fun String?.toTxError(): TransactionError {
return FailedTransaction("$this")
}
data class FailedTransaction(override val message: String) : TransactionError
/*
states:
** creating
** failed to create
CREATED
EXPIRED
MINED
SUBMITTED
INVALID
** attempting submission
** attempted submission
bookkeeper, register, treasurer, mint, ledger
private fun checkTx(transactionId: Long) {
if (transactionId < 0) {
throw SweepException.Creation
} else {
twig("successfully created transaction!")
}
}
private fun checkRawTx(transactionRaw: ByteArray?) {
if (transactionRaw == null) {
throw SweepException.Disappeared
} else {
twig("found raw transaction in the dataDb")
}
}
private fun checkResponse(response: Service.SendResponse) {
if (response.errorCode < 0) {
throw SweepException.IncompletePass(response)
} else {
twig("successfully submitted. error code: ${response.errorCode}")
}
}
sealed class SweepException(val errorMessage: String) : RuntimeException(errorMessage) {
object Creation : SweepException("failed to create raw transaction")
object Disappeared : SweepException("unable to find a matching raw transaction. This means the rust backend said it created a TX but when we looked for it in the DB it was missing!")
class IncompletePass(response: Service.SendResponse) : SweepException("submit failed with error code: ${response.errorCode} and message ${response.errorMessage}")
}
*/
}

View File

@ -3,15 +3,11 @@ package cash.z.wallet.sdk.data
import android.content.Context
import androidx.room.Room
import androidx.room.RoomDatabase
import cash.z.wallet.sdk.dao.BlockDao
import cash.z.wallet.sdk.dao.TransactionDao
import cash.z.wallet.sdk.dao.ClearedTransaction
import cash.z.wallet.sdk.db.DerivedDataDb
import cash.z.wallet.sdk.db.*
import cash.z.wallet.sdk.entity.ClearedTransaction
import cash.z.wallet.sdk.entity.Transaction
import cash.z.wallet.sdk.jni.RustBackendWelding
import kotlinx.coroutines.*
import kotlinx.coroutines.Dispatchers.IO
import kotlinx.coroutines.channels.SendChannel
/**
* Repository that does polling for simplicity. We will implement an alternative version that uses live data as well as
@ -19,35 +15,30 @@ import kotlinx.coroutines.channels.SendChannel
* changes.
*/
open class PollingTransactionRepository(
private val dataDbPath: String,
private val derivedDataDb: DerivedDataDb,
private val rustBackend: RustBackendWelding,
private val pollFrequencyMillis: Long = 2000L
private val pollFrequencyMillis: Long = 2000L,
private val limit: Int = Int.MAX_VALUE
) : TransactionRepository {
/**
* Constructor that creates the database and then executes a callback on it.
* Constructor that creates the database.
*/
constructor(
context: Context,
dataDbName: String,
rustBackend: RustBackendWelding,
pollFrequencyMillis: Long = 2000L,
dbCallback: (DerivedDataDb) -> Unit = {}
pollFrequencyMillis: Long = 2000L
) : this(
context.getDatabasePath(dataDbName).absolutePath,
Room.databaseBuilder(context, DerivedDataDb::class.java, dataDbName)
.setJournalMode(RoomDatabase.JournalMode.TRUNCATE)
.build(),
rustBackend,
pollFrequencyMillis
) {
dbCallback(derivedDataDb)
}
)
internal val blocks: BlockDao = derivedDataDb.blockDao()
private val blocks: BlockDao = derivedDataDb.blockDao()
private val receivedNotes: ReceivedDao = derivedDataDb.receivedDao()
private val sentNotes: SentDao = derivedDataDb.sentDao()
private val transactions: TransactionDao = derivedDataDb.transactionDao()
private var pollingJob: Job? = null
protected var pollingJob: Job? = null
override fun lastScannedHeight(): Int {
return blocks.lastScannedHeight()
@ -64,8 +55,8 @@ open class PollingTransactionRepository(
transaction
}
fun findTransactionByRawId(rawTxId: ByteArray): List<ClearedTransaction>? {
return transactions.findByRawId(rawTxId)
override suspend fun findTransactionByRawId(rawTxId: ByteArray): Transaction? = withContext(IO) {
transactions.findByRawId(rawTxId)
}
override suspend fun deleteTransactionById(txId: Long) = withContext(IO) {
@ -73,26 +64,35 @@ open class PollingTransactionRepository(
transactions.deleteById(txId)
}
}
override suspend fun getClearedTransactions(): List<ClearedTransaction> = withContext(IO) {
transactions.getSentTransactions(limit) + transactions.getReceivedTransactions(limit)
}
suspend fun poll(channel: SendChannel<List<ClearedTransaction>>, frequency: Long = pollFrequencyMillis) = withContext(IO) {
override suspend fun monitorChanges(listener: () -> Unit) = withContext(IO) {
// since the only thing mutable is unmined transactions, we can simply check for new data rows rather than doing any deep comparisons
// in the future we can leverage triggers instead
pollingJob?.cancel()
pollingJob = launch {
var previousTransactions: List<ClearedTransaction>? = null
while (isActive && !channel.isClosedForSend) {
twigTask("polling for cleared transactions every ${frequency}ms") {
val newTransactions = transactions.getAll()
val txCount = ValueHolder(-1, "Transaction Count")
val unminedCount = ValueHolder(-1, "Unmined Transaction Count")
val sentCount = ValueHolder(-1, "Sent Transaction Count")
val receivedCount = ValueHolder(-1, "Received Transaction Count")
if (hasChanged(previousTransactions, newTransactions)) {
twig("loaded ${newTransactions.count()} cleared transactions and changes were detected!")
channel.send(addMemos(newTransactions))
previousTransactions = newTransactions
} else {
twig("loaded ${newTransactions.count()} cleared transactions but no changes detected.")
}
while (coroutineContext.isActive) {
// we check all conditions to avoid duplicate notifications whenever a change impacts multiple tables
// if counting becomes slower than the blocktime (highly unlikely) then this could be optimized to call the listener early and continue counting afterward but there's no need for that complexity now
if (txCount.changed(transactions.count())
|| unminedCount.changed(transactions.countUnmined())
|| sentCount.changed(sentNotes.count())
|| receivedCount.changed(receivedNotes.count())
) {
twig("Notifying listener that changes have been detected in transactions!")
listener.invoke()
} else {
twig("No changes detected in transactions.")
}
delay(pollFrequencyMillis)
}
twig("Done polling for cleared transactions")
}
}
@ -101,42 +101,23 @@ open class PollingTransactionRepository(
derivedDataDb.close()
}
private suspend fun addMemos(newTransactions: List<ClearedTransaction>): List<ClearedTransaction> = withContext(IO){
for (tx in newTransactions) {
if (tx.rawMemoExists) {
tx.memo = if(tx.isSend) {
rustBackend.getSentMemoAsUtf8(dataDbPath, tx.noteId)
} else {
rustBackend.getReceivedMemoAsUtf8(dataDbPath, tx.noteId)
}
}
}
/**
* Reduces some of the boilerplate of checking a value for changes.
*/
internal class ValueHolder<T>(var value: T, val description: String = "Value") {
/**
* Hold the new value and report whether it has changed.
*/
fun changed(newValue: T): Boolean {
return if (newValue == value) {
false
} else {
twig("$description changed from $value to $newValue")
value = newValue
true
}
newTransactions
}
private fun hasChanged(oldTxs: List<ClearedTransaction>?, newTxs: List<ClearedTransaction>): Boolean {
fun pr(t: List<ClearedTransaction>?): String {
if(t == null) return "none"
val str = StringBuilder()
for (tx in t) {
str.append("\n@TWIG: ").append(tx.toString())
}
return str.toString()
}
val sends = newTxs.filter { it.isSend }
if(sends.isNotEmpty()) twig("SENDS hasChanged: old-txs: ${pr(oldTxs?.filter { it.isSend })}\n@TWIG: new-txs: ${pr(sends)}")
// shortcuts first
if (newTxs.isEmpty() && oldTxs == null) return false.also { twig("detected nothing happened yet") } // if nothing has happened, that doesn't count as a change
if (oldTxs == null) return true.also { twig("detected first set of txs!") } // the first set of transactions is automatically a change
if (oldTxs.size != newTxs.size) return true.also { twig("detected size difference") } // can't be the same and have different sizes, duh
for (note in newTxs) {
if (!oldTxs.contains(note)) return true.also { twig("detected change for $note") }
}
return false.also { twig("detected no changes in all new txs") }
}
}
}

View File

@ -1,26 +0,0 @@
package cash.z.wallet.sdk.data
interface RawTransactionEncoder {
/**
* Creates a raw transaction that is unsigned.
*/
suspend fun create(zatoshi: Long, toAddress: String, memo: String = ""): EncodedTransaction
}
data class EncodedTransaction(val txId: ByteArray, val raw: ByteArray) {
override fun equals(other: Any?): Boolean {
if (this === other) return true
if (other !is EncodedTransaction) return false
if (!txId.contentEquals(other.txId)) return false
if (!raw.contentEquals(other.raw)) return false
return true
}
override fun hashCode(): Int {
var result = txId.contentHashCode()
result = 31 * result + raw.contentHashCode()
return result
}
}

View File

@ -1,302 +1,305 @@
package cash.z.wallet.sdk.data
import cash.z.wallet.sdk.block.CompactBlockProcessor
import cash.z.wallet.sdk.dao.ClearedTransaction
import cash.z.wallet.sdk.exception.SynchronizerException
import cash.z.wallet.sdk.exception.WalletException
import cash.z.wallet.sdk.secure.Wallet
import kotlinx.coroutines.*
import kotlinx.coroutines.Dispatchers.IO
import kotlinx.coroutines.channels.ConflatedBroadcastChannel
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.channels.distinct
import kotlin.coroutines.CoroutineContext
/**
* The glue. Downloads compact blocks to the database and then scans them for transactions. In order to serve that
* purpose, this class glues together a variety of key components. Each component contributes to the team effort of
* providing a simple source of truth to interact with.
*
* Another way of thinking about this class is the reference that demonstrates how all the pieces can be tied
* together.
*
* @param processor the component that saves the downloaded compact blocks to the cache and then scans those blocks for
* data related to this wallet.
* @param repository the component that exposes streams of wallet transaction information.
* @param activeTransactionManager the component that manages the lifecycle of active transactions. This includes sent
* transactions that have not been mined.
* @param wallet the component that wraps the JNI layer that interacts with librustzcash and manages wallet config.
* @param batchSize the number of compact blocks to download at a time.
* @param staleTolerance the number of blocks to allow before considering our data to be stale
* @param blockPollFrequency how often to poll for compact blocks. Once all missing blocks have been downloaded, this
* number represents the number of milliseconds the synchronizer will wait before checking for newly mined blocks.
*/
class SdkSynchronizer(
private val processor: CompactBlockProcessor,
private val repository: PollingTransactionRepository,
private val activeTransactionManager: ActiveTransactionManager,
private val wallet: Wallet,
private val staleTolerance: Int = 10
) : Synchronizer {
/**
* The primary job for this Synchronizer. It leverages structured concurrency to cancel all work when the
* `parentScope` provided to the [start] method ends.
*/
private lateinit var blockJob: Job
/**
* The state this Synchronizer was in when it started. This is helpful because the conditions that lead to FirstRun
* or isStale being detected can change quickly so retaining the initial state is useful for walkthroughs or other
* elements of an app that need to rely on this information later, rather than in realtime.
*/
private lateinit var initialState: SyncState
/**
* Returns true when `start` has been called on this synchronizer.
*/
private val wasPreviouslyStarted
get() = ::blockJob.isInitialized
/**
* Retains the error that caused this synchronizer to fail for future error handling or reporting.
*/
private var failure: Throwable? = null
/**
* The default exception handler for the block job. Calls [onException].
*/
private val exceptionHandler: (c: CoroutineContext, t: Throwable) -> Unit = { _, t -> onException(t) }
/**
* Sets a listener to be notified of uncaught Synchronizer errors. When null, errors will only be logged.
*/
override var onSynchronizerErrorListener: ((Throwable?) -> Boolean)? = null
set(value) {
field = value
if (failure != null) value?.invoke(failure)
}
/**
* Channel of transactions from the repository.
*/
private val transactionChannel = ConflatedBroadcastChannel<List<ClearedTransaction>>()
/**
* Channel of balance information.
*/
private val balanceChannel = ConflatedBroadcastChannel<Wallet.WalletBalance>()
//
// Public API
//
/* Lifecycle */
/**
* Starts this synchronizer within the given scope. For simplicity, attempting to start an instance that has already
* been started will throw a [SynchronizerException.FalseStart] exception. This reduces the complexity of managing
* resources that must be recycled. Instead, each synchronizer is designed to have a long lifespan and should be
* started from an activity, application or session.
*
* @param parentScope the scope to use for this synchronizer, typically something with a lifecycle such as an
* Activity for single-activity apps or a logged in user session. This scope is only used for launching this
* synchronzer's job as a child.
*/
override fun start(parentScope: CoroutineScope): Synchronizer {
// prevent restarts so the behavior of this class is easier to reason about
if (wasPreviouslyStarted) throw SynchronizerException.FalseStart
twig("starting")
failure = null
blockJob = parentScope.launch(CoroutineExceptionHandler(exceptionHandler)) {
supervisorScope {
try {
wallet.initialize()
} catch (e: WalletException.AlreadyInitializedException) {
twig("Warning: wallet already initialized but this is safe to ignore " +
"because the SDK now automatically detects where to start downloading.")
}
onReady()
}
}
return this
}
/**
* Stops this synchronizer by stopping the downloader, repository, and activeTransactionManager, then cancelling the
* parent job. Note that we do not cancel the parent scope that was passed into [start] because the synchronizer
* does not own that scope, it just uses it for launching children.
*/
override fun stop() {
twig("stopping")
(repository as? PollingTransactionRepository)?.stop().also { twig("repository stopped") }
activeTransactionManager.stop().also { twig("activeTransactionManager stopped") }
// TODO: investigate whether this is necessary and remove or improve, accordingly
Thread.sleep(5000L)
blockJob.cancel().also { twig("blockJob cancelled") }
}
// This has been replaced by "StableSynchronizer" We keep it around for the docs
/* Channels */
/**
* A stream of all the wallet transactions, delegated to the [activeTransactionManager].
*/
override fun activeTransactions() = activeTransactionManager.subscribe()
/**
* A stream of all the wallet transactions, delegated to the [repository].
*/
override fun allTransactions(): ReceiveChannel<List<ClearedTransaction>> {
return transactionChannel.openSubscription()
}
/**
* A stream of progress values, corresponding to this Synchronizer downloading blocks, delegated to the
* [downloader]. Any non-zero value below 100 indicates that progress indicators can be shown and a value of 100
* signals that progress is complete and any progress indicators can be hidden. At that point, the synchronizer
* switches from catching up on missed blocks to periodically monitoring for newly mined blocks.
*/
override fun progress(): ReceiveChannel<Int> {
return processor.progress()
}
/**
* A stream of balance values, delegated to the [wallet].
*/
override fun balances(): ReceiveChannel<Wallet.WalletBalance> {
return balanceChannel.openSubscription()
}
/* Status */
/**
* A flag to indicate that this Synchronizer is significantly out of sync with it's server. This is determined by
* the delta between the current block height reported by the server and the latest block we have stored in cache.
* Whenever this delta is greater than the [staleTolerance], this function returns true. This is intended for
* showing progress indicators when the user returns to the app after having not used it for a long period.
* Typically, this means the user may have to wait for downloading to occur and the current balance and transaction
* information cannot be trusted as 100% accurate.
*
* @return true when the local data is significantly out of sync with the remote server and the app data is stale.
*/
override suspend fun isStale(): Boolean = withContext(IO) {
val latestBlockHeight = processor.downloader.getLatestBlockHeight()
val ourHeight = processor.downloader.getLastDownloadedHeight()
val tolerance = staleTolerance
val delta = latestBlockHeight - ourHeight
twig("checking whether out of sync. " +
"LatestHeight: $latestBlockHeight ourHeight: $ourHeight Delta: $delta tolerance: $tolerance")
delta > tolerance
}
/* Operations */
/**
* Gets the address for the given account.
*
* @param accountId the optional accountId whose address of interest. Typically, this value is zero.
*/
override fun getAddress(accountId: Int): String = wallet.getAddress()
override suspend fun getBalance(accountId: Int): Wallet.WalletBalance = wallet.getBalanceInfo(accountId)
/**
* Sends zatoshi.
*
* @param zatoshi the amount of zatoshi to send.
* @param toAddress the recipient's address.
* @param memo the optional memo to include as part of the transaction.
* @param fromAccountId the optional account id to use. By default, the first account is used.
*/
override suspend fun sendToAddress(zatoshi: Long, toAddress: String, memo: String, fromAccountId: Int) =
activeTransactionManager.sendToAddress(zatoshi, toAddress, memo, fromAccountId)
/**
* Attempts to cancel a previously sent transaction. Transactions can only be cancelled during the calculation phase
* before they've been submitted to the server. This method will return false when it is too late to cancel. This
* logic is delegated to the activeTransactionManager, which knows the state of the given transaction.
*
* @param transaction the transaction to cancel.
* @return true when the cancellation request was successful. False when it is too late to cancel.
*/
override fun cancelSend(transaction: ActiveSendTransaction): Boolean = activeTransactionManager.cancel(transaction)
//
// Private API
//
/**
* Logic for starting the Synchronizer once it is ready for processing. All starts eventually end with this method.
*/
private fun CoroutineScope.onReady() = launch {
twig("synchronization is ready to begin!")
launch { monitorTransactions(transactionChannel.openSubscription().distinct()) }
activeTransactionManager.start()
repository.poll(transactionChannel)
processor.start()
}
/**
* Monitors transactions and recalculates the balance any time transactions have changed.
*/
private suspend fun monitorTransactions(transactionChannel: ReceiveChannel<List<ClearedTransaction>>) =
withContext(IO) {
twig("beginning to monitor transactions in order to update the balance")
launch {
for (i in transactionChannel) {
twig("triggering a balance update because transactions have changed")
balanceChannel.send(wallet.getBalanceInfo())
twig("done triggering balance check!")
}
}
twig("done monitoring transactions in order to update the balance")
}
/**
* Wraps exceptions, logs them and then invokes the [onSynchronizerErrorListener], if it exists.
*/
private fun onException(throwable: Throwable) {
twig("********")
twig("******** ERROR: $throwable")
if (throwable.cause != null) twig("******** caused by ${throwable.cause}")
if (throwable.cause?.cause != null) twig("******** caused by ${throwable.cause?.cause}")
twig("********")
val hasRecovered = onSynchronizerErrorListener?.invoke(throwable)
if (hasRecovered != true) stop().also { failure = throwable }
}
/**
* Represents the initial state of the Synchronizer.
*/
sealed class SyncState {
/**
* State for the first run of the Synchronizer, when the database has not been initialized.
*/
object FirstRun : SyncState()
/**
* State for when compact blocks have been downloaded but not scanned. This state is typically achieved when the
* app was previously started but killed before the first scan took place. In this case, we do not need to
* download compact blocks that we already have.
*
* @param startingBlockHeight the last block that has been downloaded into the cache. We do not need to download
* any blocks before this height because we already have them.
*/
class CacheOnly(val startingBlockHeight: Int = Int.MAX_VALUE) : SyncState()
/**
* The final state of the Synchronizer, when all initialization is complete and the starting block is known.
*
* @param startingBlockHeight the height that will be fed to the downloader. In most cases, it will represent
* either the wallet birthday or the last block that was processed in the previous session.
*/
class ReadyToProcess(val startingBlockHeight: Int = Int.MAX_VALUE) : SyncState()
}
}
//package cash.z.wallet.sdk.data
//
//import cash.z.wallet.sdk.block.CompactBlockProcessor
//import cash.z.wallet.sdk.entity.ClearedTransaction
//import cash.z.wallet.sdk.exception.SynchronizerException
//import cash.z.wallet.sdk.exception.WalletException
//import cash.z.wallet.sdk.secure.Wallet
//import kotlinx.coroutines.*
//import kotlinx.coroutines.Dispatchers.IO
//import kotlinx.coroutines.channels.ConflatedBroadcastChannel
//import kotlinx.coroutines.channels.ReceiveChannel
//import kotlinx.coroutines.channels.distinct
//import kotlin.coroutines.CoroutineContext
//
///**
// * The glue. Downloads compact blocks to the database and then scans them for transactions. In order to serve that
// * purpose, this class glues together a variety of key components. Each component contributes to the team effort of
// * providing a simple source of truth to interact with.
// *
// * Another way of thinking about this class is the reference that demonstrates how all the pieces can be tied
// * together.
// *
// * @param processor the component that saves the downloaded compact blocks to the cache and then scans those blocks for
// * data related to this wallet.
// * @param repository the component that exposes streams of wallet transaction information.
// * @param activeTransactionManager the component that manages the lifecycle of active transactions. This includes sent
// * transactions that have not been mined.
// * @param wallet the component that wraps the JNI layer that interacts with librustzcash and manages wallet config.
// * @param batchSize the number of compact blocks to download at a time.
// * @param staleTolerance the number of blocks to allow before considering our data to be stale
// * @param blockPollFrequency how often to poll for compact blocks. Once all missing blocks have been downloaded, this
// * number represents the number of milliseconds the synchronizer will wait before checking for newly mined blocks.
// */
//class SdkSynchronizer(
// private val processor: CompactBlockProcessor,
// private val repository: TransactionRepository,
// private val activeTransactionManager: ActiveTransactionManager,
// private val wallet: Wallet,
// private val staleTolerance: Int = 10
//) : Synchronizer {
//
// /**
// * The primary job for this Synchronizer. It leverages structured concurrency to cancel all work when the
// * `parentScope` provided to the [start] method ends.
// */
// private lateinit var blockJob: Job
//
// /**
// * The state this Synchronizer was in when it started. This is helpful because the conditions that lead to FirstRun
// * or isStale being detected can change quickly so retaining the initial state is useful for walkthroughs or other
// * elements of an app that need to rely on this information later, rather than in realtime.
// */
// private lateinit var initialState: SyncState
//
// /**
// * Returns true when `start` has been called on this synchronizer.
// */
// private val wasPreviouslyStarted
// get() = ::blockJob.isInitialized
//
// /**
// * Retains the error that caused this synchronizer to fail for future error handling or reporting.
// */
// private var failure: Throwable? = null
//
// /**
// * The default exception handler for the block job. Calls [onException].
// */
// private val exceptionHandler: (c: CoroutineContext, t: Throwable) -> Unit = { _, t -> onException(t) }
//
// /**
// * Sets a listener to be notified of uncaught Synchronizer errors. When null, errors will only be logged.
// */
// override var onSynchronizerErrorListener: ((Throwable?) -> Boolean)? = null
// set(value) {
// field = value
// if (failure != null) value?.invoke(failure)
// }
//
// /**
// * Channel of transactions from the repository.
// */
// private val transactionChannel = ConflatedBroadcastChannel<List<ClearedTransaction>>()
//
// /**
// * Channel of balance information.
// */
// private val balanceChannel = ConflatedBroadcastChannel<Wallet.WalletBalance>()
//
// //
// // Public API
// //
//
// /* Lifecycle */
//
// /**
// * Starts this synchronizer within the given scope. For simplicity, attempting to start an instance that has already
// * been started will throw a [SynchronizerException.FalseStart] exception. This reduces the complexity of managing
// * resources that must be recycled. Instead, each synchronizer is designed to have a long lifespan and should be
// * started from an activity, application or session.
// *
// * @param parentScope the scope to use for this synchronizer, typically something with a lifecycle such as an
// * Activity for single-activity apps or a logged in user session. This scope is only used for launching this
// * synchronzer's job as a child.
// */
// override fun start(parentScope: CoroutineScope): Synchronizer {
// // prevent restarts so the behavior of this class is easier to reason about
// if (wasPreviouslyStarted) throw SynchronizerException.FalseStart
// twig("starting")
// failure = null
// blockJob = parentScope.launch(CoroutineExceptionHandler(exceptionHandler)) {
// supervisorScope {
// try {
// wallet.initialize()
// } catch (e: WalletException.AlreadyInitializedException) {
// twig("Warning: wallet already initialized but this is safe to ignore " +
// "because the SDK now automatically detects where to start downloading.")
// }
// onReady()
// }
// }
// return this
// }
//
// /**
// * Stops this synchronizer by stopping the downloader, repository, and activeTransactionManager, then cancelling the
// * parent job. Note that we do not cancel the parent scope that was passed into [start] because the synchronizer
// * does not own that scope, it just uses it for launching children.
// */
// override fun stop() {
// twig("stopping")
// (repository as? PollingTransactionRepository)?.stop().also { twig("repository stopped") }
// activeTransactionManager.stop().also { twig("activeTransactionManager stopped") }
// // TODO: investigate whether this is necessary and remove or improve, accordingly
// Thread.sleep(5000L)
// blockJob.cancel().also { twig("blockJob cancelled") }
// }
//
//
// /* Channels */
//
// /**
// * A stream of all the wallet transactions, delegated to the [activeTransactionManager].
// */
// override fun activeTransactions() = activeTransactionManager.subscribe()
//
// /**
// * A stream of all the wallet transactions, delegated to the [repository].
// */
// override fun allTransactions(): ReceiveChannel<List<ClearedTransaction>> {
// return transactionChannel.openSubscription()
// }
//
// /**
// * A stream of progress values, corresponding to this Synchronizer downloading blocks, delegated to the
// * [downloader]. Any non-zero value below 100 indicates that progress indicators can be shown and a value of 100
// * signals that progress is complete and any progress indicators can be hidden. At that point, the synchronizer
// * switches from catching up on missed blocks to periodically monitoring for newly mined blocks.
// */
// override fun progress(): ReceiveChannel<Int> {
// return processor.progress()
// }
//
// /**
// * A stream of balance values, delegated to the [wallet].
// */
// override fun balances(): ReceiveChannel<Wallet.WalletBalance> {
// return balanceChannel.openSubscription()
// }
//
//
// /* Status */
//
// /**
// * A flag to indicate that this Synchronizer is significantly out of sync with it's server. This is determined by
// * the delta between the current block height reported by the server and the latest block we have stored in cache.
// * Whenever this delta is greater than the [staleTolerance], this function returns true. This is intended for
// * showing progress indicators when the user returns to the app after having not used it for a long period.
// * Typically, this means the user may have to wait for downloading to occur and the current balance and transaction
// * information cannot be trusted as 100% accurate.
// *
// * @return true when the local data is significantly out of sync with the remote server and the app data is stale.
// */
// override suspend fun isStale(): Boolean = withContext(IO) {
// val latestBlockHeight = processor.downloader.getLatestBlockHeight()
// val ourHeight = processor.downloader.getLastDownloadedHeight()
// val tolerance = staleTolerance
// val delta = latestBlockHeight - ourHeight
// twig("checking whether out of sync. " +
// "LatestHeight: $latestBlockHeight ourHeight: $ourHeight Delta: $delta tolerance: $tolerance")
// delta > tolerance
// }
//
// /* Operations */
//
// /**
// * Gets the address for the given account.
// *
// * @param accountId the optional accountId whose address of interest. Typically, this value is zero.
// */
// override fun getAddress(accountId: Int): String = wallet.getAddress()
//
// override suspend fun getBalance(accountId: Int): Wallet.WalletBalance = wallet.getBalanceInfo(accountId)
//
// /**
// * Sends zatoshi.
// *
// * @param zatoshi the amount of zatoshi to send.
// * @param toAddress the recipient's address.
// * @param memo the optional memo to include as part of the transaction.
// * @param fromAccountId the optional account id to use. By default, the first account is used.
// */
// override suspend fun sendToAddress(zatoshi: Long, toAddress: String, memo: String, fromAccountId: Int) =
// activeTransactionManager.sendToAddress(zatoshi, toAddress, memo, fromAccountId)
//
// /**
// * Attempts to cancel a previously sent transaction. Transactions can only be cancelled during the calculation phase
// * before they've been submitted to the server. This method will return false when it is too late to cancel. This
// * logic is delegated to the activeTransactionManager, which knows the state of the given transaction.
// *
// * @param transaction the transaction to cancel.
// * @return true when the cancellation request was successful. False when it is too late to cancel.
// */
// override fun cancelSend(transaction: ActiveSendTransaction): Boolean = activeTransactionManager.cancel(transaction)
//
//
// //
// // Private API
// //
//
//
// /**
// * Logic for starting the Synchronizer once it is ready for processing. All starts eventually end with this method.
// */
// private fun CoroutineScope.onReady() = launch {
// twig("synchronization is ready to begin!")
// launch { monitorTransactions(transactionChannel.openSubscription().distinct()) }
//
// activeTransactionManager.start()
// repository.poll(transactionChannel)
// processor.start()
// }
//
// /**
// * Monitors transactions and recalculates the balance any time transactions have changed.
// */
// private suspend fun monitorTransactions(transactionChannel: ReceiveChannel<List<ClearedTransaction>>) =
// withContext(IO) {
// twig("beginning to monitor transactions in order to update the balance")
// launch {
// for (i in transactionChannel) {
// twig("triggering a balance update because transactions have changed")
// balanceChannel.send(wallet.getBalanceInfo())
// twig("done triggering balance check!")
// }
// }
// twig("done monitoring transactions in order to update the balance")
// }
//
// /**
// * Wraps exceptions, logs them and then invokes the [onSynchronizerErrorListener], if it exists.
// */
// private fun onException(throwable: Throwable) {
// twig("********")
// twig("******** ERROR: $throwable")
// if (throwable.cause != null) twig("******** caused by ${throwable.cause}")
// if (throwable.cause?.cause != null) twig("******** caused by ${throwable.cause?.cause}")
// twig("********")
//
// val hasRecovered = onSynchronizerErrorListener?.invoke(throwable)
// if (hasRecovered != true) stop().also { failure = throwable }
// }
//
// /**
// * Represents the initial state of the Synchronizer.
// */
// sealed class SyncState {
// /**
// * State for the first run of the Synchronizer, when the database has not been initialized.
// */
// object FirstRun : SyncState()
//
// /**
// * State for when compact blocks have been downloaded but not scanned. This state is typically achieved when the
// * app was previously started but killed before the first scan took place. In this case, we do not need to
// * download compact blocks that we already have.
// *
// * @param startingBlockHeight the last block that has been downloaded into the cache. We do not need to download
// * any blocks before this height because we already have them.
// */
// class CacheOnly(val startingBlockHeight: Int = Int.MAX_VALUE) : SyncState()
//
// /**
// * The final state of the Synchronizer, when all initialization is complete and the starting block is known.
// *
// * @param startingBlockHeight the height that will be fed to the downloader. In most cases, it will represent
// * either the wallet birthday or the last block that was processed in the previous session.
// */
// class ReadyToProcess(val startingBlockHeight: Int = Int.MAX_VALUE) : SyncState()
// }
//
//}

View File

@ -1,8 +1,9 @@
package cash.z.wallet.sdk.data
import cash.z.wallet.sdk.block.CompactBlockProcessor
import cash.z.wallet.sdk.dao.ClearedTransaction
import cash.z.wallet.sdk.db.PendingTransaction
import cash.z.wallet.sdk.entity.ClearedTransaction
import cash.z.wallet.sdk.entity.PendingTransaction
import cash.z.wallet.sdk.entity.SentTransaction
import cash.z.wallet.sdk.exception.WalletException
import cash.z.wallet.sdk.secure.Wallet
import kotlinx.coroutines.*
@ -17,11 +18,11 @@ import kotlin.coroutines.CoroutineContext
@ExperimentalCoroutinesApi
class StableSynchronizer (
private val wallet: Wallet,
private val ledger: PollingTransactionRepository,
private val ledger: TransactionRepository,
private val sender: TransactionSender,
private val processor: CompactBlockProcessor,
private val encoder: RawTransactionEncoder
) : DataSynchronizer {
private val encoder: TransactionEncoder
) : Synchronizer {
/**
* The lifespan of this Synchronizer. This scope is initialized once the Synchronizer starts because it will be a
@ -53,27 +54,34 @@ class StableSynchronizer (
//
// Error Callbacks
// Error Handling
//
/** This listener will not be called on the main thread. So it will need to switch to do anything with UI, like dialogs */
override var onCriticalErrorListener: ((Throwable) -> Boolean)? = null
/*
* These listeners will not be called on the main thread.
* So they will need to switch to do anything with UI, like dialogs
*/
override var onCriticalErrorHandler: ((Throwable?) -> Boolean)? = null
override var onProcessorErrorHandler: ((Throwable?) -> Boolean)? = null
override var onSubmissionErrorHandler: ((Throwable?) -> Boolean)? = null
override fun start(parentScope: CoroutineScope) {
override fun start(parentScope: CoroutineScope): Synchronizer {
// base this scope on the parent so that when the parent's job cancels, everything here cancels as well
// also use a supervisor job so that one failure doesn't bring down the whole synchronizer
coroutineScope = CoroutineScope(SupervisorJob(parentScope.coroutineContext[Job]!!) + Dispatchers.Main)
// TODO: this doesn't work as intended. Refactor to improve the cancellation behavior (i.e. what happens when one job fails) by making launchTransactionMonitor throw an exception
coroutineScope.launch {
initWallet()
startSender(this)
launchProgressMonitor()
launchPendingMonitor()
launchClearedMonitor()
launchTransactionMonitor()
onReady()
}
return this
}
private fun startSender(parentScope: CoroutineScope) {
@ -115,7 +123,7 @@ class StableSynchronizer (
//
// begin the monitor that will update the balance proactively whenever we're done a large scan
private fun CoroutineScope.launchProgressMonitor(): Job? = launch {
private fun CoroutineScope.launchProgressMonitor(): Job = launch {
twig("launching progress monitor")
val progressUpdates = progress()
for (progress in progressUpdates) {
@ -128,7 +136,7 @@ class StableSynchronizer (
}
// begin the monitor that will output pending transactions into the pending channel
private fun CoroutineScope.launchPendingMonitor(): Job? = launch {
private fun CoroutineScope.launchPendingMonitor(): Job = launch {
twig("launching pending monitor")
// ask to be notified when the sender notices anything new, while attempting to send
sender.notifyOnChange(pendingChannel)
@ -143,30 +151,27 @@ class StableSynchronizer (
twig("done monitoring for pending changes and balance changes")
}
// begin the monitor that will output cleared transactions into the cleared channel
private fun CoroutineScope.launchClearedMonitor(): Job? = launch {
twig("launching cleared monitor")
// poll for modifications and send them into the cleared channel
ledger.poll(clearedChannel, 10_000L)
private fun CoroutineScope.launchTransactionMonitor(): Job = launch {
ledger.monitorChanges(::onTransactionsChanged)
}
// when those notifications come in, also update the balance
val channel = clearedChannel.openSubscription()
for (cleared in channel) {
if(!balanceChannel.isClosedForSend) {
twig("triggering a balance update because cleared transactions have changed")
refreshBalance()
} else {
twig("WARNING: noticed new cleared transactions but the balance channel was closed for send so ignoring!")
}
fun onTransactionsChanged() {
coroutineScope.launch {
refreshBalance()
clearedChannel.send(ledger.getClearedTransactions())
}
twig("done monitoring for cleared changes and balance changes")
twig("done handling changed transactions")
}
suspend fun refreshBalance() = withContext(IO) {
balanceChannel.send(wallet.getBalanceInfo())
if (!balanceChannel.isClosedForSend) {
twig("triggering a balance update because transactions have changed")
balanceChannel.send(wallet.getBalanceInfo())
} else {
twig("WARNING: noticed new transactions but the balance channel was closed for send so ignoring!")
}
}
private fun CoroutineScope.onReady() = launch(CoroutineExceptionHandler(::onCriticalError)) {
twig("Synchronizer Ready. Starting processor!")
processor.onErrorListener = ::onProcessorError
@ -181,34 +186,21 @@ class StableSynchronizer (
if (error.cause?.cause != null) twig("******** caused by ${error.cause?.cause}")
twig("********")
onCriticalErrorListener?.invoke(error)
onCriticalErrorHandler?.invoke(error)
}
private fun onFailedSend(error: Throwable): Boolean {
twig("ERROR while submitting transaction: $error")
return onSubmissionErrorHandler?.invoke(error)?.also {
if (it) twig("submission error handler signaled that we should try again!")
} == true
}
var sameErrorCount = 1
var processorErrorMessage: String? = ""
private fun onProcessorError(error: Throwable): Boolean {
val dummyContext = CoroutineName("bob")
if (processorErrorMessage == error.message) sameErrorCount++
val isFrequent = sameErrorCount.rem(25) == 0
when {
sameErrorCount == 5 -> onCriticalError(dummyContext, error)
// isFrequent -> trackError(ProcessorRepeatedFailure(error, sameErrorCount))
sameErrorCount == 120 -> {
// trackError(ProcessorMaxFailureReached(error))
Thread.sleep(500)
throw error
}
}
processorErrorMessage = error.message
twig("synchronizer sees your error and ignores it, willfully! Keep retrying ($sameErrorCount), processor!")
return true
}
fun onFailedSend(throwable: Throwable) {
// trackError(ErrorSubmitting(throwable))
twig("ERROR while processing data: $error")
return onProcessorErrorHandler?.invoke(error)?.also {
if (it) twig("processor error handler signaled that we should try again!")
} == true
}
@ -232,15 +224,15 @@ class StableSynchronizer (
return clearedChannel.openSubscription()
}
override fun getPending(): List<PendingTransaction> {
override fun lastPending(): List<PendingTransaction> {
return if (pendingChannel.isClosedForSend) listOf() else pendingChannel.value
}
override fun getCleared(): List<ClearedTransaction> {
override fun lastCleared(): List<ClearedTransaction> {
return if (clearedChannel.isClosedForSend) listOf() else clearedChannel.value
}
override fun getBalance(): Wallet.WalletBalance {
override fun lastBalance(): Wallet.WalletBalance {
return balanceChannel.value
}
@ -249,6 +241,12 @@ class StableSynchronizer (
// Send / Receive
//
override fun cancelSend(transaction: SentTransaction): Boolean {
// not implemented
throw NotImplementedError("Cancellation is not yet implemented " +
"but should be pretty straight forward, using th PersistentTransactionManager")
}
override suspend fun getAddress(accountId: Int): String = withContext(IO) { wallet.getAddress() }
override suspend fun sendToAddress(
@ -261,27 +259,3 @@ class StableSynchronizer (
}
}
interface DataSynchronizer {
fun start(parentScope: CoroutineScope)
fun stop()
suspend fun getAddress(accountId: Int = 0): String
suspend fun sendToAddress(zatoshi: Long, toAddress: String, memo: String = "", fromAccountId: Int = 0): PendingTransaction
fun balances(): ReceiveChannel<Wallet.WalletBalance>
fun progress(): ReceiveChannel<Int>
fun pendingTransactions(): ReceiveChannel<List<PendingTransaction>>
fun clearedTransactions(): ReceiveChannel<List<ClearedTransaction>>
fun getPending(): List<PendingTransaction>
fun getCleared(): List<ClearedTransaction>
fun getBalance(): Wallet.WalletBalance
val isConnected: Boolean
val isSyncing: Boolean
val isScanning: Boolean
var onCriticalErrorListener: ((Throwable) -> Boolean)?
}

View File

@ -1,18 +1,23 @@
package cash.z.wallet.sdk.data
import cash.z.wallet.sdk.dao.ClearedTransaction
import cash.z.wallet.sdk.entity.ClearedTransaction
import cash.z.wallet.sdk.entity.PendingTransaction
import cash.z.wallet.sdk.entity.SentTransaction
import cash.z.wallet.sdk.secure.Wallet
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.channels.ReceiveChannel
/**
* Primary interface for interacting with the SDK. Defines the contract that specific implementations like
* [MockSynchronizer] and [SdkSynchronizer] fulfill. Given the language-level support for coroutines, we favor their use
* in the SDK and incorporate that choice into this contract.
* [MockSynchronizer] and [StableSynchronizer] fulfill. Given the language-level support for coroutines, we favor their
* use in the SDK and incorporate that choice into this contract.
*/
interface Synchronizer {
/* Lifecycle */
//
// Lifecycle
//
/**
* Starts this synchronizer within the given scope.
*
@ -22,26 +27,18 @@ interface Synchronizer {
fun start(parentScope: CoroutineScope): Synchronizer
/**
* Stop this synchronizer.
* Stop this synchronizer. Implementations should ensure that calling this method cancels all jobs that were created
* by this instance.
*/
fun stop()
/* Channels */
// NOTE: each of these are expected to be a broadcast channel, such that [receive] always returns the latest value
//
// Channels
//
/**
* A stream of all the active transactions.
*/
fun activeTransactions(): ReceiveChannel<Map<ActiveTransaction, TransactionState>>
/**
* A stream of all the wallet transactions.
*/
fun allTransactions(): ReceiveChannel<List<ClearedTransaction>>
/**
* A stream of balance values.
* A stream of balance values, separately reflecting both the available and total balance.
*/
fun balances(): ReceiveChannel<Wallet.WalletBalance>
@ -52,46 +49,70 @@ interface Synchronizer {
*/
fun progress(): ReceiveChannel<Int>
/* Status */
/**
* A stream of all the outbound pending transaction that have been sent but are awaiting confirmations.
*/
fun pendingTransactions(): ReceiveChannel<List<PendingTransaction>>
/**
* A flag to indicate that this Synchronizer is significantly out of sync with it's server. Typically, this means
* that the balance and other data cannot be completely trusted because a significant amount of data has not been
* processed. This is intended for showing progress indicators when the user returns to the app after having not
* used it for days. Typically, this means minor sync issues should be ignored and this should be leveraged in order
* to alert a user that the balance information is stale.
*
* @return true when the local data is significantly out of sync with the remote server and the app data is stale.
* A stream of all the transactions that are on the blockchain. Implementations should consider only returning a
* subset like the most recent 100 transactions, perhaps through paging the underlying database.
*/
suspend fun isStale(): Boolean
fun clearedTransactions(): ReceiveChannel<List<ClearedTransaction>>
/**
* Gets or sets a global error listener. This is a useful hook for handling unexpected critical errors.
*
* @return true when the error has been handled and the Synchronizer should continue. False when the error is
* unrecoverable and the Synchronizer should [stop].
* Holds the most recent value that was transmitted through the [pendingTransactions] channel. Typically, if the
* underlying channel is a BroadcastChannel (and it should be),then this value is simply [pendingChannel.value]
*/
var onSynchronizerErrorListener: ((Throwable?) -> Boolean)?
fun lastPending(): List<PendingTransaction>
/**
* Holds the most recent value that was transmitted through the [clearedTransactions] channel. Typically, if the
* underlying channel is a BroadcastChannel (and it should be), then this value is simply [clearedChannel.value]
*/
fun lastCleared(): List<ClearedTransaction>
/**
* Holds the most recent value that was transmitted through the [balances] channel. Typically, if the
* underlying channel is a BroadcastChannel (and it should be), then this value is simply [balanceChannel.value]
*/
fun lastBalance(): Wallet.WalletBalance
/* Operations */
//
// Status
//
/**
* A flag indicating whether this Synchronizer is connected to its lightwalletd server. When false, a UI element
* may want to turn red.
*/
val isConnected: Boolean
/**
* A flag indicating whether this Synchronizer is actively downloading compact blocks. When true, a UI element
* may want to turn yellow.
*/
val isSyncing: Boolean
/**
* A flag indicating whether this Synchronizer is actively decrypting compact blocks, searching for transactions.
* When true, a UI element may want to turn yellow.
*/
val isScanning: Boolean
//
// Operations
//
/**
* Gets the address for the given account.
*
* @param accountId the optional accountId whose address is of interest. By default, the first account is used.
*/
fun getAddress(accountId: Int = 0): String
/**
* Gets the balance info for the given account. In most cases, the stream of balances provided by [balances]
* should be used instead of this function.
*
* @param accountId the optional accountId whose balance is of interest. By default, the first account is used.
* @return a wrapper around the available and total balances.
*/
suspend fun getBalance(accountId: Int = 0): Wallet.WalletBalance
suspend fun getAddress(accountId: Int = 0): String
/**
* Sends zatoshi.
@ -101,7 +122,12 @@ interface Synchronizer {
* @param memo the optional memo to include as part of the transaction.
* @param fromAccountId the optional account id to use. By default, the first account is used.
*/
suspend fun sendToAddress(zatoshi: Long, toAddress: String, memo: String = "", fromAccountId: Int = 0)
suspend fun sendToAddress(
zatoshi: Long,
toAddress: String,
memo: String = "",
fromAccountId: Int = 0
): PendingTransaction
/**
* Attempts to cancel a previously sent transaction. Typically, cancellation is only an option if the transaction
@ -110,5 +136,36 @@ interface Synchronizer {
* @param transaction the transaction to cancel.
* @return true when the cancellation request was successful. False when it is too late to cancel.
*/
fun cancelSend(transaction: ActiveSendTransaction): Boolean
fun cancelSend(transaction: SentTransaction): Boolean
//
// Error Handling
//
/**
* Gets or sets a global error handler. This is a useful hook for handling unexpected critical errors.
*
* @return true when the error has been handled and the Synchronizer should attempt to continue. False when the
* error is unrecoverable and the Synchronizer should [stop].
*/
var onCriticalErrorHandler: ((Throwable?) -> Boolean)?
/**
* An error handler for exceptions during processing. For instance, a block might be missing or a reorg may get
* mishandled or the database may get corrupted.
*
* @return true when the error has been handled and the processor should attempt to continue. False when the
* error is unrecoverable and the processor should [stop].
*/
var onProcessorErrorHandler: ((Throwable?) -> Boolean)?
/**
* An error handler for exceptions while submitting transactions to lightwalletd. For instance, a transaction may
* get rejected because it would be a double-spend or the user might lose their cellphone signal.
*
* @return true when the error has been handled and the sender should attempt to resend. False when the
* error is unrecoverable and the sender should [stop].
*/
var onSubmissionErrorHandler: ((Throwable?) -> Boolean)?
}

View File

@ -0,0 +1,10 @@
package cash.z.wallet.sdk.data
import cash.z.wallet.sdk.entity.EncodedTransaction
interface TransactionEncoder {
/**
* Creates a signed transaction
*/
suspend fun create(zatoshi: Long, toAddress: String, memo: String = ""): EncodedTransaction
}

View File

@ -9,12 +9,12 @@ import cash.z.wallet.sdk.service.LightWalletService
interface TransactionManager {
fun start()
fun stop()
suspend fun manageCreation(encoder: RawTransactionEncoder, zatoshiValue: Long, toAddress: String, memo: String, currentHeight: Int): RawTransaction
suspend fun manageSubmission(service: LightWalletService, pendingTransaction: RawTransaction)
suspend fun getAll(): List<RawTransaction>
suspend fun manageCreation(encoder: TransactionEncoder, zatoshiValue: Long, toAddress: String, memo: String, currentHeight: Int): SignedTransaction
suspend fun manageSubmission(service: LightWalletService, pendingTransaction: SignedTransaction)
suspend fun getAll(): List<SignedTransaction>
}
interface RawTransaction {
val raw: ByteArray?
interface SignedTransaction {
val raw: ByteArray
}
interface TransactionError {

View File

@ -1,10 +1,14 @@
package cash.z.wallet.sdk.data
import cash.z.wallet.sdk.entity.ClearedTransaction
import cash.z.wallet.sdk.entity.Transaction
interface TransactionRepository {
fun lastScannedHeight(): Int
fun isInitialized(): Boolean
suspend fun findTransactionById(txId: Long): Transaction?
suspend fun findTransactionByRawId(rawTransactionId: ByteArray): Transaction?
suspend fun deleteTransactionById(txId: Long)
suspend fun getClearedTransactions(): List<ClearedTransaction>
suspend fun monitorChanges(listener: () -> Unit)
}

View File

@ -1,6 +1,6 @@
package cash.z.wallet.sdk.data
import cash.z.wallet.sdk.db.PendingTransaction
import cash.z.wallet.sdk.entity.PendingTransaction
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.channels.SendChannel
@ -10,10 +10,10 @@ interface TransactionSender {
fun notifyOnChange(channel: SendChannel<List<PendingTransaction>>)
/** only necessary when there is a long delay between starting a transaction and beginning to create it. Like when sweeping a wallet that first needs to be scanned. */
suspend fun prepareTransaction(amount: Long, address: String, memo: String): PendingTransaction?
suspend fun sendPreparedTransaction(encoder: RawTransactionEncoder, tx: PendingTransaction): PendingTransaction
suspend fun sendPreparedTransaction(encoder: TransactionEncoder, tx: PendingTransaction): PendingTransaction
suspend fun cleanupPreparedTransaction(tx: PendingTransaction)
suspend fun sendToAddress(encoder: RawTransactionEncoder, zatoshi: Long, toAddress: String, memo: String = "", fromAccountId: Int = 0): PendingTransaction
suspend fun sendToAddress(encoder: TransactionEncoder, zatoshi: Long, toAddress: String, memo: String = "", fromAccountId: Int = 0): PendingTransaction
suspend fun cancel(existingTransaction: PendingTransaction): Unit?
var onSubmissionError: ((Throwable) -> Unit)?
var onSubmissionError: ((Throwable) -> Boolean)?
}

View File

@ -1,5 +1,8 @@
package cash.z.wallet.sdk.data
import cash.z.wallet.sdk.entity.EncodedTransaction
import cash.z.wallet.sdk.exception.TransactionNotEncodedException
import cash.z.wallet.sdk.exception.TransactionNotFoundException
import cash.z.wallet.sdk.secure.Wallet
import kotlinx.coroutines.Dispatchers.IO
import kotlinx.coroutines.withContext
@ -7,7 +10,7 @@ import kotlinx.coroutines.withContext
class WalletTransactionEncoder(
private val wallet: Wallet,
private val repository: TransactionRepository
) : RawTransactionEncoder {
) : TransactionEncoder {
/**
* Creates a transaction, throwing an exception whenever things are missing. When the provided wallet implementation
@ -19,15 +22,7 @@ class WalletTransactionEncoder(
val transaction = repository.findTransactionById(transactionId)
?: throw TransactionNotFoundException(transactionId)
EncodedTransaction(transaction.transactionId, transaction.raw
?: throw TransactionNotEncodedException(transactionId))
?: throw TransactionNotEncodedException(transactionId)
)
}
}
class TransactionNotFoundException(transactionId: Long) : RuntimeException("Unable to find transactionId " +
"$transactionId in the repository. This means the wallet created a transaction and then returned a row ID " +
"that does not actually exist. This is a scenario where the wallet should have thrown an exception but failed " +
"to do so.")
class TransactionNotEncodedException(transactionId: Long) : RuntimeException("The transaction returned by the wallet," +
" with id $transactionId, does not have any raw data. This is a scenario where the wallet should have thrown" +
" an exception but failed to do so.")

View File

@ -1,10 +1,13 @@
package cash.z.wallet.sdk.db
import androidx.room.Database
import androidx.room.RoomDatabase
import cash.z.wallet.sdk.dao.CompactBlockDao
import androidx.room.*
import cash.z.wallet.sdk.entity.CompactBlock
//
// Database
//
@Database(
entities = [
CompactBlock::class],
@ -13,4 +16,24 @@ import cash.z.wallet.sdk.entity.CompactBlock
)
abstract class CompactBlockDb : RoomDatabase() {
abstract fun complactBlockDao(): CompactBlockDao
}
//
// Data Access Objects
//
@Dao
interface CompactBlockDao {
@Insert(onConflict = OnConflictStrategy.REPLACE)
fun insert(block: CompactBlock)
@Insert(onConflict = OnConflictStrategy.REPLACE)
fun insert(block: List<CompactBlock>)
@Query("DELETE FROM compactblocks WHERE height >= :height")
fun rewindTo(height: Int)
@Query("SELECT MAX(height) FROM compactblocks")
fun latestBlockHeight(): Int
}

View File

@ -1,16 +1,18 @@
package cash.z.wallet.sdk.db
import androidx.room.Database
import androidx.room.RoomDatabase
import cash.z.wallet.sdk.dao.BlockDao
import cash.z.wallet.sdk.dao.TransactionDao
import androidx.room.*
import cash.z.wallet.sdk.entity.*
import cash.z.wallet.sdk.entity.Transaction
//
// Database
//
@Database(
entities = [
Transaction::class,
Block::class,
Note::class,
Received::class,
Account::class,
Sent::class
],
@ -20,4 +22,106 @@ import cash.z.wallet.sdk.entity.*
abstract class DerivedDataDb : RoomDatabase() {
abstract fun transactionDao(): TransactionDao
abstract fun blockDao(): BlockDao
abstract fun receivedDao(): ReceivedDao
abstract fun sentDao(): SentDao
}
//
// Data Access Objects
//
@Dao
interface BlockDao {
@Query("SELECT COUNT(height) FROM blocks")
fun count(): Int
@Query("SELECT MAX(height) FROM blocks")
fun lastScannedHeight(): Int
}
@Dao
interface ReceivedDao {
@Query("SELECT COUNT(tx) FROM received_notes")
fun count(): Int
}
@Dao
interface SentDao {
@Query("SELECT COUNT(tx) FROM sent_notes")
fun count(): Int
}
@Dao
interface TransactionDao {
@Query("SELECT COUNT(id_tx) FROM transactions")
fun count(): Int
@Query("SELECT COUNT(block) FROM transactions WHERE block IS NULL")
fun countUnmined(): Int
@Query("SELECT * FROM transactions WHERE id_tx = :id")
fun findById(id: Long): Transaction?
@Query("SELECT * FROM transactions WHERE txid = :rawTransactionId LIMIT 1")
fun findByRawId(rawTransactionId: ByteArray): Transaction?
@Delete
fun delete(transaction: Transaction)
@Query("DELETE FROM transactions WHERE id_tx = :id")
fun deleteById(id: Long)
/**
* Query sent transactions that have been mined, sorted so the newest data is at the top.
*/
@Query("""
SELECT transactions.id_tx AS id,
transactions.block AS minedHeight,
transactions.tx_index AS transactionIndex,
transactions.txid AS rawTransactionId,
transactions.expiry_height AS expiryHeight,
transactions.raw AS raw,
sent_notes.address AS toAddress,
sent_notes.value AS value,
sent_notes.memo AS memo,
sent_notes.id_note AS noteId,
blocks.time AS blockTimeInSeconds
FROM transactions
LEFT JOIN sent_notes
ON transactions.id_tx = sent_notes.tx
LEFT JOIN blocks
ON transactions.block = blocks.height
WHERE transactions.raw IS NOT NULL
AND minedheight > 0
ORDER BY block IS NOT NULL, height DESC, time DESC, txid DESC
LIMIT :limit
""")
fun getSentTransactions(limit: Int = Int.MAX_VALUE): List<SentTransaction>
/**
* Query transactions, aggregating information on send/receive, sorted carefully so the newest data is at the top
* and the oldest transactions are at the bottom.
*/
@Query("""
SELECT transactions.id_tx AS id,
transactions.block AS minedHeight,
transactions.tx_index AS transactionIndex,
transactions.txid AS rawTransactionId,
received_notes.value AS value,
received_notes.memo AS memo,
received_notes.id_note AS noteId,
blocks.time AS blockTimeInSeconds
FROM transactions
LEFT JOIN received_notes
ON transactions.id_tx = received_notes.tx
LEFT JOIN blocks
ON transactions.block = blocks.height
WHERE received_notes.is_change != 1
ORDER BY minedheight DESC, blocktimeinseconds DESC, id DESC
LIMIT :limit
""")
fun getReceivedTransactions(limit: Int = Int.MAX_VALUE): List<ReceivedTransaction>
}

View File

@ -1,9 +1,12 @@
package cash.z.wallet.sdk.db
import androidx.room.*
import cash.z.wallet.sdk.dao.ClearedTransaction
import cash.z.wallet.sdk.data.RawTransaction
import cash.z.wallet.sdk.ext.masked
import cash.z.wallet.sdk.entity.PendingTransaction
//
// Database
//
@Database(
entities = [
@ -16,6 +19,11 @@ abstract class PendingTransactionDb : RoomDatabase() {
abstract fun pendingTransactionDao(): PendingTransactionDao
}
//
// Data Access Objects
//
@Dao
interface PendingTransactionDao {
@Insert(onConflict = OnConflictStrategy.REPLACE)
@ -23,157 +31,9 @@ interface PendingTransactionDao {
@Delete
fun delete(transaction: PendingTransaction)
//
// /**
// * Query all blocks that are not mined and not expired.
// */
// @Query(
// """
// SELECT id,
// address,
// value,
// memo,
// minedheight,
// expiryheight,
// submitcount,
// encodecount,
// errormessage,
// createtime,
// raw
// FROM pending_transactions
// WHERE minedHeight = -1 and (expiryHeight >= :currentHeight or expiryHeight = -1) and (raw IS NOT NULL)
// ORDER BY createtime
// """
// )
// fun getAllPending(currentHeight: Int): List<PendingTransactionEntity>
@Query("SELECT * from pending_transactions ORDER BY createTime")
fun getAll(): List<PendingTransaction>
}
@Entity(tableName = "pending_transactions")
data class PendingTransaction(
@PrimaryKey(autoGenerate = true)
val id: Long = 0,
val address: String = "",
val value: Long = -1,
val memo: String = "",
val minedHeight: Int = -1,
val expiryHeight: Int = -1,
val submitAttempts: Int = -1,
/** the number of times there was an attempt to encode this transaction */
val encodeAttempts: Int = -1,
val errorMessage: String? = null,
val errorCode: Int? = null,
val createTime: Long = System.currentTimeMillis(),
@ColumnInfo(typeAffinity = ColumnInfo.BLOB)
override val raw: ByteArray? = null,
@ColumnInfo(typeAffinity = ColumnInfo.BLOB)
val txId: ByteArray? = null
) : RawTransaction {
override fun toString(): String {
return if ((raw != null && raw.size > 1) || !address.contains("**mask")) {
copy(
raw = byteArrayOf(1),
address = address.masked()
).toString()
} else {
super.toString()
}
}
override fun equals(other: Any?): Boolean {
if (this === other) return true
if (other !is PendingTransaction) return false
if (id != other.id) return false
if (address != other.address) return false
if (value != other.value) return false
if (memo != other.memo) return false
if (minedHeight != other.minedHeight) return false
if (expiryHeight != other.expiryHeight) return false
if (submitAttempts != other.submitAttempts) return false
if (encodeAttempts != other.encodeAttempts) return false
if (errorMessage != other.errorMessage) return false
if (errorCode != other.errorCode) return false
if (createTime != other.createTime) return false
if (raw != null) {
if (other.raw == null) return false
if (!raw.contentEquals(other.raw)) return false
} else if (other.raw != null) return false
if (txId != null) {
if (other.txId == null) return false
if (!txId.contentEquals(other.txId)) return false
} else if (other.txId != null) return false
return true
}
override fun hashCode(): Int {
var result = id.hashCode()
result = 31 * result + address.hashCode()
result = 31 * result + value.hashCode()
result = 31 * result + memo.hashCode()
result = 31 * result + minedHeight
result = 31 * result + expiryHeight
result = 31 * result + submitAttempts
result = 31 * result + encodeAttempts
result = 31 * result + (errorMessage?.hashCode() ?: 0)
result = 31 * result + (errorCode ?: 0)
result = 31 * result + createTime.hashCode()
result = 31 * result + (raw?.contentHashCode() ?: 0)
result = 31 * result + (txId?.contentHashCode() ?: 0)
return result
}
}
fun PendingTransaction.isSameTxId(other: ClearedTransaction): Boolean {
return txId != null && other.rawTransactionId != null && txId.contentEquals(other.rawTransactionId!!)
}
fun PendingTransaction.isSameTxId(other: PendingTransaction): Boolean {
return txId != null && other.txId != null && txId.contentEquals(other.txId)
}
fun PendingTransaction.isCreating(): Boolean {
return raw == null && submitAttempts <= 0 && !isFailedSubmit() && !isFailedEncoding()
}
fun PendingTransaction.isFailedEncoding(): Boolean {
return raw == null && encodeAttempts > 0
}
fun PendingTransaction.isFailedSubmit(): Boolean {
return errorMessage != null || (errorCode != null && errorCode < 0)
}
fun PendingTransaction.isFailure(): Boolean {
return isFailedEncoding() || isFailedSubmit()
}
fun PendingTransaction.isSubmitted(): Boolean {
return submitAttempts > 0
}
fun PendingTransaction.isMined(): Boolean {
return minedHeight > 0
}
fun PendingTransaction.isPending(currentHeight: Int = -1): Boolean {
// not mined and not expired and successfully created
return !isSubmitSuccess() && minedHeight == -1 && (expiryHeight == -1 || expiryHeight > currentHeight) && raw != null
}
fun PendingTransaction.isSubmitSuccess(): Boolean {
return submitAttempts > 0 && (errorCode != null && errorCode >= 0) && errorMessage == null
}
/**
* The amount of time remaining until this transaction is stale
*/
fun PendingTransaction.ttl(): Long {
return (60L * 2L) - (System.currentTimeMillis()/1000 - createTime)
}

View File

@ -27,7 +27,7 @@ import androidx.room.ForeignKey
onDelete = ForeignKey.CASCADE
)]
)
data class Note(
data class Received(
@ColumnInfo(name = "id_note")
val id: Int = 0,
@ -66,7 +66,7 @@ data class Note(
override fun equals(other: Any?): Boolean {
if (this === other) return true
return (other is Note)
return (other is Received)
&& id == other.id
&& transactionId == other.transactionId
&& outputIndex == other.outputIndex

View File

@ -1,48 +0,0 @@
package cash.z.wallet.sdk.entity
import androidx.room.ColumnInfo
import androidx.room.Entity
import androidx.room.ForeignKey
import org.jetbrains.annotations.NotNull
@Entity(
primaryKeys = ["id_tx"], tableName = "transactions",
foreignKeys = [ForeignKey(
entity = Block::class,
parentColumns = ["height"],
childColumns = ["block"],
onDelete = ForeignKey.CASCADE
)]
)
data class Transaction(
@ColumnInfo(name = "id_tx")
val id: Long,
@ColumnInfo(typeAffinity = ColumnInfo.BLOB, name = "txid")
@NotNull
val transactionId: ByteArray,
val block: Int,
@ColumnInfo(typeAffinity = ColumnInfo.BLOB)
val raw: ByteArray?
) {
override fun equals(other: Any?): Boolean {
if (this === other) return true
return (other is Transaction)
&& id == other.id
&& transactionId.contentEquals(other.transactionId)
&& block == other.block
&& ((raw == null && other.raw == null) || (raw != null && other.raw != null && raw.contentEquals(other.raw)))
}
override fun hashCode(): Int {
var result = id.toInt()
result = 31 * result + transactionId.contentHashCode()
result = 31 * result + block
result = 31 * result + (raw?.contentHashCode() ?: 0)
return result
}
}

View File

@ -0,0 +1,309 @@
package cash.z.wallet.sdk.entity
import androidx.room.ColumnInfo
import androidx.room.Entity
import androidx.room.ForeignKey
import androidx.room.PrimaryKey
import cash.z.wallet.sdk.data.SignedTransaction
import org.jetbrains.annotations.NotNull
//
// Entities
//
@Entity(
primaryKeys = ["id_tx"], tableName = "transactions",
foreignKeys = [ForeignKey(
entity = Block::class,
parentColumns = ["height"],
childColumns = ["block"],
onDelete = ForeignKey.CASCADE
)]
)
data class Transaction(
@ColumnInfo(name = "id_tx")
val id: Long,
@ColumnInfo(typeAffinity = ColumnInfo.BLOB, name = "txid")
@NotNull
val transactionId: ByteArray,
@ColumnInfo(name = "tx_index")
val transactionIndex: Int,
@ColumnInfo(name = "expiry_height")
val expiryHeight: Int,
@ColumnInfo(name = "block")
val minedHeight: Int,
@ColumnInfo(typeAffinity = ColumnInfo.BLOB)
val raw: ByteArray?
) {
override fun equals(other: Any?): Boolean {
if (this === other) return true
if (other !is Transaction) return false
if (id != other.id) return false
if (!transactionId.contentEquals(other.transactionId)) return false
if (transactionIndex != other.transactionIndex) return false
if (expiryHeight != other.expiryHeight) return false
if (minedHeight != other.minedHeight) return false
if (raw != null) {
if (other.raw == null) return false
if (!raw.contentEquals(other.raw)) return false
} else if (other.raw != null) return false
return true
}
override fun hashCode(): Int {
var result = id.hashCode()
result = 31 * result + transactionId.contentHashCode()
result = 31 * result + transactionIndex
result = 31 * result + expiryHeight
result = 31 * result + minedHeight
result = 31 * result + (raw?.contentHashCode() ?: 0)
return result
}
}
@Entity(tableName = "pending_transactions")
data class PendingTransaction(
@PrimaryKey(autoGenerate = true)
val id: Long = 0,
val toAddress: String = "",
val value: Long = -1,
val memo: String? = null,
val minedHeight: Int = -1,
val expiryHeight: Int = -1,
val encodeAttempts: Int = -1,
val submitAttempts: Int = -1,
val errorMessage: String? = null,
val errorCode: Int? = null,
val createTime: Long = System.currentTimeMillis(),
@ColumnInfo(typeAffinity = ColumnInfo.BLOB)
override val raw: ByteArray = ByteArray(0),
@ColumnInfo(typeAffinity = ColumnInfo.BLOB)
val rawTransactionId: ByteArray? = null
) : SignedTransaction {
override fun equals(other: Any?): Boolean {
if (this === other) return true
if (other !is PendingTransaction) return false
if (id != other.id) return false
if (toAddress != other.toAddress) return false
if (value != other.value) return false
if (memo != other.memo) return false
if (minedHeight != other.minedHeight) return false
if (expiryHeight != other.expiryHeight) return false
if (encodeAttempts != other.encodeAttempts) return false
if (submitAttempts != other.submitAttempts) return false
if (errorMessage != other.errorMessage) return false
if (errorCode != other.errorCode) return false
if (createTime != other.createTime) return false
if (!raw.contentEquals(other.raw)) return false
if (rawTransactionId != null) {
if (other.rawTransactionId == null) return false
if (!rawTransactionId.contentEquals(other.rawTransactionId)) return false
} else if (other.rawTransactionId != null) return false
return true
}
override fun hashCode(): Int {
var result = id.hashCode()
result = 31 * result + toAddress.hashCode()
result = 31 * result + value.hashCode()
result = 31 * result + memo.hashCode()
result = 31 * result + minedHeight
result = 31 * result + expiryHeight
result = 31 * result + encodeAttempts
result = 31 * result + submitAttempts
result = 31 * result + (errorMessage?.hashCode() ?: 0)
result = 31 * result + (errorCode ?: 0)
result = 31 * result + createTime.hashCode()
result = 31 * result + raw.contentHashCode()
result = 31 * result + (rawTransactionId?.contentHashCode() ?: 0)
return result
}
}
//
// Query Objects
//
/**
* Parent type for transactions that have been mined. This is useful for putting all transactions in one list for things
* like history. A mined tx should have all properties, except possibly a memo.
*/
interface ClearedTransaction {
val id: Long
val value: Long
// val memo: String? --> we don't yet have a good way of privately retrieving incoming memos so let's make that clear
val noteId: Long
val minedHeight: Int
val blockTimeInSeconds: Long
val transactionIndex: Int
val rawTransactionId: ByteArray
}
/**
* A mined, inbound shielded transaction. Since this is a [ClearedTransaction], it represents data on the blockchain.
*/
data class ReceivedTransaction(
override val id: Long = 0L,
override val value: Long = 0L,
// override val memo: String? = null, --> for now we don't have a good way of privately retrieving incoming memos so let's make that clear by omitting this property
override val noteId: Long = 0L,
override val blockTimeInSeconds: Long = 0L,
override val minedHeight: Int = -1,
override val transactionIndex: Int,
override val rawTransactionId: ByteArray = ByteArray(0)
) : ClearedTransaction {
override fun equals(other: Any?): Boolean {
if (this === other) return true
if (other !is ReceivedTransaction) return false
if (id != other.id) return false
if (value != other.value) return false
if (noteId != other.noteId) return false
if (blockTimeInSeconds != other.blockTimeInSeconds) return false
if (minedHeight != other.minedHeight) return false
if (transactionIndex != other.transactionIndex) return false
if (!rawTransactionId.contentEquals(other.rawTransactionId)) return false
return true
}
override fun hashCode(): Int {
var result = id.hashCode()
result = 31 * result + value.hashCode()
result = 31 * result + noteId.hashCode()
result = 31 * result + blockTimeInSeconds.hashCode()
result = 31 * result + minedHeight
result = 31 * result + transactionIndex
result = 31 * result + rawTransactionId.contentHashCode()
return result
}
}
/**
* A mined, outbound shielded transaction. Since this is a [ClearedTransaction], it represents data on the blockchain.
*/
data class SentTransaction(
override val id: Long = 0L,
override val value: Long = 0L,
override val noteId: Long = 0L,
override val blockTimeInSeconds: Long = 0L,
override val minedHeight: Int = -1,
override val transactionIndex: Int,
override val rawTransactionId: ByteArray = ByteArray(0),
// sent transactions have memos because we create them and don't have to worry about P.I.R.
val memo: String? = null,
val toAddress: String = "",
val expiryHeight: Int = -1,
override val raw: ByteArray = ByteArray(0)
) : ClearedTransaction, SignedTransaction {
override fun equals(other: Any?): Boolean {
if (this === other) return true
if (other !is SentTransaction) return false
if (id != other.id) return false
if (value != other.value) return false
if (noteId != other.noteId) return false
if (blockTimeInSeconds != other.blockTimeInSeconds) return false
if (minedHeight != other.minedHeight) return false
if (transactionIndex != other.transactionIndex) return false
if (!rawTransactionId.contentEquals(other.rawTransactionId)) return false
if (memo != other.memo) return false
if (toAddress != other.toAddress) return false
if (expiryHeight != other.expiryHeight) return false
if (!raw.contentEquals(other.raw)) return false
return true
}
override fun hashCode(): Int {
var result = id.hashCode()
result = 31 * result + value.hashCode()
result = 31 * result + noteId.hashCode()
result = 31 * result + blockTimeInSeconds.hashCode()
result = 31 * result + minedHeight
result = 31 * result + transactionIndex
result = 31 * result + rawTransactionId.contentHashCode()
result = 31 * result + (memo?.hashCode() ?: 0)
result = 31 * result + toAddress.hashCode()
result = 31 * result + expiryHeight
result = 31 * result + raw.contentHashCode()
return result
}
}
data class EncodedTransaction(val txId: ByteArray, val raw: ByteArray) {
override fun equals(other: Any?): Boolean {
if (this === other) return true
if (other !is EncodedTransaction) return false
if (!txId.contentEquals(other.txId)) return false
if (!raw.contentEquals(other.raw)) return false
return true
}
override fun hashCode(): Int {
var result = txId.contentHashCode()
result = 31 * result + raw.contentHashCode()
return result
}
}
//
// Extension-oriented design
//
fun PendingTransaction.isSameTxId(other: ClearedTransaction): Boolean {
return rawTransactionId != null && other.rawTransactionId != null && rawTransactionId.contentEquals(other.rawTransactionId)
}
fun PendingTransaction.isSameTxId(other: PendingTransaction): Boolean {
return rawTransactionId != null && other.rawTransactionId != null && rawTransactionId.contentEquals(other.rawTransactionId)
}
fun PendingTransaction.isCreating(): Boolean {
return raw.isEmpty() && submitAttempts <= 0 && !isFailedSubmit() && !isFailedEncoding()
}
fun PendingTransaction.isFailedEncoding(): Boolean {
return raw.isEmpty() && encodeAttempts > 0
}
fun PendingTransaction.isFailedSubmit(): Boolean {
return errorMessage != null || (errorCode != null && errorCode < 0)
}
fun PendingTransaction.isFailure(): Boolean {
return isFailedEncoding() || isFailedSubmit()
}
fun PendingTransaction.isMined(): Boolean {
return minedHeight > 0
}
fun PendingTransaction.isSubmitted(): Boolean {
return submitAttempts > 0
}
fun PendingTransaction.isPending(currentHeight: Int = -1): Boolean {
// not mined and not expired and successfully created
return !isSubmitSuccess() && minedHeight == -1 && (expiryHeight == -1 || expiryHeight > currentHeight) && raw != null
}
fun PendingTransaction.isSubmitSuccess(): Boolean {
return submitAttempts > 0 && (errorCode != null && errorCode >= 0) && errorMessage == null
}

View File

@ -55,4 +55,14 @@ sealed class WalletException(message: String, cause: Throwable? = null) : Runtim
class AlreadyInitializedException(cause: Throwable) : WalletException("Failed to initialize the blocks table" +
" because it already exists.", cause)
class FalseStart(cause: Throwable?) : WalletException("Failed to initialize wallet due to: $cause", cause)
}
}
class TransactionNotFoundException(transactionId: Long) : RuntimeException("Unable to find transactionId " +
"$transactionId in the repository. This means the wallet created a transaction and then returned a row ID " +
"that does not actually exist. This is a scenario where the wallet should have thrown an exception but failed " +
"to do so.")
class TransactionNotEncodedException(transactionId: Long) : RuntimeException("The transaction returned by the wallet," +
" with id $transactionId, does not have any raw data. This is a scenario where the wallet should have thrown" +
" an exception but failed to do so.")