Batch data store writes of `put_block`

Instead of calling `put_block` for each block scanned,
`scan_cached_blocks` will now defer the block writes until the scan of a
batch is complete and will perform the block writes and note commitment
tree updates all within a single transaction.

This should ordinarily be fine in terms of memory consumption, because
the block data being saved is pruned to only that spend an output
information that is related to transactions in the wallet, which will
normally be sparse enough that the block range size that is appropriate
for a given platform to run within a batch scanner adequately bounds the
memory consumption of this pruned representation.
This commit is contained in:
Kris Nuttycombe 2023-07-05 23:08:00 -06:00
parent 82705a4ae4
commit 1b5a24a655
5 changed files with 62 additions and 46 deletions

View File

@ -15,9 +15,9 @@ and this library adheres to Rust's notion of
- `NullifierQuery` for use with `WalletRead::get_sapling_nullifiers`
- `ScannedBlock`
- `ShieldedProtocol`
- `WalletRead::{block_metadata, block_fully_scanned, suggest_scan_ranges}`
- `WalletWrite::put_block`
- `WalletCommitmentTrees`
- `WalletRead::{block_metadata, block_fully_scanned, suggest_scan_ranges}`
- `WalletWrite::put_blocks`
- `chain::CommitmentTreeRoot`
- `testing::MockWalletDb::new`
- `wallet::input_sellection::Proposal::{min_target_height, min_anchor_height}`:
@ -75,7 +75,7 @@ and this library adheres to Rust's notion of
- `WalletRead::{get_commitment_tree, get_witnesses}` have been removed
without replacement. The utility of these methods is now subsumed
by those available from the `WalletCommitmentTrees` trait.
- `WalletWrite::advance_by_block` (use `WalletWrite::put_block` instead).
- `WalletWrite::advance_by_block` (use `WalletWrite::put_blocks` instead).
- `PrunedBlock` has been replaced by `ScannedBlock`
- `testing::MockWalletDb`, which is available under the `test-dependencies`
feature flag, has been modified by the addition of a `sapling_tree` property.

View File

@ -494,9 +494,9 @@ pub trait WalletWrite: WalletRead {
/// Updates the state of the wallet database by persisting the provided block information,
/// along with the note commitments that were detected when scanning the block for transactions
/// pertaining to this wallet.
fn put_block(
fn put_blocks(
&mut self,
block: ScannedBlock<sapling::Nullifier>,
block: Vec<ScannedBlock<sapling::Nullifier>>,
) -> Result<Vec<Self::NoteRef>, Self::Error>;
/// Caches a decrypted transaction in the persistent wallet store.
@ -771,9 +771,9 @@ pub mod testing {
}
#[allow(clippy::type_complexity)]
fn put_block(
fn put_blocks(
&mut self,
_block: ScannedBlock<sapling::Nullifier>,
_blocks: Vec<ScannedBlock<sapling::Nullifier>>,
) -> Result<Vec<Self::NoteRef>, Self::Error> {
Ok(vec![])
}

View File

@ -237,6 +237,7 @@ where
batch_runner.flush();
let mut scanned_blocks = vec![];
block_source.with_blocks::<_, DbT::Error>(
Some(from_height),
Some(limit),
@ -265,11 +266,12 @@ where
}));
prior_block_metadata = Some(*scanned_block.metadata());
data_db.put_block(scanned_block).map_err(Error::Wallet)?;
scanned_blocks.push(scanned_block);
Ok(())
},
)?;
data_db.put_blocks(scanned_blocks).map_err(Error::Wallet)?;
Ok(())
}

View File

@ -394,13 +394,24 @@ impl<P: consensus::Parameters> WalletWrite for WalletDb<rusqlite::Connection, P>
)
}
#[tracing::instrument(skip_all, fields(height = u32::from(block.height())))]
#[tracing::instrument(skip_all, fields(height = blocks.first().map(|b| u32::from(b.height()))))]
#[allow(clippy::type_complexity)]
fn put_block(
fn put_blocks(
&mut self,
block: ScannedBlock<sapling::Nullifier>,
blocks: Vec<ScannedBlock<sapling::Nullifier>>,
) -> Result<Vec<Self::NoteRef>, Self::Error> {
self.transactionally(|wdb| {
let start_position = blocks.first().map(|block| {
Position::from(
u64::from(block.metadata().sapling_tree_size())
- u64::try_from(block.sapling_commitments().len()).unwrap(),
)
});
let mut wallet_note_ids = vec![];
let mut sapling_commitments = vec![];
let mut end_height = None;
for block in blocks.into_iter() {
// Insert the block into the database.
wallet::put_block(
wdb.conn.0,
@ -410,7 +421,6 @@ impl<P: consensus::Parameters> WalletWrite for WalletDb<rusqlite::Connection, P>
block.metadata().sapling_tree_size(),
)?;
let mut wallet_note_ids = vec![];
for tx in block.transactions() {
let tx_row = wallet::put_tx_meta(wdb.conn.0, tx, block.height())?;
@ -428,19 +438,23 @@ impl<P: consensus::Parameters> WalletWrite for WalletDb<rusqlite::Connection, P>
}
}
let block_height = block.height();
let sapling_tree_size = block.metadata().sapling_tree_size();
let sapling_commitments_len = block.sapling_commitments().len();
let mut sapling_commitments = block.into_sapling_commitments().into_iter();
end_height = Some(block.height());
sapling_commitments.extend(block.into_sapling_commitments().into_iter());
}
// We will have a start position and an end height in all cases where `blocks` is
// non-empty.
if let Some((start_position, end_height)) = start_position.zip(end_height) {
// Update the Sapling note commitment tree with all newly read note commitments
let mut sapling_commitments = sapling_commitments.into_iter();
wdb.with_sapling_tree_mut::<_, _, SqliteClientError>(move |sapling_tree| {
let start_position = Position::from(u64::from(sapling_tree_size))
- u64::try_from(sapling_commitments_len).unwrap();
sapling_tree.batch_insert(start_position, &mut sapling_commitments)?;
Ok(())
})?;
// Update now-expired transactions that didn't get mined.
wallet::update_expired_notes(wdb.conn.0, block_height)?;
wallet::update_expired_notes(wdb.conn.0, end_height)?;
}
Ok(wallet_note_ids)
})

View File

@ -1100,7 +1100,7 @@ pub(crate) fn put_legacy_transparent_utxo<P: consensus::Parameters>(
/// as expired, up to the given block height.
pub(crate) fn update_expired_notes(
conn: &rusqlite::Connection,
height: BlockHeight,
expiry_height: BlockHeight,
) -> Result<(), SqliteClientError> {
let mut stmt_update_expired = conn.prepare_cached(
"UPDATE sapling_received_notes SET spent = NULL WHERE EXISTS (
@ -1108,7 +1108,7 @@ pub(crate) fn update_expired_notes(
WHERE id_tx = sapling_received_notes.spent AND block IS NULL AND expiry_height < ?
)",
)?;
stmt_update_expired.execute([u32::from(height)])?;
stmt_update_expired.execute([u32::from(expiry_height)])?;
Ok(())
}