diff --git a/core/src/blocktree.rs b/core/src/blocktree.rs index 97b9718b2e..03a51e400b 100644 --- a/core/src/blocktree.rs +++ b/core/src/blocktree.rs @@ -47,8 +47,7 @@ macro_rules! db_imports { mod $mod; use $mod::$db; - use db::columns as cf; - + use db::{columns as cf, IteratorMode, IteratorDirection}; pub use db::columns; pub type Database = db::Database<$db>; @@ -96,6 +95,7 @@ pub struct Blocktree { data_shred_cf: LedgerColumn, code_shred_cf: LedgerColumn, batch_processor: Arc>, + last_root: Arc>, pub new_blobs_signals: Vec>, pub completed_slots_senders: Vec>>, } @@ -156,6 +156,14 @@ impl Blocktree { let db = Arc::new(db); + // Get max root or 0 if it doesn't exist + let max_root = db + .iter::(IteratorMode::End)? + .next() + .map(|(slot, _)| slot) + .unwrap_or(0); + let last_root = Arc::new(RwLock::new(max_root)); + Ok(Blocktree { db, meta_cf, @@ -170,6 +178,7 @@ impl Blocktree { new_blobs_signals: vec![], batch_processor, completed_slots_senders: vec![], + last_root, }) } @@ -308,7 +317,9 @@ impl Blocktree { } pub fn slot_meta_iterator(&self, slot: u64) -> Result> { - let meta_iter = self.db.iter::(Some(slot))?; + let meta_iter = self + .db + .iter::(IteratorMode::From(slot, IteratorDirection::Forward))?; Ok(meta_iter.map(|(slot, slot_meta_bytes)| { ( slot, @@ -322,7 +333,9 @@ impl Blocktree { &self, slot: u64, ) -> Result)>> { - let slot_iterator = self.db.iter::(Some((slot, 0)))?; + let slot_iterator = self + .db + .iter::(IteratorMode::From((slot, 0), IteratorDirection::Forward))?; Ok(slot_iterator.take_while(move |((blob_slot, _), _)| *blob_slot == slot)) } @@ -463,14 +476,14 @@ impl Blocktree { &mut write_batch, ) } - } else if Blocktree::insert_data_shred( - db, - &mut slot_meta_working_set, - &mut index_working_set, - &shred, - &mut write_batch, - ) - .unwrap_or(false) + } else if self + .insert_data_shred( + &mut slot_meta_working_set, + &mut index_working_set, + &shred, + &mut write_batch, + ) + .unwrap_or(false) { just_inserted_data_shreds.insert((slot, shred_index), shred); } @@ -485,8 +498,7 @@ impl Blocktree { ); recovered_data.into_iter().for_each(|shred| { - let _ = Blocktree::insert_data_shred( - db, + let _ = self.insert_data_shred( &mut slot_meta_working_set, &mut index_working_set, &shred, @@ -495,7 +507,7 @@ impl Blocktree { }); // Handle chaining for the working set - handle_chaining(&db, &mut write_batch, &slot_meta_working_set)?; + handle_chaining(&self.db, &mut write_batch, &slot_meta_working_set)?; let (should_signal, newly_completed_slots) = prepare_signals( &slot_meta_working_set, @@ -575,7 +587,7 @@ impl Blocktree { } fn insert_data_shred( - db: &Database, + &self, mut slot_meta_working_set: &mut HashMap>, Option)>, index_working_set: &mut HashMap, shred: &Shred, @@ -592,14 +604,14 @@ impl Blocktree { false }; - let entry = get_slot_meta_entry(db, &mut slot_meta_working_set, slot, parent); + let entry = get_slot_meta_entry(&self.db, &mut slot_meta_working_set, slot, parent); let slot_meta = &mut entry.0.borrow_mut(); if is_orphan(slot_meta) { slot_meta.parent_slot = parent; } - let data_cf = db.column::(); + let data_cf = self.db.column::(); let check_data_cf = |slot, index| { data_cf @@ -614,7 +626,14 @@ impl Blocktree { .data_mut(); if !index_meta.is_present(index) - && should_insert(slot_meta, index, slot, last_in_slot, check_data_cf) + && should_insert( + slot_meta, + index, + slot, + last_in_slot, + check_data_cf, + *self.last_root.read().unwrap(), + ) { let new_consumed = if slot_meta.consumed == index { let mut current_index = index + 1; @@ -628,8 +647,10 @@ impl Blocktree { }; let serialized_shred = bincode::serialize(shred).unwrap(); - write_batch.put_bytes::((slot, index), &serialized_shred)?; + // Commit step: commit all changes to the mutable structures at once, or none at all. + // We don't want only a subset of these changes going through. + write_batch.put_bytes::((slot, index), &serialized_shred)?; update_slot_meta(last_in_slot, slot_meta, index, new_consumed); index_meta.set_present(index, true); trace!("inserted shred into slot {:?} and index {:?}", slot, index); @@ -884,6 +905,7 @@ impl Blocktree { &mut index_working_set, &mut prev_inserted_blob_datas, &mut write_batch, + *self.last_root.read().unwrap(), )?; } else { insert_data_blob_batch( @@ -893,6 +915,7 @@ impl Blocktree { &mut index_working_set, &mut prev_inserted_blob_datas, &mut write_batch, + *self.last_root.read().unwrap(), )?; } @@ -1134,6 +1157,7 @@ impl Blocktree { &mut index_working_set, &mut prev_inserted_blob_datas, &mut writebatch, + *self.last_root.read().unwrap(), )?; // Handle chaining for the working set @@ -1433,6 +1457,12 @@ impl Blocktree { batch_processor.write(write_batch)?; } + + let mut last_root = self.last_root.write().unwrap(); + if *last_root == std::u64::MAX { + *last_root = 0; + } + *last_root = cmp::max(*rooted_slots.iter().max().unwrap(), *last_root); Ok(()) } @@ -1469,29 +1499,6 @@ impl Blocktree { results } - // Handle special case of writing genesis blobs. For instance, the first two entries - // don't count as ticks, even if they're empty entries - fn write_genesis_blobs(&self, blobs: &[Blob]) -> Result<()> { - // TODO: change bootstrap height to number of slots - let mut bootstrap_meta = SlotMeta::new(0, 1); - let last = blobs.last().unwrap(); - - let mut batch_processor = self.batch_processor.write().unwrap(); - - bootstrap_meta.consumed = last.index() + 1; - bootstrap_meta.received = last.index() + 1; - bootstrap_meta.is_connected = true; - - let mut batch = batch_processor.batch()?; - batch.put::(0, &bootstrap_meta)?; - for blob in blobs { - let serialized_blob_datas = &blob.data[..BLOB_HEADER_SIZE + blob.size()]; - batch.put_bytes::((blob.slot(), blob.index()), serialized_blob_datas)?; - } - batch_processor.write(batch)?; - Ok(()) - } - /// Prune blocktree such that slots higher than `target_slot` are deleted and all references to /// higher slots are removed pub fn prune(&self, target_slot: u64) { @@ -1524,6 +1531,10 @@ impl Blocktree { .expect("couldn't update meta"); } } + + pub fn last_root(&self) -> u64 { + *self.last_root.read().unwrap() + } } fn insert_data_blob_batch<'a, I>( @@ -1533,6 +1544,7 @@ fn insert_data_blob_batch<'a, I>( index_working_set: &mut HashMap, prev_inserted_blob_datas: &mut HashMap<(u64, u64), &'a [u8]>, write_batch: &mut WriteBatch, + last_root: u64, ) -> Result<()> where I: IntoIterator, @@ -1544,6 +1556,7 @@ where slot_meta_working_set, prev_inserted_blob_datas, write_batch, + last_root, ); if inserted { @@ -1589,7 +1602,7 @@ fn insert_data_blob<'a>( let serialized_blob_data = &blob_to_insert.data[..BLOB_HEADER_SIZE + blob_size]; // Commit step: commit all changes to the mutable structures at once, or none at all. - // We don't want only some of these changes going through. + // We don't want only a subset of these changes going through. write_batch.put_bytes::((blob_slot, blob_index), serialized_blob_data)?; prev_inserted_blob_datas.insert((blob_slot, blob_index), serialized_blob_data); update_slot_meta( @@ -1660,6 +1673,7 @@ fn check_insert_data_blob<'a>( slot_meta_working_set: &mut HashMap>, Option)>, prev_inserted_blob_datas: &mut HashMap<(u64, u64), &'a [u8]>, write_batch: &mut WriteBatch, + last_root: u64, ) -> bool { let entry = get_slot_meta_entry(db, slot_meta_working_set, blob.slot(), blob.parent()); @@ -1670,7 +1684,7 @@ fn check_insert_data_blob<'a>( // This slot is full, skip the bogus blob // Check if this blob should be inserted - if !should_insert_blob(&slot_meta, db, &prev_inserted_blob_datas, blob) { + if !should_insert_blob(&slot_meta, db, &prev_inserted_blob_datas, blob, last_root) { false } else { let _ = insert_data_blob(blob, db, prev_inserted_blob_datas, slot_meta, write_batch); @@ -1714,6 +1728,7 @@ fn should_insert_blob( db: &Database, prev_inserted_blob_datas: &HashMap<(u64, u64), &[u8]>, blob: &Blob, + last_slot: u64, ) -> bool { let blob_index = blob.index(); let blob_slot = blob.slot(); @@ -1728,7 +1743,14 @@ fn should_insert_blob( }; !prev_inserted_blob_datas.contains_key(&(blob_slot, blob_index)) - && should_insert(slot, blob_index, blob_slot, last_in_slot, check_data_cf) + && should_insert( + slot, + blob_index, + blob_slot, + last_in_slot, + check_data_cf, + last_slot, + ) } fn should_insert( @@ -1737,6 +1759,7 @@ fn should_insert( slot: u64, last_in_slot: bool, db_check: F, + last_root: u64, ) -> bool where F: Fn(u64, u64) -> bool, @@ -1775,9 +1798,42 @@ where ); return false; } + + if !is_valid_write_to_slot_0(slot, slot_meta.parent_slot, last_root) { + // Check that the parent_slot < slot + if slot_meta.parent_slot >= slot { + datapoint_error!( + "blocktree_error", + ( + "error", + format!( + "Received blob with parent_slot {} >= slot {}", + slot_meta.parent_slot, slot + ), + String + ) + ); + return false; + } + + // Check that the blob is for a slot that is past the root + if slot <= last_root { + return false; + } + + // Ignore blobs that chain to slots before the last root + if slot_meta.parent_slot < last_root { + return false; + } + } + true } +fn is_valid_write_to_slot_0(slot_to_write: u64, parent_slot: u64, last_root: u64) -> bool { + slot_to_write == 0 && last_root == 0 && parent_slot == 0 +} + fn send_signals( new_blobs_signals: &[SyncSender], completed_slots_senders: &[SyncSender>], @@ -2464,33 +2520,11 @@ pub fn create_new_ledger(ledger_path: &Path, genesis_block: &GenesisBlock) -> Re .collect(); blocktree.insert_shreds(shreds)?; + blocktree.set_roots(&[0])?; Ok(last_hash) } -pub fn genesis<'a, I>(ledger_path: &Path, keypair: &Keypair, entries: I) -> Result<()> -where - I: IntoIterator, -{ - let blocktree = Blocktree::open(ledger_path)?; - - // TODO sign these blobs with keypair - let blobs: Vec<_> = entries - .into_iter() - .enumerate() - .map(|(idx, entry)| { - let mut b = entry.borrow().to_blob(); - b.set_index(idx as u64); - b.set_id(&keypair.pubkey()); - b.set_slot(0); - b - }) - .collect(); - - blocktree.write_genesis_blobs(&blobs[..])?; - Ok(()) -} - #[macro_export] macro_rules! tmp_ledger_name { () => { @@ -3909,7 +3943,8 @@ pub mod tests { &slot_meta, &blocktree.db, &HashMap::new(), - &blobs[4].clone() + &blobs[4].clone(), + 0 )); // Trying to insert the same blob again should fail @@ -3919,7 +3954,8 @@ pub mod tests { &slot_meta, &blocktree.db, &HashMap::new(), - &blobs[7].clone() + &blobs[7].clone(), + 0 )); // Trying to insert another "is_last" blob with index < the received index @@ -3932,7 +3968,8 @@ pub mod tests { &slot_meta, &blocktree.db, &HashMap::new(), - &blobs[8].clone() + &blobs[8].clone(), + 0 )); // Insert the 10th blob, which is marked as "is_last" @@ -3945,7 +3982,8 @@ pub mod tests { &slot_meta, &blocktree.db, &HashMap::new(), - &blobs[10].clone() + &blobs[10].clone(), + 0 )); drop(blocktree); @@ -4009,9 +4047,12 @@ pub mod tests { let blocktree_path = get_tmp_ledger_path!(); let blocktree = Blocktree::open(&blocktree_path).unwrap(); let chained_slots = vec![0, 2, 4, 7, 12, 15]; + assert_eq!(blocktree.last_root(), 0); blocktree.set_roots(&chained_slots).unwrap(); + assert_eq!(blocktree.last_root(), 15); + for i in chained_slots { assert!(blocktree.is_root(i)); } @@ -4041,7 +4082,10 @@ pub mod tests { assert_eq!(meta.last_index, 5) }); - let data_iter = blocktree.data_cf.iter(Some((0, 0))).unwrap(); + let data_iter = blocktree + .data_cf + .iter(IteratorMode::From((0, 0), IteratorDirection::Forward)) + .unwrap(); for ((slot, _), _) in data_iter { if slot > 5 { assert!(false); @@ -4855,10 +4899,8 @@ pub mod tests { let mut slots_shreds_and_entries = vec![]; for (i, slot) in chain.iter().enumerate() { let parent_slot = { - if *slot == 0 { + if *slot == 0 || i == 0 { 0 - } else if i == 0 { - std::u64::MAX } else { chain[i - 1] } @@ -4907,10 +4949,8 @@ pub mod tests { let mut slots_blobs_and_entries = vec![]; for (i, slot) in chain.iter().enumerate() { let parent_slot = { - if *slot == 0 { + if *slot == 0 || i == 0 { 0 - } else if i == 0 { - std::u64::MAX } else { chain[i - 1] } diff --git a/core/src/blocktree/db.rs b/core/src/blocktree/db.rs index 7982a56ac7..2ca67f03bc 100644 --- a/core/src/blocktree/db.rs +++ b/core/src/blocktree/db.rs @@ -12,6 +12,17 @@ use std::marker::PhantomData; use std::path::Path; use std::sync::Arc; +pub enum IteratorMode { + Start, + End, + From(Index, IteratorDirection), +} + +pub enum IteratorDirection { + Forward, + Reverse, +} + pub mod columns { #[derive(Debug)] /// SlotMeta Column @@ -77,7 +88,11 @@ pub trait Backend: Sized + Send + Sync { fn delete_cf(&self, cf: Self::ColumnFamily, key: &Self::Key) -> Result<()>; - fn iterator_cf(&self, cf: Self::ColumnFamily, from: Option<&Self::Key>) -> Result; + fn iterator_cf( + &self, + cf: Self::ColumnFamily, + iterator_mode: IteratorMode<&Self::Key>, + ) -> Result; fn raw_iterator_cf(&self, cf: Self::ColumnFamily) -> Result; @@ -262,18 +277,26 @@ where pub fn iter( &self, - start_from: Option, + iterator_mode: IteratorMode, ) -> Result)>> where C: Column, { let iter = { - if let Some(index) = start_from { - let key = C::key(index); - self.backend - .iterator_cf(self.cf_handle::(), Some(key.borrow()))? - } else { - self.backend.iterator_cf(self.cf_handle::(), None)? + match iterator_mode { + IteratorMode::From(start_from, direction) => { + let key = C::key(start_from); + self.backend.iterator_cf( + self.cf_handle::(), + IteratorMode::From(key.borrow(), direction), + )? + } + IteratorMode::Start => self + .backend + .iterator_cf(self.cf_handle::(), IteratorMode::Start)?, + IteratorMode::End => self + .backend + .iterator_cf(self.cf_handle::(), IteratorMode::End)?, } }; @@ -405,15 +428,19 @@ where pub fn iter( &self, - start_from: Option, + iterator_mode: IteratorMode, ) -> Result)>> { let iter = { - if let Some(index) = start_from { - let key = C::key(index); - self.backend - .iterator_cf(self.handle(), Some(key.borrow()))? - } else { - self.backend.iterator_cf(self.handle(), None)? + match iterator_mode { + IteratorMode::From(start_from, direction) => { + let key = C::key(start_from); + self.backend + .iterator_cf(self.handle(), IteratorMode::From(key.borrow(), direction))? + } + IteratorMode::Start => self + .backend + .iterator_cf(self.handle(), IteratorMode::Start)?, + IteratorMode::End => self.backend.iterator_cf(self.handle(), IteratorMode::End)?, } }; @@ -430,7 +457,11 @@ where C::Index: PartialOrd + Copy, { let mut end = true; - let iter = self.iter(from.map(C::as_index))?; + let iter_config = match from { + Some(s) => IteratorMode::From(C::as_index(s), IteratorDirection::Forward), + None => IteratorMode::Start, + }; + let iter = self.iter(iter_config)?; for (index, _) in iter { if let Some(to) = to { if C::slot(index) > to { diff --git a/core/src/blocktree/rocks.rs b/core/src/blocktree/rocks.rs index 71c37f04d7..3ee4ea7c33 100644 --- a/core/src/blocktree/rocks.rs +++ b/core/src/blocktree/rocks.rs @@ -1,5 +1,5 @@ use crate::blocktree::db::columns as cf; -use crate::blocktree::db::{Backend, Column, DbCursor, IWriteBatch, TypedColumn}; +use crate::blocktree::db::{Backend, Column, DbCursor, IWriteBatch, TypedColumn, IteratorMode, IteratorDirection}; use crate::blocktree::BlocktreeError; use crate::result::{Error, Result}; use solana_sdk::timing::Slot; @@ -7,7 +7,7 @@ use solana_sdk::timing::Slot; use byteorder::{BigEndian, ByteOrder}; use rocksdb::{ - self, ColumnFamily, ColumnFamilyDescriptor, DBIterator, DBRawIterator, Direction, IteratorMode, + self, ColumnFamily, ColumnFamilyDescriptor, DBIterator, DBRawIterator, Direction, IteratorMode as RocksIteratorMode, Options, WriteBatch as RWriteBatch, DB, }; @@ -130,14 +130,28 @@ impl Backend for Rocks { Ok(()) } - fn iterator_cf(&self, cf: ColumnFamily, start_from: Option<&[u8]>) -> Result { + fn iterator_cf(&self, cf: ColumnFamily, iterator_mode: IteratorMode<&[u8]>,) -> Result { let iter = { - if let Some(start_from) = start_from { - self.0 - .iterator_cf(cf, IteratorMode::From(start_from, Direction::Forward))? - } else { - self.0.iterator_cf(cf, IteratorMode::Start)? - } + match iterator_mode { + IteratorMode::Start => { + self.0.iterator_cf(cf, RocksIteratorMode::Start)? + } + IteratorMode::End => { + self.0.iterator_cf(cf, RocksIteratorMode::End)? + } + IteratorMode::From(start_from, direction) => { + let rocks_direction = match direction { + IteratorDirection::Forward => { + Direction::Forward + } + IteratorDirection::Reverse => { + Direction::Reverse + } + }; + self.0 + .iterator_cf(cf, RocksIteratorMode::From(start_from, rocks_direction))? + } + } }; Ok(iter) diff --git a/core/src/blocktree_processor.rs b/core/src/blocktree_processor.rs index 8ca0724ae1..ae37f6cc96 100644 --- a/core/src/blocktree_processor.rs +++ b/core/src/blocktree_processor.rs @@ -544,7 +544,7 @@ pub mod tests { info!("last_fork1_entry.hash: {:?}", last_fork1_entry_hash); info!("last_fork2_entry.hash: {:?}", last_fork2_entry_hash); - blocktree.set_roots(&[4, 1, 0]).unwrap(); + blocktree.set_roots(&[0, 1, 4]).unwrap(); let (bank_forks, bank_forks_info, _) = process_blocktree(&genesis_block, &blocktree, None, true, None).unwrap(); diff --git a/core/src/repair_service.rs b/core/src/repair_service.rs index 966edd5617..0508ab7a1d 100644 --- a/core/src/repair_service.rs +++ b/core/src/repair_service.rs @@ -1,6 +1,5 @@ //! The `repair_service` module implements the tools necessary to generate a thread which //! regularly finds missing blobs in the ledger and sends repair requests for those blobs - use crate::bank_forks::BankForks; use crate::blocktree::{Blocktree, CompletedSlotsReceiver, SlotMeta}; use crate::cluster_info::ClusterInfo; @@ -111,12 +110,10 @@ impl RepairService { let id = cluster_info.read().unwrap().id(); let mut current_root = 0; if let RepairStrategy::RepairAll { - ref bank_forks, - ref epoch_schedule, - .. + ref epoch_schedule, .. } = repair_strategy { - current_root = bank_forks.read().unwrap().root(); + current_root = blocktree.last_root(); Self::initialize_epoch_slots( id, blocktree, @@ -143,11 +140,10 @@ impl RepairService { } RepairStrategy::RepairAll { - ref bank_forks, ref completed_slots_receiver, .. } => { - let new_root = bank_forks.read().unwrap().root(); + let new_root = blocktree.last_root(); Self::update_epoch_slots( id, new_root, @@ -239,7 +235,8 @@ impl RepairService { // TODO: Incorporate gossip to determine priorities for repair? // Try to resolve orphans in blocktree - let orphans = blocktree.get_orphans(Some(MAX_ORPHANS)); + let mut orphans = blocktree.get_orphans(Some(MAX_ORPHANS)); + orphans.retain(|x| *x > root); Self::generate_repairs_for_orphans(&orphans[..], &mut repairs); Ok(repairs) @@ -430,11 +427,7 @@ mod test { blocktree.write_blobs(&blobs).unwrap(); assert_eq!( RepairService::generate_repairs(&blocktree, 0, 2).unwrap(), - vec![ - RepairType::HighestBlob(0, 0), - RepairType::Orphan(0), - RepairType::Orphan(2) - ] + vec![RepairType::HighestBlob(0, 0), RepairType::Orphan(2)] ); } @@ -456,7 +449,7 @@ mod test { // Check that repair tries to patch the empty slot assert_eq!( RepairService::generate_repairs(&blocktree, 0, 2).unwrap(), - vec![RepairType::HighestBlob(0, 0), RepairType::Orphan(0)] + vec![RepairType::HighestBlob(0, 0)] ); } Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction"); diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index cfacbde4f1..10b3ee9ec2 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -420,13 +420,14 @@ impl ReplayStage { let mut rooted_banks = root_bank.parents(); rooted_banks.push(root_bank); let rooted_slots: Vec<_> = rooted_banks.iter().map(|bank| bank.slot()).collect(); + // Call leader schedule_cache.set_root() before blocktree.set_root() because + // bank_forks.root is consumed by repair_service to update gossip, so we don't want to + // get blobs for repair on gossip before we update leader schedule, otherwise they may + // get dropped. + leader_schedule_cache.set_root(rooted_banks.last().unwrap()); blocktree .set_roots(&rooted_slots) .expect("Ledger set roots failed"); - // Set root first in leader schedule_cache before bank_forks because bank_forks.root - // is consumed by repair_service to update gossip, so we don't want to get blobs for - // repair on gossip before we update leader schedule, otherwise they may get dropped. - leader_schedule_cache.set_root(rooted_banks.last().unwrap()); bank_forks .write() .unwrap() diff --git a/core/src/tvu.rs b/core/src/tvu.rs index bf36d69e0b..c66adb15ff 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -224,7 +224,7 @@ impl Service for Tvu { pub mod tests { use super::*; use crate::banking_stage::create_test_recorder; - use crate::blocktree::get_tmp_ledger_path; + use crate::blocktree::create_new_tmp_ledger; use crate::cluster_info::{ClusterInfo, Node}; use crate::genesis_utils::{create_genesis_block, GenesisBlockInfo}; use solana_runtime::bank::Bank; @@ -247,7 +247,7 @@ pub mod tests { cluster_info1.insert_info(leader.info.clone()); let cref1 = Arc::new(RwLock::new(cluster_info1)); - let blocktree_path = get_tmp_ledger_path!(); + let (blocktree_path, _) = create_new_tmp_ledger!(&genesis_block); let (blocktree, l_receiver, completed_slots_receiver) = Blocktree::open_with_signal(&blocktree_path) .expect("Expected to successfully open ledger"); diff --git a/local_cluster/tests/local_cluster.rs b/local_cluster/tests/local_cluster.rs index 6d814350f1..c76eee65f2 100644 --- a/local_cluster/tests/local_cluster.rs +++ b/local_cluster/tests/local_cluster.rs @@ -15,6 +15,7 @@ use solana_runtime::{ epoch_schedule::{EpochSchedule, MINIMUM_SLOTS_PER_EPOCH}, }; use solana_sdk::{client::SyncClient, poh_config::PohConfig, timing}; +use std::path::PathBuf; use std::{ collections::{HashMap, HashSet}, fs, @@ -299,37 +300,116 @@ fn test_listener_startup() { assert_eq!(cluster_nodes.len(), 4); } +#[allow(unused_attributes)] #[test] #[serial] -fn test_snapshots_restart_validity() { - solana_logger::setup(); - let temp_dir = TempDir::new().unwrap(); - let snapshot_path = temp_dir.path().join("bank_states"); - let snapshot_package_output_path = temp_dir.path().join("tar"); +#[ignore] +fn test_snapshots_blocktree_floor() { + // First set up the cluster with 1 snapshotting leader let snapshot_interval_slots = 10; - - // Create the snapshot directories - fs::create_dir_all(&snapshot_path).expect("Failed to create snapshots bank state directory"); - fs::create_dir_all(&snapshot_package_output_path) - .expect("Failed to create snapshots tar directory"); - - // Set up the cluster with 1 snapshotting validator - let mut snapshot_validator_config = ValidatorConfig::default(); - snapshot_validator_config.rpc_config.enable_fullnode_exit = true; - snapshot_validator_config.snapshot_config = Some(SnapshotConfig { - snapshot_interval_slots, - snapshot_package_output_path: snapshot_package_output_path.clone(), - snapshot_path, - }); let num_account_paths = 4; - let (account_storage_dirs, account_storage_paths) = generate_account_paths(num_account_paths); - let mut all_account_storage_dirs = vec![account_storage_dirs]; - snapshot_validator_config.account_paths = Some(account_storage_paths); + + let leader_snapshot_test_config = + setup_snapshot_validator_config(snapshot_interval_slots, num_account_paths); + let validator_snapshot_test_config = + setup_snapshot_validator_config(snapshot_interval_slots, num_account_paths); + + let snapshot_package_output_path = &leader_snapshot_test_config + .validator_config + .snapshot_config + .as_ref() + .unwrap() + .snapshot_package_output_path; let config = ClusterConfig { node_stakes: vec![10000], cluster_lamports: 100000, - validator_configs: vec![snapshot_validator_config.clone()], + validator_configs: vec![leader_snapshot_test_config.validator_config.clone()], + ..ClusterConfig::default() + }; + + let mut cluster = LocalCluster::new(&config); + + trace!("Waiting for snapshot tar to be generated with slot",); + + let tar = snapshot_utils::get_snapshot_tar_path(&snapshot_package_output_path); + loop { + if tar.exists() { + trace!("snapshot tar exists"); + break; + } + sleep(Duration::from_millis(5000)); + } + + // Copy tar to validator's snapshot output directory + let validator_tar_path = + snapshot_utils::get_snapshot_tar_path(&validator_snapshot_test_config.snapshot_output_path); + fs::hard_link(tar, &validator_tar_path).unwrap(); + let slot_floor = snapshot_utils::bank_slot_from_archive(&validator_tar_path).unwrap(); + + // Start up a new node from a snapshot, wait for it to catchup with the leader + let validator_stake = 5; + cluster.add_validator( + &validator_snapshot_test_config.validator_config, + validator_stake, + ); + let all_pubkeys = cluster.get_node_pubkeys(); + let validator_id = all_pubkeys + .into_iter() + .find(|x| *x != cluster.entry_point_info.id) + .unwrap(); + let validator_client = cluster.get_validator_client(&validator_id).unwrap(); + let mut current_slot = 0; + + // Make sure this validator can get repaired past the first few warmup epochs + let target_slot = slot_floor + 40; + while current_slot <= target_slot { + trace!("current_slot: {}", current_slot); + if let Ok(slot) = validator_client.get_slot() { + current_slot = slot; + } else { + continue; + } + sleep(Duration::from_secs(1)); + } + + // Check the validator ledger doesn't contain any slots < slot_floor + cluster.close_preserve_ledgers(); + let validator_ledger_path = &cluster.fullnode_infos[&validator_id]; + let blocktree = Blocktree::open(&validator_ledger_path.info.ledger_path).unwrap(); + + // Skip the zeroth slot in blocktree that the ledger is initialized with + let (first_slot, _) = blocktree.slot_meta_iterator(1).unwrap().next().unwrap(); + assert_eq!(first_slot, slot_floor); +} + +#[test] +#[serial] +fn test_snapshots_restart_validity() { + solana_logger::setup(); + let snapshot_interval_slots = 10; + let num_account_paths = 4; + let mut snapshot_test_config = + setup_snapshot_validator_config(snapshot_interval_slots, num_account_paths); + let snapshot_package_output_path = &snapshot_test_config + .validator_config + .snapshot_config + .as_ref() + .unwrap() + .snapshot_package_output_path; + + // Set up the cluster with 1 snapshotting validator + let mut all_account_storage_dirs = vec![vec![]]; + + std::mem::swap( + &mut all_account_storage_dirs[0], + &mut snapshot_test_config.account_storage_dirs, + ); + + let config = ClusterConfig { + node_stakes: vec![10000], + cluster_lamports: 100000, + validator_configs: vec![snapshot_test_config.validator_config.clone()], ..ClusterConfig::default() }; @@ -381,12 +461,12 @@ fn test_snapshots_restart_validity() { let (new_account_storage_dirs, new_account_storage_paths) = generate_account_paths(num_account_paths); all_account_storage_dirs.push(new_account_storage_dirs); - snapshot_validator_config.account_paths = Some(new_account_storage_paths); + snapshot_test_config.validator_config.account_paths = Some(new_account_storage_paths); // Restart a node trace!("Restarting cluster from snapshot"); let nodes = cluster.get_node_pubkeys(); - cluster.restart_node(nodes[0], &snapshot_validator_config); + cluster.restart_node(nodes[0], &snapshot_test_config.validator_config); // Verify account balances on validator trace!("Verifying balances"); @@ -586,3 +666,40 @@ fn generate_account_paths(num_account_paths: usize) -> (Vec, String) { let account_storage_paths = AccountsDB::format_paths(account_storage_paths); (account_storage_dirs, account_storage_paths) } + +struct SnapshotValidatorConfig { + _snapshot_dir: TempDir, + snapshot_output_path: TempDir, + account_storage_dirs: Vec, + validator_config: ValidatorConfig, +} + +fn setup_snapshot_validator_config( + snapshot_interval_slots: usize, + num_account_paths: usize, +) -> SnapshotValidatorConfig { + // Create the snapshot config + let snapshot_dir = TempDir::new().unwrap(); + let snapshot_output_path = TempDir::new().unwrap(); + let snapshot_config = SnapshotConfig { + snapshot_interval_slots, + snapshot_package_output_path: PathBuf::from(snapshot_output_path.path()), + snapshot_path: PathBuf::from(snapshot_dir.path()), + }; + + // Create the account paths + let (account_storage_dirs, account_storage_paths) = generate_account_paths(num_account_paths); + + // Create the validator config + let mut validator_config = ValidatorConfig::default(); + validator_config.rpc_config.enable_fullnode_exit = true; + validator_config.snapshot_config = Some(snapshot_config); + validator_config.account_paths = Some(account_storage_paths); + + SnapshotValidatorConfig { + _snapshot_dir: snapshot_dir, + snapshot_output_path, + account_storage_dirs, + validator_config, + } +}