Refactor logic for awaiting confirmations.
Leverage the new flow-centric approach to sending transactions, rather than polling the database, which leads to concurrent modification errors and corrupted data.
This commit is contained in:
parent
bebe0cf4e7
commit
93d4114848
|
@ -27,6 +27,9 @@ android {
|
|||
sourceCompatibility 1.8
|
||||
targetCompatibility 1.8
|
||||
}
|
||||
kotlinOptions {
|
||||
jvmTarget = "1.8"
|
||||
}
|
||||
}
|
||||
|
||||
dependencies {
|
||||
|
@ -42,12 +45,12 @@ dependencies {
|
|||
implementation "io.grpc:grpc-android:1.21.0"
|
||||
implementation 'javax.annotation:javax.annotation-api:1.3.2'
|
||||
// SDK: Room
|
||||
implementation "androidx.room:room-runtime:2.2.0"
|
||||
implementation "androidx.room:room-common:2.2.0"
|
||||
implementation "androidx.room:room-ktx:2.2.0"
|
||||
implementation "androidx.room:room-runtime:2.2.2"
|
||||
implementation "androidx.room:room-common:2.2.2"
|
||||
implementation "androidx.room:room-ktx:2.2.2"
|
||||
implementation "androidx.paging:paging-runtime-ktx:2.1.0"
|
||||
implementation 'com.google.guava:guava:27.0.1-android'
|
||||
kapt "androidx.room:room-compiler:2.2.0"
|
||||
kapt "androidx.room:room-compiler:2.2.2"
|
||||
// SDK: Other
|
||||
implementation "org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.2"
|
||||
implementation "org.jetbrains.kotlinx:kotlinx-coroutines-android:1.3.2"
|
||||
|
@ -59,15 +62,15 @@ dependencies {
|
|||
implementation 'androidx.legacy:legacy-support-v4:1.0.0'
|
||||
implementation 'com.google.android.material:material:1.0.0'
|
||||
implementation 'androidx.constraintlayout:constraintlayout:1.1.3'
|
||||
implementation 'androidx.navigation:navigation-fragment:2.0.0'
|
||||
implementation 'androidx.navigation:navigation-ui:2.0.0'
|
||||
implementation 'androidx.navigation:navigation-fragment:2.1.0'
|
||||
implementation 'androidx.navigation:navigation-ui:2.1.0'
|
||||
implementation 'androidx.lifecycle:lifecycle-extensions:2.1.0'
|
||||
implementation 'androidx.navigation:navigation-fragment-ktx:2.0.0'
|
||||
implementation 'androidx.navigation:navigation-ui-ktx:2.0.0'
|
||||
implementation 'androidx.navigation:navigation-fragment-ktx:2.1.0'
|
||||
implementation 'androidx.navigation:navigation-ui-ktx:2.1.0'
|
||||
testImplementation 'junit:junit:4.12'
|
||||
androidTestImplementation 'androidx.test.ext:junit:1.1.1'
|
||||
androidTestImplementation 'androidx.test.espresso:espresso-core:3.2.0'
|
||||
implementation "androidx.lifecycle:lifecycle-common-java8:2.2.0-beta01"
|
||||
implementation "androidx.lifecycle:lifecycle-runtime-ktx:2.2.0-beta01" // provides lifecycleScope!
|
||||
implementation "androidx.lifecycle:lifecycle-common-java8:2.2.0-rc02"
|
||||
implementation "androidx.lifecycle:lifecycle-runtime-ktx:2.2.0-rc02" // provides lifecycleScope!
|
||||
|
||||
}
|
||||
|
|
|
@ -90,15 +90,18 @@ class SendFragment : BaseDemoFragment<FragmentSendBinding>() {
|
|||
).collectWith(lifecycleScope, ::onPendingTxUpdated)
|
||||
}
|
||||
|
||||
private fun onPendingTxUpdated(pendingTransaction: PendingTransaction) {
|
||||
private fun onPendingTxUpdated(pendingTransaction: PendingTransaction?) {
|
||||
val message = when {
|
||||
pendingTransaction.isSubmitted() -> "Successfully submitted transaction!"
|
||||
pendingTransaction == null -> "Transaction not found"
|
||||
pendingTransaction.isMined() -> "Transaction Mined!"
|
||||
pendingTransaction.isSubmitted() -> "Successfully submitted transaction!"
|
||||
pendingTransaction.isFailedEncoding() -> "ERROR: failed to encode transaction!"
|
||||
pendingTransaction.isFailedSubmit() -> "ERROR: failed to submit transaction!"
|
||||
pendingTransaction.isCreated() -> "Transaction creation complete!"
|
||||
pendingTransaction.isCreating() -> "Creating transaction!"
|
||||
else -> "Transaction updated!".also { twig("Unhandled TX state: $pendingTransaction") }
|
||||
}
|
||||
twig("PENDING TX: $message")
|
||||
Toast.makeText(App.instance, message, Toast.LENGTH_SHORT).show()
|
||||
}
|
||||
}
|
||||
|
|
|
@ -10,7 +10,7 @@ buildscript {
|
|||
jcenter()
|
||||
}
|
||||
dependencies {
|
||||
classpath 'com.android.tools.build:gradle:3.6.0-beta02'
|
||||
classpath 'com.android.tools.build:gradle:3.6.0-beta04'
|
||||
classpath"org.jetbrains.kotlin:kotlin-gradle-plugin:$kotlin_version"
|
||||
// NOTE: Do not place your application dependencies here; they belong
|
||||
// in the individual module build.gradle files
|
||||
|
|
|
@ -37,11 +37,12 @@ import kotlin.coroutines.CoroutineContext
|
|||
*/
|
||||
@ExperimentalCoroutinesApi
|
||||
class SdkSynchronizer internal constructor(
|
||||
val ledger: TransactionRepository,
|
||||
private val ledger: TransactionRepository,
|
||||
private val manager: OutboundTransactionManager,
|
||||
private val processor: CompactBlockProcessor
|
||||
) : Synchronizer {
|
||||
private val balanceChannel = ConflatedBroadcastChannel<WalletBalance>()
|
||||
private val _balances = ConflatedBroadcastChannel<WalletBalance>()
|
||||
private val _status = ConflatedBroadcastChannel<Synchronizer.Status>()
|
||||
|
||||
/**
|
||||
* The lifespan of this Synchronizer. This scope is initialized once the Synchronizer starts
|
||||
|
@ -57,22 +58,11 @@ class SdkSynchronizer internal constructor(
|
|||
// Transactions
|
||||
//
|
||||
|
||||
// TODO: implement this stuff
|
||||
override val balances: Flow<WalletBalance> = balanceChannel.asFlow()
|
||||
override val allTransactions: Flow<PagedList<Transaction>> = flow{}
|
||||
override val pendingTransactions: Flow<PagedList<PendingTransaction>> = flow{}
|
||||
override val clearedTransactions: Flow<PagedList<ConfirmedTransaction>> = flow{}
|
||||
|
||||
override val sentTransactions: Flow<PagedList<ConfirmedTransaction>> = ledger.sentTransactions
|
||||
override val receivedTransactions: Flow<PagedList<ConfirmedTransaction>> = ledger.receivedTransactions
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
override val balances: Flow<WalletBalance> = _balances.asFlow()
|
||||
override val clearedTransactions = ledger.allTransactions
|
||||
override val pendingTransactions = manager.getAll()
|
||||
override val sentTransactions = ledger.sentTransactions
|
||||
override val receivedTransactions = ledger.receivedTransactions
|
||||
|
||||
|
||||
//
|
||||
|
@ -81,16 +71,11 @@ class SdkSynchronizer internal constructor(
|
|||
|
||||
/**
|
||||
* Indicates the status of this Synchronizer. This implementation basically simplifies the
|
||||
* status of the processor to focus only on the high level states that matter most.
|
||||
* status of the processor to focus only on the high level states that matter most. Whenever the
|
||||
* processor is finished scanning, the synchronizer updates transaction and balance info and
|
||||
* then emits a [SYNCED] status.
|
||||
*/
|
||||
override val status = processor.state.map {
|
||||
when (it) {
|
||||
is Synced -> SYNCED
|
||||
is Stopped -> STOPPED
|
||||
is Disconnected -> DISCONNECTED
|
||||
else -> SYNCING
|
||||
}
|
||||
}
|
||||
override val status = _status.asFlow()
|
||||
|
||||
/**
|
||||
* Indicates the download progress of the Synchronizer. When progress reaches 100, that
|
||||
|
@ -147,8 +132,9 @@ class SdkSynchronizer internal constructor(
|
|||
override fun start(parentScope: CoroutineScope): Synchronizer {
|
||||
if (::coroutineScope.isInitialized) throw SynchronizerException.FalseStart
|
||||
|
||||
// base this scope on the parent so that when the parent's job cancels, everything here cancels as well
|
||||
// also use a supervisor job so that one failure doesn't bring down the whole synchronizer
|
||||
// base this scope on the parent so that when the parent's job cancels, everything here
|
||||
// cancels as well also use a supervisor job so that one failure doesn't bring down the
|
||||
// whole synchronizer
|
||||
coroutineScope =
|
||||
CoroutineScope(SupervisorJob(parentScope.coroutineContext[Job]!!) + Dispatchers.Main)
|
||||
coroutineScope.onReady()
|
||||
|
@ -164,7 +150,8 @@ class SdkSynchronizer internal constructor(
|
|||
coroutineScope.launch {
|
||||
processor.stop()
|
||||
coroutineScope.cancel()
|
||||
balanceChannel.cancel()
|
||||
_balances.cancel()
|
||||
_status.cancel()
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -178,30 +165,23 @@ class SdkSynchronizer internal constructor(
|
|||
}
|
||||
|
||||
private suspend fun refreshBalance() {
|
||||
balanceChannel.send(processor.getBalanceInfo())
|
||||
_balances.send(processor.getBalanceInfo())
|
||||
}
|
||||
|
||||
private fun CoroutineScope.onReady() = launch(CoroutineExceptionHandler(::onCriticalError)) {
|
||||
twig("Synchronizer Ready. Starting processor!")
|
||||
processor.onErrorListener = ::onProcessorError
|
||||
status.filter { it == SYNCED }.onEach {
|
||||
// TRICKY:
|
||||
// Keep an eye on this section because there is a potential for concurrent DB
|
||||
// modification. A change in transactions means a change in balance. Calculating the
|
||||
// balance requires touching transactions. If both are done in separate threads, the
|
||||
// database can have issues. On Android, would manifest as a false positive for a
|
||||
// "malformed database" exception when the database is not actually corrupt but rather
|
||||
// locked (i.e. it's a bad error message).
|
||||
// The balance refresh is done first because it is coroutine-based and will fully
|
||||
// complete by the time the function returns.
|
||||
// Ultimately, refreshing the transactions just invalidates views of data that
|
||||
// already exists and it completes on another thread so it should come after the
|
||||
// balance refresh is complete.
|
||||
twigTask("Triggering an automatic balance refresh since the processor is synced!") {
|
||||
refreshBalance()
|
||||
}
|
||||
twigTask("Triggering an automatic transaction refresh since the processor is synced!") {
|
||||
refreshTransactions()
|
||||
processor.state.onEach {
|
||||
when (it) {
|
||||
is Scanned -> {
|
||||
onScanComplete(it.scannedRange)
|
||||
SYNCED
|
||||
}
|
||||
is Stopped -> STOPPED
|
||||
is Disconnected -> DISCONNECTED
|
||||
else -> SYNCING
|
||||
}.let { synchronizerStatus ->
|
||||
_status.send(synchronizerStatus)
|
||||
}
|
||||
}.launchIn(this)
|
||||
processor.start()
|
||||
|
@ -243,6 +223,49 @@ class SdkSynchronizer internal constructor(
|
|||
} == true
|
||||
}
|
||||
|
||||
private suspend fun onScanComplete(scannedRange: IntRange) {
|
||||
// TODO: optimize to skip logic here if there are no new transactions with a block height
|
||||
// within the given range
|
||||
|
||||
// TRICKY:
|
||||
// Keep an eye on this section because there is a potential for concurrent DB
|
||||
// modification. A change in transactions means a change in balance. Calculating the
|
||||
// balance requires touching transactions. If both are done in separate threads, the
|
||||
// database can have issues. On Android, this would manifest as a false positive for a
|
||||
// "malformed database" exception when the database is not actually corrupt but rather
|
||||
// locked (i.e. it's a bad error message).
|
||||
// The balance refresh is done first because it is coroutine-based and will fully
|
||||
// complete by the time the function returns.
|
||||
// Ultimately, refreshing the transactions just invalidates views of data that
|
||||
// already exists and it completes on another thread so it should come after the
|
||||
// balance refresh is complete.
|
||||
twigTask("Triggering balance refresh since the processor is synced!") {
|
||||
refreshBalance()
|
||||
}
|
||||
twigTask("Triggering pending transaction refresh!") {
|
||||
refreshPendingTransactions()
|
||||
}
|
||||
twigTask("Triggering transaction refresh since the processor is synced!") {
|
||||
refreshTransactions()
|
||||
}
|
||||
}
|
||||
|
||||
private suspend fun refreshPendingTransactions() {
|
||||
// TODO: this would be the place to clear out any stale pending transactions. Remove filter
|
||||
// logic and then delete any pending transaction with sufficient confirmations (all in one
|
||||
// db transaction).
|
||||
manager.getAll().first().filter { !it.isMined() }.forEach { pendingTx ->
|
||||
twig("checking for updates on pendingTx id: ${pendingTx.id}")
|
||||
pendingTx.rawTransactionId?.let { rawId ->
|
||||
ledger.findMinedHeight(rawId)?.let { minedHeight ->
|
||||
twig("found matching transaction for pending transaction with id" +
|
||||
" ${pendingTx.id} mined at height ${minedHeight}!")
|
||||
manager.applyMinedHeight(pendingTx, minedHeight)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
//
|
||||
// Send / Receive
|
||||
|
@ -258,13 +281,18 @@ class SdkSynchronizer internal constructor(
|
|||
toAddress: String,
|
||||
memo: String,
|
||||
fromAccountIndex: Int
|
||||
): Flow<PendingTransaction> {
|
||||
twig("beginning sendToAddress")
|
||||
return manager.initSpend(zatoshi, toAddress, memo, fromAccountIndex).flatMapConcat {
|
||||
manager.encode(spendingKey, it)
|
||||
}.flatMapConcat {
|
||||
manager.submit(it)
|
||||
): Flow<PendingTransaction> = flow {
|
||||
twig("Initializing pending transaction")
|
||||
// Emit the placeholder transaction, then switch to monitoring the database
|
||||
manager.initSpend(zatoshi, toAddress, memo, fromAccountIndex).let { placeHolderTx ->
|
||||
emit(placeHolderTx)
|
||||
manager.encode(spendingKey, placeHolderTx).let { encodedTx ->
|
||||
manager.submit(encodedTx)
|
||||
}
|
||||
}
|
||||
}.flatMapLatest {
|
||||
twig("Monitoring pending transaction for updates...")
|
||||
manager.monitorById(it.id)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -278,13 +306,16 @@ fun Synchronizer(
|
|||
appContext: Context,
|
||||
lightwalletdHost: String,
|
||||
rustBackend: RustBackend,
|
||||
ledger: TransactionRepository = PagedTransactionRepository(appContext, 10, rustBackend.dbDataPath),
|
||||
ledger: TransactionRepository =
|
||||
PagedTransactionRepository(appContext, 10, rustBackend.dbDataPath),
|
||||
blockStore: CompactBlockStore = CompactBlockDbStore(appContext, rustBackend.dbCachePath),
|
||||
service: LightWalletService = LightWalletGrpcService(appContext, lightwalletdHost),
|
||||
encoder: TransactionEncoder = WalletTransactionEncoder(rustBackend, ledger),
|
||||
downloader: CompactBlockDownloader = CompactBlockDownloader(service, blockStore),
|
||||
manager: OutboundTransactionManager = PersistentTransactionManager(appContext, encoder, service),
|
||||
processor: CompactBlockProcessor = CompactBlockProcessor(downloader, ledger, rustBackend, rustBackend.birthdayHeight)
|
||||
manager: OutboundTransactionManager =
|
||||
PersistentTransactionManager(appContext, encoder, service),
|
||||
processor: CompactBlockProcessor =
|
||||
CompactBlockProcessor(downloader, ledger, rustBackend, rustBackend.birthdayHeight)
|
||||
): Synchronizer {
|
||||
// ties everything together
|
||||
return SdkSynchronizer(
|
||||
|
|
|
@ -60,16 +60,11 @@ interface Synchronizer {
|
|||
|
||||
/* Transactions */
|
||||
|
||||
/**
|
||||
* All transactions of every type.
|
||||
*/
|
||||
val allTransactions: Flow<PagedList<Transaction>>
|
||||
|
||||
/**
|
||||
* A flow of all the outbound pending transaction that have been sent but are awaiting
|
||||
* confirmations.
|
||||
*/
|
||||
val pendingTransactions: Flow<PagedList<PendingTransaction>>
|
||||
val pendingTransactions: Flow<List<PendingTransaction>>
|
||||
|
||||
/**
|
||||
* A flow of all the transactions that are on the blockchain.
|
||||
|
@ -114,7 +109,7 @@ interface Synchronizer {
|
|||
toAddress: String,
|
||||
memo: String = "",
|
||||
fromAccountIndex: Int = 0
|
||||
): Flow<PendingTransaction>
|
||||
): Flow<PendingTransaction?>
|
||||
|
||||
/**
|
||||
* Attempts to cancel a transaction that is about to be sent. Typically, cancellation is only
|
||||
|
|
|
@ -18,7 +18,6 @@ import cash.z.wallet.sdk.jni.RustBackendWelding
|
|||
import cash.z.wallet.sdk.transaction.TransactionRepository
|
||||
import kotlinx.coroutines.Dispatchers.IO
|
||||
import kotlinx.coroutines.channels.ConflatedBroadcastChannel
|
||||
import kotlinx.coroutines.channels.ReceiveChannel
|
||||
import kotlinx.coroutines.delay
|
||||
import kotlinx.coroutines.flow.asFlow
|
||||
import kotlinx.coroutines.isActive
|
||||
|
@ -129,7 +128,9 @@ class CompactBlockProcessor(
|
|||
setState(Scanning)
|
||||
val success = scanNewBlocks(rangeToScan)
|
||||
if (!success) throw CompactBlockProcessorException.FailedScan
|
||||
else setState(Synced)
|
||||
else {
|
||||
setState(Scanned(rangeToScan))
|
||||
}
|
||||
-1
|
||||
} else {
|
||||
error
|
||||
|
@ -137,6 +138,7 @@ class CompactBlockProcessor(
|
|||
}
|
||||
|
||||
|
||||
|
||||
@VisibleForTesting //allow mocks to verify how this is called, rather than the downloader, which is more complex
|
||||
internal suspend fun downloadNewBlocks(range: IntRange) = withContext<Unit>(IO) {
|
||||
if (range.isEmpty()) {
|
||||
|
@ -155,7 +157,7 @@ class CompactBlockProcessor(
|
|||
for (i in 1..batches) {
|
||||
retryUpTo(RETRIES) {
|
||||
val end = min(range.first + (i * DOWNLOAD_BATCH_SIZE), range.last + 1)
|
||||
twig("downloaded $downloadedBlockHeight..${(end - 1)} (batch $i of $batches) into : ${(rustBackend as RustBackend).dbCachePath}") {
|
||||
twig("downloaded $downloadedBlockHeight..${(end - 1)} (batch $i of $batches)") {
|
||||
downloader.downloadBlockRange(downloadedBlockHeight until end)
|
||||
}
|
||||
progress = (i / batches.toFloat() * 100).roundToInt()
|
||||
|
@ -252,7 +254,7 @@ class CompactBlockProcessor(
|
|||
object Downloading : Connected, Syncing, State()
|
||||
object Validating : Connected, Syncing, State()
|
||||
object Scanning : Connected, Syncing, State()
|
||||
object Synced : Connected, State()
|
||||
class Scanned(val scannedRange:IntRange) : Connected, Syncing, State()
|
||||
object Disconnected : State()
|
||||
object Stopped : State()
|
||||
object Initialized : State()
|
||||
|
|
|
@ -1,10 +1,11 @@
|
|||
package cash.z.wallet.sdk.db
|
||||
|
||||
import androidx.paging.DataSource
|
||||
import androidx.paging.PositionalDataSource
|
||||
import androidx.room.*
|
||||
import androidx.room.Dao
|
||||
import androidx.room.Database
|
||||
import androidx.room.Query
|
||||
import androidx.room.RoomDatabase
|
||||
import cash.z.wallet.sdk.entity.*
|
||||
import cash.z.wallet.sdk.entity.Transaction
|
||||
|
||||
//
|
||||
// Database
|
||||
|
@ -62,11 +63,21 @@ interface TransactionDao {
|
|||
@Query("SELECT COUNT(block) FROM transactions WHERE block IS NULL")
|
||||
fun countUnmined(): Int
|
||||
|
||||
@Query("SELECT * FROM transactions WHERE id_tx = :id")
|
||||
fun findById(id: Long): TransactionEntity?
|
||||
@Query("""
|
||||
SELECT transactions.txid AS txId,
|
||||
transactions.raw AS raw
|
||||
FROM transactions
|
||||
WHERE id_tx = :id AND raw is not null
|
||||
""")
|
||||
fun findEncodedTransactionById(id: Long): EncodedTransaction?
|
||||
|
||||
@Query("SELECT * FROM transactions WHERE txid = :rawTransactionId LIMIT 1")
|
||||
fun findByRawId(rawTransactionId: ByteArray): TransactionEntity?
|
||||
@Query("""
|
||||
SELECT transactions.block
|
||||
FROM transactions
|
||||
WHERE txid = :rawTransactionId
|
||||
LIMIT 1
|
||||
""")
|
||||
fun findMinedHeight(rawTransactionId: ByteArray): Int?
|
||||
|
||||
// @Delete
|
||||
// fun delete(transaction: Transaction)
|
||||
|
|
|
@ -2,7 +2,7 @@ package cash.z.wallet.sdk.db
|
|||
|
||||
import androidx.room.*
|
||||
import cash.z.wallet.sdk.entity.PendingTransactionEntity
|
||||
import cash.z.wallet.sdk.entity.PendingTransaction
|
||||
import kotlinx.coroutines.flow.Flow
|
||||
|
||||
|
||||
//
|
||||
|
@ -43,7 +43,10 @@ interface PendingTransactionDao {
|
|||
suspend fun findById(id: Long): PendingTransactionEntity?
|
||||
|
||||
@Query("SELECT * FROM pending_transactions ORDER BY createTime")
|
||||
suspend fun getAll(): List<PendingTransactionEntity>
|
||||
fun getAll(): Flow<List<PendingTransactionEntity>>
|
||||
|
||||
@Query("SELECT * FROM pending_transactions WHERE id = :id")
|
||||
fun monitorById(id: Long): Flow<PendingTransactionEntity>
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -297,6 +297,10 @@ fun PendingTransaction.isCreating(): Boolean {
|
|||
return (raw?.isEmpty() != false) && submitAttempts <= 0 && !isFailedSubmit() && !isFailedEncoding()
|
||||
}
|
||||
|
||||
fun PendingTransaction.isCreated(): Boolean {
|
||||
return (raw?.isEmpty() == false) && submitAttempts <= 0 && !isFailedSubmit() && !isFailedEncoding()
|
||||
}
|
||||
|
||||
fun PendingTransaction.isFailedEncoding(): Boolean {
|
||||
return (raw?.isEmpty() != false) && encodeAttempts > 0
|
||||
}
|
||||
|
|
|
@ -7,6 +7,8 @@ import androidx.room.RoomDatabase
|
|||
import cash.z.wallet.sdk.db.BlockDao
|
||||
import cash.z.wallet.sdk.db.DerivedDataDb
|
||||
import cash.z.wallet.sdk.db.TransactionDao
|
||||
import cash.z.wallet.sdk.entity.ConfirmedTransaction
|
||||
import cash.z.wallet.sdk.entity.EncodedTransaction
|
||||
import cash.z.wallet.sdk.entity.TransactionEntity
|
||||
import cash.z.wallet.sdk.ext.ZcashSdk
|
||||
import cash.z.wallet.sdk.ext.android.toFlowPagedList
|
||||
|
@ -66,15 +68,12 @@ open class PagedTransactionRepository(
|
|||
return blocks.count() > 0
|
||||
}
|
||||
|
||||
override suspend fun findTransactionById(txId: Long): TransactionEntity? = withContext(IO) {
|
||||
twig("finding transaction with id $txId on thread ${Thread.currentThread().name}")
|
||||
val transaction = transactions.findById(txId)
|
||||
twig("found ${transaction?.id}")
|
||||
transaction
|
||||
override suspend fun findEncodedTransactionById(txId: Long) = withContext(IO) {
|
||||
transactions.findEncodedTransactionById(txId)
|
||||
}
|
||||
|
||||
override suspend fun findTransactionByRawId(rawTxId: ByteArray): TransactionEntity? = withContext(IO) {
|
||||
transactions.findByRawId(rawTxId)
|
||||
override suspend fun findMinedHeight(rawTransactionId: ByteArray) = withContext(IO) {
|
||||
transactions.findMinedHeight(rawTransactionId)
|
||||
}
|
||||
|
||||
fun close() {
|
||||
|
|
|
@ -1,6 +1,8 @@
|
|||
package cash.z.wallet.sdk.transaction
|
||||
|
||||
import android.content.Context
|
||||
import androidx.lifecycle.LifecycleOwner
|
||||
import androidx.lifecycle.Observer
|
||||
import androidx.room.Room
|
||||
import androidx.room.RoomDatabase
|
||||
import cash.z.wallet.sdk.db.PendingTransactionDao
|
||||
|
@ -8,10 +10,11 @@ import cash.z.wallet.sdk.db.PendingTransactionDb
|
|||
import cash.z.wallet.sdk.entity.*
|
||||
import cash.z.wallet.sdk.ext.twig
|
||||
import cash.z.wallet.sdk.service.LightWalletService
|
||||
import kotlinx.coroutines.Dispatchers
|
||||
import kotlinx.coroutines.flow.Flow
|
||||
import kotlinx.coroutines.flow.flow
|
||||
import kotlinx.coroutines.sync.Mutex
|
||||
import kotlinx.coroutines.sync.withLock
|
||||
import kotlinx.coroutines.withContext
|
||||
import kotlin.math.max
|
||||
|
||||
/**
|
||||
|
@ -54,12 +57,12 @@ class PersistentTransactionManager(
|
|||
* Initialize a [PendingTransaction] and then insert it in the database for monitoring and
|
||||
* follow-up.
|
||||
*/
|
||||
override fun initSpend(
|
||||
override suspend fun initSpend(
|
||||
zatoshiValue: Long,
|
||||
toAddress: String,
|
||||
memo: String,
|
||||
fromAccountIndex: Int
|
||||
): Flow<PendingTransaction> = flow {
|
||||
): PendingTransaction = withContext(Dispatchers.IO) {
|
||||
twig("constructing a placeholder transaction")
|
||||
var tx = PendingTransactionEntity(
|
||||
value = zatoshiValue,
|
||||
|
@ -80,12 +83,14 @@ class PersistentTransactionManager(
|
|||
twig("Unknown error while attempting to create pending transaction: ${t.message} caused by: ${t.cause}")
|
||||
}
|
||||
|
||||
emit(tx)
|
||||
tx
|
||||
}
|
||||
|
||||
suspend fun manageMined(pendingTx: PendingTransactionEntity, matchingMinedTx: TransactionEntity) {
|
||||
twig("a pending transaction has been mined!")
|
||||
safeUpdate(pendingTx.copy(minedHeight = matchingMinedTx.minedHeight!!))
|
||||
override suspend fun applyMinedHeight(pendingTx: PendingTransaction, minedHeight: Int) {
|
||||
(pendingTx as? PendingTransactionEntity)?.let {
|
||||
twig("a pending transaction has been mined!")
|
||||
safeUpdate(pendingTx.copy(minedHeight = minedHeight))
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -97,10 +102,10 @@ class PersistentTransactionManager(
|
|||
}
|
||||
}
|
||||
|
||||
override fun encode(
|
||||
override suspend fun encode(
|
||||
spendingKey: String,
|
||||
pendingTx: PendingTransaction
|
||||
): Flow<PendingTransaction> = flow {
|
||||
): PendingTransaction = withContext(Dispatchers.IO) {
|
||||
twig("managing the creation of a transaction")
|
||||
//var tx = transaction.copy(expiryHeight = if (currentHeight == -1) -1 else currentHeight + EXPIRY_OFFSET)
|
||||
var tx = pendingTx as PendingTransactionEntity
|
||||
|
@ -125,10 +130,10 @@ class PersistentTransactionManager(
|
|||
}
|
||||
safeUpdate(tx)
|
||||
|
||||
emit(tx)
|
||||
tx
|
||||
}
|
||||
|
||||
override fun submit(pendingTx: PendingTransaction): Flow<PendingTransaction> = flow {
|
||||
override suspend fun submit(pendingTx: PendingTransaction): PendingTransaction = withContext(Dispatchers.IO) {
|
||||
var tx1 = pendingTransactionDao { findById(pendingTx.id) }
|
||||
if(tx1 == null) twig("unable to find transaction for id: ${pendingTx.id}")
|
||||
var tx = tx1!!
|
||||
|
@ -157,7 +162,11 @@ class PersistentTransactionManager(
|
|||
safeUpdate(tx)
|
||||
}
|
||||
|
||||
emit(tx)
|
||||
tx
|
||||
}
|
||||
|
||||
override suspend fun monitorById(id: Long): Flow<PendingTransaction> {
|
||||
return pendingTransactionDao { monitorById(id) }
|
||||
}
|
||||
|
||||
override suspend fun cancel(pendingTx: PendingTransaction): Boolean {
|
||||
|
@ -172,7 +181,7 @@ class PersistentTransactionManager(
|
|||
}
|
||||
}
|
||||
|
||||
override suspend fun getAll(): List<PendingTransaction> = pendingTransactionDao { getAll() }
|
||||
override fun getAll() = _dao.getAll()
|
||||
|
||||
/**
|
||||
* Updating the pending transaction is often done at the end of a function but still should
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
package cash.z.wallet.sdk.transaction
|
||||
|
||||
import cash.z.wallet.sdk.entity.ConfirmedTransaction
|
||||
import cash.z.wallet.sdk.entity.PendingTransaction
|
||||
import kotlinx.coroutines.flow.Flow
|
||||
|
||||
|
@ -9,14 +10,16 @@ import kotlinx.coroutines.flow.Flow
|
|||
* transactions through to completion.
|
||||
*/
|
||||
interface OutboundTransactionManager {
|
||||
fun initSpend(
|
||||
suspend fun initSpend(
|
||||
zatoshi: Long,
|
||||
toAddress: String,
|
||||
memo: String,
|
||||
fromAccountIndex: Int
|
||||
): Flow<PendingTransaction>
|
||||
fun encode(spendingKey: String, pendingTx: PendingTransaction): Flow<PendingTransaction>
|
||||
fun submit(pendingTx: PendingTransaction): Flow<PendingTransaction>
|
||||
): PendingTransaction
|
||||
suspend fun encode(spendingKey: String, pendingTx: PendingTransaction): PendingTransaction
|
||||
suspend fun submit(pendingTx: PendingTransaction): PendingTransaction
|
||||
suspend fun applyMinedHeight(pendingTx: PendingTransaction, minedHeight: Int)
|
||||
suspend fun monitorById(id: Long): Flow<PendingTransaction>
|
||||
|
||||
/**
|
||||
* Attempt to cancel a transaction.
|
||||
|
@ -24,7 +27,7 @@ interface OutboundTransactionManager {
|
|||
* @return true when the transaction was able to be cancelled.
|
||||
*/
|
||||
suspend fun cancel(pendingTx: PendingTransaction): Boolean
|
||||
suspend fun getAll(): List<PendingTransaction>
|
||||
fun getAll(): Flow<List<PendingTransaction>>
|
||||
}
|
||||
|
||||
interface TransactionError {
|
||||
|
|
|
@ -7,8 +7,8 @@ import kotlinx.coroutines.flow.Flow
|
|||
interface TransactionRepository {
|
||||
fun lastScannedHeight(): Int
|
||||
fun isInitialized(): Boolean
|
||||
suspend fun findTransactionById(txId: Long): TransactionEntity?
|
||||
suspend fun findTransactionByRawId(rawTransactionId: ByteArray): TransactionEntity?
|
||||
suspend fun findEncodedTransactionById(txId: Long): EncodedTransaction?
|
||||
suspend fun findMinedHeight(rawTransactionId: ByteArray): Int?
|
||||
|
||||
/**
|
||||
* Provides a way for other components to signal that the underlying data has been modified.
|
||||
|
|
|
@ -1,6 +1,5 @@
|
|||
package cash.z.wallet.sdk.transaction
|
||||
|
||||
import cash.z.wallet.sdk.Initializer
|
||||
import cash.z.wallet.sdk.entity.EncodedTransaction
|
||||
import cash.z.wallet.sdk.exception.TransactionEncoderException
|
||||
import cash.z.wallet.sdk.ext.*
|
||||
|
@ -31,11 +30,8 @@ class WalletTransactionEncoder(
|
|||
fromAccountIndex: Int
|
||||
): EncodedTransaction = withContext(IO) {
|
||||
val transactionId = createSpend(spendingKey, zatoshi, toAddress, memo)
|
||||
val transaction = repository.findTransactionById(transactionId)
|
||||
repository.findEncodedTransactionById(transactionId)
|
||||
?: throw TransactionEncoderException.TransactionNotFoundException(transactionId)
|
||||
EncodedTransaction(transaction.transactionId, transaction.raw
|
||||
?: throw TransactionEncoderException.TransactionNotEncodedException(transactionId)
|
||||
)
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
Loading…
Reference in New Issue