Merge pull request #878 from zcash/876-nullifier-map

zcash_client_sqlite: Maintain a nullifier map from out-of-order scanning
This commit is contained in:
str4d 2023-07-21 21:57:12 +01:00 committed by GitHub
commit 2ed0747b67
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 448 additions and 36 deletions

View File

@ -300,6 +300,7 @@ pub struct ScannedBlock<Nf> {
metadata: BlockMetadata,
block_time: u32,
transactions: Vec<WalletTx<Nf>>,
sapling_nullifier_map: Vec<(TxId, u16, Vec<sapling::Nullifier>)>,
sapling_commitments: Vec<(sapling::Node, Retention<BlockHeight>)>,
}
@ -308,12 +309,14 @@ impl<Nf> ScannedBlock<Nf> {
metadata: BlockMetadata,
block_time: u32,
transactions: Vec<WalletTx<Nf>>,
sapling_nullifier_map: Vec<(TxId, u16, Vec<sapling::Nullifier>)>,
sapling_commitments: Vec<(sapling::Node, Retention<BlockHeight>)>,
) -> Self {
Self {
metadata,
block_time,
transactions,
sapling_nullifier_map,
sapling_commitments,
}
}
@ -338,6 +341,10 @@ impl<Nf> ScannedBlock<Nf> {
&self.transactions
}
pub fn sapling_nullifier_map(&self) -> &[(TxId, u16, Vec<sapling::Nullifier>)] {
&self.sapling_nullifier_map
}
pub fn sapling_commitments(&self) -> &[(sapling::Node, Retention<BlockHeight>)] {
&self.sapling_commitments
}
@ -498,7 +505,7 @@ pub trait WalletWrite: WalletRead {
/// `blocks` must be sequential, in order of increasing block height
fn put_blocks(
&mut self,
block: Vec<ScannedBlock<sapling::Nullifier>>,
blocks: Vec<ScannedBlock<sapling::Nullifier>>,
) -> Result<Vec<Self::NoteRef>, Self::Error>;
/// Updates the wallet's view of the blockchain.

View File

@ -355,36 +355,44 @@ pub(crate) fn scan_block_with_runner<
let compact_block_tx_count = block.vtx.len();
let mut wtxs: Vec<WalletTx<K::Nf>> = vec![];
let mut sapling_nullifier_map = Vec::with_capacity(block.vtx.len());
let mut sapling_note_commitments: Vec<(sapling::Node, Retention<BlockHeight>)> = vec![];
for (tx_idx, tx) in block.vtx.into_iter().enumerate() {
let txid = tx.txid();
let tx_index =
u16::try_from(tx.index).expect("Cannot fit more than 2^16 transactions in a block");
// Check for spent notes. The only step that is not constant-time is
// the filter() at the end.
// Check for spent notes. The comparison against known-unspent nullifiers is done
// in constant time.
// TODO: However, this is O(|nullifiers| * |notes|); does using
// constant-time operations here really make sense?
let shielded_spends: Vec<_> = tx
.spends
.into_iter()
.enumerate()
.map(|(index, spend)| {
let spend_nf = spend.nf().expect(
"Could not deserialize nullifier for spend from protobuf representation.",
);
// Find the first tracked nullifier that matches this spend, and produce
// a WalletShieldedSpend if there is a match, in constant time.
nullifiers
.iter()
.map(|&(account, nf)| CtOption::new(account, nf.ct_eq(&spend_nf)))
.fold(
CtOption::new(AccountId::from(0), 0.into()),
|first, next| CtOption::conditional_select(&next, &first, first.is_some()),
)
.map(|account| WalletSaplingSpend::from_parts(index, spend_nf, account))
})
.filter(|spend| spend.is_some().into())
.map(|spend| spend.unwrap())
.collect();
let mut shielded_spends = vec![];
let mut sapling_unlinked_nullifiers = Vec::with_capacity(tx.spends.len());
for (index, spend) in tx.spends.into_iter().enumerate() {
let spend_nf = spend
.nf()
.expect("Could not deserialize nullifier for spend from protobuf representation.");
// Find the first tracked nullifier that matches this spend, and produce
// a WalletShieldedSpend if there is a match, in constant time.
let spend = nullifiers
.iter()
.map(|&(account, nf)| CtOption::new(account, nf.ct_eq(&spend_nf)))
.fold(
CtOption::new(AccountId::from(0), 0.into()),
|first, next| CtOption::conditional_select(&next, &first, first.is_some()),
)
.map(|account| WalletSaplingSpend::from_parts(index, spend_nf, account));
if spend.is_some().into() {
shielded_spends.push(spend.unwrap());
} else {
// This nullifier didn't match any we are currently tracking; save it in
// case it matches an earlier block range we haven't scanned yet.
sapling_unlinked_nullifiers.push(spend_nf);
}
}
sapling_nullifier_map.push((txid, tx_index, sapling_unlinked_nullifiers));
// Collect the set of accounts that were spent from in this transaction
let spent_from_accounts: HashSet<_> = shielded_spends
@ -505,7 +513,7 @@ pub(crate) fn scan_block_with_runner<
if !(shielded_spends.is_empty() && shielded_outputs.is_empty()) {
wtxs.push(WalletTx {
txid,
index: tx.index as usize,
index: tx_index as usize,
sapling_spends: shielded_spends,
sapling_outputs: shielded_outputs,
});
@ -518,6 +526,7 @@ pub(crate) fn scan_block_with_runner<
BlockMetadata::from_parts(cur_height, cur_hash, sapling_commitment_tree_size),
block.time,
wtxs,
sapling_nullifier_map,
sapling_note_commitments,
))
}

View File

@ -835,4 +835,85 @@ mod tests {
(value - value2).unwrap()
);
}
#[test]
fn scan_cached_blocks_detects_spends_out_of_order() {
let cache_file = NamedTempFile::new().unwrap();
let db_cache = BlockDb::for_path(cache_file.path()).unwrap();
init_cache_database(&db_cache).unwrap();
let data_file = NamedTempFile::new().unwrap();
let mut db_data = WalletDb::for_path(data_file.path(), tests::network()).unwrap();
init_wallet_db(&mut db_data, Some(Secret::new(vec![]))).unwrap();
// Add an account to the wallet
let (dfvk, _taddr) = init_test_accounts_table(&mut db_data);
// Account balance should be zero
assert_eq!(
get_balance(&db_data.conn, AccountId::from(0)).unwrap(),
Amount::zero()
);
// Create a fake CompactBlock sending value to the address
let value = Amount::from_u64(5).unwrap();
let (cb, nf) = fake_compact_block(
sapling_activation_height(),
BlockHash([0; 32]),
&dfvk,
AddressType::DefaultExternal,
value,
0,
);
insert_into_cache(&db_cache, &cb);
// Create a second fake CompactBlock spending value from the address
let extsk2 = ExtendedSpendingKey::master(&[0]);
let to2 = extsk2.default_address().1;
let value2 = Amount::from_u64(2).unwrap();
insert_into_cache(
&db_cache,
&fake_compact_block_spending(
sapling_activation_height() + 1,
cb.hash(),
(nf, value),
&dfvk,
to2,
value2,
1,
),
);
// Scan the spending block first.
scan_cached_blocks(
&tests::network(),
&db_cache,
&mut db_data,
sapling_activation_height() + 1,
1,
)
.unwrap();
// Account balance should equal the change
assert_eq!(
get_balance(&db_data.conn, AccountId::from(0)).unwrap(),
(value - value2).unwrap()
);
// Now scan the block in which we received the note that was spent.
scan_cached_blocks(
&tests::network(),
&db_cache,
&mut db_data,
sapling_activation_height(),
1,
)
.unwrap();
// Account balance should be the same.
assert_eq!(
get_balance(&db_data.conn, AccountId::from(0)).unwrap(),
(value - value2).unwrap()
);
}
}

View File

@ -442,14 +442,31 @@ impl<P: consensus::Parameters> WalletWrite for WalletDb<rusqlite::Connection, P>
}
for output in &tx.sapling_outputs {
let received_note_id =
wallet::sapling::put_received_note(wdb.conn.0, output, tx_row)?;
// Check whether this note was spent in a later block range that
// we previously scanned.
let spent_in = wallet::query_nullifier_map(
wdb.conn.0,
ShieldedProtocol::Sapling,
output.nf(),
)?;
let received_note_id = wallet::sapling::put_received_note(
wdb.conn.0, output, tx_row, spent_in,
)?;
// Save witness for note.
wallet_note_ids.push(received_note_id);
}
}
// Insert the new nullifiers from this block into the nullifier map.
wallet::insert_nullifier_map(
wdb.conn.0,
block.height(),
ShieldedProtocol::Sapling,
block.sapling_nullifier_map(),
)?;
note_positions.extend(block.transactions().iter().flat_map(|wtx| {
wtx.sapling_outputs
.iter()
@ -460,6 +477,14 @@ impl<P: consensus::Parameters> WalletWrite for WalletDb<rusqlite::Connection, P>
sapling_commitments.extend(block.into_sapling_commitments().into_iter());
}
// Prune the nullifier map of entries we no longer need.
if let Some(meta) = wdb.block_fully_scanned()? {
wallet::prune_nullifier_map(
wdb.conn.0,
meta.block_height().saturating_sub(PRUNING_DEPTH),
)?;
}
// We will have a start position and a last scanned height in all cases where
// `blocks` is non-empty.
if let Some(((start_height, start_position), last_scanned_height)) =
@ -533,7 +558,7 @@ impl<P: consensus::Parameters> WalletWrite for WalletDb<rusqlite::Connection, P>
)?;
if matches!(recipient, Recipient::InternalAccount(_, _)) {
wallet::sapling::put_received_note(wdb.conn.0, output, tx_ref)?;
wallet::sapling::put_received_note(wdb.conn.0, output, tx_ref, None)?;
}
}
TransferType::Incoming => {
@ -548,7 +573,7 @@ impl<P: consensus::Parameters> WalletWrite for WalletDb<rusqlite::Connection, P>
}
}
wallet::sapling::put_received_note(wdb.conn.0, output, tx_ref)?;
wallet::sapling::put_received_note(wdb.conn.0, output, tx_ref, None)?;
}
}
}
@ -645,6 +670,7 @@ impl<P: consensus::Parameters> WalletWrite for WalletDb<rusqlite::Connection, P>
transfer_type: TransferType::WalletInternal,
},
tx_ref,
None,
)?;
}
}

View File

@ -659,13 +659,35 @@ pub(crate) fn block_metadata(
pub(crate) fn block_fully_scanned(
conn: &rusqlite::Connection,
) -> Result<Option<BlockMetadata>, SqliteClientError> {
// FIXME: this will need to be rewritten once out-of-order scan range suggestion
// is implemented.
// We assume here that the wallet was either initialized via `init_blocks_table`, or
// its birthday is Sapling activation, so the earliest block in the `blocks` table is
// the first fully-scanned block (because it occurs before any wallet activity).
//
// We further assume that the only way we get a contiguous range of block heights in
// the `blocks` table starting with this earliest block, is if all scanning operations
// have been performed on those blocks. This holds because the `blocks` table is only
// altered by `WalletDb::put_blocks` via `put_block`, and the effective combination of
// intra-range linear scanning and the nullifier map ensures that we discover all
// wallet-related information within the contiguous range.
//
// The fully-scanned height is therefore the greatest height in the first contiguous
// range of block rows, which is a combined case of the "gaps and islands" and
// "greatest N per group" SQL query problems.
conn.query_row(
"SELECT height, hash, sapling_commitment_tree_size, sapling_tree
FROM blocks
ORDER BY height DESC
LIMIT 1",
FROM blocks
INNER JOIN (
WITH contiguous AS (
SELECT height, ROW_NUMBER() OVER (ORDER BY height) - height AS grp
FROM blocks
)
SELECT MAX(height) AS [fully_scanned_height]
FROM contiguous
GROUP BY grp
ORDER BY height
LIMIT 1
)
ON height = fully_scanned_height",
[],
|row| {
let height: u32 = row.get(0)?;
@ -817,6 +839,14 @@ pub(crate) fn truncate_to_height<P: consensus::Parameters>(
[u32::from(block_height)],
)?;
// Delete from the nullifier map any entries with a locator referencing a block
// height greater than the truncation height.
conn.execute(
"DELETE FROM tx_locator_map
WHERE block_height > :block_height",
named_params![":block_height": u32::from(block_height)],
)?;
// Delete from the scanning queue any range with a start height greater than the
// truncation height, and then truncate any remaining range by setting the end
// equal to the truncation height + 1.
@ -1294,6 +1324,165 @@ pub(crate) fn put_sent_output<P: consensus::Parameters>(
Ok(())
}
/// Inserts the given entries into the nullifier map.
///
/// Returns an error if the new entries conflict with existing ones. This indicates either
/// corrupted data, or that a reorg has occurred and the caller needs to repair the wallet
/// state with [`truncate_to_height`].
pub(crate) fn insert_nullifier_map<N: AsRef<[u8]>>(
conn: &rusqlite::Transaction<'_>,
block_height: BlockHeight,
spend_pool: ShieldedProtocol,
new_entries: &[(TxId, u16, Vec<N>)],
) -> Result<(), SqliteClientError> {
let mut stmt_select_tx_locators = conn.prepare_cached(
"SELECT block_height, tx_index, txid
FROM tx_locator_map
WHERE (block_height = :block_height AND tx_index = :tx_index) OR txid = :txid",
)?;
let mut stmt_insert_tx_locator = conn.prepare_cached(
"INSERT INTO tx_locator_map
(block_height, tx_index, txid)
VALUES (:block_height, :tx_index, :txid)",
)?;
let mut stmt_insert_nullifier_mapping = conn.prepare_cached(
"INSERT INTO nullifier_map
(spend_pool, nf, block_height, tx_index)
VALUES (:spend_pool, :nf, :block_height, :tx_index)
ON CONFLICT (spend_pool, nf) DO UPDATE
SET block_height = :block_height,
tx_index = :tx_index",
)?;
for (txid, tx_index, nullifiers) in new_entries {
let tx_args = named_params![
":block_height": u32::from(block_height),
":tx_index": tx_index,
":txid": txid.as_ref(),
];
// We cannot use an upsert here, because we use the tx locator as the foreign key
// in `nullifier_map` instead of `txid` for database size efficiency. If an insert
// into `tx_locator_map` were to conflict, we would need the resulting update to
// cascade into `nullifier_map` as either:
// - an update (if a transaction moved within a block), or
// - a deletion (if the locator now points to a different transaction).
//
// `ON UPDATE` has `CASCADE` to always update, but has no deletion option. So we
// instead set `ON UPDATE RESTRICT` on the foreign key relation, and require the
// caller to manually rewind the database in this situation.
let locator = stmt_select_tx_locators
.query_map(tx_args, |row| {
Ok((
BlockHeight::from_u32(row.get(0)?),
row.get::<_, u16>(1)?,
TxId::from_bytes(row.get(2)?),
))
})?
.fold(Ok(None), |acc: Result<_, SqliteClientError>, row| {
match (acc?, row?) {
(None, rhs) => Ok(Some(Some(rhs))),
// If there was more than one row, then due to the uniqueness
// constraints on the `tx_locator_map` table, all of the rows conflict
// with the locator being inserted.
(Some(_), _) => Ok(Some(None)),
}
})?;
match locator {
// If the locator in the table matches the one being inserted, do nothing.
Some(Some(loc)) if loc == (block_height, *tx_index, *txid) => (),
// If the locator being inserted would conflict, report it.
Some(_) => Err(SqliteClientError::DbError(rusqlite::Error::SqliteFailure(
rusqlite::ffi::Error::new(rusqlite::ffi::SQLITE_CONSTRAINT),
Some("UNIQUE constraint failed: tx_locator_map.block_height, tx_locator_map.tx_index".into()),
)))?,
// If the locator doesn't exist, insert it.
None => stmt_insert_tx_locator.execute(tx_args).map(|_| ())?,
}
for nf in nullifiers {
// Here it is okay to use an upsert, because per above we've confirmed that
// the locator points to the same transaction.
let nf_args = named_params![
":spend_pool": pool_code(PoolType::Shielded(spend_pool)),
":nf": nf.as_ref(),
":block_height": u32::from(block_height),
":tx_index": tx_index,
];
stmt_insert_nullifier_mapping.execute(nf_args)?;
}
}
Ok(())
}
/// Returns the row of the `transactions` table corresponding to the transaction in which
/// this nullifier is revealed, if any.
pub(crate) fn query_nullifier_map<N: AsRef<[u8]>>(
conn: &rusqlite::Transaction<'_>,
spend_pool: ShieldedProtocol,
nf: &N,
) -> Result<Option<i64>, SqliteClientError> {
let mut stmt_select_locator = conn.prepare_cached(
"SELECT block_height, tx_index, txid
FROM nullifier_map
LEFT JOIN tx_locator_map USING (block_height, tx_index)
WHERE spend_pool = :spend_pool AND nf = :nf",
)?;
let sql_args = named_params![
":spend_pool": pool_code(PoolType::Shielded(spend_pool)),
":nf": nf.as_ref(),
];
// Find the locator corresponding to this nullifier, if any.
let locator = stmt_select_locator
.query_row(sql_args, |row| {
Ok((
BlockHeight::from_u32(row.get(0)?),
row.get(1)?,
TxId::from_bytes(row.get(2)?),
))
})
.optional()?;
let (height, index, txid) = match locator {
Some(res) => res,
None => return Ok(None),
};
// Find or create a corresponding row in the `transactions` table. Usually a row will
// have been created during the same scan that the locator was added to the nullifier
// map, but it would not happen if the transaction in question spent the note with no
// change or explicit in-wallet recipient.
put_tx_meta(
conn,
&WalletTx::<N> {
txid,
index,
sapling_spends: vec![],
sapling_outputs: vec![],
},
height,
)
.map(Some)
}
/// Deletes from the nullifier map any entries with a locator referencing a block height
/// lower than the pruning height.
pub(crate) fn prune_nullifier_map(
conn: &rusqlite::Transaction<'_>,
block_height: BlockHeight,
) -> Result<(), SqliteClientError> {
conn.execute(
"DELETE FROM tx_locator_map
WHERE block_height < :block_height",
named_params![":block_height": u32::from(block_height)],
)?;
Ok(())
}
#[cfg(test)]
mod tests {
use std::num::NonZeroU32;

View File

@ -414,6 +414,18 @@ mod tests {
sapling_tree BLOB NOT NULL ,
sapling_commitment_tree_size INTEGER,
orchard_commitment_tree_size INTEGER)",
"CREATE TABLE nullifier_map (
spend_pool INTEGER NOT NULL,
nf BLOB NOT NULL,
block_height INTEGER NOT NULL,
tx_index INTEGER NOT NULL,
CONSTRAINT tx_locator
FOREIGN KEY (block_height, tx_index)
REFERENCES tx_locator_map(block_height, tx_index)
ON DELETE CASCADE
ON UPDATE RESTRICT,
CONSTRAINT nf_uniq UNIQUE (spend_pool, nf)
)",
"CREATE TABLE sapling_received_notes (
id_note INTEGER PRIMARY KEY,
tx INTEGER NOT NULL,
@ -507,6 +519,12 @@ mod tests {
fee INTEGER,
FOREIGN KEY (block) REFERENCES blocks(height)
)",
"CREATE TABLE tx_locator_map (
block_height INTEGER NOT NULL,
tx_index INTEGER NOT NULL,
txid BLOB NOT NULL UNIQUE,
PRIMARY KEY (block_height, tx_index)
)",
"CREATE TABLE \"utxos\" (
id_utxo INTEGER PRIMARY KEY,
received_by_account INTEGER NOT NULL,

View File

@ -2,6 +2,7 @@ mod add_transaction_views;
mod add_utxo_account;
mod addresses_table;
mod initial_setup;
mod nullifier_map;
mod received_notes_nullable_nf;
mod sent_notes_to_internal;
mod shardtree_support;
@ -30,6 +31,10 @@ pub(super) fn all_migrations<P: consensus::Parameters + 'static>(
// add_transaction_views
// /
// v_transactions_net
// /
// received_notes_nullable_nf
// / \
// shardtree_support nullifier_map
vec![
Box::new(initial_setup::Migration {}),
Box::new(utxos_table::Migration {}),
@ -48,5 +53,6 @@ pub(super) fn all_migrations<P: consensus::Parameters + 'static>(
Box::new(v_transactions_net::Migration),
Box::new(received_notes_nullable_nf::Migration),
Box::new(shardtree_support::Migration),
Box::new(nullifier_map::Migration),
]
}

View File

@ -0,0 +1,72 @@
//! This migration adds a table for storing mappings from nullifiers to the transaction
//! they are revealed in.
use std::collections::HashSet;
use schemer_rusqlite::RusqliteMigration;
use tracing::debug;
use uuid::Uuid;
use crate::wallet::init::WalletMigrationError;
use super::received_notes_nullable_nf;
pub(super) const MIGRATION_ID: Uuid = Uuid::from_u128(0xe2d71ac5_6a44_4c6b_a9a0_6d0a79d355f1);
pub(super) struct Migration;
impl schemer::Migration for Migration {
fn id(&self) -> Uuid {
MIGRATION_ID
}
fn dependencies(&self) -> HashSet<Uuid> {
[received_notes_nullable_nf::MIGRATION_ID]
.into_iter()
.collect()
}
fn description(&self) -> &'static str {
"Adds a lookup table for nullifiers we've observed on-chain that we haven't confirmed are not ours."
}
}
impl RusqliteMigration for Migration {
type Error = WalletMigrationError;
fn up(&self, transaction: &rusqlite::Transaction) -> Result<(), Self::Error> {
// We don't enforce any foreign key constraint to the blocks table, to allow
// loading the nullifier map separately from block scanning.
debug!("Creating tables for nullifier map");
transaction.execute_batch(
"CREATE TABLE tx_locator_map (
block_height INTEGER NOT NULL,
tx_index INTEGER NOT NULL,
txid BLOB NOT NULL UNIQUE,
PRIMARY KEY (block_height, tx_index)
);
CREATE TABLE nullifier_map (
spend_pool INTEGER NOT NULL,
nf BLOB NOT NULL,
block_height INTEGER NOT NULL,
tx_index INTEGER NOT NULL,
CONSTRAINT tx_locator
FOREIGN KEY (block_height, tx_index)
REFERENCES tx_locator_map(block_height, tx_index)
ON DELETE CASCADE
ON UPDATE RESTRICT,
CONSTRAINT nf_uniq UNIQUE (spend_pool, nf)
);",
)?;
Ok(())
}
fn down(&self, transaction: &rusqlite::Transaction) -> Result<(), Self::Error> {
transaction.execute_batch(
"DROP TABLE nullifier_map;
DROP TABLE tx_locator_map;",
)?;
Ok(())
}
}

View File

@ -312,10 +312,11 @@ pub(crate) fn put_received_note<T: ReceivedSaplingOutput>(
conn: &Connection,
output: &T,
tx_ref: i64,
spent_in: Option<i64>,
) -> Result<NoteId, SqliteClientError> {
let mut stmt_upsert_received_note = conn.prepare_cached(
"INSERT INTO sapling_received_notes
(tx, output_index, account, diversifier, value, rcm, memo, nf, is_change, commitment_tree_position)
(tx, output_index, account, diversifier, value, rcm, memo, nf, is_change, spent, commitment_tree_position)
VALUES (
:tx,
:output_index,
@ -326,6 +327,7 @@ pub(crate) fn put_received_note<T: ReceivedSaplingOutput>(
:memo,
:nf,
:is_change,
:spent,
:commitment_tree_position
)
ON CONFLICT (tx, output_index) DO UPDATE
@ -336,6 +338,7 @@ pub(crate) fn put_received_note<T: ReceivedSaplingOutput>(
nf = IFNULL(:nf, nf),
memo = IFNULL(:memo, memo),
is_change = IFNULL(:is_change, is_change),
spent = IFNULL(:spent, spent),
commitment_tree_position = IFNULL(:commitment_tree_position, commitment_tree_position)
RETURNING id_note",
)?;
@ -354,6 +357,7 @@ pub(crate) fn put_received_note<T: ReceivedSaplingOutput>(
":nf": output.nullifier().map(|nf| nf.0.as_ref()),
":memo": memo_repr(output.memo()),
":is_change": output.is_change(),
":spent": spent_in,
":commitment_tree_position": output.note_commitment_tree_position().map(u64::from),
];