[#629] Stream blocks during download

This change leverages the gRPC streaming API along with the elimination of bulk inserts to SQLite to reduce the amount of memory required to write blocks.
This commit is contained in:
Carter Jernigan 2022-07-28 08:39:48 -04:00 committed by Carter Jernigan
parent 98e22f6029
commit 3535ba905e
10 changed files with 44 additions and 19 deletions

View File

@ -97,7 +97,7 @@ class SampleCodeTest {
val blockRange = BlockHeight.new(ZcashNetwork.Mainnet, 500_000)..BlockHeight.new(ZcashNetwork.Mainnet, 500_009)
val lightwalletService = LightWalletGrpcService(context, lightwalletdHost)
val blocks = lightwalletService.getBlockRange(blockRange)
assertEquals(blockRange.endInclusive.value - blockRange.start.value, blocks.size)
assertEquals(blockRange.endInclusive.value - blockRange.start.value, blocks.count())
blocks.forEachIndexed { i, block ->
log("Block #$i: height:${block.height} hash:${block.hash.toByteArray().toHex()}")

View File

@ -31,9 +31,10 @@ class GetBlockRangeFragment : BaseDemoFragment<FragmentGetBlockRangeBinding>() {
val fetchDelta = System.currentTimeMillis() - start
// Note: This is a demo so we won't worry about iterating efficiently over these blocks
// Note: Converting the blocks sequence to a list can consume a lot of memory and may
// cause OOM.
binding.textInfo.text = Html.fromHtml(
blocks?.run {
blocks?.toList()?.run {
val count = size
val emptyCount = count { it.vtxCount == 0 }
val maxTxs = maxByOrNull { it.vtxCount }
@ -45,9 +46,9 @@ class GetBlockRangeFragment : BaseDemoFragment<FragmentGetBlockRangeBinding>() {
block.vtxList.maxOfOrNull { it.outputsCount } ?: -1
}
val maxOutTx = maxOuts?.vtxList?.maxByOrNull { it.outputsCount }
val txCount = sumBy { it.vtxCount }
val outCount = sumBy { block -> block.vtxList.sumBy { it.outputsCount } }
val inCount = sumBy { block -> block.vtxList.sumBy { it.spendsCount } }
val txCount = sumOf { it.vtxCount }
val outCount = sumOf { block -> block.vtxList.sumOf { it.outputsCount } }
val inCount = sumOf { block -> block.vtxList.sumOf { it.spendsCount } }
val processTime = System.currentTimeMillis() - start - fetchDelta
@Suppress("MaxLineLength")

View File

@ -107,7 +107,7 @@ class SanityTest(
fun testSingleBlockDownload() = runBlocking {
// fetch block directly because the synchronizer hasn't started, yet
val height = BlockHeight.new(wallet.network, 1_000_000)
val block = wallet.service.getBlockRange(height..height)[0]
val block = wallet.service.getBlockRange(height..height).first()
assertTrue("$networkName failed to return a proper block. Height was ${block.height} but we expected $height", block.height == height.value)
}

View File

@ -5,7 +5,7 @@ import androidx.room.Entity
@Entity(primaryKeys = ["height"], tableName = "compactblocks")
data class CompactBlockEntity(
val height: Int,
val height: Long,
@ColumnInfo(typeAffinity = ColumnInfo.BLOB)
val data: ByteArray
) {
@ -20,7 +20,7 @@ data class CompactBlockEntity(
}
override fun hashCode(): Int {
var result = height
var result = height.hashCode()
result = 31 * result + data.contentHashCode()
return result
}

View File

@ -30,8 +30,8 @@ class CompactBlockDbStore private constructor(
override suspend fun findCompactBlock(height: BlockHeight): CompactFormats.CompactBlock? =
cacheDao.findCompactBlock(height.value)?.let { CompactFormats.CompactBlock.parseFrom(it) }
override suspend fun write(result: List<CompactFormats.CompactBlock>) =
cacheDao.insert(result.map { CompactBlockEntity(it.height.toInt(), it.toByteArray()) })
override suspend fun write(result: Sequence<CompactFormats.CompactBlock>) =
cacheDao.insert(result.map { CompactBlockEntity(it.height, it.toByteArray()) })
override suspend fun rewindTo(height: BlockHeight) =
cacheDao.rewindTo(height.value)
@ -47,7 +47,11 @@ class CompactBlockDbStore private constructor(
* @param appContext the application context. This is used for creating the database.
* @property dbPath the absolute path to the database.
*/
fun new(appContext: Context, zcashNetwork: ZcashNetwork, dbPath: String): CompactBlockDbStore {
fun new(
appContext: Context,
zcashNetwork: ZcashNetwork,
dbPath: String
): CompactBlockDbStore {
val cacheDb = createCompactBlockCacheDb(appContext.applicationContext, dbPath)
return CompactBlockDbStore(zcashNetwork, cacheDb)

View File

@ -47,7 +47,6 @@ open class CompactBlockDownloader private constructor(val compactBlockStore: Com
suspend fun downloadBlockRange(heightRange: ClosedRange<BlockHeight>): Int = withContext(IO) {
val result = lightWalletService.getBlockRange(heightRange)
compactBlockStore.write(result)
result.size
}
/**

View File

@ -25,8 +25,9 @@ interface CompactBlockStore {
* Write the given blocks to this store, which may be anything from an in-memory cache to a DB.
*
* @param result the list of compact blocks to persist.
* @return Number of blocks that were written.
*/
suspend fun write(result: List<CompactFormats.CompactBlock>)
suspend fun write(result: Sequence<CompactFormats.CompactBlock>): Int
/**
* Remove every block above the given height.

View File

@ -6,6 +6,7 @@ import androidx.room.Insert
import androidx.room.OnConflictStrategy
import androidx.room.Query
import androidx.room.RoomDatabase
import androidx.room.Transaction
import cash.z.ecc.android.sdk.db.entity.CompactBlockEntity
//
@ -42,6 +43,18 @@ interface CompactBlockDao {
@Insert(onConflict = OnConflictStrategy.REPLACE)
suspend fun insert(block: List<CompactBlockEntity>)
@Transaction
suspend fun insert(blocks: Sequence<CompactBlockEntity>): Int {
var count = 0
blocks.forEach {
insert(it)
count++
}
return count
}
@Query("DELETE FROM compactblocks WHERE height > :height")
suspend fun rewindTo(height: Long)

View File

@ -67,11 +67,13 @@ class LightWalletGrpcService private constructor(
/* LightWalletService implementation */
override fun getBlockRange(heightRange: ClosedRange<BlockHeight>): List<CompactFormats.CompactBlock> {
if (heightRange.isEmpty()) return listOf()
override fun getBlockRange(heightRange: ClosedRange<BlockHeight>): Sequence<CompactFormats.CompactBlock> {
if (heightRange.isEmpty()) {
return emptySequence()
}
return requireChannel().createStub(streamingRequestTimeout)
.getBlockRange(heightRange.toBlockRange()).toList()
.getBlockRange(heightRange.toBlockRange()).iterator().asSequence()
}
override fun getLatestBlockHeight(): BlockHeight {
@ -161,7 +163,12 @@ class LightWalletGrpcService private constructor(
new
}
channel.resetConnectBackoff()
twig("getting channel isShutdown: ${channel.isShutdown} isTerminated: ${channel.isTerminated} getState: $state stateCount: $stateCount", -1)
twig(
"getting channel isShutdown: ${channel.isShutdown} " +
"isTerminated: ${channel.isTerminated} " +
"getState: $state stateCount: $stateCount",
-1
)
return channel
}

View File

@ -36,7 +36,7 @@ interface LightWalletService {
* @return a list of compact blocks for the given range
*
*/
fun getBlockRange(heightRange: ClosedRange<BlockHeight>): List<CompactFormats.CompactBlock>
fun getBlockRange(heightRange: ClosedRange<BlockHeight>): Sequence<CompactFormats.CompactBlock>
/**
* Return the latest block height known to the service.