Pulled over classes from Zcon1 app.

Next step is to refactor and simplify.
This commit is contained in:
Kevin Gorham 2019-07-10 14:12:32 -04:00 committed by Kevin Gorham
parent 1de3ee9ed5
commit 862a4be480
15 changed files with 1034 additions and 16 deletions

View File

@ -18,13 +18,12 @@ object Injection {
App.instance.getDatabasePath(dataDbName).absoluteFile.delete()
return Wallet(
App.instance,
provideRustBackend(),
App.instance.getDatabasePath(dataDbName).absolutePath,
App.instance.cacheDir.absolutePath,
arrayOf(0),
seedProvider,
spendingKeyProvider
context = App.instance,
birthday = Wallet.loadBirthdayFromAssets(App.instance, 421720),
rustBackend = provideRustBackend(),
dataDbName = dataDbName,
seedProvider = seedProvider,
spendingKeyProvider = spendingKeyProvider
)
}

View File

@ -7,7 +7,6 @@ import androidx.appcompat.app.AppCompatActivity
import cash.z.wallet.sdk.data.SampleSeedProvider
import cash.z.wallet.sdk.data.TroubleshootingTwig
import cash.z.wallet.sdk.data.Twig
import cash.z.wallet.sdk.jni.RustBackend
import cash.z.wallet.sdk.jni.RustBackendWelding
import cash.z.wallet.sdk.secure.Wallet
import kotlinx.coroutines.runBlocking

View File

@ -9,7 +9,7 @@ buildscript {
}
dependencies {
classpath 'com.android.tools.build:gradle:3.5.0-beta03'
classpath 'com.android.tools.build:gradle:3.5.0-beta04'
classpath "org.jetbrains.kotlin:kotlin-gradle-plugin:${versions.kotlin}"
}
}

View File

@ -21,11 +21,16 @@ import org.junit.Test
import java.io.IOException
import kotlin.properties.Delegates
/**
* A tool for checking transactions since the given birthday and printing balances. This was useful for the Zcon1 app to
* ensure that we loaded all the pokerchips correctly.
*/
@ExperimentalCoroutinesApi
class BalancePrinterUtil {
private val host = "34.65.230.46"
private val downloadBatchSize = 9_000
private val birthday = 523240
private val context = InstrumentationRegistry.getInstrumentation().context
@ -34,7 +39,6 @@ class BalancePrinterUtil {
private val cacheDbPath = context.getDatabasePath("BalanceUtilCache.db").absolutePath
private val dataDbPath = context.getDatabasePath("BalanceUtilData.db").absolutePath
private val rustBackend = RustBackend()
private val birthday = 523240
private val downloader = CompactBlockDownloader(
LightWalletGrpcService(context, host),
@ -59,15 +63,14 @@ class BalancePrinterUtil {
assertEquals(-1, error)
}
private fun deleteDb() {
context.getDatabasePath(dataDbName).absoluteFile.delete()
private fun deleteDb(dbName: String) {
context.getDatabasePath(dbName).absoluteFile.delete()
}
@Test
fun printBalances() = runBlocking {
readLines().collect { seed ->
deleteDb()
deleteDb(dataDbName)
initWallet(seed)
twig("scanning blocks for seed <$seed>")
rustBackend.scanBlocks(cacheDbPath, dataDbPath)

View File

@ -0,0 +1,9 @@
package cash.z.android.wallet.data
import kotlinx.coroutines.channels.ConflatedBroadcastChannel
class ChannelListValueProvider<T>(val channel: ConflatedBroadcastChannel<List<T>>) {
fun getLatestValue(): List<T> {
return if (channel.isClosedForSend) listOf() else channel.value
}
}

View File

@ -0,0 +1,155 @@
package cash.z.wallet.sdk.data
import android.content.Context
import androidx.room.Room
import androidx.room.RoomDatabase
import cash.z.wallet.sdk.db.PendingTransactionDao
import cash.z.wallet.sdk.db.PendingTransactionDb
import cash.z.wallet.sdk.db.PendingTransactionEntity
import cash.z.wallet.sdk.ext.EXPIRY_OFFSET
import cash.z.wallet.sdk.service.LightWalletService
import kotlinx.coroutines.Dispatchers.IO
import kotlinx.coroutines.withContext
/**
* Facilitates persistent attempts to ensure a transaction occurs.
*/
// TODO: consider having the manager register the fail listeners rather than having that responsibility spread elsewhere (synchronizer and the broom)
class PersistentTransactionManager(private val db: PendingTransactionDb) : TransactionManager {
private lateinit var dao: PendingTransactionDao
/**
* Constructor that creates the database and then executes a callback on it.
*/
constructor(
appContext: Context,
dataDbName: String = "PendingTransactions.db",
dbCallback: (PendingTransactionDb) -> Unit = {}
) : this(
Room.databaseBuilder(appContext, PendingTransactionDb::class.java, dataDbName)
.setJournalMode(RoomDatabase.JournalMode.TRUNCATE)
.build()
) {
dbCallback(db)
}
override fun start() {
twig("TransactionManager starting")
dao = db.pendingTransactionDao()
}
override fun stop() {
twig("TransactionManager stopping")
db.close()
}
suspend fun initPlaceholder(
zatoshiValue: Long,
toAddress: String,
memo: String
): PendingTransactionEntity? = withContext(IO) {
twig("constructing a placeholder transaction")
val tx = initTransaction(zatoshiValue, toAddress, memo)
twig("done constructing a placeholder transaction")
try {
twig("inserting tx into DB: $tx")
val insertId = dao.insert(tx)
twig("insert returned id of $insertId")
tx.copy(id = insertId)
} catch (t: Throwable) {
val message = "failed initialize a placeholder transaction due to : ${t.message} caused by: ${t.cause}"
twig(message)
null
} finally {
twig("done constructing a placeholder transaction")
}
}
override suspend fun manageCreation(
encoder: RawTransactionEncoder,
zatoshiValue: Long,
toAddress: String,
memo: String,
currentHeight: Int
): PendingTransactionEntity = manageCreation(encoder, initTransaction(zatoshiValue, toAddress, memo), currentHeight)
suspend fun manageCreation(
encoder: RawTransactionEncoder,
transaction: PendingTransactionEntity,
currentHeight: Int
): PendingTransactionEntity = withContext(IO){
twig("managing the creation of a transaction")
var tx = transaction.copy(expiryHeight = if (currentHeight == -1) -1 else currentHeight + EXPIRY_OFFSET)
try {
twig("beginning to encode transaction with : $encoder")
val encodedTx = encoder.create(tx.value, tx.address, tx.memo)
twig("successfully encoded transaction for ${tx.memo}!!")
tx = tx.copy(raw = encodedTx.raw, txId = encodedTx.txId)
tx
} catch (t: Throwable) {
val message = "failed to encode transaction due to : ${t.message} caused by: ${t.cause}"
twig(message)
message
tx = tx.copy(errorMessage = message)
tx
} finally {
tx = tx.copy(encodeAttempts = Math.max(1, tx.encodeAttempts + 1))
twig("inserting tx into DB: $tx")
dao.insert(tx)
twig("successfully inserted TX into DB")
tx
}
}
override suspend fun manageSubmission(service: LightWalletService, pendingTransaction: RawTransaction) {
var tx = pendingTransaction as PendingTransactionEntity
try {
twig("managing the preparation to submit transaction memo: ${tx.memo} amount: ${tx.value}")
val response = service.submitTransaction(pendingTransaction.raw!!)
twig("management of submit transaction completed with response: ${response.errorCode}: ${response.errorMessage}")
if (response.errorCode < 0) {
tx = tx.copy(errorMessage = response.errorMessage, errorCode = response.errorCode)
} else {
tx = tx.copy(errorMessage = null, errorCode = response.errorCode)
}
} catch (t: Throwable) {
twig("error while managing submitting transaction: ${t.message} caused by: ${t.cause}")
} finally {
tx = tx.copy(submitAttempts = Math.max(1, tx.submitAttempts + 1))
dao.insert(tx)
}
}
override suspend fun getAll(): List<PendingTransactionEntity> = withContext(IO) {
dao.getAll()
}
private fun initTransaction(
value: Long,
toAddress: String,
memo: String,
currentHeight: Int = -1
): PendingTransactionEntity {
return PendingTransactionEntity(
value = value,
address = toAddress,
memo = memo,
expiryHeight = if (currentHeight == -1) -1 else currentHeight + EXPIRY_OFFSET
)
}
suspend fun manageMined(pendingTx: PendingTransactionEntity, matchingMinedTx: PendingTransactionEntity) = withContext(IO) {
twig("a pending transaction has been mined!")
val tx = pendingTx.copy(minedHeight = matchingMinedTx.minedHeight)
dao.insert(tx)
}
/**
* Remove a transaction and pretend it never existed.
*/
suspend fun abortTransaction(existingTransaction: PendingTransactionEntity) = withContext(IO) {
dao.delete(existingTransaction)
}
}

View File

@ -0,0 +1,299 @@
package cash.z.wallet.sdk.data
import cash.z.wallet.sdk.dao.WalletTransaction
import cash.z.wallet.sdk.db.PendingTransactionEntity
import cash.z.wallet.sdk.db.isMined
import cash.z.wallet.sdk.db.isPending
import cash.z.wallet.sdk.db.isSameTxId
import cash.z.wallet.sdk.service.LightWalletService
import kotlinx.coroutines.*
import kotlinx.coroutines.Dispatchers.IO
import kotlinx.coroutines.channels.SendChannel
import kotlinx.coroutines.channels.actor
class PersistentTransactionSender (
private val manager: TransactionManager,
private val service: LightWalletService,
private val clearedTxProvider: ClearedTransactionProvider
) : TransactionSender {
private lateinit var channel: SendChannel<TransactionUpdateRequest>
private var monitoringJob: Job? = null
private val initialMonitorDelay = 45_000L
private var listenerChannel: SendChannel<List<PendingTransactionEntity>>? = null
override var onSubmissionError: ((Throwable) -> Unit)? = null
fun CoroutineScope.requestUpdate(triggerSend: Boolean) = launch {
twig("requesting update: $triggerSend")
if (!channel.isClosedForSend) {
twig("submitting request")
channel.send(if (triggerSend) SubmitPendingTx else RefreshSentTx)
twig("done submitting request")
} else {
twig("request ignored because the channel is closed for send!!!")
}
}
/**
* Start an actor that listens for signals about what to do with transactions. This actor's lifespan is within the
* provided [scope] and it will live until the scope is cancelled.
*/
private fun CoroutineScope.startActor() = actor<TransactionUpdateRequest> {
var pendingTransactionDao = 0 // actor state:
for (msg in channel) { // iterate over incoming messages
twig("actor received message: ${msg.javaClass.simpleName}")
when (msg) {
is SubmitPendingTx -> updatePendingTransactions()
is RefreshSentTx -> refreshSentTransactions()
}
}
}
private fun CoroutineScope.startMonitor() = launch {
delay(5000) // todo see if we need a formal initial delay
while (!channel.isClosedForSend && isActive) {
requestUpdate(true)
delay(calculateDelay())
}
twig("TransactionMonitor stopping!")
}
private fun calculateDelay(): Long {
return initialMonitorDelay
}
override fun start(scope: CoroutineScope) {
twig("TransactionMonitor starting!")
channel = scope.startActor()
monitoringJob?.cancel()
monitoringJob = scope.startMonitor()
}
override fun stop() {
channel.close()
monitoringJob?.cancel()?.also { monitoringJob = null }
manager.stop()
}
override fun notifyOnChange(channel: SendChannel<List<PendingTransactionEntity>>) {
if (channel != null) twig("warning: listener channel was not null but it probably should have been. Something else was listening with $channel!")
listenerChannel = channel
}
/**
* Generates newly persisted information about a transaction so that other processes can send.
*/
override suspend fun sendToAddress(
encoder: RawTransactionEncoder,
zatoshi: Long,
toAddress: String,
memo: String,
fromAccountId: Int
): PendingTransactionEntity = withContext(IO) {
val currentHeight = service.safeLatestBlockHeight()
(manager as PersistentTransactionManager).manageCreation(encoder, zatoshi, toAddress, memo, currentHeight).also {
requestUpdate(true)
}
}
override suspend fun prepareTransaction(
zatoshiValue: Long,
address: String,
memo: String
): PendingTransactionEntity? = withContext(IO) {
(manager as PersistentTransactionManager).initPlaceholder(zatoshiValue, address, memo).also {
// update UI to show what we've just created. No need to submit, it has no raw data yet!
requestUpdate(false)
}
}
override suspend fun sendPreparedTransaction(
encoder: RawTransactionEncoder,
tx: PendingTransactionEntity
): PendingTransactionEntity = withContext(IO) {
val currentHeight = service.safeLatestBlockHeight()
(manager as PersistentTransactionManager).manageCreation(encoder, tx, currentHeight).also {
// submit what we've just created
requestUpdate(true)
}
}
override suspend fun cleanupPreparedTransaction(tx: PendingTransactionEntity) {
if (tx.raw == null) {
(manager as PersistentTransactionManager).abortTransaction(tx)
}
}
// TODO: get this from the channel instead
var previousSentTxs: List<PendingTransactionEntity>? = null
private suspend fun notifyIfChanged(currentSentTxs: List<PendingTransactionEntity>) = withContext(IO) {
twig("notifyIfChanged: listener null? ${listenerChannel == null} closed? ${listenerChannel?.isClosedForSend}")
if (hasChanged(previousSentTxs, currentSentTxs) && listenerChannel?.isClosedForSend != true) {
twig("START notifying listenerChannel of changed txs")
listenerChannel?.send(currentSentTxs)
twig("DONE notifying listenerChannel of changed txs")
previousSentTxs = currentSentTxs
} else {
twig("notifyIfChanged: did nothing because ${if(listenerChannel?.isClosedForSend == true) "the channel is closed." else "nothing changed."}")
}
}
override suspend fun cancel(existingTransaction: PendingTransactionEntity) = withContext(IO) {
(manager as PersistentTransactionManager).abortTransaction(existingTransaction). also {
requestUpdate(false)
}
}
private fun hasChanged(
previousSents: List<PendingTransactionEntity>?,
currentSents: List<PendingTransactionEntity>
): Boolean {
// shortcuts first
if (currentSents.isEmpty() && previousSents == null) return false.also { twig("checking pending txs: detected nothing happened yet") } // if nothing has happened, that doesn't count as a change
if (previousSents == null) return true.also { twig("checking pending txs: detected first set of txs!") } // the first set of transactions is automatically a change
if (previousSents.size != currentSents.size) return true.also { twig("checking pending txs: detected size change from ${previousSents.size} to ${currentSents.size}") } // can't be the same and have different sizes, duh
for (tx in currentSents) {
if (!previousSents.contains(tx)) return true.also { twig("checking pending txs: detected change for $tx") }
}
return false.also { twig("checking pending txs: detected no changes in pending txs") }
}
/**
* Check on all sent transactions and if they've changed, notify listeners. This method can be called proactively
* when anything interesting has occurred with a transaction (via [requestUpdate]).
*/
private suspend fun refreshSentTransactions(): List<PendingTransactionEntity> = withContext(IO) {
twig("refreshing all sent transactions")
val allSentTransactions = (manager as PersistentTransactionManager).getAll() // TODO: make this crash and catch error gracefully
notifyIfChanged(allSentTransactions)
allSentTransactions
}
/**
* Submit all pending transactions that have not expired.
*/
private suspend fun updatePendingTransactions() = withContext(IO) {
try {
twig("received request to submit pending transactions")
val allTransactions = refreshSentTransactions()
var pendingCount = 0
val currentHeight = service.safeLatestBlockHeight()
allTransactions.filter { !it.isMined() }.forEach { tx ->
if (tx.isPending(currentHeight)) {
pendingCount++
try {
manager.manageSubmission(service, tx)
} catch (t: Throwable) {
twig("Warning: manageSubmission failed")
onSubmissionError?.invoke(t)
}
} else {
findMatchingClearedTx(tx)?.let {
twig("matching cleared transaction found!")
(manager as PersistentTransactionManager).manageMined(tx, it)
refreshSentTransactions()
}
}
}
twig("given current height $currentHeight, we found $pendingCount pending txs to submit")
} catch (t: Throwable) {
twig("Error during updatePendingTransactions: $t caused by ${t.cause}")
}
}
private fun findMatchingClearedTx(tx: PendingTransactionEntity): PendingTransactionEntity? {
return clearedTxProvider.getCleared().firstOrNull { clearedTx ->
// TODO: remove this troubleshooting code
if (tx.isSameTxId(clearedTx)) {
twig("found a matching cleared transaction with id: ${clearedTx.id}...")
if (clearedTx.height.let { it ?: 0 } <= 0) {
twig("...but it didn't have a mined height. That probably shouldn't happen so investigate this.")
false
} else {
true
}
} else false
}.toPendingTransactionEntity()
}
}
private fun WalletTransaction?.toPendingTransactionEntity(): PendingTransactionEntity? {
if(this == null) return null
return PendingTransactionEntity(
address = address ?: "",
value = value,
memo = memo ?: "",
minedHeight = height ?: -1,
txId = rawTransactionId
)
}
private fun LightWalletService.safeLatestBlockHeight(): Int {
return try {
getLatestBlockHeight()
} catch (t: Throwable) {
twig("Warning: LightWalletService failed to return the latest height and we are returning -1 instead.")
-1
}
}
sealed class TransactionUpdateRequest
object SubmitPendingTx : TransactionUpdateRequest()
object RefreshSentTx : TransactionUpdateRequest()
private fun String?.toTxError(): TransactionError {
return FailedTransaction("$this")
}
data class FailedTransaction(override val message: String) : TransactionError
/*
states:
** creating
** failed to create
CREATED
EXPIRED
MINED
SUBMITTED
INVALID
** attempting submission
** attempted submission
bookkeeper, register, treasurer, mint, ledger
private fun checkTx(transactionId: Long) {
if (transactionId < 0) {
throw SweepException.Creation
} else {
twig("successfully created transaction!")
}
}
private fun checkRawTx(transactionRaw: ByteArray?) {
if (transactionRaw == null) {
throw SweepException.Disappeared
} else {
twig("found raw transaction in the dataDb")
}
}
private fun checkResponse(response: Service.SendResponse) {
if (response.errorCode < 0) {
throw SweepException.IncompletePass(response)
} else {
twig("successfully submitted. error code: ${response.errorCode}")
}
}
sealed class SweepException(val errorMessage: String) : RuntimeException(errorMessage) {
object Creation : SweepException("failed to create raw transaction")
object Disappeared : SweepException("unable to find a matching raw transaction. This means the rust backend said it created a TX but when we looked for it in the DB it was missing!")
class IncompletePass(response: Service.SendResponse) : SweepException("submit failed with error code: ${response.errorCode} and message ${response.errorMessage}")
}
*/

View File

@ -0,0 +1,26 @@
package cash.z.wallet.sdk.data
interface RawTransactionEncoder {
/**
* Creates a raw transaction that is unsigned.
*/
suspend fun create(zatoshi: Long, toAddress: String, memo: String = ""): EncodedTransaction
}
data class EncodedTransaction(val txId: ByteArray, val raw: ByteArray) {
override fun equals(other: Any?): Boolean {
if (this === other) return true
if (other !is EncodedTransaction) return false
if (!txId.contentEquals(other.txId)) return false
if (!raw.contentEquals(other.raw)) return false
return true
}
override fun hashCode(): Int {
var result = txId.contentHashCode()
result = 31 * result + raw.contentHashCode()
return result
}
}

View File

@ -0,0 +1,275 @@
package cash.z.wallet.sdk.data
import cash.z.android.wallet.data.ChannelListValueProvider
import cash.z.wallet.sdk.block.CompactBlockProcessor
import cash.z.wallet.sdk.dao.WalletTransaction
import cash.z.wallet.sdk.db.PendingTransactionEntity
import cash.z.wallet.sdk.exception.WalletException
import cash.z.wallet.sdk.secure.Wallet
import kotlinx.coroutines.*
import kotlinx.coroutines.Dispatchers.IO
import kotlinx.coroutines.channels.ConflatedBroadcastChannel
import kotlinx.coroutines.channels.ReceiveChannel
import kotlin.coroutines.CoroutineContext
/**
* A synchronizer that attempts to remain operational, despite any number of errors that can occur.
*/
@ExperimentalCoroutinesApi
class StableSynchronizer (
private val wallet: Wallet,
private val ledger: PollingTransactionRepository,
private val sender: TransactionSender,
private val processor: CompactBlockProcessor,
private val encoder: RawTransactionEncoder,
private val clearedTransactionProvider: ChannelListValueProvider<WalletTransaction>
) : DataSyncronizer {
/** This listener will not be called on the main thread. So it will need to switch to do anything with UI, like dialogs */
override var onCriticalErrorListener: ((Throwable) -> Boolean)? = null
private var syncJob: Job? = null
private var clearedJob: Job? = null
private var pendingJob: Job? = null
private var progressJob: Job? = null
private val balanceChannel = ConflatedBroadcastChannel(Wallet.WalletBalance())
private val progressChannel = ConflatedBroadcastChannel(0)
private val pendingChannel = ConflatedBroadcastChannel<List<PendingTransactionEntity>>(listOf())
private val clearedChannel = clearedTransactionProvider.channel
// TODO: clean these up and turn them into delegates
internal val pendingProvider = ChannelListValueProvider(pendingChannel)
override val isConnected: Boolean get() = processor.isConnected
override val isSyncing: Boolean get() = processor.isSyncing
override val isScanning: Boolean get() = processor.isScanning
// TODO: find a better way to expose the lifecycle of this synchronizer (right now this is only used by the zcon1 app's SendReceiver)
lateinit var internalScope: CoroutineScope
override fun start(scope: CoroutineScope) {
internalScope = scope
twig("Starting sender!")
try {
wallet.initialize()
} catch (e: WalletException.AlreadyInitializedException) {
twig("Warning: wallet already initialized but this is safe to ignore " +
"because the SDK now automatically detects where to start downloading.")
} catch (f: WalletException.FalseStart) {
if (recoverFrom(f)) {
twig("Warning: had a wallet init error but we recovered!")
} else {
twig("Error: false start while initializing wallet!")
}
}
sender.onSubmissionError = ::onFailedSend
sender.start(scope)
progressJob = scope.launchProgressMonitor()
pendingJob = scope.launchPendingMonitor()
clearedJob = scope.launchClearedMonitor()
syncJob = scope.onReady()
}
private fun recoverFrom(error: WalletException.FalseStart): Boolean {
if (error.message?.contains("unable to open database file") == true
|| error.message?.contains("table blocks has no column named") == true) {
//TODO: these errors are fatal and we need to delete the database and start over
twig("Database should be deleted and we should start over")
}
return true
}
// TODO: consider removing the need for stopping by wrapping everything in a job that gets cancelled
// probably just returning the job from start
override fun stop() {
sender.stop()
// TODO: consider wrapping these in another object that helps with cleanup like job.toScopedJob()
// it would keep a reference to the job and then clear that reference when the scope ends
syncJob?.cancel().also { syncJob = null }
pendingJob?.cancel().also { pendingJob = null }
clearedJob?.cancel().also { clearedJob = null }
progressJob?.cancel().also { progressJob = null }
}
//
// Monitors
//
// begin the monitor that will update the balance proactively whenever we're done a large scan
private fun CoroutineScope.launchProgressMonitor(): Job? = launch {
twig("launching progress monitor")
val progressUpdates = progress()
for (progress in progressUpdates) {
if (progress == 100) {
twig("triggering a balance update because progress is complete")
refreshBalance()
}
}
twig("done monitoring for progress changes")
}
// begin the monitor that will output pending transactions into the pending channel
private fun CoroutineScope.launchPendingMonitor(): Job? = launch {
twig("launching pending monitor")
// ask to be notified when the sender notices anything new, while attempting to send
sender.notifyOnChange(pendingChannel)
// when those notifications come in, also update the balance
val channel = pendingChannel.openSubscription()
for (pending in channel) {
if(balanceChannel.isClosedForSend) break
twig("triggering a balance update because pending transactions have changed")
refreshBalance()
}
twig("done monitoring for pending changes and balance changes")
}
// begin the monitor that will output cleared transactions into the cleared channel
private fun CoroutineScope.launchClearedMonitor(): Job? = launch {
twig("launching cleared monitor")
// poll for modifications and send them into the cleared channel
ledger.poll(clearedChannel, 10_000L)
// when those notifications come in, also update the balance
val channel = clearedChannel.openSubscription()
for (cleared in channel) {
if(!balanceChannel.isClosedForSend) {
twig("triggering a balance update because cleared transactions have changed")
refreshBalance()
} else {
twig("WARNING: noticed new cleared transactions but the balance channel was closed for send so ignoring!")
}
}
twig("done monitoring for cleared changes and balance changes")
}
suspend fun refreshBalance() = withContext(IO) {
balanceChannel.send(wallet.getBalanceInfo())
}
private fun CoroutineScope.onReady() = launch(CoroutineExceptionHandler(::onCriticalError)) {
twig("Synchronizer Ready. Starting processor!")
processor.onErrorListener = ::onProcessorError
processor.start()
twig("Synchronizer onReady complete. Processor start has exited!")
}
private fun onCriticalError(unused: CoroutineContext, error: Throwable) {
twig("********")
twig("******** ERROR: $error")
if (error.cause != null) twig("******** caused by ${error.cause}")
if (error.cause?.cause != null) twig("******** caused by ${error.cause?.cause}")
twig("********")
onCriticalErrorListener?.invoke(error)
}
var sameErrorCount = 1
var processorErrorMessage: String? = ""
private fun onProcessorError(error: Throwable): Boolean {
val dummyContext = CoroutineName("bob")
if (processorErrorMessage == error.message) sameErrorCount++
val isFrequent = sameErrorCount.rem(25) == 0
when {
sameErrorCount == 5 -> onCriticalError(dummyContext, error)
// isFrequent -> trackError(ProcessorRepeatedFailure(error, sameErrorCount))
sameErrorCount == 120 -> {
// trackError(ProcessorMaxFailureReached(error))
Thread.sleep(500)
throw error
}
}
processorErrorMessage = error.message
twig("synchronizer sees your error and ignores it, willfully! Keep retrying ($sameErrorCount), processor!")
return true
}
fun onFailedSend(throwable: Throwable) {
// trackError(ErrorSubmitting(throwable))
}
//
// Channels
//
override fun balances(): ReceiveChannel<Wallet.WalletBalance> {
return balanceChannel.openSubscription()
}
override fun progress(): ReceiveChannel<Int> {
return progressChannel.openSubscription()
}
override fun pendingTransactions(): ReceiveChannel<List<PendingTransactionEntity>> {
return pendingChannel.openSubscription()
}
override fun clearedTransactions(): ReceiveChannel<List<WalletTransaction>> {
return clearedChannel.openSubscription()
}
override fun getPending(): List<PendingTransactionEntity> {
return pendingProvider.getLatestValue()
}
override fun getCleared(): List<WalletTransaction> {
return clearedTransactionProvider.getLatestValue()
}
override fun getBalance(): Wallet.WalletBalance {
return balanceChannel.value
}
//
// Send / Receive
//
override suspend fun getAddress(accountId: Int): String = withContext(IO) { wallet.getAddress() }
override suspend fun sendToAddress(
zatoshi: Long,
toAddress: String,
memo: String,
fromAccountId: Int
): PendingTransactionEntity = withContext(IO) {
sender.sendToAddress(encoder, zatoshi, toAddress, memo, fromAccountId)
}
}
interface DataSyncronizer : ClearedTransactionProvider, PendingTransactionProvider {
fun start(scope: CoroutineScope)
fun stop()
suspend fun getAddress(accountId: Int = 0): String
suspend fun sendToAddress(zatoshi: Long, toAddress: String, memo: String = "", fromAccountId: Int = 0): PendingTransactionEntity
fun balances(): ReceiveChannel<Wallet.WalletBalance>
fun progress(): ReceiveChannel<Int>
fun pendingTransactions(): ReceiveChannel<List<PendingTransactionEntity>>
fun clearedTransactions(): ReceiveChannel<List<WalletTransaction>>
val isConnected: Boolean
val isSyncing: Boolean
val isScanning: Boolean
var onCriticalErrorListener: ((Throwable) -> Boolean)?
override fun getPending(): List<PendingTransactionEntity>
override fun getCleared(): List<WalletTransaction>
fun getBalance(): Wallet.WalletBalance
}
interface ClearedTransactionProvider {
fun getCleared(): List<WalletTransaction>
}
interface PendingTransactionProvider {
fun getPending(): List<PendingTransactionEntity>
}

View File

@ -0,0 +1,22 @@
package cash.z.wallet.sdk.data
import cash.z.wallet.sdk.service.LightWalletService
/**
* Manage transactions with the main purpose of reporting which ones are still pending, particularly after failed
* attempts or dropped connectivity. The intent is to help see transactions through to completion.
*/
interface TransactionManager {
fun start()
fun stop()
suspend fun manageCreation(encoder: RawTransactionEncoder, zatoshiValue: Long, toAddress: String, memo: String, currentHeight: Int): RawTransaction
suspend fun manageSubmission(service: LightWalletService, pendingTransaction: RawTransaction)
suspend fun getAll(): List<RawTransaction>
}
interface RawTransaction {
val raw: ByteArray?
}
interface TransactionError {
val message: String
}

View File

@ -0,0 +1,19 @@
package cash.z.wallet.sdk.data
import cash.z.wallet.sdk.db.PendingTransactionEntity
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.channels.SendChannel
interface TransactionSender {
fun start(scope: CoroutineScope)
fun stop()
fun notifyOnChange(channel: SendChannel<List<PendingTransactionEntity>>)
/** only necessary when there is a long delay between starting a transaction and beginning to create it. Like when sweeping a wallet that first needs to be scanned. */
suspend fun prepareTransaction(amount: Long, address: String, memo: String): PendingTransactionEntity?
suspend fun sendPreparedTransaction(encoder: RawTransactionEncoder, tx: PendingTransactionEntity): PendingTransactionEntity
suspend fun cleanupPreparedTransaction(tx: PendingTransactionEntity)
suspend fun sendToAddress(encoder: RawTransactionEncoder, zatoshi: Long, toAddress: String, memo: String = "", fromAccountId: Int = 0): PendingTransactionEntity
suspend fun cancel(existingTransaction: PendingTransactionEntity): Unit?
var onSubmissionError: ((Throwable) -> Unit)?
}

View File

@ -0,0 +1,33 @@
package cash.z.wallet.sdk.data
import cash.z.wallet.sdk.secure.Wallet
import kotlinx.coroutines.Dispatchers.IO
import kotlinx.coroutines.withContext
class WalletTransactionEncoder(
private val wallet: Wallet,
private val repository: TransactionRepository
) : RawTransactionEncoder {
/**
* Creates a transaction, throwing an exception whenever things are missing. When the provided wallet implementation
* doesn't throw an exception, we wrap the issue into a descriptive exception ourselves (rather than using
* double-bangs for things).
*/
override suspend fun create(zatoshi: Long, toAddress: String, memo: String): EncodedTransaction = withContext(IO) {
val transactionId = wallet.createRawSendTransaction(zatoshi, toAddress, memo)
val transaction = repository.findTransactionById(transactionId)
?: throw TransactionNotFoundException(transactionId)
EncodedTransaction(transaction.transactionId, transaction.raw
?: throw TransactionNotEncodedException(transactionId))
}
}
class TransactionNotFoundException(transactionId: Long) : RuntimeException("Unable to find transactionId " +
"$transactionId in the repository. This means the wallet created a transaction and then returned a row ID " +
"that does not actually exist. This is a scenario where the wallet should have thrown an exception but failed " +
"to do so.")
class TransactionNotEncodedException(transactionId: Long) : RuntimeException("The transaction returned by the wallet," +
" with id $transactionId, does not have any raw data. This is a scenario where the wallet should have thrown" +
" an exception but failed to do so.")

View File

@ -0,0 +1,179 @@
package cash.z.wallet.sdk.db
import androidx.room.*
import cash.z.wallet.sdk.dao.WalletTransaction
import cash.z.wallet.sdk.data.RawTransaction
import cash.z.wallet.sdk.ext.masked
@Database(
entities = [
PendingTransactionEntity::class
],
version = 1,
exportSchema = false
)
abstract class PendingTransactionDb : RoomDatabase() {
abstract fun pendingTransactionDao(): PendingTransactionDao
}
@Dao
interface PendingTransactionDao {
@Insert(onConflict = OnConflictStrategy.REPLACE)
fun insert(transaction: PendingTransactionEntity): Long
@Delete
fun delete(transaction: PendingTransactionEntity)
//
// /**
// * Query all blocks that are not mined and not expired.
// */
// @Query(
// """
// SELECT id,
// address,
// value,
// memo,
// minedheight,
// expiryheight,
// submitcount,
// encodecount,
// errormessage,
// createtime,
// raw
// FROM pending_transactions
// WHERE minedHeight = -1 and (expiryHeight >= :currentHeight or expiryHeight = -1) and (raw IS NOT NULL)
// ORDER BY createtime
// """
// )
// fun getAllPending(currentHeight: Int): List<PendingTransactionEntity>
@Query("SELECT * from pending_transactions ORDER BY createTime")
fun getAll(): List<PendingTransactionEntity>
}
@Entity(tableName = "pending_transactions")
data class PendingTransactionEntity(
@PrimaryKey(autoGenerate = true)
val id: Long = 0,
val address: String = "",
val value: Long = -1,
val memo: String = "",
val minedHeight: Int = -1,
val expiryHeight: Int = -1,
val submitAttempts: Int = -1,
/** the number of times there was an attempt to encode this transaction */
val encodeAttempts: Int = -1,
val errorMessage: String? = null,
val errorCode: Int? = null,
val createTime: Long = System.currentTimeMillis(),
@ColumnInfo(typeAffinity = ColumnInfo.BLOB)
override val raw: ByteArray? = null,
@ColumnInfo(typeAffinity = ColumnInfo.BLOB)
val txId: ByteArray? = null
) : RawTransaction {
override fun toString(): String {
return if ((raw != null && raw.size > 1) || !address.contains("**mask")) {
copy(
raw = byteArrayOf(1),
address = address.masked()
).toString()
} else {
super.toString()
}
}
override fun equals(other: Any?): Boolean {
if (this === other) return true
if (other !is PendingTransactionEntity) return false
if (id != other.id) return false
if (address != other.address) return false
if (value != other.value) return false
if (memo != other.memo) return false
if (minedHeight != other.minedHeight) return false
if (expiryHeight != other.expiryHeight) return false
if (submitAttempts != other.submitAttempts) return false
if (encodeAttempts != other.encodeAttempts) return false
if (errorMessage != other.errorMessage) return false
if (errorCode != other.errorCode) return false
if (createTime != other.createTime) return false
if (raw != null) {
if (other.raw == null) return false
if (!raw.contentEquals(other.raw)) return false
} else if (other.raw != null) return false
if (txId != null) {
if (other.txId == null) return false
if (!txId.contentEquals(other.txId)) return false
} else if (other.txId != null) return false
return true
}
override fun hashCode(): Int {
var result = id.hashCode()
result = 31 * result + address.hashCode()
result = 31 * result + value.hashCode()
result = 31 * result + memo.hashCode()
result = 31 * result + minedHeight
result = 31 * result + expiryHeight
result = 31 * result + submitAttempts
result = 31 * result + encodeAttempts
result = 31 * result + (errorMessage?.hashCode() ?: 0)
result = 31 * result + (errorCode ?: 0)
result = 31 * result + createTime.hashCode()
result = 31 * result + (raw?.contentHashCode() ?: 0)
result = 31 * result + (txId?.contentHashCode() ?: 0)
return result
}
}
fun PendingTransactionEntity.isSameTxId(other: WalletTransaction): Boolean {
return txId != null && other.rawTransactionId != null && txId.contentEquals(other.rawTransactionId!!)
}
fun PendingTransactionEntity.isSameTxId(other: PendingTransactionEntity): Boolean {
return txId != null && other.txId != null && txId.contentEquals(other.txId)
}
fun PendingTransactionEntity.isCreating(): Boolean {
return raw == null && submitAttempts <= 0 && !isFailedSubmit() && !isFailedEncoding()
}
fun PendingTransactionEntity.isFailedEncoding(): Boolean {
return raw == null && encodeAttempts > 0
}
fun PendingTransactionEntity.isFailedSubmit(): Boolean {
return errorMessage != null || (errorCode != null && errorCode < 0)
}
fun PendingTransactionEntity.isFailure(): Boolean {
return isFailedEncoding() || isFailedSubmit()
}
fun PendingTransactionEntity.isSubmitted(): Boolean {
return submitAttempts > 0
}
fun PendingTransactionEntity.isMined(): Boolean {
return minedHeight > 0
}
fun PendingTransactionEntity.isPending(currentHeight: Int = -1): Boolean {
// not mined and not expired and successfully created
return !isSubmitSuccess() && minedHeight == -1 && (expiryHeight == -1 || expiryHeight > currentHeight) && raw != null
}
fun PendingTransactionEntity.isSubmitSuccess(): Boolean {
return submitAttempts > 0 && (errorCode != null && errorCode >= 0) && errorMessage == null
}
/**
* The amount of time remaining until this transaction is stale
*/
fun PendingTransactionEntity.ttl(): Long {
return (60L * 2L) - (System.currentTimeMillis()/1000 - createTime)
}

View File

@ -11,7 +11,7 @@ import java.util.*
//TODO: provide a dynamic way to configure this globally for the SDK
// For now, just make these vars so at least they could be modified in one place
object Conversions {
var ONE_ZEC_IN_ZATOSHI = BigDecimal(ZATOSHI, MathContext.DECIMAL128)
var ONE_ZEC_IN_ZATOSHI = BigDecimal(ZATOSHI_PER_ZEC, MathContext.DECIMAL128)
var ZEC_FORMATTER = NumberFormat.getInstance(Locale.getDefault()).apply {
roundingMode = RoundingMode.HALF_EVEN
maximumFractionDigits = 6

View File

@ -12,7 +12,7 @@ const val MINERS_FEE_ZATOSHI = 10_000L
/**
* The number of zatoshi that equal 1 ZEC.
*/
const val ZATOSHI = 100_000_000L
const val ZATOSHI_PER_ZEC = 100_000_000L
/**
* The height of the first sapling block. When it comes to shielded transactions, we do not need to consider any blocks