zcash_client_sqlite: Maintain a nullifier map from out-of-order scanning

Closes zcash/librustzcash#876.
This commit is contained in:
Jack Grigg 2023-07-21 18:01:52 +00:00
parent 4d2abd5c3a
commit 0f2689b9c3
8 changed files with 340 additions and 31 deletions

View File

@ -300,6 +300,7 @@ pub struct ScannedBlock<Nf> {
metadata: BlockMetadata,
block_time: u32,
transactions: Vec<WalletTx<Nf>>,
sapling_nullifier_map: Vec<(TxId, u16, Vec<sapling::Nullifier>)>,
sapling_commitments: Vec<(sapling::Node, Retention<BlockHeight>)>,
}
@ -308,12 +309,14 @@ impl<Nf> ScannedBlock<Nf> {
metadata: BlockMetadata,
block_time: u32,
transactions: Vec<WalletTx<Nf>>,
sapling_nullifier_map: Vec<(TxId, u16, Vec<sapling::Nullifier>)>,
sapling_commitments: Vec<(sapling::Node, Retention<BlockHeight>)>,
) -> Self {
Self {
metadata,
block_time,
transactions,
sapling_nullifier_map,
sapling_commitments,
}
}
@ -338,6 +341,10 @@ impl<Nf> ScannedBlock<Nf> {
&self.transactions
}
pub fn sapling_nullifier_map(&self) -> &[(TxId, u16, Vec<sapling::Nullifier>)] {
&self.sapling_nullifier_map
}
pub fn sapling_commitments(&self) -> &[(sapling::Node, Retention<BlockHeight>)] {
&self.sapling_commitments
}
@ -498,7 +505,7 @@ pub trait WalletWrite: WalletRead {
/// `blocks` must be sequential, in order of increasing block height
fn put_blocks(
&mut self,
block: Vec<ScannedBlock<sapling::Nullifier>>,
blocks: Vec<ScannedBlock<sapling::Nullifier>>,
) -> Result<Vec<Self::NoteRef>, Self::Error>;
/// Updates the wallet's view of the blockchain.

View File

@ -355,36 +355,44 @@ pub(crate) fn scan_block_with_runner<
let compact_block_tx_count = block.vtx.len();
let mut wtxs: Vec<WalletTx<K::Nf>> = vec![];
let mut sapling_nullifier_map = Vec::with_capacity(block.vtx.len());
let mut sapling_note_commitments: Vec<(sapling::Node, Retention<BlockHeight>)> = vec![];
for (tx_idx, tx) in block.vtx.into_iter().enumerate() {
let txid = tx.txid();
let tx_index =
u16::try_from(tx.index).expect("Cannot fit more than 2^16 transactions in a block");
// Check for spent notes. The only step that is not constant-time is
// the filter() at the end.
// Check for spent notes. The comparison against known-unspent nullifiers is done
// in constant time.
// TODO: However, this is O(|nullifiers| * |notes|); does using
// constant-time operations here really make sense?
let shielded_spends: Vec<_> = tx
.spends
.into_iter()
.enumerate()
.map(|(index, spend)| {
let spend_nf = spend.nf().expect(
"Could not deserialize nullifier for spend from protobuf representation.",
);
// Find the first tracked nullifier that matches this spend, and produce
// a WalletShieldedSpend if there is a match, in constant time.
nullifiers
.iter()
.map(|&(account, nf)| CtOption::new(account, nf.ct_eq(&spend_nf)))
.fold(
CtOption::new(AccountId::from(0), 0.into()),
|first, next| CtOption::conditional_select(&next, &first, first.is_some()),
)
.map(|account| WalletSaplingSpend::from_parts(index, spend_nf, account))
})
.filter(|spend| spend.is_some().into())
.map(|spend| spend.unwrap())
.collect();
let mut shielded_spends = vec![];
let mut sapling_unlinked_nullifiers = Vec::with_capacity(tx.spends.len());
for (index, spend) in tx.spends.into_iter().enumerate() {
let spend_nf = spend
.nf()
.expect("Could not deserialize nullifier for spend from protobuf representation.");
// Find the first tracked nullifier that matches this spend, and produce
// a WalletShieldedSpend if there is a match, in constant time.
let spend = nullifiers
.iter()
.map(|&(account, nf)| CtOption::new(account, nf.ct_eq(&spend_nf)))
.fold(
CtOption::new(AccountId::from(0), 0.into()),
|first, next| CtOption::conditional_select(&next, &first, first.is_some()),
)
.map(|account| WalletSaplingSpend::from_parts(index, spend_nf, account));
if spend.is_some().into() {
shielded_spends.push(spend.unwrap());
} else {
// This nullifier didn't match any we are currently tracking; save it in
// case it matches an earlier block range we haven't scanned yet.
sapling_unlinked_nullifiers.push(spend_nf);
}
}
sapling_nullifier_map.push((txid, tx_index, sapling_unlinked_nullifiers));
// Collect the set of accounts that were spent from in this transaction
let spent_from_accounts: HashSet<_> = shielded_spends
@ -505,7 +513,7 @@ pub(crate) fn scan_block_with_runner<
if !(shielded_spends.is_empty() && shielded_outputs.is_empty()) {
wtxs.push(WalletTx {
txid,
index: tx.index as usize,
index: tx_index as usize,
sapling_spends: shielded_spends,
sapling_outputs: shielded_outputs,
});
@ -518,6 +526,7 @@ pub(crate) fn scan_block_with_runner<
BlockMetadata::from_parts(cur_height, cur_hash, sapling_commitment_tree_size),
block.time,
wtxs,
sapling_nullifier_map,
sapling_note_commitments,
))
}

View File

@ -442,14 +442,31 @@ impl<P: consensus::Parameters> WalletWrite for WalletDb<rusqlite::Connection, P>
}
for output in &tx.sapling_outputs {
let received_note_id =
wallet::sapling::put_received_note(wdb.conn.0, output, tx_row)?;
// Check whether this note was spent in a later block range that
// we previously scanned.
let spent_in = wallet::query_nullifier_map(
wdb.conn.0,
ShieldedProtocol::Sapling,
output.nf(),
)?;
let received_note_id = wallet::sapling::put_received_note(
wdb.conn.0, output, tx_row, spent_in,
)?;
// Save witness for note.
wallet_note_ids.push(received_note_id);
}
}
// Insert the new nullifiers from this block into the nullifier map.
wallet::insert_nullifier_map(
wdb.conn.0,
block.height(),
ShieldedProtocol::Sapling,
block.sapling_nullifier_map(),
)?;
note_positions.extend(block.transactions().iter().flat_map(|wtx| {
wtx.sapling_outputs
.iter()
@ -460,6 +477,14 @@ impl<P: consensus::Parameters> WalletWrite for WalletDb<rusqlite::Connection, P>
sapling_commitments.extend(block.into_sapling_commitments().into_iter());
}
// Prune the nullifier map of entries we no longer need.
if let Some(meta) = wdb.block_fully_scanned()? {
wallet::prune_nullifier_map(
wdb.conn.0,
meta.block_height().saturating_sub(PRUNING_DEPTH),
)?;
}
// We will have a start position and a last scanned height in all cases where
// `blocks` is non-empty.
if let Some(((start_height, start_position), last_scanned_height)) =
@ -533,7 +558,7 @@ impl<P: consensus::Parameters> WalletWrite for WalletDb<rusqlite::Connection, P>
)?;
if matches!(recipient, Recipient::InternalAccount(_, _)) {
wallet::sapling::put_received_note(wdb.conn.0, output, tx_ref)?;
wallet::sapling::put_received_note(wdb.conn.0, output, tx_ref, None)?;
}
}
TransferType::Incoming => {
@ -548,7 +573,7 @@ impl<P: consensus::Parameters> WalletWrite for WalletDb<rusqlite::Connection, P>
}
}
wallet::sapling::put_received_note(wdb.conn.0, output, tx_ref)?;
wallet::sapling::put_received_note(wdb.conn.0, output, tx_ref, None)?;
}
}
}
@ -645,6 +670,7 @@ impl<P: consensus::Parameters> WalletWrite for WalletDb<rusqlite::Connection, P>
transfer_type: TransferType::WalletInternal,
},
tx_ref,
None,
)?;
}
}

View File

@ -839,6 +839,14 @@ pub(crate) fn truncate_to_height<P: consensus::Parameters>(
[u32::from(block_height)],
)?;
// Delete from the nullifier map any entries with a locator referencing a block
// height greater than the truncation height.
conn.execute(
"DELETE FROM tx_locator_map
WHERE block_height > :block_height",
named_params![":block_height": u32::from(block_height)],
)?;
// Delete from the scanning queue any range with a start height greater than the
// truncation height, and then truncate any remaining range by setting the end
// equal to the truncation height + 1.
@ -1316,6 +1324,165 @@ pub(crate) fn put_sent_output<P: consensus::Parameters>(
Ok(())
}
/// Inserts the given entries into the nullifier map.
///
/// Returns an error if the new entries conflict with existing ones. This indicates either
/// corrupted data, or that a reorg has occurred and the caller needs to repair the wallet
/// state with [`truncate_to_height`].
pub(crate) fn insert_nullifier_map<N: AsRef<[u8]>>(
conn: &rusqlite::Transaction<'_>,
block_height: BlockHeight,
spend_pool: ShieldedProtocol,
new_entries: &[(TxId, u16, Vec<N>)],
) -> Result<(), SqliteClientError> {
let mut stmt_select_tx_locators = conn.prepare_cached(
"SELECT block_height, tx_index, txid
FROM tx_locator_map
WHERE (block_height = :block_height AND tx_index = :tx_index) OR txid = :txid",
)?;
let mut stmt_insert_tx_locator = conn.prepare_cached(
"INSERT INTO tx_locator_map
(block_height, tx_index, txid)
VALUES (:block_height, :tx_index, :txid)",
)?;
let mut stmt_insert_nullifier_mapping = conn.prepare_cached(
"INSERT INTO nullifier_map
(spend_pool, nf, block_height, tx_index)
VALUES (:spend_pool, :nf, :block_height, :tx_index)
ON CONFLICT (spend_pool, nf) DO UPDATE
SET block_height = :block_height,
tx_index = :tx_index",
)?;
for (txid, tx_index, nullifiers) in new_entries {
let tx_args = named_params![
":block_height": u32::from(block_height),
":tx_index": tx_index,
":txid": txid.as_ref(),
];
// We cannot use an upsert here, because we use the tx locator as the foreign key
// in `nullifier_map` instead of `txid` for database size efficiency. If an insert
// into `tx_locator_map` were to conflict, we would need the resulting update to
// cascade into `nullifier_map` as either:
// - an update (if a transaction moved within a block), or
// - a deletion (if the locator now points to a different transaction).
//
// `ON UPDATE` has `CASCADE` to always update, but has no deletion option. So we
// instead set `ON UPDATE RESTRICT` on the foreign key relation, and require the
// caller to manually rewind the database in this situation.
let locator = stmt_select_tx_locators
.query_map(tx_args, |row| {
Ok((
BlockHeight::from_u32(row.get(0)?),
row.get::<_, u16>(1)?,
TxId::from_bytes(row.get(2)?),
))
})?
.fold(Ok(None), |acc: Result<_, SqliteClientError>, row| {
match (acc?, row?) {
(None, rhs) => Ok(Some(Some(rhs))),
// If there was more than one row, then due to the uniqueness
// constraints on the `tx_locator_map` table, all of the rows conflict
// with the locator being inserted.
(Some(_), _) => Ok(Some(None)),
}
})?;
match locator {
// If the locator in the table matches the one being inserted, do nothing.
Some(Some(loc)) if loc == (block_height, *tx_index, *txid) => (),
// If the locator being inserted would conflict, report it.
Some(_) => Err(SqliteClientError::DbError(rusqlite::Error::SqliteFailure(
rusqlite::ffi::Error::new(rusqlite::ffi::SQLITE_CONSTRAINT),
Some("UNIQUE constraint failed: tx_locator_map.block_height, tx_locator_map.tx_index".into()),
)))?,
// If the locator doesn't exist, insert it.
None => stmt_insert_tx_locator.execute(tx_args).map(|_| ())?,
}
for nf in nullifiers {
// Here it is okay to use an upsert, because per above we've confirmed that
// the locator points to the same transaction.
let nf_args = named_params![
":spend_pool": pool_code(PoolType::Shielded(spend_pool)),
":nf": nf.as_ref(),
":block_height": u32::from(block_height),
":tx_index": tx_index,
];
stmt_insert_nullifier_mapping.execute(nf_args)?;
}
}
Ok(())
}
/// Returns the row of the `transactions` table corresponding to the transaction in which
/// this nullifier is revealed, if any.
pub(crate) fn query_nullifier_map<N: AsRef<[u8]>>(
conn: &rusqlite::Transaction<'_>,
spend_pool: ShieldedProtocol,
nf: &N,
) -> Result<Option<i64>, SqliteClientError> {
let mut stmt_select_locator = conn.prepare_cached(
"SELECT block_height, tx_index, txid
FROM nullifier_map
LEFT JOIN tx_locator_map USING (block_height, tx_index)
WHERE spend_pool = :spend_pool AND nf = :nf",
)?;
let sql_args = named_params![
":spend_pool": pool_code(PoolType::Shielded(spend_pool)),
":nf": nf.as_ref(),
];
// Find the locator corresponding to this nullifier, if any.
let locator = stmt_select_locator
.query_row(sql_args, |row| {
Ok((
BlockHeight::from_u32(row.get(0)?),
row.get(1)?,
TxId::from_bytes(row.get(2)?),
))
})
.optional()?;
let (height, index, txid) = match locator {
Some(res) => res,
None => return Ok(None),
};
// Find or create a corresponding row in the `transactions` table. Usually a row will
// have been created during the same scan that the locator was added to the nullifier
// map, but it would not happen if the transaction in question spent the note with no
// change or explicit in-wallet recipient.
put_tx_meta(
conn,
&WalletTx::<N> {
txid,
index,
sapling_spends: vec![],
sapling_outputs: vec![],
},
height,
)
.map(Some)
}
/// Deletes from the nullifier map any entries with a locator referencing a block height
/// lower than the pruning height.
pub(crate) fn prune_nullifier_map(
conn: &rusqlite::Transaction<'_>,
block_height: BlockHeight,
) -> Result<(), SqliteClientError> {
conn.execute(
"DELETE FROM tx_locator_map
WHERE block_height < :block_height",
named_params![":block_height": u32::from(block_height)],
)?;
Ok(())
}
#[cfg(test)]
mod tests {
use std::num::NonZeroU32;

View File

@ -414,6 +414,18 @@ mod tests {
sapling_tree BLOB NOT NULL ,
sapling_commitment_tree_size INTEGER,
orchard_commitment_tree_size INTEGER)",
"CREATE TABLE nullifier_map (
spend_pool INTEGER NOT NULL,
nf BLOB NOT NULL,
block_height INTEGER NOT NULL,
tx_index INTEGER NOT NULL,
CONSTRAINT tx_locator
FOREIGN KEY (block_height, tx_index)
REFERENCES tx_locator_map(block_height, tx_index)
ON DELETE CASCADE
ON UPDATE RESTRICT,
CONSTRAINT nf_uniq UNIQUE (spend_pool, nf)
)",
"CREATE TABLE sapling_received_notes (
id_note INTEGER PRIMARY KEY,
tx INTEGER NOT NULL,
@ -507,6 +519,12 @@ mod tests {
fee INTEGER,
FOREIGN KEY (block) REFERENCES blocks(height)
)",
"CREATE TABLE tx_locator_map (
block_height INTEGER NOT NULL,
tx_index INTEGER NOT NULL,
txid BLOB NOT NULL UNIQUE,
PRIMARY KEY (block_height, tx_index)
)",
"CREATE TABLE \"utxos\" (
id_utxo INTEGER PRIMARY KEY,
received_by_account INTEGER NOT NULL,

View File

@ -2,6 +2,7 @@ mod add_transaction_views;
mod add_utxo_account;
mod addresses_table;
mod initial_setup;
mod nullifier_map;
mod received_notes_nullable_nf;
mod sent_notes_to_internal;
mod shardtree_support;
@ -30,6 +31,10 @@ pub(super) fn all_migrations<P: consensus::Parameters + 'static>(
// add_transaction_views
// /
// v_transactions_net
// /
// received_notes_nullable_nf
// / \
// shardtree_support nullifier_map
vec![
Box::new(initial_setup::Migration {}),
Box::new(utxos_table::Migration {}),
@ -48,5 +53,6 @@ pub(super) fn all_migrations<P: consensus::Parameters + 'static>(
Box::new(v_transactions_net::Migration),
Box::new(received_notes_nullable_nf::Migration),
Box::new(shardtree_support::Migration),
Box::new(nullifier_map::Migration),
]
}

View File

@ -0,0 +1,72 @@
//! This migration adds a table for storing mappings from nullifiers to the transaction
//! they are revealed in.
use std::collections::HashSet;
use schemer_rusqlite::RusqliteMigration;
use tracing::debug;
use uuid::Uuid;
use crate::wallet::init::WalletMigrationError;
use super::received_notes_nullable_nf;
pub(super) const MIGRATION_ID: Uuid = Uuid::from_u128(0xe2d71ac5_6a44_4c6b_a9a0_6d0a79d355f1);
pub(super) struct Migration;
impl schemer::Migration for Migration {
fn id(&self) -> Uuid {
MIGRATION_ID
}
fn dependencies(&self) -> HashSet<Uuid> {
[received_notes_nullable_nf::MIGRATION_ID]
.into_iter()
.collect()
}
fn description(&self) -> &'static str {
"Adds a lookup table for nullifiers we've observed on-chain that we haven't confirmed are not ours."
}
}
impl RusqliteMigration for Migration {
type Error = WalletMigrationError;
fn up(&self, transaction: &rusqlite::Transaction) -> Result<(), Self::Error> {
// We don't enforce any foreign key constraint to the blocks table, to allow
// loading the nullifier map separately from block scanning.
debug!("Creating tables for nullifier map");
transaction.execute_batch(
"CREATE TABLE tx_locator_map (
block_height INTEGER NOT NULL,
tx_index INTEGER NOT NULL,
txid BLOB NOT NULL UNIQUE,
PRIMARY KEY (block_height, tx_index)
);
CREATE TABLE nullifier_map (
spend_pool INTEGER NOT NULL,
nf BLOB NOT NULL,
block_height INTEGER NOT NULL,
tx_index INTEGER NOT NULL,
CONSTRAINT tx_locator
FOREIGN KEY (block_height, tx_index)
REFERENCES tx_locator_map(block_height, tx_index)
ON DELETE CASCADE
ON UPDATE RESTRICT,
CONSTRAINT nf_uniq UNIQUE (spend_pool, nf)
);",
)?;
Ok(())
}
fn down(&self, transaction: &rusqlite::Transaction) -> Result<(), Self::Error> {
transaction.execute_batch(
"DROP TABLE nullifier_map;
DROP TABLE tx_locator_map;",
)?;
Ok(())
}
}

View File

@ -312,10 +312,11 @@ pub(crate) fn put_received_note<T: ReceivedSaplingOutput>(
conn: &Connection,
output: &T,
tx_ref: i64,
spent_in: Option<i64>,
) -> Result<NoteId, SqliteClientError> {
let mut stmt_upsert_received_note = conn.prepare_cached(
"INSERT INTO sapling_received_notes
(tx, output_index, account, diversifier, value, rcm, memo, nf, is_change, commitment_tree_position)
(tx, output_index, account, diversifier, value, rcm, memo, nf, is_change, spent, commitment_tree_position)
VALUES (
:tx,
:output_index,
@ -326,6 +327,7 @@ pub(crate) fn put_received_note<T: ReceivedSaplingOutput>(
:memo,
:nf,
:is_change,
:spent,
:commitment_tree_position
)
ON CONFLICT (tx, output_index) DO UPDATE
@ -336,6 +338,7 @@ pub(crate) fn put_received_note<T: ReceivedSaplingOutput>(
nf = IFNULL(:nf, nf),
memo = IFNULL(:memo, memo),
is_change = IFNULL(:is_change, is_change),
spent = IFNULL(:spent, spent),
commitment_tree_position = IFNULL(:commitment_tree_position, commitment_tree_position)
RETURNING id_note",
)?;
@ -354,6 +357,7 @@ pub(crate) fn put_received_note<T: ReceivedSaplingOutput>(
":nf": output.nullifier().map(|nf| nf.0.as_ref()),
":memo": memo_repr(output.memo()),
":is_change": output.is_change(),
":spent": spent_in,
":commitment_tree_position": output.note_commitment_tree_position().map(u64::from),
];