Move traversal of cached blocks to CacheOps

This commit is contained in:
Kris Nuttycombe 2020-08-18 16:33:34 -06:00
parent 604294dd9f
commit 06c1772692
3 changed files with 224 additions and 184 deletions

View File

@ -121,4 +121,13 @@ pub trait CacheOps {
) -> Result<Option<BlockHash>, Self::Error>
where
F: Fn(&CompactBlock, &CompactBlock) -> Result<(), Self::Error>;
fn with_cached_blocks<F>(
&self,
from_height: BlockHeight,
limit: Option<u32>,
with_row: F,
) -> Result<(), Self::Error>
where
F: FnMut(BlockHeight, CompactBlock) -> Result<(), Self::Error>;
}

View File

@ -195,6 +195,18 @@ impl CacheOps for CacheConnection {
{
chain::validate_chain(self, from_height, validate)
}
fn with_cached_blocks<F>(
&self,
from_height: BlockHeight,
limit: Option<u32>,
with_row: F,
) -> Result<(), Self::Error>
where
F: FnMut(BlockHeight, CompactBlock) -> Result<(), Self::Error>,
{
scan::with_cached_blocks(self, from_height, limit, with_row)
}
}
fn address_from_extfvk<P: consensus::Parameters>(

View File

@ -9,7 +9,7 @@ use zcash_client_backend::{
address::RecipientAddress,
data_api::{
error::{ChainInvalid, Error},
DBOps,
CacheOps, DBOps,
},
decrypt_transaction,
encoding::decode_extended_full_viewing_key,
@ -107,6 +107,7 @@ pub fn scan_cached_blocks<P: consensus::Parameters>(
// Get most recent incremental witnesses for the notes we are tracking
let mut witnesses = data.get_witnesses(last_height)?;
// Get the nullifiers for the notes we are tracking
let mut nullifiers = data.get_nullifiers()?;
// Prepare per-block SQL statements
@ -154,13 +155,210 @@ pub fn scan_cached_blocks<P: consensus::Parameters>(
)",
)?;
cache.with_cached_blocks(
last_height,
limit,
|height: BlockHeight, block: CompactBlock| {
// Start an SQL transaction for this block.
data.0.execute("BEGIN IMMEDIATE", NO_PARAMS)?;
// Scanned blocks MUST be height-sequential.
if height != (last_height + 1) {
return Err(SqliteClientError(ChainInvalid::block_height_mismatch(
last_height + 1,
height,
)));
}
last_height = height;
let block_hash = block.hash.clone();
let block_time = block.time;
let txs = {
let nf_refs: Vec<_> = nullifiers
.iter()
.map(|(nf, acc)| (&nf[..], acc.0 as usize))
.collect();
let mut witness_refs: Vec<_> = witnesses.iter_mut().map(|w| &mut w.1).collect();
scan_block(
params,
block,
&extfvks[..],
&nf_refs,
&mut tree,
&mut witness_refs[..],
)
};
// Enforce that all roots match. This is slow, so only include in debug builds.
#[cfg(debug_assertions)]
{
let cur_root = tree.root();
for row in &witnesses {
if row.1.root() != cur_root {
return Err(SqliteClientError(Error::InvalidWitnessAnchor(
row.0,
last_height,
)));
}
}
for tx in &txs {
for output in tx.shielded_outputs.iter() {
if output.witness.root() != cur_root {
return Err(Error::InvalidNewWitnessAnchor(
output.index,
tx.txid,
last_height,
output.witness.root(),
)
.into());
}
}
}
}
// Insert the block into the database.
let mut encoded_tree = Vec::new();
tree.write(&mut encoded_tree)
.expect("Should be able to write to a Vec");
stmt_insert_block.execute(&[
u32::from(height).to_sql()?,
block_hash.to_sql()?,
block_time.to_sql()?,
encoded_tree.to_sql()?,
])?;
for tx in txs {
// First try update an existing transaction in the database.
let txid = tx.txid.0.to_vec();
let tx_row = if stmt_update_tx.execute(&[
u32::from(height).to_sql()?,
(tx.index as i64).to_sql()?,
txid.to_sql()?,
])? == 0
{
// It isn't there, so insert our transaction into the database.
stmt_insert_tx.execute(&[
txid.to_sql()?,
u32::from(height).to_sql()?,
(tx.index as i64).to_sql()?,
])?;
data.0.last_insert_rowid()
} else {
// It was there, so grab its row number.
stmt_select_tx.query_row(&[txid], |row| row.get(0))?
};
// Mark notes as spent and remove them from the scanning cache
for spend in &tx.shielded_spends {
stmt_mark_spent_note.execute(&[tx_row.to_sql()?, spend.nf.to_sql()?])?;
}
nullifiers.retain(|(nf, _acc)| {
tx.shielded_spends
.iter()
.find(|spend| &spend.nf == nf)
.is_none()
});
for output in tx.shielded_outputs {
let rcm = output.note.rcm().to_repr();
let nf = output.note.nf(
&extfvks[output.account].fvk.vk,
output.witness.position() as u64,
);
// Assumptions:
// - A transaction will not contain more than 2^63 shielded outputs.
// - A note value will never exceed 2^63 zatoshis.
// First try updating an existing received note into the database.
let note_id = if stmt_update_note.execute(&[
(output.account as i64).to_sql()?,
output.to.diversifier().0.to_sql()?,
(output.note.value as i64).to_sql()?,
rcm.as_ref().to_sql()?,
nf.to_sql()?,
output.is_change.to_sql()?,
tx_row.to_sql()?,
(output.index as i64).to_sql()?,
])? == 0
{
// It isn't there, so insert our note into the database.
stmt_insert_note.execute(&[
tx_row.to_sql()?,
(output.index as i64).to_sql()?,
(output.account as i64).to_sql()?,
output.to.diversifier().0.to_sql()?,
(output.note.value as i64).to_sql()?,
rcm.as_ref().to_sql()?,
nf.to_sql()?,
output.is_change.to_sql()?,
])?;
NoteId(data.0.last_insert_rowid())
} else {
// It was there, so grab its row number.
stmt_select_note.query_row(
&[tx_row.to_sql()?, (output.index as i64).to_sql()?],
|row| row.get(0).map(NoteId),
)?
};
// Save witness for note.
witnesses.push((note_id, output.witness));
// Cache nullifier for note (to detect subsequent spends in this scan).
nullifiers.push((nf, AccountId(output.account as u32)));
}
}
// Insert current witnesses into the database.
let mut encoded = Vec::new();
for witness_row in witnesses.iter() {
encoded.clear();
witness_row
.1
.write(&mut encoded)
.expect("Should be able to write to a Vec");
stmt_insert_witness.execute(&[
(witness_row.0).0.to_sql()?,
u32::from(last_height).to_sql()?,
encoded.to_sql()?,
])?;
}
// Prune the stored witnesses (we only expect rollbacks of at most 100 blocks).
stmt_prune_witnesses.execute(&[u32::from(last_height - 100)])?;
// Update now-expired transactions that didn't get mined.
stmt_update_expired.execute(&[u32::from(last_height)])?;
// Commit the SQL transaction, writing this block's data atomically.
data.0.execute("COMMIT", NO_PARAMS)?;
Ok(())
},
)?;
Ok(())
}
pub fn with_cached_blocks<F>(
cache: &CacheConnection,
from_height: BlockHeight,
limit: Option<u32>,
mut with_row: F,
) -> Result<(), SqliteClientError>
where
F: FnMut(BlockHeight, CompactBlock) -> Result<(), SqliteClientError>,
{
// Fetch the CompactBlocks we need to scan
let mut stmt_blocks = cache.0.prepare(
"SELECT height, data FROM compactblocks WHERE height > ? ORDER BY height ASC LIMIT ?",
)?;
let rows = stmt_blocks.query_map(
&[
u32::from(last_height).to_sql()?,
u32::from(from_height).to_sql()?,
limit.unwrap_or(u32::max_value()).to_sql()?,
],
|row| {
@ -171,188 +369,9 @@ pub fn scan_cached_blocks<P: consensus::Parameters>(
},
)?;
for row in rows {
let row = row?;
// Start an SQL transaction for this block.
data.0.execute("BEGIN IMMEDIATE", NO_PARAMS)?;
// Scanned blocks MUST be height-sequential.
if row.height != (last_height + 1) {
return Err(SqliteClientError(ChainInvalid::block_height_mismatch(
last_height + 1,
row.height,
)));
}
last_height = row.height;
let block: CompactBlock = parse_from_bytes(&row.data)?;
let block_hash = block.hash.clone();
let block_time = block.time;
let txs = {
let nf_refs: Vec<_> = nullifiers
.iter()
.map(|(nf, acc)| (&nf[..], acc.0 as usize))
.collect();
let mut witness_refs: Vec<_> = witnesses.iter_mut().map(|w| &mut w.1).collect();
scan_block(
params,
block,
&extfvks[..],
&nf_refs,
&mut tree,
&mut witness_refs[..],
)
};
// Enforce that all roots match. This is slow, so only include in debug builds.
#[cfg(debug_assertions)]
{
let cur_root = tree.root();
for row in &witnesses {
if row.1.root() != cur_root {
return Err(SqliteClientError(Error::InvalidWitnessAnchor(
row.0,
last_height,
)));
}
}
for tx in &txs {
for output in tx.shielded_outputs.iter() {
if output.witness.root() != cur_root {
return Err(Error::InvalidNewWitnessAnchor(
output.index,
tx.txid,
last_height,
output.witness.root(),
)
.into());
}
}
}
}
// Insert the block into the database.
let mut encoded_tree = Vec::new();
tree.write(&mut encoded_tree)
.expect("Should be able to write to a Vec");
stmt_insert_block.execute(&[
u32::from(row.height).to_sql()?,
block_hash.to_sql()?,
block_time.to_sql()?,
encoded_tree.to_sql()?,
])?;
for tx in txs {
// First try update an existing transaction in the database.
let txid = tx.txid.0.to_vec();
let tx_row = if stmt_update_tx.execute(&[
u32::from(row.height).to_sql()?,
(tx.index as i64).to_sql()?,
txid.to_sql()?,
])? == 0
{
// It isn't there, so insert our transaction into the database.
stmt_insert_tx.execute(&[
txid.to_sql()?,
u32::from(row.height).to_sql()?,
(tx.index as i64).to_sql()?,
])?;
data.0.last_insert_rowid()
} else {
// It was there, so grab its row number.
stmt_select_tx.query_row(&[txid], |row| row.get(0))?
};
// Mark notes as spent and remove them from the scanning cache
for spend in &tx.shielded_spends {
stmt_mark_spent_note.execute(&[tx_row.to_sql()?, spend.nf.to_sql()?])?;
}
nullifiers = nullifiers
.into_iter()
.filter(|(nf, _acc)| {
tx.shielded_spends
.iter()
.find(|spend| &spend.nf == nf)
.is_none()
})
.collect();
for output in tx.shielded_outputs {
let rcm = output.note.rcm().to_repr();
let nf = output.note.nf(
&extfvks[output.account].fvk.vk,
output.witness.position() as u64,
);
// Assumptions:
// - A transaction will not contain more than 2^63 shielded outputs.
// - A note value will never exceed 2^63 zatoshis.
// First try updating an existing received note into the database.
let note_id = if stmt_update_note.execute(&[
(output.account as i64).to_sql()?,
output.to.diversifier().0.to_sql()?,
(output.note.value as i64).to_sql()?,
rcm.as_ref().to_sql()?,
nf.to_sql()?,
output.is_change.to_sql()?,
tx_row.to_sql()?,
(output.index as i64).to_sql()?,
])? == 0
{
// It isn't there, so insert our note into the database.
stmt_insert_note.execute(&[
tx_row.to_sql()?,
(output.index as i64).to_sql()?,
(output.account as i64).to_sql()?,
output.to.diversifier().0.to_sql()?,
(output.note.value as i64).to_sql()?,
rcm.as_ref().to_sql()?,
nf.to_sql()?,
output.is_change.to_sql()?,
])?;
NoteId(data.0.last_insert_rowid())
} else {
// It was there, so grab its row number.
stmt_select_note.query_row(
&[tx_row.to_sql()?, (output.index as i64).to_sql()?],
|row| row.get(0).map(NoteId),
)?
};
// Save witness for note.
witnesses.push((note_id, output.witness));
// Cache nullifier for note (to detect subsequent spends in this scan).
nullifiers.push((nf, AccountId(output.account as u32)));
}
}
// Insert current witnesses into the database.
let mut encoded = Vec::new();
for witness_row in witnesses.iter() {
encoded.clear();
witness_row
.1
.write(&mut encoded)
.expect("Should be able to write to a Vec");
stmt_insert_witness.execute(&[
(witness_row.0).0.to_sql()?,
u32::from(last_height).to_sql()?,
encoded.to_sql()?,
])?;
}
// Prune the stored witnesses (we only expect rollbacks of at most 100 blocks).
stmt_prune_witnesses.execute(&[u32::from(last_height - 100)])?;
// Update now-expired transactions that didn't get mined.
stmt_update_expired.execute(&[u32::from(last_height)])?;
// Commit the SQL transaction, writing this block's data atomically.
data.0.execute("COMMIT", NO_PARAMS)?;
for row_result in rows {
let row = row_result?;
with_row(row.height, parse_from_bytes(&row.data)?)?;
}
Ok(())