2019-07-10 11:12:32 -07:00
package cash.z.wallet.sdk.data
import cash.z.wallet.sdk.block.CompactBlockProcessor
2019-07-14 15:13:12 -07:00
import cash.z.wallet.sdk.entity.ClearedTransaction
import cash.z.wallet.sdk.entity.PendingTransaction
import cash.z.wallet.sdk.entity.SentTransaction
2019-08-30 10:05:02 -07:00
import cash.z.wallet.sdk.exception.SynchronizerException
2019-07-10 11:12:32 -07:00
import cash.z.wallet.sdk.exception.WalletException
import cash.z.wallet.sdk.secure.Wallet
import kotlinx.coroutines.*
import kotlinx.coroutines.Dispatchers.IO
import kotlinx.coroutines.channels.ConflatedBroadcastChannel
import kotlinx.coroutines.channels.ReceiveChannel
import kotlin.coroutines.CoroutineContext
/ * *
2019-08-30 10:05:02 -07:00
* A synchronizer that attempts to remain operational , despite any number of errors that can occur . It acts as the glue
* that ties all the pieces of the SDK together . Each component of the SDK is designed for the potential of stand - alone
* usage but coordinating all the interactions is non - trivial . So the synchronizer facilitates this , acting as reference
* that demonstrates how all the pieces can be tied together . Its goal is to allow a developer to focus on their app
* rather than the nuances of how Zcash works .
*
* @param wallet the component that wraps the JNI layer that interacts with the rust backend and manages wallet config .
* @param repository the component that exposes streams of wallet transaction information .
* @param sender the component responsible for sending transactions to lightwalletd in order to spend funds .
* @param processor the component that saves the downloaded compact blocks to the cache and then scans those blocks for
* data related to this wallet .
* @param encoder the component that creates a signed transaction , used for spending funds .
2019-07-10 11:12:32 -07:00
* /
@ExperimentalCoroutinesApi
2019-08-30 10:05:02 -07:00
class SdkSynchronizer (
2019-07-10 11:12:32 -07:00
private val wallet : Wallet ,
2019-07-14 15:13:12 -07:00
private val ledger : TransactionRepository ,
2019-07-10 11:12:32 -07:00
private val sender : TransactionSender ,
private val processor : CompactBlockProcessor ,
2019-07-14 15:13:12 -07:00
private val encoder : TransactionEncoder
) : Synchronizer {
2019-07-10 11:12:32 -07:00
2019-07-12 01:47:17 -07:00
/ * *
* The lifespan of this Synchronizer . This scope is initialized once the Synchronizer starts because it will be a
* child of the parentScope that gets passed into the [ start ] function . Everything launched by this Synchronizer
* will be cancelled once the Synchronizer or its parentScope stops . This is a lateinit rather than nullable
* property so that it fails early rather than silently , whenever the scope is used before the Synchronizer has been
* started .
* /
lateinit var coroutineScope : CoroutineScope
2019-07-10 11:12:32 -07:00
2019-07-12 01:47:17 -07:00
//
2019-08-30 10:05:02 -07:00
// Status
2019-07-12 01:47:17 -07:00
//
2019-07-10 11:12:32 -07:00
2019-08-30 10:05:02 -07:00
/ * *
* A property that is true while a connection to the lightwalletd server exists .
* /
override val isConnected : Boolean get ( ) = processor . isConnected
/ * *
* A property that is true while actively downloading blocks from lightwalletd .
* /
override val isSyncing : Boolean get ( ) = processor . isSyncing
/ * *
* A property that is true while actively scanning the cache of compact blocks for transactions .
* /
override val isScanning : Boolean get ( ) = processor . isScanning
2019-07-12 01:47:17 -07:00
2019-07-10 11:12:32 -07:00
2019-07-12 01:47:17 -07:00
//
2019-08-30 10:05:02 -07:00
// Communication Primitives
2019-07-12 01:47:17 -07:00
//
2019-07-10 11:12:32 -07:00
2019-08-30 10:05:02 -07:00
/ * *
* Channel of balance information .
* /
private val balanceChannel = ConflatedBroadcastChannel ( Wallet . WalletBalance ( ) )
/ * *
* Channel of data representing transactions that are pending .
* /
private val pendingChannel = ConflatedBroadcastChannel < List < PendingTransaction > > ( listOf ( ) )
/ * *
* Channel of data representing transactions that have been mined .
* /
private val clearedChannel = ConflatedBroadcastChannel < List < ClearedTransaction > > ( listOf ( ) )
2019-07-10 11:12:32 -07:00
2019-07-12 01:47:17 -07:00
//
2019-07-14 15:13:12 -07:00
// Error Handling
2019-07-12 01:47:17 -07:00
//
2019-08-30 10:05:02 -07:00
/ * *
* A callback to invoke whenever an uncaught error is encountered . By definition , the return value of the function
* is ignored because this error is unrecoverable . The only reason the function has a return value is so that all
* error handlers work with the same signature which allows one function to handle all errors in simple apps . This
* callback is not called on the main thread so any UI work would need to switch context to the main thread .
2019-07-14 15:13:12 -07:00
* /
override var onCriticalErrorHandler : ( ( Throwable ? ) -> Boolean ) ? = null
2019-08-30 10:05:02 -07:00
/ * *
* A callback to invoke whenver a processor error is encountered . Returning true signals that the error was handled
* and a retry attempt should be made , if possible . This callback is not called on the main thread so any UI work
* would need to switch context to the main thread .
* /
2019-07-14 15:13:12 -07:00
override var onProcessorErrorHandler : ( ( Throwable ? ) -> Boolean ) ? = null
2019-08-30 10:05:02 -07:00
/ * *
* A callback to invoke whenever a server error is encountered while submitting a transaction to lightwalletd .
* Returning true signals that the error was handled and a retry attempt should be made , if possible . This callback
* is not called on the main thread so any UI work would need to switch context to the main thread .
* /
2019-07-14 15:13:12 -07:00
override var onSubmissionErrorHandler : ( ( Throwable ? ) -> Boolean ) ? = null
2019-07-12 01:47:17 -07:00
2019-08-30 10:05:02 -07:00
/ * *
* Starts this synchronizer within the given scope . For simplicity , attempting to start an instance that has already
* been started will throw a [ SynchronizerException . FalseStart ] exception . This reduces the complexity of managing
* resources that must be recycled . Instead , each synchronizer is designed to have a long lifespan and should be
* started from an activity , application or session .
*
* @param parentScope the scope to use for this synchronizer , typically something with a lifecycle such as an
* Activity for single - activity apps or a logged in user session . This scope is only used for launching this
* synchronzer ' s job as a child .
* /
2019-07-14 15:13:12 -07:00
override fun start ( parentScope : CoroutineScope ) : Synchronizer {
2019-08-30 10:05:02 -07:00
if ( :: coroutineScope . isInitialized ) throw SynchronizerException . FalseStart
2019-07-12 01:47:17 -07:00
// base this scope on the parent so that when the parent's job cancels, everything here cancels as well
// also use a supervisor job so that one failure doesn't bring down the whole synchronizer
coroutineScope = CoroutineScope ( SupervisorJob ( parentScope . coroutineContext [ Job ] !! ) + Dispatchers . Main )
2019-07-14 15:13:12 -07:00
// TODO: this doesn't work as intended. Refactor to improve the cancellation behavior (i.e. what happens when one job fails) by making launchTransactionMonitor throw an exception
2019-07-12 01:47:17 -07:00
coroutineScope . launch {
initWallet ( )
startSender ( this )
launchProgressMonitor ( )
launchPendingMonitor ( )
2019-07-14 15:13:12 -07:00
launchTransactionMonitor ( )
2019-07-12 01:47:17 -07:00
onReady ( )
}
2019-07-14 15:13:12 -07:00
return this
2019-07-12 01:47:17 -07:00
}
2019-08-30 10:05:02 -07:00
/ * *
* Initializes the sender such that it can initiate requests within the scope of this synchronizer .
* /
2019-07-12 01:47:17 -07:00
private fun startSender ( parentScope : CoroutineScope ) {
sender . onSubmissionError = :: onFailedSend
sender . start ( parentScope )
}
2019-08-30 10:05:02 -07:00
/ * *
* Initialize the wallet , which involves populating data tables based on the latest state of the wallet .
* /
private suspend fun initWallet ( ) = withContext ( IO ) {
2019-07-10 11:12:32 -07:00
try {
wallet . initialize ( )
} catch ( e : WalletException . AlreadyInitializedException ) {
twig ( " Warning: wallet already initialized but this is safe to ignore " +
2019-07-12 01:47:17 -07:00
" because the SDK automatically detects where to start downloading. " )
2019-07-10 11:12:32 -07:00
} catch ( f : WalletException . FalseStart ) {
if ( recoverFrom ( f ) ) {
twig ( " Warning: had a wallet init error but we recovered! " )
} else {
twig ( " Error: false start while initializing wallet! " )
}
}
}
2019-08-30 10:05:02 -07:00
// TODO: this is a work in progress. We could take drastic measures to automatically recover from certain critical
// errors and alert the user but this might be better to do at the app level, rather than SDK level.
2019-07-10 11:12:32 -07:00
private fun recoverFrom ( error : WalletException . FalseStart ) : Boolean {
if ( error . message ?. contains ( " unable to open database file " ) == true
|| error . message ?. contains ( " table blocks has no column named " ) == true ) {
//TODO: these errors are fatal and we need to delete the database and start over
twig ( " Database should be deleted and we should start over " )
}
2019-07-12 01:47:17 -07:00
return false
2019-07-10 11:12:32 -07:00
}
2019-08-30 10:05:02 -07:00
/ * *
* Stop this synchronizer and all of its child jobs . Once a synchronizer has been stopped it should not be restarted
* and attempting to do so will result in an error . Also , this function will throw an exception if the synchronizer
* was never previously started .
* /
2019-07-10 11:12:32 -07:00
override fun stop ( ) {
2019-07-12 01:47:17 -07:00
coroutineScope . cancel ( )
2019-07-10 11:12:32 -07:00
}
//
// Monitors
//
// begin the monitor that will update the balance proactively whenever we're done a large scan
2019-07-14 15:13:12 -07:00
private fun CoroutineScope . launchProgressMonitor ( ) : Job = launch {
2019-07-10 11:12:32 -07:00
twig ( " launching progress monitor " )
val progressUpdates = progress ( )
for ( progress in progressUpdates ) {
if ( progress == 100 ) {
2019-09-26 09:58:37 -07:00
twig ( " triggering a balance update because progress is complete (j/k) " )
//refreshBalance()
2019-07-10 11:12:32 -07:00
}
}
twig ( " done monitoring for progress changes " )
}
// begin the monitor that will output pending transactions into the pending channel
2019-07-14 15:13:12 -07:00
private fun CoroutineScope . launchPendingMonitor ( ) : Job = launch {
2019-07-10 11:12:32 -07:00
twig ( " launching pending monitor " )
// ask to be notified when the sender notices anything new, while attempting to send
sender . notifyOnChange ( pendingChannel )
// when those notifications come in, also update the balance
val channel = pendingChannel . openSubscription ( )
for ( pending in channel ) {
if ( balanceChannel . isClosedForSend ) break
2019-09-26 09:58:37 -07:00
twig ( " triggering a balance update because pending transactions have changed (j/kk) " )
// refreshBalance()
2019-07-10 11:12:32 -07:00
}
twig ( " done monitoring for pending changes and balance changes " )
}
2019-07-14 15:13:12 -07:00
private fun CoroutineScope . launchTransactionMonitor ( ) : Job = launch {
ledger . monitorChanges ( :: onTransactionsChanged )
}
2019-07-10 11:12:32 -07:00
2019-07-14 15:13:12 -07:00
fun onTransactionsChanged ( ) {
coroutineScope . launch {
2019-08-30 10:05:02 -07:00
twig ( " triggering a balance update because transactions have changed " )
2019-07-14 15:13:12 -07:00
refreshBalance ( )
clearedChannel . send ( ledger . getClearedTransactions ( ) )
2019-07-10 11:12:32 -07:00
}
2019-07-14 15:13:12 -07:00
twig ( " done handling changed transactions " )
2019-07-10 11:12:32 -07:00
}
suspend fun refreshBalance ( ) = withContext ( IO ) {
2019-07-14 15:13:12 -07:00
if ( ! balanceChannel . isClosedForSend ) {
balanceChannel . send ( wallet . getBalanceInfo ( ) )
} else {
twig ( " WARNING: noticed new transactions but the balance channel was closed for send so ignoring! " )
}
2019-07-10 11:12:32 -07:00
}
private fun CoroutineScope . onReady ( ) = launch ( CoroutineExceptionHandler ( :: onCriticalError ) ) {
twig ( " Synchronizer Ready. Starting processor! " )
processor . onErrorListener = :: onProcessorError
processor . start ( )
twig ( " Synchronizer onReady complete. Processor start has exited! " )
}
private fun onCriticalError ( unused : CoroutineContext , error : Throwable ) {
twig ( " ******** " )
twig ( " ******** ERROR: $error " )
if ( error . cause != null ) twig ( " ******** caused by ${error.cause} " )
if ( error . cause ?. cause != null ) twig ( " ******** caused by ${error.cause?.cause} " )
twig ( " ******** " )
2019-07-14 15:13:12 -07:00
onCriticalErrorHandler ?. invoke ( error )
2019-07-10 11:12:32 -07:00
}
2019-07-14 15:13:12 -07:00
private fun onFailedSend ( error : Throwable ) : Boolean {
twig ( " ERROR while submitting transaction: $error " )
return onSubmissionErrorHandler ?. invoke ( error ) ?. also {
if ( it ) twig ( " submission error handler signaled that we should try again! " )
} == true
2019-07-10 11:12:32 -07:00
}
2019-07-14 15:13:12 -07:00
private fun onProcessorError ( error : Throwable ) : Boolean {
twig ( " ERROR while processing data: $error " )
return onProcessorErrorHandler ?. invoke ( error ) ?. also {
if ( it ) twig ( " processor error handler signaled that we should try again! " )
} == true
2019-07-10 11:12:32 -07:00
}
//
// Channels
//
override fun balances ( ) : ReceiveChannel < Wallet . WalletBalance > {
return balanceChannel . openSubscription ( )
}
2019-08-30 10:05:02 -07:00
override fun progress ( ) : ReceiveChannel < Int > = processor . progress ( )
2019-07-10 11:12:32 -07:00
2019-07-12 01:47:17 -07:00
override fun pendingTransactions ( ) : ReceiveChannel < List < PendingTransaction > > {
2019-07-10 11:12:32 -07:00
return pendingChannel . openSubscription ( )
}
2019-07-12 01:47:17 -07:00
override fun clearedTransactions ( ) : ReceiveChannel < List < ClearedTransaction > > {
2019-07-10 11:12:32 -07:00
return clearedChannel . openSubscription ( )
}
2019-07-14 15:13:12 -07:00
override fun lastPending ( ) : List < PendingTransaction > {
2019-07-12 01:47:17 -07:00
return if ( pendingChannel . isClosedForSend ) listOf ( ) else pendingChannel . value
2019-07-10 11:12:32 -07:00
}
2019-07-14 15:13:12 -07:00
override fun lastCleared ( ) : List < ClearedTransaction > {
2019-07-12 01:47:17 -07:00
return if ( clearedChannel . isClosedForSend ) listOf ( ) else clearedChannel . value
2019-07-10 11:12:32 -07:00
}
2019-07-14 15:13:12 -07:00
override fun lastBalance ( ) : Wallet . WalletBalance {
2019-07-10 11:12:32 -07:00
return balanceChannel . value
}
//
// Send / Receive
//
2019-07-14 15:13:12 -07:00
override fun cancelSend ( transaction : SentTransaction ) : Boolean {
// not implemented
throw NotImplementedError ( " Cancellation is not yet implemented " +
" but should be pretty straight forward, using th PersistentTransactionManager " )
}
2019-07-10 11:12:32 -07:00
override suspend fun getAddress ( accountId : Int ) : String = withContext ( IO ) { wallet . getAddress ( ) }
override suspend fun sendToAddress (
zatoshi : Long ,
toAddress : String ,
memo : String ,
fromAccountId : Int
2019-07-12 01:47:17 -07:00
) : PendingTransaction = withContext ( IO ) {
2019-07-10 11:12:32 -07:00
sender . sendToAddress ( encoder , zatoshi , toAddress , memo , fromAccountId )
}
}