diff --git a/zebra-state/src/constants.rs b/zebra-state/src/constants.rs index b5060d104..6c454b6d8 100644 --- a/zebra-state/src/constants.rs +++ b/zebra-state/src/constants.rs @@ -52,7 +52,7 @@ pub(crate) const DATABASE_FORMAT_MINOR_VERSION: u64 = 1; /// The database format patch version, incremented each time the on-disk database format has a /// significant format compatibility fix. -pub(crate) const DATABASE_FORMAT_PATCH_VERSION: u64 = 0; +pub(crate) const DATABASE_FORMAT_PATCH_VERSION: u64 = 1; /// The name of the file containing the minor and patch database versions. /// diff --git a/zebra-state/src/service/finalized_state/disk_db.rs b/zebra-state/src/service/finalized_state/disk_db.rs index 8f7e2956b..b1d23227b 100644 --- a/zebra-state/src/service/finalized_state/disk_db.rs +++ b/zebra-state/src/service/finalized_state/disk_db.rs @@ -119,7 +119,8 @@ pub trait WriteDisk { C: rocksdb::AsColumnFamilyRef, K: IntoDisk + Debug; - /// Remove the given key range from rocksdb column family if it exists. + /// Deletes the given key range from rocksdb column family if it exists, including `from` and + /// excluding `to`. fn zs_delete_range(&mut self, cf: &C, from: K, to: K) where C: rocksdb::AsColumnFamilyRef, @@ -147,6 +148,8 @@ impl WriteDisk for DiskWriteBatch { self.batch.delete_cf(cf, key_bytes); } + // TODO: convert zs_delete_range() to take std::ops::RangeBounds + // see zs_range_iter() for an example of the edge cases fn zs_delete_range(&mut self, cf: &C, from: K, to: K) where C: rocksdb::AsColumnFamilyRef, @@ -464,7 +467,13 @@ impl DiskDb { .iterator_cf(cf, start_mode) .map(|result| result.expect("unexpected database failure")) .map(|(key, value)| (key.to_vec(), value)) - // Handle Excluded start and the end bound + // Skip excluded start bound and empty ranges. The `start_mode` already skips keys + // before the start bound. + .skip_while({ + let range = range.clone(); + move |(key, _value)| !range.contains(key) + }) + // Take until the excluded end bound is reached, or we're after the included end bound. .take_while(move |(key, _value)| range.contains(key)) .map(|(key, value)| (K::from_bytes(key), V::from_bytes(value))) } diff --git a/zebra-state/src/service/finalized_state/disk_format/upgrade.rs b/zebra-state/src/service/finalized_state/disk_format/upgrade.rs index 929b02992..a693ac174 100644 --- a/zebra-state/src/service/finalized_state/disk_format/upgrade.rs +++ b/zebra-state/src/service/finalized_state/disk_format/upgrade.rs @@ -2,10 +2,7 @@ use std::{ cmp::Ordering, - sync::{ - atomic::{self, AtomicBool}, - mpsc, Arc, - }, + sync::{mpsc, Arc}, thread::{self, JoinHandle}, }; @@ -15,7 +12,7 @@ use tracing::Span; use zebra_chain::{ block::Height, diagnostic::task::{CheckForPanics, WaitForPanics}, - parameters::{Network, NetworkUpgrade}, + parameters::Network, }; use DbFormatChange::*; @@ -72,7 +69,8 @@ pub struct DbFormatChangeThreadHandle { update_task: Option>>, /// A channel that tells the running format thread to finish early. - should_cancel_format_change: Arc, + /// A channel that tells the running format thread to finish early. + cancel_handle: mpsc::SyncSender, } /// Marker for cancelling a format upgrade. @@ -150,27 +148,24 @@ impl DbFormatChange { // // Cancel handles must use try_send() to avoid blocking waiting for the format change // thread to shut down. - let should_cancel_format_change = Arc::new(AtomicBool::new(false)); + let (cancel_handle, cancel_receiver) = mpsc::sync_channel(1); let span = Span::current(); - let update_task = { - let should_cancel_format_change = should_cancel_format_change.clone(); - thread::spawn(move || { - span.in_scope(move || { - self.apply_format_change( - config, - network, - initial_tip_height, - upgrade_db, - should_cancel_format_change, - ); - }) + let update_task = thread::spawn(move || { + span.in_scope(move || { + self.apply_format_change( + config, + network, + initial_tip_height, + upgrade_db, + cancel_receiver, + ); }) - }; + }); let mut handle = DbFormatChangeThreadHandle { update_task: Some(Arc::new(update_task)), - should_cancel_format_change, + cancel_handle, }; handle.check_for_panics(); @@ -180,8 +175,8 @@ impl DbFormatChange { /// Apply this format change to the database. /// - /// Format changes should be launched in an independent `std::thread`, which runs until the - /// upgrade is finished. + /// Format changes are launched in an independent `std::thread` by `apply_format_upgrade()`. + /// This thread runs until the upgrade is finished. /// /// See `apply_format_upgrade()` for details. fn apply_format_change( @@ -190,21 +185,22 @@ impl DbFormatChange { network: Network, initial_tip_height: Option, upgrade_db: ZebraDb, - should_cancel_format_change: Arc, + cancel_receiver: mpsc::Receiver, ) { match self { - // Handled in the rest of this function. + // Perform any required upgrades, then mark the state as upgraded. Upgrade { .. } => self.apply_format_upgrade( config, network, initial_tip_height, - upgrade_db, - should_cancel_format_change, + upgrade_db.clone(), + cancel_receiver, ), NewlyCreated { .. } => { Self::mark_as_newly_created(&config, network); } + Downgrade { .. } => { // # Correctness // @@ -220,6 +216,13 @@ impl DbFormatChange { // We do this on a best-effort basis for versions that are still supported. } } + + // This check should pass for all format changes: + // - upgrades should de-duplicate trees if needed (and they already do this check) + // - an empty state doesn't have any trees, so it can't have duplicate trees + // - since this Zebra code knows how to de-duplicate trees, downgrades using this code + // still know how to make sure trees are unique + Self::check_for_duplicate_trees(upgrade_db); } /// Apply any required format updates to the database. @@ -237,8 +240,8 @@ impl DbFormatChange { config: Config, network: Network, initial_tip_height: Option, - upgrade_db: ZebraDb, - should_cancel_format_change: Arc, + db: ZebraDb, + cancel_receiver: mpsc::Receiver, ) { let Upgrade { newer_running_version, @@ -273,180 +276,73 @@ impl DbFormatChange { // Start of a database upgrade task. let version_for_pruning_trees = - Version::parse("25.1.0").expect("Hardcoded version string should be valid."); + Version::parse("25.1.1").expect("Hardcoded version string should be valid."); // Check if we need to prune the note commitment trees in the database. if older_disk_version < version_for_pruning_trees { - // Get network upgrade heights - let (&sapling_height, _) = NetworkUpgrade::activation_list(network) - .iter() - .find(|(_, upgrade)| **upgrade == NetworkUpgrade::Sapling) - .expect("there should be sapling upgrade"); - let (&orchard_height, _) = NetworkUpgrade::activation_list(network) - .iter() - .find(|(_, upgrade)| **upgrade == NetworkUpgrade::Nu5) - .expect("there should be Nu5 upgrade"); + // Prune duplicate Sapling note commitment trees. - // Delete duplicates before sapling and orchard heights - let (mut prev_sapling_tree, mut prev_orchard_tree) = { - let height = Height(1); - let mut batch = DiskWriteBatch::new(); + // The last tree we checked. + let mut last_tree = db + .sapling_tree_by_height(&Height(0)) + .expect("Checked above that the genesis block is in the database."); - batch.delete_range_sapling_tree(&upgrade_db, &height, &sapling_height); - batch.delete_range_orchard_tree(&upgrade_db, &height, &orchard_height); - upgrade_db - .write_batch(batch) - .expect("Deleting note commitment trees should always succeed."); - - ( - upgrade_db.sapling_tree_by_height(&Height(0)), - upgrade_db.orchard_tree_by_height(&Height(0)), - ) - }; - - // Create an unbounded channel for reading note commitment trees - let (sapling_tree_tx, sapling_tree_rx) = mpsc::channel(); - - // Set up task for reading sapling note commitment trees - let db = upgrade_db.clone(); - let should_cancel_flag = should_cancel_format_change.clone(); - let sapling_read_task = std::thread::spawn(move || { - for (height, tree) in - db.sapling_tree_by_height_range(sapling_height..initial_tip_height) - { - // Breaking from this loop and dropping the sapling_tree channel - // will cause the sapling compare and delete tasks to finish. - if should_cancel_flag.load(atomic::Ordering::Relaxed) { - break; - } - - if let Err(error) = sapling_tree_tx.send((height, tree)) { - warn!(?error, "unexpected send error") - } + // Run through all the possible duplicate trees in the finalized chain. + // The block after genesis is the first possible duplicate. + for (height, tree) in db.sapling_tree_by_height_range(Height(1)..=initial_tip_height) { + // Return early if there is a cancel signal. + if !matches!(cancel_receiver.try_recv(), Err(mpsc::TryRecvError::Empty)) { + return; } - }); - // Create an unbounded channel for duplicate sapling note commitment tree heights - let (unique_sapling_tree_height_tx, unique_sapling_tree_height_rx) = mpsc::channel(); - - // Set up task for reading sapling note commitment trees - let sapling_compare_task = std::thread::spawn(move || { - while let Ok((height, tree)) = sapling_tree_rx.recv() { - let tree = Some(tree); - if prev_sapling_tree != tree { - if let Err(error) = unique_sapling_tree_height_tx.send(height) { - warn!(?error, "unexpected send error") - } - prev_sapling_tree = tree; - } + // Delete any duplicate trees. + if tree == last_tree { + let mut batch = DiskWriteBatch::new(); + batch.delete_sapling_tree(&db, &height); + db.write_batch(batch) + .expect("Deleting Sapling note commitment trees should always succeed."); } - }); - // Set up task for deleting sapling note commitment trees - let db = upgrade_db.clone(); - let sapling_delete_task = std::thread::spawn(move || { - let mut delete_from = sapling_height.next(); - while let Ok(delete_to) = unique_sapling_tree_height_rx.recv() { - let num_entries = delete_to - delete_from; - if num_entries > 0 { - let mut batch = DiskWriteBatch::new(); - - if num_entries == 1 { - batch.delete_sapling_tree(&db, &delete_from); - } else { - batch.delete_range_sapling_tree(&db, &delete_from, &delete_to); - } - - db.write_batch(batch).expect( - "Deleting Sapling note commitment trees should always succeed.", - ); - } - - delete_from = delete_to.next(); - } - }); - - // Create an unbounded channel for reading note commitment trees - let (orchard_tree_tx, orchard_tree_rx) = mpsc::channel(); - - // Set up task for reading orchard note commitment trees - let db = upgrade_db.clone(); - let should_cancel_flag = should_cancel_format_change; - let orchard_read_task = std::thread::spawn(move || { - for (height, tree) in - db.orchard_tree_by_height_range(orchard_height..initial_tip_height) - { - // Breaking from this loop and dropping the orchard_tree channel - // will cause the orchard compare and delete tasks to finish. - if should_cancel_flag.load(atomic::Ordering::Relaxed) { - break; - } - - if let Err(error) = orchard_tree_tx.send((height, tree)) { - warn!(?error, "unexpected send error") - } - } - }); - - // Create an unbounded channel for duplicate orchard note commitment tree heights - let (unique_orchard_tree_height_tx, unique_orchard_tree_height_rx) = mpsc::channel(); - - // Set up task for reading orchard note commitment trees - let orchard_compare_task = std::thread::spawn(move || { - while let Ok((height, tree)) = orchard_tree_rx.recv() { - let tree = Some(tree); - if prev_orchard_tree != tree { - if let Err(error) = unique_orchard_tree_height_tx.send(height) { - warn!(?error, "unexpected send error") - } - - prev_orchard_tree = tree; - } - } - }); - - // Set up task for deleting orchard note commitment trees - let db = upgrade_db; - let orchard_delete_task = std::thread::spawn(move || { - let mut delete_from = orchard_height.next(); - while let Ok(delete_to) = unique_orchard_tree_height_rx.recv() { - let num_entries = delete_to - delete_from; - if num_entries > 0 { - let mut batch = DiskWriteBatch::new(); - - if num_entries == 1 { - batch.delete_orchard_tree(&db, &delete_from); - } else { - batch.delete_range_orchard_tree(&db, &delete_from, &delete_to); - } - - db.write_batch(batch).expect( - "Deleting Orchard note commitment trees should always succeed.", - ); - } - - delete_from = delete_to.next(); - } - }); - - for task in [ - sapling_read_task, - sapling_compare_task, - sapling_delete_task, - orchard_read_task, - orchard_compare_task, - orchard_delete_task, - ] { - if let Err(error) = task.join() { - warn!(?error, "unexpected join error") - } + // Compare against the last tree to find unique trees. + last_tree = tree; } - // At the end of each format upgrade, we mark the database as upgraded to that version. - // We don't mark the database if `height` didn't reach the `initial_tip_height` because - // Zebra wouldn't run the upgrade anymore, and the part of the database above `height` - // wouldn't be upgraded. - info!(?newer_running_version, "Database has been upgraded to:"); + // Prune duplicate Orchard note commitment trees. + + // The last tree we checked. + let mut last_tree = db + .orchard_tree_by_height(&Height(0)) + .expect("Checked above that the genesis block is in the database."); + + // Run through all the possible duplicate trees in the finalized chain. + // The block after genesis is the first possible duplicate. + for (height, tree) in db.orchard_tree_by_height_range(Height(1)..=initial_tip_height) { + // Return early if there is a cancel signal. + if !matches!(cancel_receiver.try_recv(), Err(mpsc::TryRecvError::Empty)) { + return; + } + + // Delete any duplicate trees. + if tree == last_tree { + let mut batch = DiskWriteBatch::new(); + batch.delete_orchard_tree(&db, &height); + db.write_batch(batch) + .expect("Deleting Orchard note commitment trees should always succeed."); + } + + // Compare against the last tree to find unique trees. + last_tree = tree; + } + + // Before marking the state as upgraded, check that the upgrade completed successfully. + Self::check_for_duplicate_trees(db); + + // Mark the database as upgraded. Zebra won't repeat the upgrade anymore once the + // database is marked, so the upgrade MUST be complete at this point. + info!( + ?newer_running_version, + "Zebra automatically upgraded the database format to:" + ); Self::mark_as_upgraded_to(&version_for_pruning_trees, &config, network); } @@ -461,6 +357,64 @@ impl DbFormatChange { // every time it runs its inner update loop. } + /// Check that note commitment trees were correctly de-duplicated. + /// + /// # Panics + /// + /// If a duplicate tree is found. + pub fn check_for_duplicate_trees(upgrade_db: ZebraDb) { + // Runtime test: make sure we removed all duplicates. + // We always run this test, even if the state has supposedly been upgraded. + let mut duplicate_found = false; + + let mut prev_height = None; + let mut prev_tree = None; + for (height, tree) in upgrade_db.sapling_tree_by_height_range(..) { + if prev_tree == Some(tree.clone()) { + // TODO: replace this with a panic because it indicates an unrecoverable + // bug, which should fail the tests immediately + error!( + height = ?height, + prev_height = ?prev_height.unwrap(), + tree_root = ?tree.root(), + "found duplicate sapling trees after running de-duplicate tree upgrade" + ); + + duplicate_found = true; + } + + prev_height = Some(height); + prev_tree = Some(tree); + } + + let mut prev_height = None; + let mut prev_tree = None; + for (height, tree) in upgrade_db.orchard_tree_by_height_range(..) { + if prev_tree == Some(tree.clone()) { + // TODO: replace this with a panic because it indicates an unrecoverable + // bug, which should fail the tests immediately + error!( + height = ?height, + prev_height = ?prev_height.unwrap(), + tree_root = ?tree.root(), + "found duplicate orchard trees after running de-duplicate tree upgrade" + ); + + duplicate_found = true; + } + + prev_height = Some(height); + prev_tree = Some(tree); + } + + if duplicate_found { + panic!( + "found duplicate sapling or orchard trees \ + after running de-duplicate tree upgrade" + ); + } + } + /// Mark a newly created database with the current format version. /// /// This should be called when a newly created database is opened. @@ -626,8 +580,7 @@ impl DbFormatChangeThreadHandle { // There's nothing we can do about errors here. // If the channel is disconnected, the task has exited. // If it's full, it's already been cancelled. - self.should_cancel_format_change - .store(true, atomic::Ordering::Relaxed); + let _ = self.cancel_handle.try_send(CancelFormatChange); } /// Check for panics in the code running in the spawned thread. diff --git a/zebra-state/src/service/finalized_state/zebra_db.rs b/zebra-state/src/service/finalized_state/zebra_db.rs index 859ab80a6..479f337b0 100644 --- a/zebra-state/src/service/finalized_state/zebra_db.rs +++ b/zebra-state/src/service/finalized_state/zebra_db.rs @@ -106,6 +106,11 @@ impl ZebraDb { ); db.format_change_handle = Some(format_change_handle); + } else { + // If we're re-opening a previously upgraded or newly created database, + // the trees should already be de-duplicated. + // (There's no format change here, so the format change checks won't run.) + DbFormatChange::check_for_duplicate_trees(db.clone()); } db diff --git a/zebra-state/src/service/finalized_state/zebra_db/shielded.rs b/zebra-state/src/service/finalized_state/zebra_db/shielded.rs index 305fbaea6..57f9ae0ba 100644 --- a/zebra-state/src/service/finalized_state/zebra_db/shielded.rs +++ b/zebra-state/src/service/finalized_state/zebra_db/shielded.rs @@ -72,6 +72,8 @@ impl ZebraDb { self.db.zs_contains(&orchard_anchors, &orchard_anchor) } + // # Sprout trees + /// Returns the Sprout note commitment tree of the finalized tip /// or the empty tree if the state is empty. pub fn sprout_tree(&self) -> Arc { @@ -116,7 +118,10 @@ impl ZebraDb { .zs_items_in_range_unordered(&sprout_anchors_handle, ..) } - /// Returns the Sapling note commitment trees starting from the given block height up to the chain tip + // # Sapling trees + + /// Returns the Sapling note commitment tree of the finalized tip or the empty tree if the state + /// is empty. pub fn sapling_tree(&self) -> Arc { let height = match self.finalized_tip_height() { Some(h) => h, @@ -127,35 +132,8 @@ impl ZebraDb { .expect("Sapling note commitment tree must exist if there is a finalized tip") } - /// Returns the Orchard note commitment trees starting from the given block height up to the chain tip - #[allow(clippy::unwrap_in_result)] - pub fn orchard_tree_by_height_range( - &self, - range: R, - ) -> impl Iterator)> + '_ - where - R: std::ops::RangeBounds, - { - let orchard_trees = self.db.cf_handle("orchard_note_commitment_tree").unwrap(); - self.db.zs_range_iter(&orchard_trees, range) - } - - /// Returns the Sapling note commitment tree matching the given block height, - /// or `None` if the height is above the finalized tip. - #[allow(clippy::unwrap_in_result)] - pub fn sapling_tree_by_height_range( - &self, - range: R, - ) -> impl Iterator)> + '_ - where - R: std::ops::RangeBounds, - { - let sapling_trees = self.db.cf_handle("sapling_note_commitment_tree").unwrap(); - self.db.zs_range_iter(&sapling_trees, range) - } - - /// Returns the Sapling note commitment tree matching the given block height, - /// or `None` if the height is above the finalized tip. + /// Returns the Sapling note commitment tree matching the given block height, or `None` if the + /// height is above the finalized tip. #[allow(clippy::unwrap_in_result)] pub fn sapling_tree_by_height( &self, @@ -182,6 +160,19 @@ impl ZebraDb { Some(Arc::new(tree)) } + /// Returns the Sapling note commitment trees in the supplied range. + #[allow(clippy::unwrap_in_result)] + pub fn sapling_tree_by_height_range( + &self, + range: R, + ) -> impl Iterator)> + '_ + where + R: std::ops::RangeBounds, + { + let sapling_trees = self.db.cf_handle("sapling_note_commitment_tree").unwrap(); + self.db.zs_range_iter(&sapling_trees, range) + } + /// Returns the Sapling note commitment subtree at this index #[allow(clippy::unwrap_in_result)] pub fn sapling_subtree_by_index( @@ -198,8 +189,10 @@ impl ZebraDb { Some(subtree_data.with_index(index)) } - /// Returns the Orchard note commitment tree of the finalized tip - /// or the empty tree if the state is empty. + // Orchard trees + + /// Returns the Orchard note commitment tree of the finalized tip or the empty tree if the state + /// is empty. pub fn orchard_tree(&self) -> Arc { let height = match self.finalized_tip_height() { Some(h) => h, @@ -254,6 +247,19 @@ impl ZebraDb { Some(Arc::new(tree)) } + /// Returns the Orchard note commitment trees in the supplied range. + #[allow(clippy::unwrap_in_result)] + pub fn orchard_tree_by_height_range( + &self, + range: R, + ) -> impl Iterator)> + '_ + where + R: std::ops::RangeBounds, + { + let orchard_trees = self.db.cf_handle("orchard_note_commitment_tree").unwrap(); + self.db.zs_range_iter(&orchard_trees, range) + } + /// Returns the shielded note commitment trees of the finalized tip /// or the empty trees if the state is empty. pub fn note_commitment_trees(&self) -> NoteCommitmentTrees { @@ -419,11 +425,14 @@ impl DiskWriteBatch { } /// Deletes the range of Sapling note commitment trees at the given [`Height`]s. Doesn't delete the upper bound. + #[allow(dead_code)] pub fn delete_range_sapling_tree(&mut self, zebra_db: &ZebraDb, from: &Height, to: &Height) { let sapling_tree_cf = zebra_db .db .cf_handle("sapling_note_commitment_tree") .unwrap(); + + // TODO: convert zs_delete_range() to take std::ops::RangeBounds self.zs_delete_range(&sapling_tree_cf, from, to); } @@ -437,11 +446,14 @@ impl DiskWriteBatch { } /// Deletes the range of Orchard note commitment trees at the given [`Height`]s. Doesn't delete the upper bound. + #[allow(dead_code)] pub fn delete_range_orchard_tree(&mut self, zebra_db: &ZebraDb, from: &Height, to: &Height) { let orchard_tree_cf = zebra_db .db .cf_handle("orchard_note_commitment_tree") .unwrap(); + + // TODO: convert zs_delete_range() to take std::ops::RangeBounds self.zs_delete_range(&orchard_tree_cf, from, to); } }