From 3535ba905ed4b0e73143ae37996dbef1ee40bf6b Mon Sep 17 00:00:00 2001 From: Carter Jernigan Date: Thu, 28 Jul 2022 08:39:48 -0400 Subject: [PATCH] [#629] Stream blocks during download This change leverages the gRPC streaming API along with the elimination of bulk inserts to SQLite to reduce the amount of memory required to write blocks. --- .../z/wallet/sdk/sample/demoapp/SampleCodeTest.kt | 2 +- .../demos/getblockrange/GetBlockRangeFragment.kt | 11 ++++++----- .../z/ecc/android/sdk/integration/SanityTest.kt | 2 +- .../android/sdk/db/entity/CompactBlockEntity.kt | 4 ++-- .../sdk/internal/block/CompactBlockDbStore.kt | 10 +++++++--- .../sdk/internal/block/CompactBlockDownloader.kt | 1 - .../sdk/internal/block/CompactBlockStore.kt | 3 ++- .../ecc/android/sdk/internal/db/CompactBlockDb.kt | 13 +++++++++++++ .../internal/service/LightWalletGrpcService.kt | 15 +++++++++++---- .../sdk/internal/service/LightWalletService.kt | 2 +- 10 files changed, 44 insertions(+), 19 deletions(-) diff --git a/demo-app/src/androidTest/java/cash/z/wallet/sdk/sample/demoapp/SampleCodeTest.kt b/demo-app/src/androidTest/java/cash/z/wallet/sdk/sample/demoapp/SampleCodeTest.kt index 01489821..55a2e5eb 100644 --- a/demo-app/src/androidTest/java/cash/z/wallet/sdk/sample/demoapp/SampleCodeTest.kt +++ b/demo-app/src/androidTest/java/cash/z/wallet/sdk/sample/demoapp/SampleCodeTest.kt @@ -97,7 +97,7 @@ class SampleCodeTest { val blockRange = BlockHeight.new(ZcashNetwork.Mainnet, 500_000)..BlockHeight.new(ZcashNetwork.Mainnet, 500_009) val lightwalletService = LightWalletGrpcService(context, lightwalletdHost) val blocks = lightwalletService.getBlockRange(blockRange) - assertEquals(blockRange.endInclusive.value - blockRange.start.value, blocks.size) + assertEquals(blockRange.endInclusive.value - blockRange.start.value, blocks.count()) blocks.forEachIndexed { i, block -> log("Block #$i: height:${block.height} hash:${block.hash.toByteArray().toHex()}") diff --git a/demo-app/src/main/java/cash/z/ecc/android/sdk/demoapp/demos/getblockrange/GetBlockRangeFragment.kt b/demo-app/src/main/java/cash/z/ecc/android/sdk/demoapp/demos/getblockrange/GetBlockRangeFragment.kt index 99bd570a..5e3dd1eb 100644 --- a/demo-app/src/main/java/cash/z/ecc/android/sdk/demoapp/demos/getblockrange/GetBlockRangeFragment.kt +++ b/demo-app/src/main/java/cash/z/ecc/android/sdk/demoapp/demos/getblockrange/GetBlockRangeFragment.kt @@ -31,9 +31,10 @@ class GetBlockRangeFragment : BaseDemoFragment() { val fetchDelta = System.currentTimeMillis() - start // Note: This is a demo so we won't worry about iterating efficiently over these blocks - + // Note: Converting the blocks sequence to a list can consume a lot of memory and may + // cause OOM. binding.textInfo.text = Html.fromHtml( - blocks?.run { + blocks?.toList()?.run { val count = size val emptyCount = count { it.vtxCount == 0 } val maxTxs = maxByOrNull { it.vtxCount } @@ -45,9 +46,9 @@ class GetBlockRangeFragment : BaseDemoFragment() { block.vtxList.maxOfOrNull { it.outputsCount } ?: -1 } val maxOutTx = maxOuts?.vtxList?.maxByOrNull { it.outputsCount } - val txCount = sumBy { it.vtxCount } - val outCount = sumBy { block -> block.vtxList.sumBy { it.outputsCount } } - val inCount = sumBy { block -> block.vtxList.sumBy { it.spendsCount } } + val txCount = sumOf { it.vtxCount } + val outCount = sumOf { block -> block.vtxList.sumOf { it.outputsCount } } + val inCount = sumOf { block -> block.vtxList.sumOf { it.spendsCount } } val processTime = System.currentTimeMillis() - start - fetchDelta @Suppress("MaxLineLength") diff --git a/sdk-lib/src/androidTest/java/cash/z/ecc/android/sdk/integration/SanityTest.kt b/sdk-lib/src/androidTest/java/cash/z/ecc/android/sdk/integration/SanityTest.kt index 3f9865c3..dd8ba2f7 100644 --- a/sdk-lib/src/androidTest/java/cash/z/ecc/android/sdk/integration/SanityTest.kt +++ b/sdk-lib/src/androidTest/java/cash/z/ecc/android/sdk/integration/SanityTest.kt @@ -107,7 +107,7 @@ class SanityTest( fun testSingleBlockDownload() = runBlocking { // fetch block directly because the synchronizer hasn't started, yet val height = BlockHeight.new(wallet.network, 1_000_000) - val block = wallet.service.getBlockRange(height..height)[0] + val block = wallet.service.getBlockRange(height..height).first() assertTrue("$networkName failed to return a proper block. Height was ${block.height} but we expected $height", block.height == height.value) } diff --git a/sdk-lib/src/main/java/cash/z/ecc/android/sdk/db/entity/CompactBlockEntity.kt b/sdk-lib/src/main/java/cash/z/ecc/android/sdk/db/entity/CompactBlockEntity.kt index e22e4410..50fd9495 100644 --- a/sdk-lib/src/main/java/cash/z/ecc/android/sdk/db/entity/CompactBlockEntity.kt +++ b/sdk-lib/src/main/java/cash/z/ecc/android/sdk/db/entity/CompactBlockEntity.kt @@ -5,7 +5,7 @@ import androidx.room.Entity @Entity(primaryKeys = ["height"], tableName = "compactblocks") data class CompactBlockEntity( - val height: Int, + val height: Long, @ColumnInfo(typeAffinity = ColumnInfo.BLOB) val data: ByteArray ) { @@ -20,7 +20,7 @@ data class CompactBlockEntity( } override fun hashCode(): Int { - var result = height + var result = height.hashCode() result = 31 * result + data.contentHashCode() return result } diff --git a/sdk-lib/src/main/java/cash/z/ecc/android/sdk/internal/block/CompactBlockDbStore.kt b/sdk-lib/src/main/java/cash/z/ecc/android/sdk/internal/block/CompactBlockDbStore.kt index 812cdc43..4c5d7560 100644 --- a/sdk-lib/src/main/java/cash/z/ecc/android/sdk/internal/block/CompactBlockDbStore.kt +++ b/sdk-lib/src/main/java/cash/z/ecc/android/sdk/internal/block/CompactBlockDbStore.kt @@ -30,8 +30,8 @@ class CompactBlockDbStore private constructor( override suspend fun findCompactBlock(height: BlockHeight): CompactFormats.CompactBlock? = cacheDao.findCompactBlock(height.value)?.let { CompactFormats.CompactBlock.parseFrom(it) } - override suspend fun write(result: List) = - cacheDao.insert(result.map { CompactBlockEntity(it.height.toInt(), it.toByteArray()) }) + override suspend fun write(result: Sequence) = + cacheDao.insert(result.map { CompactBlockEntity(it.height, it.toByteArray()) }) override suspend fun rewindTo(height: BlockHeight) = cacheDao.rewindTo(height.value) @@ -47,7 +47,11 @@ class CompactBlockDbStore private constructor( * @param appContext the application context. This is used for creating the database. * @property dbPath the absolute path to the database. */ - fun new(appContext: Context, zcashNetwork: ZcashNetwork, dbPath: String): CompactBlockDbStore { + fun new( + appContext: Context, + zcashNetwork: ZcashNetwork, + dbPath: String + ): CompactBlockDbStore { val cacheDb = createCompactBlockCacheDb(appContext.applicationContext, dbPath) return CompactBlockDbStore(zcashNetwork, cacheDb) diff --git a/sdk-lib/src/main/java/cash/z/ecc/android/sdk/internal/block/CompactBlockDownloader.kt b/sdk-lib/src/main/java/cash/z/ecc/android/sdk/internal/block/CompactBlockDownloader.kt index bdcdce18..92fdc519 100644 --- a/sdk-lib/src/main/java/cash/z/ecc/android/sdk/internal/block/CompactBlockDownloader.kt +++ b/sdk-lib/src/main/java/cash/z/ecc/android/sdk/internal/block/CompactBlockDownloader.kt @@ -47,7 +47,6 @@ open class CompactBlockDownloader private constructor(val compactBlockStore: Com suspend fun downloadBlockRange(heightRange: ClosedRange): Int = withContext(IO) { val result = lightWalletService.getBlockRange(heightRange) compactBlockStore.write(result) - result.size } /** diff --git a/sdk-lib/src/main/java/cash/z/ecc/android/sdk/internal/block/CompactBlockStore.kt b/sdk-lib/src/main/java/cash/z/ecc/android/sdk/internal/block/CompactBlockStore.kt index f0be0166..ba7123e2 100644 --- a/sdk-lib/src/main/java/cash/z/ecc/android/sdk/internal/block/CompactBlockStore.kt +++ b/sdk-lib/src/main/java/cash/z/ecc/android/sdk/internal/block/CompactBlockStore.kt @@ -25,8 +25,9 @@ interface CompactBlockStore { * Write the given blocks to this store, which may be anything from an in-memory cache to a DB. * * @param result the list of compact blocks to persist. + * @return Number of blocks that were written. */ - suspend fun write(result: List) + suspend fun write(result: Sequence): Int /** * Remove every block above the given height. diff --git a/sdk-lib/src/main/java/cash/z/ecc/android/sdk/internal/db/CompactBlockDb.kt b/sdk-lib/src/main/java/cash/z/ecc/android/sdk/internal/db/CompactBlockDb.kt index fedccf80..6fcdbc97 100644 --- a/sdk-lib/src/main/java/cash/z/ecc/android/sdk/internal/db/CompactBlockDb.kt +++ b/sdk-lib/src/main/java/cash/z/ecc/android/sdk/internal/db/CompactBlockDb.kt @@ -6,6 +6,7 @@ import androidx.room.Insert import androidx.room.OnConflictStrategy import androidx.room.Query import androidx.room.RoomDatabase +import androidx.room.Transaction import cash.z.ecc.android.sdk.db.entity.CompactBlockEntity // @@ -42,6 +43,18 @@ interface CompactBlockDao { @Insert(onConflict = OnConflictStrategy.REPLACE) suspend fun insert(block: List) + @Transaction + suspend fun insert(blocks: Sequence): Int { + var count = 0 + + blocks.forEach { + insert(it) + count++ + } + + return count + } + @Query("DELETE FROM compactblocks WHERE height > :height") suspend fun rewindTo(height: Long) diff --git a/sdk-lib/src/main/java/cash/z/ecc/android/sdk/internal/service/LightWalletGrpcService.kt b/sdk-lib/src/main/java/cash/z/ecc/android/sdk/internal/service/LightWalletGrpcService.kt index 5768b781..8241e412 100644 --- a/sdk-lib/src/main/java/cash/z/ecc/android/sdk/internal/service/LightWalletGrpcService.kt +++ b/sdk-lib/src/main/java/cash/z/ecc/android/sdk/internal/service/LightWalletGrpcService.kt @@ -67,11 +67,13 @@ class LightWalletGrpcService private constructor( /* LightWalletService implementation */ - override fun getBlockRange(heightRange: ClosedRange): List { - if (heightRange.isEmpty()) return listOf() + override fun getBlockRange(heightRange: ClosedRange): Sequence { + if (heightRange.isEmpty()) { + return emptySequence() + } return requireChannel().createStub(streamingRequestTimeout) - .getBlockRange(heightRange.toBlockRange()).toList() + .getBlockRange(heightRange.toBlockRange()).iterator().asSequence() } override fun getLatestBlockHeight(): BlockHeight { @@ -161,7 +163,12 @@ class LightWalletGrpcService private constructor( new } channel.resetConnectBackoff() - twig("getting channel isShutdown: ${channel.isShutdown} isTerminated: ${channel.isTerminated} getState: $state stateCount: $stateCount", -1) + twig( + "getting channel isShutdown: ${channel.isShutdown} " + + "isTerminated: ${channel.isTerminated} " + + "getState: $state stateCount: $stateCount", + -1 + ) return channel } diff --git a/sdk-lib/src/main/java/cash/z/ecc/android/sdk/internal/service/LightWalletService.kt b/sdk-lib/src/main/java/cash/z/ecc/android/sdk/internal/service/LightWalletService.kt index bb188856..d8a48e77 100644 --- a/sdk-lib/src/main/java/cash/z/ecc/android/sdk/internal/service/LightWalletService.kt +++ b/sdk-lib/src/main/java/cash/z/ecc/android/sdk/internal/service/LightWalletService.kt @@ -36,7 +36,7 @@ interface LightWalletService { * @return a list of compact blocks for the given range * */ - fun getBlockRange(heightRange: ClosedRange): List + fun getBlockRange(heightRange: ClosedRange): Sequence /** * Return the latest block height known to the service.