Create CompactBlockProcessor and refine responsibilities of collaborators

The synchronizer now primarily collaborates with a downloader, processor and repository; each with a more focused set of responsibilities.
The downloader streams blocks into a channel, the processor saves blocks from that channel and scans for transactions, the repository
exposes transaction change events.
This commit is contained in:
Kevin Gorham 2019-01-23 05:45:26 -05:00
parent a871c5e476
commit 4d226a8c5e
31 changed files with 1268 additions and 396 deletions

1
.gitignore vendored
View File

@ -12,6 +12,7 @@
.cargo/
bin/
gen/
generated/
out/
target/
jniLibs/

View File

@ -17,6 +17,7 @@ buildscript {
dependencies {
classpath 'com.android.tools.build:gradle:3.4.0-alpha05'
classpath "org.jetbrains.kotlin:kotlin-gradle-plugin:${versions.kotlin}"
classpath "org.jetbrains.kotlin:kotlin-allopen:${versions.kotlin}"
classpath "com.github.ben-manes:gradle-versions-plugin:0.20.0"
classpath 'com.github.dcendents:android-maven-gradle-plugin:2.1'
classpath "com.google.protobuf:protobuf-gradle-plugin:0.8.7"
@ -28,12 +29,13 @@ apply plugin: 'com.android.library'
apply plugin: "kotlin-android-extensions"
apply plugin: "kotlin-android"
apply plugin: 'kotlin-kapt'
apply plugin: 'kotlin-allopen'
apply plugin: 'com.google.protobuf'
apply plugin: 'com.github.ben-manes.versions'
apply plugin: 'com.github.dcendents.android-maven'
group = 'cash.z.android.wallet'
version = '1.2.4'
version = '1.4.0'
repositories {
google()
@ -46,10 +48,11 @@ android {
defaultConfig {
minSdkVersion 16
targetSdkVersion 28
versionCode = 1_03_00
versionName = "1.3.0"
versionCode = 1_04_00
versionName = version
testInstrumentationRunner = "androidx.test.runner.AndroidJUnitRunner"
multiDexEnabled false
archivesBaseName = "zcash-android-wallet-sdk-$versionName"
}
buildTypes {
@ -90,11 +93,15 @@ android {
}
}
allOpen {
// marker for classes that we want to be able to extend in debug builds for testing purposes
annotation 'cash.z.wallet.sdk.annotation.OpenClass'
}
clean {
delete "$project.projectDir/src/generated/source/grpc"
}
protobuf {
generatedFilesBaseDir = "$projectDir/src/generated/source/grpc"
protoc { artifact = 'com.google.protobuf:protoc:3.6.1' }
@ -136,10 +143,15 @@ dependencies {
implementation "io.grpc:grpc-stub:${versions.grpc}"
implementation 'javax.annotation:javax.annotation-api:1.2'
// other
implementation "com.jakewharton.timber:timber:4.7.1"
// Tests
testImplementation 'org.mockito:mockito-junit-jupiter:2.23.0'
testImplementation 'com.nhaarman.mockitokotlin2:mockito-kotlin:2.0.0'
// testImplementation 'org.mockito:mockito-junit-jupiter:2.23.0'
// testImplementation 'com.nhaarman.mockitokotlin2:mockito-kotlin:2.1.0'
testImplementation 'org.junit.jupiter:junit-jupiter-api:5.3.2'
androidTestImplementation 'com.nhaarman.mockitokotlin2:mockito-kotlin:2.1.0'
androidTestImplementation 'org.mockito:mockito-android:2.23.4'
androidTestImplementation 'org.junit.jupiter:junit-jupiter-api:5.3.2'
androidTestImplementation "androidx.test:runner:1.1.1"
androidTestImplementation "androidx.test.espresso:espresso-core:3.1.1"

View File

@ -45,7 +45,7 @@ class TransactionDaoTest {
@Test
fun testDaoInsert() {
Transaction(4, "sample".toByteArray(), 356418, null).let { transaction ->
Transaction(4L, "sample".toByteArray(), 356418, null).let { transaction ->
dao.insert(transaction)
val result = dao.findById(transaction.id)
assertEquals(transaction.id, result?.id)

View File

@ -0,0 +1,155 @@
package cash.z.wallet.sdk.data
import android.content.Context
import android.util.Log
import androidx.arch.core.executor.testing.InstantTaskExecutorRule
import androidx.test.core.app.ApplicationProvider
import cash.z.wallet.sdk.dao.BlockDao
import cash.z.wallet.sdk.dao.NoteDao
import cash.z.wallet.sdk.dao.TransactionDao
import cash.z.wallet.sdk.jni.JniConverter
import cash.z.wallet.sdk.vo.Block
import cash.z.wallet.sdk.vo.Note
import cash.z.wallet.sdk.vo.Transaction
import com.nhaarman.mockitokotlin2.any
import com.nhaarman.mockitokotlin2.atLeast
import com.nhaarman.mockitokotlin2.mock
import com.nhaarman.mockitokotlin2.verify
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import org.junit.After
import org.junit.Assert.*
import org.junit.Before
import org.junit.Rule
import org.junit.Test
import org.mockito.ArgumentMatchers.anyString
import kotlin.random.Random
internal class PollingTransactionRepositoryTest {
@get:Rule
var instantTaskExecutorRule = InstantTaskExecutorRule()
private lateinit var repository: TransactionRepository
private lateinit var noteDao: NoteDao
private lateinit var transactionDao: TransactionDao
private lateinit var blockDao: BlockDao
private val twig = TestLogTwig()
private var ids = 0L
private var heights: Int = 123_456
private lateinit var balanceProvider: Iterator<Long>
private val pollFrequency = 100L
private lateinit var converter: JniConverter
@Before
fun setUp() {
val dbName = "polling-test.db"
val context = ApplicationProvider.getApplicationContext<Context>()
converter = mock {
on { getBalance(any()) }.thenAnswer { balanceProvider.next() }
}
repository = PollingTransactionRepository(context, dbName, pollFrequency, converter, twig) { db ->
blockDao = db.blockDao()
transactionDao = db.transactionDao()
noteDao = db.noteDao()
}
}
@After
fun tearDown() {
repository.stop()
blockDao.deleteAll()
// just verify the cascading deletes are working, for sanity
assertEquals(0, blockDao.count())
assertEquals(0, transactionDao.count())
assertEquals(0, noteDao.count())
}
@Test
fun testBalancesAreDistinct() = runBlocking<Unit> {
val balanceList = listOf(1L, 1L, 2L, 2L, 3L, 3L, 4L, 4L)
val iterations = balanceList.size
balanceProvider = balanceList.iterator()
insert(6) {
repository.stop()
}
var distinctBalances = 0
val balances = repository.balance()
twig.twigTask("waiting for balance changes") {
for (balance in balances) {
twig.twig("found balance of $balance")
distinctBalances++
}
}
assertEquals(iterations, blockDao.count())
assertEquals(balanceList.distinct().size, distinctBalances)
// we at least requested the balance more times from the rust library than we got it in the channel
// (meaning the duplicates were ignored)
verify(converter, atLeast(distinctBalances + 1)).getBalance(anyString())
}
@Test
fun testTransactionsAreNotLost() = runBlocking<Unit> {
val iterations = 10
balanceProvider = List(iterations + 1) { it.toLong() }.iterator()
val transactionChannel = repository.transactions()
repository.start(this)
insert(iterations) {
repeat(iterations) {
assertNotNull("unexpected null for transaction number $it", transactionChannel.poll())
}
assertNull("transactions shouldn't remain", transactionChannel.poll())
assertEquals("incorrect number of items in DB", iterations, blockDao.count())
repository.stop()
}
}
/**
* insert [count] items, then run the code block.
*/
private fun CoroutineScope.insert(count: Int, block: suspend () -> Unit = {}) {
repeat(count) {
launch { insertItemDelayed(it * pollFrequency) }
}
launch { delay(pollFrequency * count * 2); block() }
}
private suspend fun insertItemDelayed(duration: Long) {
twig.twig("delaying $duration")
delay(duration)
val block = createBlock()
val transaction = createTransaction(block.height)
val note = createNote(transaction.id)
twig.twig("inserting note with value ${note.value}")
blockDao.insert(block)
transactionDao.insert(transaction)
noteDao.insert(note)
}
private fun createBlock(): Block {
return Block(heights++, System.currentTimeMillis().toInt(), byteArrayOf(heights.toByte()))
}
private fun createTransaction(blockId: Int): Transaction {
return Transaction(ids++, byteArrayOf(ids.toByte()), blockId, null)
}
private fun createNote(id: Long): Note {
return Note(
id.toInt(),
id.toInt(),
value = Random.nextInt(0, 10)
)
}
}
class TestLogTwig : TroubleshootingTwig(printer = { msg: String -> Log.e("TEST_LOG", msg) })

View File

@ -1,56 +0,0 @@
package cash.z.wallet.sdk.data
import androidx.arch.core.executor.testing.InstantTaskExecutorRule
import androidx.test.core.app.ApplicationProvider
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.cancel
import org.junit.AfterClass
import org.junit.Assert.assertNotNull
import org.junit.BeforeClass
import org.junit.Rule
import org.junit.Test
import cash.z.wallet.sdk.rpc.CompactFormats
class SynchronizerTest {
@get:Rule
var instantTaskExecutorRule = InstantTaskExecutorRule()
@Test
fun testSynchronizerExists() {
assertNotNull(synchronizer)
}
@Test
fun testBlockSaving() {
// synchronizer.saveBlocks()
}
@Test
fun testBlockScanning() {
Thread.sleep(180000L)
}
private fun printFailure(result: Result<CompactFormats.CompactBlock>): String {
return if (result.isFailure) "result failed due to: ${result.exceptionOrNull()!!.let { "$it caused by: ${it.cause}" }}}"
else "success"
}
companion object {
val job = Job()
val testScope = CoroutineScope(Dispatchers.IO + job)
val synchronizer = Synchronizer(ApplicationProvider.getApplicationContext(), testScope)
@BeforeClass
@JvmStatic
fun setup() {
synchronizer.start()
}
@AfterClass
@JvmStatic
fun close() {
synchronizer.stop()
testScope.cancel()
}
}
}

View File

@ -50,6 +50,13 @@ class DerivedDbIntegrationTest {
fun testCount_Note() {
assertEquals(5, notes.count())
}
@Test
fun testNoteQuery() {
val all = notes.getAll()
assertEquals(3, all.size)
}
@Test
fun testTransactionDaoPrepopulated() {
val tran = transactions.findById(1)
@ -75,7 +82,7 @@ class DerivedDbIntegrationTest {
fun setup() {
// TODO: put this database in the assets directory and open it from there via .openHelperFactory(new AssetSQLiteOpenHelperFactory()) seen here https://github.com/albertogiunta/sqliteAsset
db = Room
.databaseBuilder(ApplicationProvider.getApplicationContext(), DerivedDataDb::class.java, "dummy-data2.db")
.databaseBuilder(ApplicationProvider.getApplicationContext(), DerivedDataDb::class.java, "new-data-glue2.db")
.setJournalMode(RoomDatabase.JournalMode.TRUNCATE)
.fallbackToDestructiveMigration()
.build()

View File

@ -0,0 +1,72 @@
package cash.z.wallet.sdk.db
import androidx.test.platform.app.InstrumentationRegistry
import cash.z.wallet.sdk.data.*
import cash.z.wallet.sdk.jni.JniConverter
import cash.z.wallet.sdk.secure.Wallet
import kotlinx.coroutines.runBlocking
import org.junit.Before
import org.junit.Test
import org.junit.jupiter.api.AfterEach
/*
TODO:
setup a test that we can run and just watch things happen, to give confidence that logging is expressive enough to
verify that the SDK is behaving as expected.
*/
class IntegrationTest {
private val dataDbName = "IntegrationData.db"
private val cacheDdName = "IntegrationCache.db"
private val context = InstrumentationRegistry.getInstrumentation().context
private lateinit var downloader: CompactBlockStream
private lateinit var processor: CompactBlockProcessor
private lateinit var synchronizer: Synchronizer
private lateinit var repository: TransactionRepository
private lateinit var wallet: Wallet
@Before
fun setup() {
deleteDbs()
}
private fun deleteDbs() {
// prior to each run, delete the DBs for sanity
listOf(cacheDdName, dataDbName).map { context.getDatabasePath(it).absoluteFile }.forEach {
println("Deleting ${it.name}")
it.delete()
}
}
@Test
fun testSync() = runBlocking<Unit> {
val converter = JniConverter()
converter.initLogs()
val logger = TroubleshootingTwig()
downloader = CompactBlockStream("10.0.2.2", 9067, logger)
processor = CompactBlockProcessor(context, converter, cacheDdName, dataDbName, logger = logger)
repository = PollingTransactionRepository(context, dataDbName, 10_000L, converter, logger)
wallet = Wallet(converter, context.getDatabasePath(dataDbName).absolutePath, context.cacheDir.absolutePath, arrayOf(0), SampleSeedProvider("dummyseed"))
// repository.start(this)
synchronizer = Synchronizer(
downloader,
processor,
repository,
wallet,
logger
).start(this)
for(i in synchronizer.downloader.progress()) {
logger.twig("made progress: $i")
}
}
@AfterEach
fun tearDown() {
repository.stop()
synchronizer.stop()
}
}

View File

@ -0,0 +1,15 @@
package cash.z.wallet.sdk.annotation
@Target(AnnotationTarget.CLASS)
annotation class OpenClass
/**
* Used in conjunction with the kotlin-allopen plugin to make any class with this annotation open for extension.
* Typically, we apply this to classes that we want to mock in androidTests because unit tests don't have this problem,
* it's only an issue with JUnit4 Instrumentation tests.
*
* Note: the counterpart to this annotation in the release buildType does not apply the OpenClass annotation
*/
@OpenClass
@Target(AnnotationTarget.CLASS)
annotation class OpenForTesting

View File

@ -2,9 +2,6 @@ package cash.z.wallet.sdk.dao
import androidx.room.*
import cash.z.wallet.sdk.vo.Block
import androidx.lifecycle.LiveData
@Dao
interface BlockDao {
@ -23,4 +20,12 @@ interface BlockDao {
@Query("SELECT COUNT(height) FROM blocks")
fun count(): Int
@Query("DELETE FROM blocks")
fun deleteAll()
@Query("SELECT MAX(height) FROM blocks")
fun lastScannedHeight(): Long
@Query("UPDATE blocks SET time=:time WHERE height = :height")
fun updateTime(height: Int, time: Int)
}

View File

@ -2,6 +2,7 @@ package cash.z.wallet.sdk.dao
import androidx.room.*
import cash.z.wallet.sdk.vo.Note
import cash.z.wallet.sdk.vo.NoteQuery
@Dao
interface NoteDao {
@ -14,8 +15,23 @@ interface NoteDao {
@Query("DELETE FROM received_notes WHERE id_note = :id")
fun deleteById(id: Int)
@Query("SELECT * FROM received_notes WHERE 1")
fun getAll(): List<Note>
/**
* Query blocks, transactions and received_notes to aggregate information on send/receive
*/
@Query("""
SELECT received_notes.tx AS txId,
received_notes.value,
transactions.block AS height,
transactions.raw IS NOT NULL AS sent,
blocks.time
FROM received_notes,
transactions,
blocks
WHERE received_notes.tx = transactions.id_tx
AND blocks.height = transactions.block
ORDER BY height DESC;
""")
fun getAll(): List<NoteQuery>
@Delete
fun delete(block: Note)

View File

@ -6,19 +6,19 @@ import cash.z.wallet.sdk.vo.Transaction
@Dao
interface TransactionDao {
@Insert(onConflict = OnConflictStrategy.REPLACE)
fun insert(block: Transaction)
fun insert(transaction: Transaction)
@Query("SELECT * FROM transactions WHERE id_tx = :id")
fun findById(id: Int): Transaction?
fun findById(id: Long): Transaction?
@Query("DELETE FROM transactions WHERE id_tx = :id")
fun deleteById(id: Int)
fun deleteById(id: Long)
@Query("SELECT * FROM transactions WHERE 1")
fun getAll(): List<Transaction>
@Delete
fun delete(block: Transaction)
fun delete(transaction: Transaction)
@Query("SELECT COUNT(id_tx) FROM transactions")
fun count(): Int

View File

@ -1,141 +0,0 @@
package cash.z.wallet.sdk.data
import cash.z.wallet.sdk.ext.debug
import cash.z.wallet.sdk.ext.toBlockRange
import cash.z.wallet.sdk.rpc.CompactFormats.CompactBlock
import cash.z.wallet.sdk.rpc.CompactTxStreamerGrpc
import cash.z.wallet.sdk.rpc.CompactTxStreamerGrpc.CompactTxStreamerBlockingStub
import cash.z.wallet.sdk.rpc.Service
import io.grpc.ManagedChannelBuilder
import kotlinx.coroutines.*
import kotlinx.coroutines.Dispatchers.IO
import kotlinx.coroutines.channels.BroadcastChannel
import kotlinx.coroutines.channels.ReceiveChannel
import java.io.Closeable
/**
* Serves as a source of compact blocks received from the light wallet server. Once started, it will
* request all the appropriate blocks and then stream them into the channel returned when calling [start].
*/
class CompactBlockDownloader private constructor() {
private lateinit var connection: Connection
constructor(host: String, port: Int) : this() {
// TODO: improve the creation of this channel (tweak its settings to use mobile device responsibly) and make sure it is properly cleaned up
val channel = ManagedChannelBuilder.forAddress(host, port).usePlaintext().build()
connection = Connection(CompactTxStreamerGrpc.newBlockingStub(channel))
}
constructor(connection: Connection) : this() {
this.connection = connection
}
fun start(
scope: CoroutineScope,
startingBlockHeight: Long = Long.MAX_VALUE,
batchSize: Int = DEFAULT_BATCH_SIZE,
pollFrequencyMillis: Long = DEFAULT_POLL_INTERVAL
): ReceiveChannel<CompactBlock> {
if(connection.isClosed()) throw IllegalStateException("Cannot start downloader when connection is closed.")
scope.launch {
delay(1000L)
connection.use {
var latestBlockHeight = it.getLatestBlockHeight()
if (startingBlockHeight < latestBlockHeight) {
latestBlockHeight = it.downloadMissingBlocks(startingBlockHeight, batchSize)
}
it.streamBlocks(pollFrequencyMillis, latestBlockHeight)
}
}
return connection.subscribe()
}
fun stop() {
connection.close()
}
companion object {
const val DEFAULT_BATCH_SIZE = 10_000
const val DEFAULT_POLL_INTERVAL = 75_000L
}
class Connection(private val blockingStub: CompactTxStreamerBlockingStub): Closeable {
private var job: Job? = null
private var syncJob: Job? = null
private val compactBlockChannel = BroadcastChannel<CompactBlock>(100)
fun subscribe() = compactBlockChannel.openSubscription()
/**
* Download all the missing blocks and return the height of the last block downloaded, which can be used to
* calculate the total number of blocks downloaded.
*/
suspend fun downloadMissingBlocks(startingBlockHeight: Long, batchSize: Int = DEFAULT_BATCH_SIZE): Long {
debug("[Downloader:${System.currentTimeMillis()}] downloadingMissingBlocks starting at $startingBlockHeight")
val latestBlockHeight = getLatestBlockHeight()
var downloadedBlockHeight = startingBlockHeight
// if blocks are missing then download them
if (startingBlockHeight < latestBlockHeight) {
val missingBlockCount = latestBlockHeight - startingBlockHeight + 1
val batches = missingBlockCount / batchSize + (if (missingBlockCount.rem(batchSize) == 0L) 0 else 1)
debug("[Downloader:${System.currentTimeMillis()}] found $missingBlockCount missing blocks, downloading in $batches batches...")
for (i in 1..batches) {
val end = Math.min(startingBlockHeight + (i * batchSize), latestBlockHeight + 1)
loadBlockRange(downloadedBlockHeight..(end-1))
downloadedBlockHeight = end
}
} else {
debug("[Downloader:${System.currentTimeMillis()}] no missing blocks to download!")
}
return downloadedBlockHeight
}
suspend fun getLatestBlockHeight(): Long = withContext(IO) {
blockingStub.getLatestBlock(Service.ChainSpec.newBuilder().build()).height
}
suspend fun streamBlocks(pollFrequencyMillis: Long = DEFAULT_POLL_INTERVAL, startingBlockHeight: Long = Long.MAX_VALUE) = withContext(IO) {
debug("[Downloader:${System.currentTimeMillis()}] streamBlocks started at $startingBlockHeight with interval $pollFrequencyMillis")
// start with the next block, unless we were asked to start before then
var nextBlockHeight = Math.min(startingBlockHeight, getLatestBlockHeight() + 1)
while (isActive && !compactBlockChannel.isClosedForSend) {
debug("[Downloader:${System.currentTimeMillis()}] polling on thread ${Thread.currentThread().name} . . .")
val latestBlockHeight = getLatestBlockHeight()
if (latestBlockHeight >= nextBlockHeight) {
debug("[Downloader:${System.currentTimeMillis()}] found a new block! (latest: $latestBlockHeight) on thread ${Thread.currentThread().name}")
loadBlockRange(nextBlockHeight..latestBlockHeight)
nextBlockHeight = latestBlockHeight + 1
} else {
debug("[Downloader:${System.currentTimeMillis()}] no new block yet (latest: $latestBlockHeight) on thread ${Thread.currentThread().name}")
}
delay(pollFrequencyMillis)
}
}
suspend fun loadBlockRange(range: LongRange): Int = withContext(IO) {
debug("[Downloader:${System.currentTimeMillis()}] requesting block range $range on thread ${Thread.currentThread().name}")
val result = blockingStub.getBlockRange(range.toBlockRange())
var resultCount = 0
while (result.hasNext()) { //hasNext blocks
resultCount++
val nextBlock = result.next()
debug("[Downloader:${System.currentTimeMillis()}] received new block: ${nextBlock.height} on thread ${Thread.currentThread().name}")
compactBlockChannel.send(nextBlock)
}
resultCount
}
fun isClosed(): Boolean {
return compactBlockChannel.isClosedForSend
}
override fun close() {
compactBlockChannel.cancel()
syncJob?.cancel()
syncJob = null
job?.cancel()
job = null
}
}
}

View File

@ -0,0 +1,135 @@
package cash.z.wallet.sdk.data
import android.content.Context
import androidx.room.Room
import androidx.room.RoomDatabase
import cash.z.wallet.sdk.dao.CompactBlockDao
import cash.z.wallet.sdk.db.CompactBlockDb
import cash.z.wallet.sdk.exception.CompactBlockProcessorException
import cash.z.wallet.sdk.jni.JniConverter
import cash.z.wallet.sdk.rpc.CompactFormats
import kotlinx.coroutines.Dispatchers.IO
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.isActive
import kotlinx.coroutines.withContext
import java.io.File
import kotlin.properties.ReadOnlyProperty
import kotlin.properties.ReadWriteProperty
/**
* Responsible for processing the blocks on the stream. Saves them to the cacheDb and periodically scans for transactions.
*
* @property applicationContext used to connect to the DB on the device. No reference is kept beyond construction.
* @property seedProvider used for scanning. Later, this will be replaced by a viewing key so we don't pass the seed around.
*/
class CompactBlockProcessor(
applicationContext: Context,
val converter: JniConverter = JniConverter(),
cacheDbName: String = CACHE_DB_NAME,
dataDbName: String = DATA_DB_NAME,
seedProvider: ReadOnlyProperty<Any?, ByteArray> = SampleSeedProvider("dummyseed"),
logger: Twig = SilentTwig()
) : Twig by logger {
internal val cacheDao: CompactBlockDao
private val cacheDb: CompactBlockDb
private val cacheDbPath: String
private val dataDbPath: String
private val seed by seedProvider
var birthdayHeight = Long.MAX_VALUE
internal val dataDbExists get() = File(dataDbPath).exists()
init {
cacheDb = createCompactBlockCacheDb(applicationContext, cacheDbName)
cacheDao = cacheDb.complactBlockDao()
cacheDbPath = applicationContext.getDatabasePath(cacheDbName).absolutePath
dataDbPath = applicationContext.getDatabasePath(dataDbName).absolutePath
}
fun onFirstRun() {
twigTask("executing compactblock processor for first run: initializing data db") {
converter.initDataDb(dataDbPath)
}
// TODO: add precomputed sapling tree to DB and this will be the basis for the birthday
// val birthday = 373070L
val birthday = 394925L
birthdayHeight = birthday
twig("compactblock processor birthday set to $birthdayHeight")
}
private fun createCompactBlockCacheDb(applicationContext: Context, cacheDbName: String): CompactBlockDb {
return Room.databaseBuilder(applicationContext, CompactBlockDb::class.java, cacheDbName)
.setJournalMode(RoomDatabase.JournalMode.TRUNCATE)
// this is a simple cache of blocks. destroying the db should be benign
.fallbackToDestructiveMigration()
.build()
}
/**
* Save blocks and periodically scan them.
*/
suspend fun processBlocks(incomingBlocks: ReceiveChannel<CompactFormats.CompactBlock>) = withContext(IO) {
ensureDataDb()
twigTask("processing blocks") {
var lastScanTime = System.currentTimeMillis()
var hasScanned = false
while (isActive && !incomingBlocks.isClosedForReceive) {
twig("awaiting next block")
val nextBlock = incomingBlocks.receive()
val nextBlockHeight = nextBlock.height
twig("received block with height ${nextBlockHeight} on thread ${Thread.currentThread().name}")
if (birthdayHeight > nextBlockHeight) {
birthdayHeight = nextBlockHeight
twig("birthday initialized to $birthdayHeight")
}
cacheDao.insert(cash.z.wallet.sdk.vo.CompactBlock(nextBlockHeight.toInt(), nextBlock.toByteArray()))
if (shouldScanBlocks(lastScanTime, hasScanned)) {
twig("last block prior to scan ${nextBlockHeight}")
scanBlocks()
lastScanTime = System.currentTimeMillis()
hasScanned = true
}
}
cacheDb.close()
}
}
private fun ensureDataDb() {
if (!dataDbExists) throw CompactBlockProcessorException.DataDbMissing(dataDbPath)
}
private fun shouldScanBlocks(lastScanTime: Long, hasScanned: Boolean): Boolean {
val deltaTime = System.currentTimeMillis() - lastScanTime
twig("${deltaTime}ms since last scan. Have we ever scanned? $hasScanned")
return (!hasScanned && deltaTime > INITIAL_SCAN_DELAY)
|| deltaTime > SCAN_FREQUENCY
}
suspend fun scanBlocks() = withContext(IO) {
twigTask("scanning blocks") {
if (isActive) {
try {
converter.scanBlocks(
cacheDbPath,
dataDbPath,
seed,
birthdayHeight.toInt()
)
} catch (t: Throwable) {
twig("error while scanning blocks: $t")
}
}
}
}
companion object {
/** Default amount of time to synchronize before initiating the first scan. This allows time to download a few blocks. */
const val INITIAL_SCAN_DELAY = 3000L
/** Minimum amount of time between scans. The frequency with which we check whether the block height has changed and, if so, trigger a scan */
const val SCAN_FREQUENCY = 75_000L
const val CACHE_DB_NAME = "DownloadedCompactBlocks.db"
const val DATA_DB_NAME = "CompactBlockScanResults.db"
}
}

View File

@ -0,0 +1,207 @@
package cash.z.wallet.sdk.data
import cash.z.wallet.sdk.exception.CompactBlockStreamException
import cash.z.wallet.sdk.ext.toBlockRange
import cash.z.wallet.sdk.rpc.CompactFormats.CompactBlock
import cash.z.wallet.sdk.rpc.CompactTxStreamerGrpc
import cash.z.wallet.sdk.rpc.CompactTxStreamerGrpc.CompactTxStreamerBlockingStub
import cash.z.wallet.sdk.rpc.Service
import com.google.protobuf.ByteString
import io.grpc.Channel
import io.grpc.ManagedChannelBuilder
import kotlinx.coroutines.*
import kotlinx.coroutines.Dispatchers.IO
import kotlinx.coroutines.channels.BroadcastChannel
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.channels.distinct
import java.io.Closeable
import java.util.concurrent.TimeUnit
/**
* Serves as a source of compact blocks received from the light wallet server. Once started, it will
* request all the appropriate blocks and then stream them into the channel returned when calling [start].
*/
class CompactBlockStream private constructor(logger: Twig = SilentTwig()) : Twig by logger {
lateinit var connection: Connection
// TODO: improve the creation of this channel (tweak its settings to use mobile device responsibly) and make sure it is properly cleaned up
constructor(host: String, port: Int, logger: Twig = SilentTwig()) : this(
ManagedChannelBuilder.forAddress(host, port).usePlaintext().build(), logger
)
constructor(channel: Channel, logger: Twig = SilentTwig()) : this(logger) {
connection = Connection(channel)
}
fun start(
scope: CoroutineScope,
startingBlockHeight: Long = Long.MAX_VALUE,
batchSize: Int = DEFAULT_BATCH_SIZE,
pollFrequencyMillis: Long = DEFAULT_POLL_INTERVAL
): ReceiveChannel<CompactBlock> {
if(connection.isClosed()) throw CompactBlockStreamException.ConnectionClosed
twig("starting")
scope.launch {
twig("preparing to stream blocks...")
delay(1000L) // TODO: we can probably get rid of this delay.
try {
connection.use {
twig("requesting latest block height")
var latestBlockHeight = it.getLatestBlockHeight()
twig("responded with latest block height of $latestBlockHeight")
if (startingBlockHeight < latestBlockHeight) {
twig("downloading missing blocks from $startingBlockHeight to $latestBlockHeight")
latestBlockHeight = it.downloadMissingBlocks(startingBlockHeight, batchSize)
twig("done downloading missing blocks")
}
it.streamBlocks(pollFrequencyMillis, latestBlockHeight)
}
} finally {
stop()
}
}
return connection.subscribe()
}
fun progress() = connection.progress().distinct()
fun stop() {
twig("stopping")
connection.close()
}
companion object {
const val DEFAULT_BATCH_SIZE = 10_000
const val DEFAULT_POLL_INTERVAL = 75_000L
const val DEFAULT_RETRIES = 5
}
inner class Connection(private val channel: Channel): Closeable {
private var job: Job? = null
private var syncJob: Job? = null
private val compactBlockChannel = BroadcastChannel<CompactBlock>(100)
private val progressChannel = BroadcastChannel<Int>(100)
fun createStub(timeoutMillis: Long = 60_000L): CompactTxStreamerBlockingStub {
return CompactTxStreamerGrpc.newBlockingStub(channel).withDeadlineAfter(timeoutMillis, TimeUnit.MILLISECONDS)
}
fun subscribe() = compactBlockChannel.openSubscription()
fun progress() = progressChannel.openSubscription()
/**
* Download all the missing blocks and return the height of the last block downloaded, which can be used to
* calculate the total number of blocks downloaded.
*/
suspend fun downloadMissingBlocks(startingBlockHeight: Long, batchSize: Int = DEFAULT_BATCH_SIZE): Long {
twig("downloadingMissingBlocks starting at $startingBlockHeight")
val latestBlockHeight = getLatestBlockHeight()
var downloadedBlockHeight = startingBlockHeight
// if blocks are missing then download them
if (startingBlockHeight < latestBlockHeight) {
val missingBlockCount = latestBlockHeight - startingBlockHeight + 1
val batches = missingBlockCount / batchSize + (if (missingBlockCount.rem(batchSize) == 0L) 0 else 1)
var progress: Int
twig("found $missingBlockCount missing blocks, downloading in $batches batches...")
for (i in 1..batches) {
retryUpTo(DEFAULT_RETRIES) {
twig("beginning batch $i")
val end = Math.min(startingBlockHeight + (i * batchSize), latestBlockHeight + 1)
loadBlockRange(downloadedBlockHeight..(end-1))
progress = Math.round(i/batches.toFloat() * 100)
progressChannel.send(progress)
downloadedBlockHeight = end
twig("finished batch $i\n")
}
}
progressChannel.cancel()
} else {
twig("no missing blocks to download!")
}
return downloadedBlockHeight
}
suspend fun getLatestBlockHeight(): Long = withContext(IO) {
createStub().getLatestBlock(Service.ChainSpec.newBuilder().build()).height
}
suspend fun submitTransaction(raw: ByteArray) = withContext(IO) {
val request = Service.RawTransaction.newBuilder().setData(ByteString.copyFrom(raw)).build()
createStub().sendTransaction(request)
}
suspend fun streamBlocks(pollFrequencyMillis: Long = DEFAULT_POLL_INTERVAL, startingBlockHeight: Long = Long.MAX_VALUE) = withContext(IO) {
twig("streamBlocks started at $startingBlockHeight with interval $pollFrequencyMillis")
// start with the next block, unless we were asked to start before then
var nextBlockHeight = Math.min(startingBlockHeight, getLatestBlockHeight() + 1)
while (isActive && !compactBlockChannel.isClosedForSend) {
retryUpTo(DEFAULT_RETRIES) {
twig("polling for next block in stream on thread ${Thread.currentThread().name} . . .")
val latestBlockHeight = getLatestBlockHeight()
if (latestBlockHeight >= nextBlockHeight) {
twig("found a new block! (latest: $latestBlockHeight) on thread ${Thread.currentThread().name}")
loadBlockRange(nextBlockHeight..latestBlockHeight)
nextBlockHeight = latestBlockHeight + 1
} else {
twig("no new block yet (latest: $latestBlockHeight) on thread ${Thread.currentThread().name}")
}
twig("delaying $pollFrequencyMillis before polling for next block in stream")
delay(pollFrequencyMillis)
}
}
}
private suspend fun retryUpTo(retries: Int, initialDelay:Int = 10, block: suspend () -> Unit) {
var failedAttempts = 0
while (failedAttempts < retries) {
try {
block()
return
} catch (t: Throwable) {
failedAttempts++
if (failedAttempts >= retries) throw t
val duration = Math.pow(initialDelay.toDouble(), failedAttempts.toDouble()).toLong()
twig("failed due to $t retrying (${failedAttempts+1}/$retries) in ${duration}s...")
delay(duration)
}
}
}
suspend fun loadBlockRange(range: LongRange): Int = withContext(IO) {
twig("requesting block range $range on thread ${Thread.currentThread().name}")
val result = createStub(90_000L).getBlockRange(range.toBlockRange())
twig("done requesting block range")
var resultCount = 0
while (checkNextBlock(result)) { //calls result.hasNext, which blocks because we use a blockingStub
resultCount++
val nextBlock = result.next()
twig("...while loading block range $range, received new block ${nextBlock.height} on thread ${Thread.currentThread().name}. Sending...")
compactBlockChannel.send(nextBlock)
twig("...done sending block ${nextBlock.height}")
}
twig("done loading block range $range")
resultCount
}
/* this helper method is used to allow for logic (like logging) before blocking on the current thread */
private fun checkNextBlock(result: MutableIterator<CompactBlock>): Boolean {
twig("awaiting next block...")
return result.hasNext()
}
fun isClosed(): Boolean {
return compactBlockChannel.isClosedForSend
}
override fun close() {
compactBlockChannel.cancel()
progressChannel.cancel()
syncJob?.cancel()
syncJob = null
job?.cancel()
job = null
}
}
}

View File

@ -0,0 +1,191 @@
package cash.z.wallet.sdk.data
import android.content.Context
import androidx.room.Room
import androidx.room.RoomDatabase
import cash.z.wallet.sdk.dao.BlockDao
import cash.z.wallet.sdk.dao.NoteDao
import cash.z.wallet.sdk.dao.TransactionDao
import cash.z.wallet.sdk.db.DerivedDataDb
import cash.z.wallet.sdk.exception.RepositoryException
import cash.z.wallet.sdk.exception.RustLayerException
import cash.z.wallet.sdk.jni.JniConverter
import cash.z.wallet.sdk.vo.NoteQuery
import cash.z.wallet.sdk.vo.Transaction
import kotlinx.coroutines.*
import kotlinx.coroutines.Dispatchers.IO
import kotlinx.coroutines.channels.ConflatedBroadcastChannel
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.channels.distinct
/**
* Repository that does polling for simplicity. We will implement an alternative version that uses live data as well as
* one that creates triggers and then reference them here. For now this is the most basic example of keeping track of
* changes.
*/
open class PollingTransactionRepository(
private val derivedDataDb: DerivedDataDb,
private val derivedDataDbPath: String,
private val converter: JniConverter,
private val pollFrequencyMillis: Long = 2000L,
logger: Twig = SilentTwig()
) : TransactionRepository, Twig by logger {
/**
* Constructor that creates the database and then executes a callback on it.
*/
constructor(
context: Context,
dataDbName: String,
pollFrequencyMillis: Long = 2000L,
converter: JniConverter = JniConverter(),
logger: Twig = SilentTwig(),
dbCallback: (DerivedDataDb) -> Unit = {}
) : this(
Room.databaseBuilder(context, DerivedDataDb::class.java, dataDbName)
.setJournalMode(RoomDatabase.JournalMode.TRUNCATE)
.fallbackToDestructiveMigration()
.build(),
context.getDatabasePath(dataDbName).absolutePath,
converter,
pollFrequencyMillis,
logger
) {
dbCallback(derivedDataDb)
}
private val notes: NoteDao = derivedDataDb.noteDao()
internal val blocks: BlockDao = derivedDataDb.blockDao()
private val transactions: TransactionDao = derivedDataDb.transactionDao()
private lateinit var pollingJob: Job
private val balanceChannel = ConflatedBroadcastChannel<Long>()
private val allTransactionsChannel = ConflatedBroadcastChannel<List<NoteQuery>>()
val existingTransactions = listOf<NoteQuery>()
private val wasPreviouslyStarted
get() = !existingTransactions.isEmpty() || balanceChannel.isClosedForSend || allTransactionsChannel.isClosedForSend
override fun start(parentScope: CoroutineScope) {
// prevent restarts so the behavior of this class is easier to reason about
if (wasPreviouslyStarted) throw RepositoryException.FalseStart
twig("starting")
pollingJob = parentScope.launch {
poll()
}
}
override fun stop() {
twig("stopping")
balanceChannel.cancel()
allTransactionsChannel.cancel()
pollingJob.cancel()
}
override fun balance(): ReceiveChannel<Long> {
return balanceChannel.openSubscription().distinct()
}
override fun allTransactions(): ReceiveChannel<List<NoteQuery>> {
return allTransactionsChannel.openSubscription()
}
override fun lastScannedHeight(): Long {
return blocks.lastScannedHeight()
}
override suspend fun findTransactionById(txId: Long): Transaction? = withContext(IO) {
twig("finding transaction with id $txId on thread ${Thread.currentThread().name}")
val transaction = transactions.findById(txId)
twig("found ${transaction?.id}")
transaction
}
override suspend fun deleteTransactionById(txId: Long) = withContext(IO) {
twigTask("deleting transaction with id ${txId}") {
transactions.deleteById(txId)
}
}
private suspend fun poll() = withContext(IO) {
try {
var previousNotes: List<NoteQuery>? = null
while (isActive
&& !balanceChannel.isClosedForSend
&& !allTransactionsChannel.isClosedForSend
) {
twigTask("polling for transactions") {
val newNotes = notes.getAll()
if (hasChanged(previousNotes, newNotes)) {
twig("loaded ${notes.count()} transactions and changes were detected!")
allTransactionsChannel.send(newNotes)
sendLatestBalance()
previousNotes = newNotes
} else {
twig("loaded ${notes.count()} transactions but no changes detected.")
}
}
delay(pollFrequencyMillis)
}
} finally {
stop()
}
}
private fun hasChanged(oldNotes: List<NoteQuery>?, newNotes: List<NoteQuery>): Boolean {
// shortcuts first
if (newNotes.isEmpty() && oldNotes == null) return false // if nothing has happened, that doesn't count as a change
if (oldNotes == null) return true
if (oldNotes.size != newNotes.size) return true
for (note in newNotes) {
if (!oldNotes.contains(note)) return true
}
return false
}
// private suspend fun poll() = withContext(IO) {
// try {
// while (isActive && !transactionChannel.isClosedForSend && !balanceChannel.isClosedForSend && !allTransactionsChannel.isClosedForSend) {
// twigTask("polling for transactions") {
// val newTransactions = checkForNewTransactions()
// newTransactions?.takeUnless { it.isEmpty() }?.forEach {
// existingTransactions.union(listOf(it))
// transactionChannel.send(it)
// allTransactionsChannel.send(existingTransactions)
// }?.also {
// twig("discovered ${newTransactions?.size} transactions!")
// // only update the balance when we've had some new transactions
// sendLatestBalance()
// }
// }
// delay(pollFrequencyMillis)
// }
// } finally {
// // if the job is cancelled, it should be the same as the repository stopping.
// // otherwise, it over-complicates things and makes it harder to reason about the behavior of this class.
// stop()
// }
// }
//
// protected open fun checkForNewTransactions(): Set<NoteQuery>? {
// val notes = notes.getAll()
// twig("object $this : checking for new transactions. previousCount: ${existingTransactions.size} currentCount: ${notes.size}")
// return notes.subtract(existingTransactions)
// }
private suspend fun sendLatestBalance() = withContext(IO) {
twigTask("sending balance") {
try {
val balance = converter.getBalance(derivedDataDbPath)
twig("balance: $balance")
balanceChannel.send(balance)
} catch (t: Throwable) {
twig("failed to get balance due to $t")
throw RustLayerException.BalanceException(t)
}
}
}
}

View File

@ -0,0 +1,10 @@
package cash.z.wallet.sdk.data
import kotlin.properties.ReadOnlyProperty
import kotlin.reflect.KProperty
class SampleSeedProvider(val seedValue: String) : ReadOnlyProperty<Any?, ByteArray> {
override fun getValue(thisRef: Any?, property: KProperty<*>): ByteArray {
return seedValue.toByteArray()
}
}

View File

@ -1,68 +0,0 @@
package cash.z.wallet.sdk.data
import android.content.Context
import android.database.sqlite.SQLiteDatabase
import android.database.sqlite.SQLiteOpenHelper
class ScanResultDbCreator(context: Context) : SQLiteOpenHelper(context, DB_NAME, null, DB_VERSION) {
override fun onCreate(db: SQLiteDatabase) {
SQL_CREATE_DB.split(";").forEach { db.execSQL(it.trim()) }
}
override fun onUpgrade(db: SQLiteDatabase, oldVersion: Int, newVersion: Int) {
onCreate(db)
}
override fun onDowngrade(db: SQLiteDatabase, oldVersion: Int, newVersion: Int) {
onUpgrade(db, oldVersion, newVersion)
}
companion object {
const val DB_NAME = "ScannedBlockResults.db"
const val DB_VERSION = 1
val SQL_CREATE_DB: String = """
CREATE TABLE IF NOT EXISTS blocks (
height INTEGER PRIMARY KEY,
time INTEGER,
sapling_tree BLOB
);
CREATE TABLE IF NOT EXISTS transactions (
id_tx INTEGER PRIMARY KEY,
txid BLOB NOT NULL UNIQUE,
block INTEGER,
raw BLOB,
FOREIGN KEY (block) REFERENCES blocks(height)
);
CREATE TABLE IF NOT EXISTS received_notes (
id_note INTEGER PRIMARY KEY,
tx INTEGER NOT NULL,
output_index INTEGER NOT NULL,
account INTEGER NOT NULL,
diversifier BLOB NOT NULL,
value INTEGER NOT NULL,
rcm BLOB NOT NULL,
nf BLOB NOT NULL UNIQUE,
memo BLOB,
spent INTEGER,
FOREIGN KEY (tx) REFERENCES transactions(id_tx),
FOREIGN KEY (spent) REFERENCES transactions(id_tx),
CONSTRAINT tx_output UNIQUE (tx, output_index)
);
CREATE TABLE IF NOT EXISTS sapling_witnesses (
id_witness INTEGER PRIMARY KEY,
note INTEGER NOT NULL,
block INTEGER NOT NULL,
witness BLOB NOT NULL,
FOREIGN KEY (note) REFERENCES received_notes(id_note),
FOREIGN KEY (block) REFERENCES blocks(height),
CONSTRAINT witness_height UNIQUE (note, block)
)
""".trimIndent()
fun create(context: Context) {
val db = ScanResultDbCreator(context).writableDatabase
db.close()
}
}
}

View File

@ -0,0 +1,14 @@
package cash.z.wallet.sdk.data
import kotlin.properties.ReadWriteProperty
import kotlin.reflect.KProperty
class SimpleProvider<T>(var value: T) : ReadWriteProperty<Any?, T> {
override fun getValue(thisRef: Any?, property: KProperty<*>): T {
return value
}
override fun setValue(thisRef: Any?, property: KProperty<*>, value: T) {
this.value = value
}
}

View File

@ -1,104 +1,115 @@
package cash.z.wallet.sdk.data
import android.content.Context
import androidx.room.Room
import androidx.room.RoomDatabase
import cash.z.wallet.sdk.dao.CompactBlockDao
import cash.z.wallet.sdk.db.CompactBlockDb
import cash.z.wallet.sdk.ext.debug
import cash.z.wallet.sdk.jni.JniConverter
import cash.z.wallet.sdk.data.Synchronizer.SyncState.FirstRun
import cash.z.wallet.sdk.data.Synchronizer.SyncState.ReadyToProcess
import cash.z.wallet.sdk.exception.SynchronizerException
import cash.z.wallet.sdk.rpc.CompactFormats
import cash.z.wallet.sdk.secure.Wallet
import kotlinx.coroutines.*
import kotlinx.coroutines.Dispatchers.IO
import kotlinx.coroutines.channels.ConflatedBroadcastChannel
import kotlinx.coroutines.channels.ReceiveChannel
import cash.z.wallet.sdk.rpc.CompactFormats
/**
* Downloads compact blocks to the database and then scans them for transactions
* The glue. Downloads compact blocks to the database and then scans them for transactions. In order to serve that
* purpose, this class glues together a variety of key components. Each component contributes to the team effort of
* providing a simple source of truth to interact with.
*
* Another way of thinking about this class is the reference that demonstrates how all the pieces can be tied
* together.
*/
class Synchronizer(val applicationContext: Context, val scope: CoroutineScope, val birthday: Long = 373070L) {
class Synchronizer(
val downloader: CompactBlockStream,
val processor: CompactBlockProcessor,
val repository: TransactionRepository,
val wallet: Wallet,
val batchSize: Int = 1000,
logger: Twig = SilentTwig()
) : Twig by logger {
// TODO: convert to CompactBlockSource that just has a stream and then have the downloader operate on the stream
private val downloader = CompactBlockDownloader("10.0.2.2", 9067)
// private val downloader = CompactBlockDownloader("10.0.2.2", 9067)
private val savedBlockChannel = ConflatedBroadcastChannel<CompactFormats.CompactBlock>()
private lateinit var cacheDao: CompactBlockDao
private lateinit var cacheDb: CompactBlockDb
private lateinit var saveJob: Job
private lateinit var scanJob: Job
private lateinit var blockJob: Job
private val wasPreviouslyStarted
get() = savedBlockChannel.isClosedForSend || ::blockJob.isInitialized
fun blocks(): ReceiveChannel<CompactFormats.CompactBlock> = savedBlockChannel.openSubscription()
fun start() {
createDb()
downloader.start(scope, birthday)
saveJob = saveBlocks()
scanJob = scanBlocks()
fun start(parentScope: CoroutineScope): Synchronizer {
// prevent restarts so the behavior of this class is easier to reason about
if (wasPreviouslyStarted) throw SynchronizerException.FalseStart
twig("starting")
blockJob = parentScope.launch {
continueWithState(determineState())
}
return this
}
fun stop() {
scanJob.cancel()
saveJob.cancel()
downloader.stop()
cacheDb.close()
}
private fun createDb() {
// TODO: inject the db and dao
cacheDb = Room.databaseBuilder(
applicationContext,
CompactBlockDb::class.java,
CACHEDB_NAME
)
.setJournalMode(RoomDatabase.JournalMode.TRUNCATE)
.fallbackToDestructiveMigration()
.build()
.apply { cacheDao = complactBlockDao() }
}
private fun saveBlocks(): Job = scope.launch {
// val downloadedBlockChannel = downloader.blocks()
// while (isActive) {
// try {
// val nextBlock = downloadedBlockChannel.receive()
// cacheDao.insert(cash.z.wallet.sdk.vo.CompactBlock(nextBlock.height.toInt(), nextBlock.toByteArray()))
// async {
// savedBlockChannel.send(Result.success(nextBlock))
// debug("stored block at height: ${nextBlock.height}")
// }
// } catch (t: Throwable) {
// debug("failed to store block due to $t")
// async {
// savedBlockChannel.send(Result.failure(t))
// }
// }
//
// }
}
private fun scanBlocks(): Job = scope.launch {
val savedBlocks = blocks()
val converter = JniConverter()
converter.initLogs()
ScanResultDbCreator.create(applicationContext)
while (isActive) {
try {
debug("scanning blocks from $birthday onward...")
val nextBlock = savedBlocks.receive()
debug("...scanner observed a block (${nextBlock.height}) without crashing!")
delay(5000L)
val result = converter.scanBlocks(
applicationContext.getDatabasePath(CACHEDB_NAME).absolutePath,
applicationContext.getDatabasePath(ScanResultDbCreator.DB_NAME).absolutePath,
"dummyseed".toByteArray(),
birthday.toInt()
)
debug("scan complete")
} catch (t: Throwable) {
debug("error while scanning blocks: $t")
}
fun CoroutineScope.continueWithState(syncState: SyncState): Job {
return when (syncState) {
FirstRun -> onFirstRun()
is ReadyToProcess -> onReady(syncState)
}
}
companion object {
const val CACHEDB_NAME = "DownloadedCompactBlocks.db"
private fun CoroutineScope.onFirstRun(): Job {
twig("this appears to be a fresh install, beginning first run of application")
processor.onFirstRun()
return continueWithState(ReadyToProcess(processor.birthdayHeight))
}
private fun CoroutineScope.onReady(syncState: ReadyToProcess) = launch {
twig("synchronization is ready to begin at height ${syncState.startingBlockHeight}")
try {
// TODO: for PIR concerns, introduce some jitter here for where, exactly, the downloader starts
val blockChannel =
downloader.start(this, syncState.startingBlockHeight, batchSize)
repository.start(this)
processor.processBlocks(blockChannel)
} finally {
stop()
}
}
// TODO: get rid of this temporary helper function after syncing with the latest rust code
suspend fun updateTimeStamp(height: Int): Long? = withContext(IO) {
val originalBlock = processor.cacheDao.findById(height)
twig("TMP: found block at height ${height}")
if (originalBlock != null) {
val ogBlock = CompactFormats.CompactBlock.parseFrom(originalBlock.data)
twig("TMP: parsed block! ${ogBlock.height} ${ogBlock.time}")
(repository as PollingTransactionRepository).blocks.updateTime(height, ogBlock.time)
ogBlock.time
}
null
}
private suspend fun determineState(): SyncState = withContext(IO) {
twig("determining state (has the app run before, what block did we last see, etc.)")
val state = if (processor.dataDbExists) {
// this call blocks because it does IO
val startingBlockHeight = repository.lastScannedHeight()
twig("dataDb exists with last height of $startingBlockHeight")
if (startingBlockHeight == 0L) FirstRun else ReadyToProcess(startingBlockHeight)
} else {
FirstRun
}
twig("determined ${state::class.java.simpleName}")
state
}
fun stop() {
twig("stopping")
blockJob.cancel()
downloader.stop()
repository.stop()
}
sealed class SyncState {
object FirstRun : SyncState()
class ReadyToProcess(val startingBlockHeight: Long = Long.MAX_VALUE) : SyncState()
}
}

View File

@ -0,0 +1,17 @@
package cash.z.wallet.sdk.data
import cash.z.wallet.sdk.vo.NoteQuery
import cash.z.wallet.sdk.vo.Transaction
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.channels.ReceiveChannel
import java.math.BigDecimal
interface TransactionRepository {
fun start(parentScope: CoroutineScope)
fun stop()
fun balance(): ReceiveChannel<Long>
fun allTransactions(): ReceiveChannel<List<NoteQuery>>
fun lastScannedHeight(): Long
suspend fun findTransactionById(txId: Long): Transaction?
suspend fun deleteTransactionById(txId: Long)
}

View File

@ -0,0 +1,61 @@
package cash.z.wallet.sdk.data
import kotlin.system.measureTimeMillis
/**
* A tiny log.
*/
interface Twig {
fun twig(logMessage: String = "")
}
/**
* A tiny log that does nothing. No one hears this twig fall in the woods.
*/
class SilentTwig : Twig {
override fun twig(logMessage: String) {
// shh
}
}
/**
* A tiny log for detecting troubles. Aim at your troubles and pull the twigger.
*
* @param formatter a formatter for the twigs. The default one is pretty spiffy.
* @param printer a printer for the twigs. The default is System.err.println.
*/
open class TroubleshootingTwig(
val formatter: (String) -> String = spiffy(5),
val printer: (String) -> Any = System.err::println
) : Twig {
override fun twig(logMessage: String) {
printer(formatter(logMessage))
}
}
/**
* A tiny log task. Execute the block of code with some twigging around the outside.
*/
// for silent twigs, this adds a small amount of overhead at the call site but still avoids logging
//
// note: being an extension function (i.e. static rather than a member of the Twig interface) allows this function to be
// inlined and simplifies its use with suspend functions
// (otherwise the function and its "block" param would have to suspend)
inline fun Twig.twigTask(logMessage: String, block: () -> Unit) {
twig("$logMessage - started | on thread ${Thread.currentThread().name})")
val time = measureTimeMillis(block)
twig("$logMessage - completed | in ${time}ms on thread ${Thread.currentThread().name}")
}
/**
* A tiny log formatter that makes twigs pretty spiffy.
*
* @param stackFrame the stack frame from which we try to derive the class. This can vary depending on how the code is
* called so we expose it for flexibility. Jiggle the handle on this whenever the line numbers appear incorrect.
*/
inline fun spiffy(stackFrame: Int = 4, tag: String = "@TWIG"): (String) -> String = { logMessage: String ->
val stack = Thread.currentThread().stackTrace[stackFrame]
val time = String.format("${tag} %1\$tD %1\$tI:%1\$tM:%1\$tS.%1\$tN", System.currentTimeMillis())
val className = stack.className.split(".").lastOrNull()?.split("\$")?.firstOrNull()
"$time[$className:${stack.lineNumber}] $logMessage"
}

View File

@ -0,0 +1,40 @@
package cash.z.wallet.sdk.exception
import java.lang.RuntimeException
//TODO: rename things in here when we know what we're calling the Rust layer (librustzcash?)
/**
* Exceptions thrown in the Rust layer of the SDK. We may not always be able to surface details about this
* exception so it's important for the SDK to provide helpful messages whenever these errors are encountered.
*/
sealed class RustLayerException(message: String, cause: Throwable? = null) : RuntimeException(message, cause) {
class BalanceException(cause: Throwable) : RustLayerException("Error while requesting the current balance over " +
"JNI. This might mean that the database has been corrupted and needs to be rebuilt. Verify that " +
"blocks are not missing or have not been scanned out of order.", cause)
}
sealed class RepositoryException(message: String, cause: Throwable? = null) : RuntimeException(message, cause) {
object FalseStart: RepositoryException( "The channel is closed. Note that once a repository has stopped it " +
"cannot be restarted. Verify that the repository is not being restarted.")
}
sealed class SynchronizerException(message: String, cause: Throwable? = null) : RuntimeException(message, cause) {
object FalseStart: SynchronizerException("Once a synchronizer has stopped it cannotbe restarted. Instead, a new " +
"instance should be created.")
}
sealed class CompactBlockProcessorException(message: String, cause: Throwable? = null) : RuntimeException(message, cause) {
class DataDbMissing(path: String): CompactBlockProcessorException("No data db file found at path $path. Verify " +
"that the data DB has been initialized via `converter.initDataDb(path)`")
}
sealed class CompactBlockStreamException(message: String, cause: Throwable? = null) : RuntimeException(message, cause) {
object ConnectionClosed: CompactBlockStreamException("Cannot start stream when connection is closed.")
}
sealed class WalletException(message: String, cause: Throwable? = null) : RuntimeException(message, cause) {
object MissingParamsException : WalletException("Cannot send funds due to missing spend or output params and " +
"attempting to download them failed.")
class FetchParamsException(message: String) : WalletException("Failed to fetch params due to: $message")
}

View File

@ -1,5 +1,8 @@
package cash.z.wallet.sdk.jni
import cash.z.wallet.sdk.annotation.OpenForTesting
@OpenForTesting
class JniConverter {
external fun initDataDb(dbData: String): Boolean

View File

@ -0,0 +1,142 @@
package cash.z.wallet.sdk.secure
import cash.z.wallet.sdk.data.SilentTwig
import cash.z.wallet.sdk.data.Twig
import cash.z.wallet.sdk.data.twigTask
import cash.z.wallet.sdk.exception.WalletException
import cash.z.wallet.sdk.jni.JniConverter
import com.squareup.okhttp.OkHttpClient
import com.squareup.okhttp.Request
import kotlinx.coroutines.Dispatchers.IO
import kotlinx.coroutines.withContext
import okio.Okio
import java.io.File
import kotlin.properties.ReadOnlyProperty
/**
* Wrapper for the converter. This class basically represents all the Rust-wallet capabilities and the supporting data
* required to exercise those abilities.
*/
class Wallet(
private val converter: JniConverter,
private val dbDataPath: String,
private val paramDestinationDir: String,
/** indexes of accounts ids. In the reference wallet, we only work with account 0 */
private val accountIds: Array<Int> = arrayOf(0),
seedProvider: ReadOnlyProperty<Any?, ByteArray>,
logger: Twig = SilentTwig()
) : Twig by logger {
val seed by seedProvider
init {
// initialize data db for this wallet and its accounts
// initialize extended viewing keys for this wallet's seed and store them in the dataDb
// initialize spending keys
// call converter.initializeForSeed(seed, n) where n is the number of accounts
// get back an array of spending keys for each account. store them super securely
}
fun getBalance(accountId: Int = accountIds[0]) {
// TODO: modify request to factor in account Ids
converter.getBalance(dbDataPath)
}
// TODO: modify request to factor in account Ids
// TODO: once initializeForSeed exists then we won't need to hang onto it and use it here
suspend fun sendToAddress(value: Long, toAddress: String, fromAccountId: Int = accountIds[0]): Long =
withContext(IO) {
var result = -1L
twigTask("sending $value zatoshi to ${toAddress.masked()}") {
result = runCatching {
ensureParams(paramDestinationDir)
twig("params exist at $paramDestinationDir! attempting to send...")
converter.sendToAddress(
dbDataPath,
seed,
toAddress,
value,
// using names here so it's easier to avoid transposing them, if the function signature changes
spendParams = SPEND_PARAM_FILE_NAME.toPath(),
outputParams = OUTPUT_PARAM_FILE_NAME.toPath()
)
}.getOrDefault(result)
}
twig("result of sendToAddress: $result")
result
}
suspend fun fetchParams(destinationDir: String) = withContext(IO) {
val client = createHttpClient()
var failureMessage = ""
arrayOf(SPEND_PARAM_FILE_NAME, OUTPUT_PARAM_FILE_NAME).forEach { paramFileName ->
val url = "$CLOUD_PARAM_DIR_URL/$paramFileName"
val request = Request.Builder().url(url).build()
val response = client.newCall(request).execute()
if (response.isSuccessful) {
twig("fetch succeeded")
val file = File(destinationDir, paramFileName)
if(file.parentFile.exists()) {
twig("directory exists!")
} else {
twig("directory did not exist attempting to make it")
file.parentFile.mkdirs()
}
Okio.buffer(Okio.sink(file)).use {
twig("writing to $file")
it.writeAll(response.body().source())
}
twig("fetch succeeded, done writing $paramFileName")
} else {
failureMessage += "Error while fetching $paramFileName : $response\n"
twig(failureMessage)
}
}
if (failureMessage.isNotEmpty()) throw WalletException.FetchParamsException(failureMessage)
}
private suspend fun ensureParams(destinationDir: String) {
var hadError = false
arrayOf(SPEND_PARAM_FILE_NAME, OUTPUT_PARAM_FILE_NAME).forEach { paramFileName ->
if (!File(destinationDir, paramFileName).exists()) {
twig("ERROR: $paramFileName not found at location: $destinationDir")
hadError = true
}
}
if (hadError) {
try {
twigTask("attempting to download missing params") {
fetchParams(destinationDir)
}
} catch (e: Throwable) {
twig("failed to fetch params due to: $e")
throw WalletException.MissingParamsException
}
}
}
//
// Helpers
//
private fun createHttpClient(): OkHttpClient {
//TODO: add logging and timeouts
return OkHttpClient()
}
private fun String.masked(): String = if (startsWith("ztest")) "****${takeLast(4)}" else "***masked***"
private fun String.toPath(): String = "$paramDestinationDir/$this"
companion object {
/**
* The Url that is used by default in zcashd.
* We'll want to make this externally configurable, rather than baking it into the SDK but this will do for now,
* since we're using a cloudfront URL that already redirects.
*/
const val CLOUD_PARAM_DIR_URL = "https://z.cash/downloads/"
const val SPEND_PARAM_FILE_NAME = "sapling-spend.params"
const val OUTPUT_PARAM_FILE_NAME = "sapling-output.params"
}
}

View File

@ -3,6 +3,7 @@ package cash.z.wallet.sdk.vo
import androidx.room.ColumnInfo
import androidx.room.Entity
import androidx.room.ForeignKey
import androidx.room.Ignore
@Entity(
tableName = "received_notes",
@ -23,29 +24,29 @@ import androidx.room.ForeignKey
)
data class Note(
@ColumnInfo(name = "id_note")
val id: Int,
val id: Int = 0,
@ColumnInfo(name = "tx")
val transaction: Int,
val transaction: Int = 0,
@ColumnInfo(name = "output_index")
val outputIndex: Int,
val outputIndex: Int = 0,
val account: Int,
val value: Int,
val spent: Int?,
val account: Int = 0,
val value: Int = 0,
val spent: Int? = 0,
@ColumnInfo(typeAffinity = ColumnInfo.BLOB)
val diversifier: ByteArray,
val diversifier: ByteArray = byteArrayOf(),
@ColumnInfo(typeAffinity = ColumnInfo.BLOB)
val rcm: ByteArray,
val rcm: ByteArray = byteArrayOf(),
@ColumnInfo(typeAffinity = ColumnInfo.BLOB)
val nf: ByteArray,
val nf: ByteArray = byteArrayOf(),
@ColumnInfo(typeAffinity = ColumnInfo.BLOB)
val memo: ByteArray?
val memo: ByteArray? = byteArrayOf()
) {
override fun equals(other: Any?): Boolean {
if (this === other) return true
@ -77,4 +78,6 @@ data class Note(
return result
}
}
}
data class NoteQuery(val txId: Int, val value: Int, val height: Int, val sent: Boolean, val time: Long)

View File

@ -16,7 +16,7 @@ import org.jetbrains.annotations.NotNull
)
data class Transaction(
@ColumnInfo(name = "id_tx")
val id: Int,
val id: Long,
@ColumnInfo(typeAffinity = ColumnInfo.BLOB, name = "txid")
@NotNull
@ -38,7 +38,7 @@ data class Transaction(
}
override fun hashCode(): Int {
var result = id
var result = id.toInt()
result = 31 * result + transactionId.contentHashCode()
result = 31 * result + block
result = 31 * result + (raw?.contentHashCode() ?: 0)

View File

@ -0,0 +1,11 @@
package cash.z.wallet.sdk.annotation
/**
* Used in conjunction with the kotlin-allopen plugin to make any class with this annotation open for extension.
* Typically, we apply this to classes that we want to mock in androidTests because unit tests don't have this problem,
* it's only an issue with JUnit4 Instrumentation tests. This annotation is only leveraged in debug builds.
*
* Note: the counterpart to this annotation in the debug buildType applies the OpenClass annotation but here we do not.
*/
@Target(AnnotationTarget.CLASS)
annotation class OpenForTesting

View File

@ -6,6 +6,6 @@ import org.mockito.Mockito
* Use in place of `any()` to fix the issue with mockito `any` returning null (so you can't pass it to functions that
* take a non-null param)
*
* TODO: perhaps submit this to the mockito kotlin project
* TODO: perhaps submit this function to the mockito kotlin project because it allows the use of non-null 'any()'
*/
internal fun <T> anyNotNull() = Mockito.any<T>() as T

View File

@ -6,10 +6,10 @@ import io.grpc.ManagedChannelBuilder
import org.junit.jupiter.api.*
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Assertions.assertNotNull
import rpc.CompactTxStreamerGrpc
import rpc.Service
import rpc.Service.BlockID
import rpc.Service.BlockRange
import cash.z.wallet.sdk.rpc.CompactTxStreamerGrpc
import cash.z.wallet.sdk.rpc.Service
import cash.z.wallet.sdk.rpc.Service.BlockID
import cash.z.wallet.sdk.rpc.Service.BlockRange
import java.util.concurrent.TimeUnit
class GlueTest {

View File

@ -8,9 +8,9 @@ import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Assertions.assertNotNull
import org.junit.jupiter.api.BeforeAll
import org.junit.jupiter.api.Test
import rpc.CompactTxStreamerGrpc
import rpc.Service
import rpc.Service.*
import cash.z.wallet.sdk.rpc.CompactTxStreamerGrpc
import cash.z.wallet.sdk.rpc.Service
import cash.z.wallet.sdk.rpc.Service.*
import rpc.WalletDataOuterClass
import java.util.concurrent.TimeUnit

View File

@ -2,31 +2,39 @@ package cash.z.wallet.sdk.data
import cash.z.wallet.anyNotNull
import cash.z.wallet.sdk.ext.toBlockHeight
import cash.z.wallet.sdk.rpc.CompactFormats
import cash.z.wallet.sdk.rpc.CompactTxStreamerGrpc.CompactTxStreamerBlockingStub
import cash.z.wallet.sdk.rpc.Service
import com.nhaarman.mockitokotlin2.*
import kotlinx.coroutines.*
import org.junit.jupiter.api.*
import org.junit.jupiter.api.AfterEach
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Assertions.assertTrue
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.extension.ExtendWith
import org.mockito.ArgumentMatchers.any
import org.mockito.Mock
import org.mockito.junit.jupiter.MockitoExtension
import org.mockito.junit.jupiter.MockitoSettings
import org.mockito.quality.Strictness
import rpc.CompactFormats
import rpc.CompactTxStreamerGrpc.CompactTxStreamerBlockingStub
import rpc.Service
import kotlin.system.measureTimeMillis
import org.junit.Rule
@ExtendWith(MockitoExtension::class)
@MockitoSettings(strictness = Strictness.LENIENT) // allows us to setup the blockingStub once, with everything, rather than using custom stubs for each test
class CompactBlockDownloaderTest {
lateinit var downloader: CompactBlockDownloader
lateinit var connection: CompactBlockDownloader.Connection
lateinit var downloader: CompactBlockStream
lateinit var connection: CompactBlockStream.Connection
val job = Job()
val io = CoroutineScope(Dispatchers.IO + job)
@Rule
var grpcServerRule = GrpcServerRule()
@BeforeEach
fun setUp(@Mock blockingStub: CompactTxStreamerBlockingStub) {
whenever(blockingStub.getLatestBlock(any())).doAnswer {
@ -53,8 +61,9 @@ class CompactBlockDownloaderTest {
}
delayedIterator
}
connection = spy(CompactBlockDownloader.Connection(blockingStub))
downloader = CompactBlockDownloader(connection)
downloader = CompactBlockStream(grpcServerRule.channel, TroubleshootingTwig())
connection = spy(downloader.connection)
whenever(connection.createStub(any())).thenReturn(blockingStub)
}
@AfterEach