diff --git a/core/src/crds_gossip_pull.rs b/core/src/crds_gossip_pull.rs index 5da4c4f5db..d8b3fc9066 100644 --- a/core/src/crds_gossip_pull.rs +++ b/core/src/crds_gossip_pull.rs @@ -238,7 +238,7 @@ impl CrdsGossipPull { if now > r.wallclock().checked_add(timeout).unwrap_or_else(|| 0) || now + timeout < r.wallclock() { - inc_new_counter_warn!( + inc_new_counter_info!( "cluster_info-gossip_pull_response_value_timeout", 1 ); @@ -250,7 +250,7 @@ impl CrdsGossipPull { // Before discarding this value, check if a ContactInfo for the owner // exists in the table. If it doesn't, that implies that this value can be discarded if crds.lookup(&CrdsValueLabel::ContactInfo(owner)).is_none() { - inc_new_counter_warn!( + inc_new_counter_info!( "cluster_info-gossip_pull_response_value_timeout", 1 ); diff --git a/core/src/ledger_cleanup_service.rs b/core/src/ledger_cleanup_service.rs index 3f72f5ae62..74c9fc4081 100644 --- a/core/src/ledger_cleanup_service.rs +++ b/core/src/ledger_cleanup_service.rs @@ -131,7 +131,6 @@ impl LedgerCleanupService { while let Ok(new_root) = new_root_receiver.try_recv() { root = new_root; } - if root - *last_purge_slot > purge_interval { let disk_utilization_pre = blockstore.storage_size(); info!( diff --git a/core/src/poh_recorder.rs b/core/src/poh_recorder.rs index 06e3993ed0..23f612f025 100644 --- a/core/src/poh_recorder.rs +++ b/core/src/poh_recorder.rs @@ -354,7 +354,7 @@ impl PohRecorder { pub fn tick(&mut self) { let now = Instant::now(); let poh_entry = self.poh.lock().unwrap().tick(); - inc_new_counter_warn!( + inc_new_counter_info!( "poh_recorder-tick_lock_contention", timing::duration_as_us(&now.elapsed()) as usize ); @@ -364,7 +364,7 @@ impl PohRecorder { trace!("tick_height {}", self.tick_height); if self.leader_first_tick_height.is_none() { - inc_new_counter_warn!( + inc_new_counter_info!( "poh_recorder-tick_overhead", timing::duration_as_us(&now.elapsed()) as usize ); @@ -380,7 +380,7 @@ impl PohRecorder { self.tick_cache.push((entry, self.tick_height)); let _ = self.flush_cache(true); } - inc_new_counter_warn!( + inc_new_counter_info!( "poh_recorder-tick_overhead", timing::duration_as_us(&now.elapsed()) as usize ); @@ -409,13 +409,13 @@ impl PohRecorder { { let now = Instant::now(); let mut poh_lock = self.poh.lock().unwrap(); - inc_new_counter_warn!( + inc_new_counter_info!( "poh_recorder-record_lock_contention", timing::duration_as_us(&now.elapsed()) as usize ); let now = Instant::now(); let res = poh_lock.record(mixin); - inc_new_counter_warn!( + inc_new_counter_info!( "poh_recorder-record_ms", timing::duration_as_us(&now.elapsed()) as usize ); diff --git a/core/src/repair_service.rs b/core/src/repair_service.rs index 9350e0e88a..08c31aae1d 100644 --- a/core/src/repair_service.rs +++ b/core/src/repair_service.rs @@ -262,7 +262,7 @@ impl RepairService { } else if slot_meta.consumed == slot_meta.received { vec![RepairType::HighestShred(slot, slot_meta.received)] } else { - let reqs = blockstore.find_missing_data_indexes( + let reqs = blockstore.find_missing_data_indexes_ts( slot, slot_meta.first_shred_timestamp, slot_meta.consumed, diff --git a/core/src/snapshot_packager_service.rs b/core/src/snapshot_packager_service.rs index 57875d98e9..cedd304882 100644 --- a/core/src/snapshot_packager_service.rs +++ b/core/src/snapshot_packager_service.rs @@ -1,4 +1,5 @@ use crate::cluster_info::{ClusterInfo, MAX_SNAPSHOT_HASHES}; +use solana_ledger::blockstore::Blockstore; use solana_ledger::{snapshot_package::AccountsPackageReceiver, snapshot_utils}; use solana_sdk::{clock::Slot, hash::Hash}; use std::{ @@ -21,6 +22,7 @@ impl SnapshotPackagerService { starting_snapshot_hash: Option<(Slot, Hash)>, exit: &Arc, cluster_info: &Arc>, + blockstore: Option>, ) -> Self { let exit = exit.clone(); let cluster_info = cluster_info.clone(); @@ -63,6 +65,9 @@ impl SnapshotPackagerService { .unwrap() .push_snapshot_hashes(hashes.clone()); } + if let Some(ref blockstore) = blockstore { + let _ = blockstore.tar_shreds(snapshot_package.root); + } } Err(RecvTimeoutError::Disconnected) => break, Err(RecvTimeoutError::Timeout) => (), diff --git a/core/src/tvu.rs b/core/src/tvu.rs index 424f254158..1adfa6d9b7 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -293,6 +293,7 @@ pub mod tests { Blockstore::open_with_signal(&blockstore_path) .expect("Expected to successfully open ledger"); let blockstore = Arc::new(blockstore); + let bank = bank_forks.working_bank(); let (exit, poh_recorder, poh_service, _entry_receiver) = create_test_recorder(&bank, &blockstore, None); diff --git a/core/src/validator.rs b/core/src/validator.rs index 9a4bc6798a..9062ddb1c2 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -198,6 +198,10 @@ impl Validator { let bank_info = &bank_forks_info[0]; let bank = bank_forks[bank_info.bank_slot].clone(); + blockstore + .reconcile_shreds(Some(&leader_schedule_cache)) + .expect("Expected to successfully reconcile shreds"); + info!("Starting validator from slot {}", bank.slot()); { let hard_forks: Vec<_> = bank.hard_forks().read().unwrap().iter().copied().collect(); @@ -376,8 +380,13 @@ impl Validator { if config.snapshot_config.is_some() { // Start a snapshot packaging service let (sender, receiver) = channel(); - let snapshot_packager_service = - SnapshotPackagerService::new(receiver, snapshot_hash, &exit, &cluster_info); + let snapshot_packager_service = SnapshotPackagerService::new( + receiver, + snapshot_hash, + &exit, + &cluster_info, + Some(blockstore.clone()), + ); (Some(snapshot_packager_service), Some(sender)) } else { (None, None) diff --git a/core/tests/bank_forks.rs b/core/tests/bank_forks.rs index 40bf9d420b..01ebfaece3 100644 --- a/core/tests/bank_forks.rs +++ b/core/tests/bank_forks.rs @@ -324,7 +324,7 @@ mod tests { ))); let snapshot_packager_service = - SnapshotPackagerService::new(receiver, None, &exit, &cluster_info); + SnapshotPackagerService::new(receiver, None, &exit, &cluster_info, None); // Close the channel so that the package service will exit after reading all the // packages off the channel diff --git a/ledger-tool/src/main.rs b/ledger-tool/src/main.rs index 9fbc5ff22e..0220fda8e0 100644 --- a/ledger-tool/src/main.rs +++ b/ledger-tool/src/main.rs @@ -494,8 +494,6 @@ fn analyze_storage(database: &Database) -> Result<(), String> { analyze_column::(database, "ErasureMeta", ErasureMeta::key_size())?; analyze_column::(database, "Root", Root::key_size())?; analyze_column::(database, "Index", Index::key_size())?; - analyze_column::(database, "ShredData", ShredData::key_size())?; - analyze_column::(database, "ShredCode", ShredCode::key_size())?; analyze_column::( database, "TransactionStatus", diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index 86331586da..25d22bc518 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -20,19 +20,19 @@ use rayon::{ iter::{IntoParallelRefIterator, ParallelIterator}, ThreadPool, }; -use rocksdb::DBRawIterator; use solana_measure::measure::Measure; use solana_metrics::{datapoint_debug, datapoint_error}; use solana_rayon_threadlimit::get_thread_count; use solana_sdk::{ account::Account, + clock::DEFAULT_MS_PER_SLOT, clock::{Slot, UnixTimestamp, DEFAULT_TICKS_PER_SECOND, MS_PER_TICK}, genesis_config::GenesisConfig, hash::Hash, program_utils::limited_deserialize, pubkey::Pubkey, signature::{Keypair, Signature, Signer}, - timing::timestamp, + timing::{duration_as_ms, timestamp}, transaction::Transaction, }; use solana_transaction_status::{ @@ -43,8 +43,10 @@ use solana_vote_program::{vote_instruction::VoteInstruction, vote_state::TIMESTA use std::{ cell::RefCell, cmp, - collections::HashMap, + collections::{HashMap, HashSet}, fs, + io::Read, + io::Write, path::{Path, PathBuf}, rc::Rc, sync::{ @@ -55,6 +57,7 @@ use std::{ }; pub const BLOCKSTORE_DIRECTORY: &str = "rocksdb"; +pub const SHREDS_DIRECTORY: &str = "shreds"; thread_local!(static PAR_THREAD_POOL: RefCell = RefCell::new(rayon::ThreadPoolBuilder::new() .num_threads(get_thread_count()) @@ -83,8 +86,6 @@ pub struct Blockstore { erasure_meta_cf: LedgerColumn, orphans_cf: LedgerColumn, index_cf: LedgerColumn, - data_shred_cf: LedgerColumn, - code_shred_cf: LedgerColumn, transaction_status_cf: LedgerColumn, address_signatures_cf: LedgerColumn, transaction_status_index_cf: LedgerColumn, @@ -96,6 +97,7 @@ pub struct Blockstore { pub completed_slots_senders: Vec>>, pub lowest_cleanup_slot: Arc>, no_compaction: bool, + shreds_dir: String, } pub struct IndexMetaWorkingSetEntry { @@ -177,6 +179,11 @@ impl Blockstore { pub fn open(ledger_path: &Path) -> Result { fs::create_dir_all(&ledger_path)?; let blockstore_path = ledger_path.join(BLOCKSTORE_DIRECTORY); + let shreds_dir = ledger_path + .join(SHREDS_DIRECTORY) + .to_str() + .unwrap() + .to_string(); adjust_ulimit_nofile(); @@ -199,8 +206,6 @@ impl Blockstore { let orphans_cf = db.column(); let index_cf = db.column(); - let data_shred_cf = db.column(); - let code_shred_cf = db.column(); let transaction_status_cf = db.column(); let address_signatures_cf = db.column(); let transaction_status_index_cf = db.column(); @@ -242,8 +247,6 @@ impl Blockstore { erasure_meta_cf, orphans_cf, index_cf, - data_shred_cf, - code_shred_cf, transaction_status_cf, address_signatures_cf, transaction_status_index_cf, @@ -255,6 +258,7 @@ impl Blockstore { last_root, lowest_cleanup_slot: Arc::new(RwLock::new(0)), no_compaction: false, + shreds_dir, }; if initialize_transaction_status_index { blockstore.initialize_transaction_status_index()?; @@ -339,6 +343,49 @@ impl Blockstore { } } + fn tar_dir(dir: String, archive: String) -> Result<()> { + let args = ["cfz", &archive, &dir]; + let output = std::process::Command::new("tar").args(&args).output()?; + if !output.status.success() { + warn!( + "tar shreds {} command failed with exit code: {}", + dir, output.status, + ); + use std::str::from_utf8; + info!("tar stdout: {}", from_utf8(&output.stdout).unwrap_or("?")); + info!("tar stderr: {}", from_utf8(&output.stderr).unwrap_or("?")); + } else { + let _ = fs::remove_dir_all(dir); + } + Ok(()) + } + + pub fn tar_shreds(&self, max_slot: Slot) -> Result<()> { + fs::create_dir_all(Path::new(&self.shreds_dir).join("data").join("tars"))?; + fs::create_dir_all(Path::new(&self.shreds_dir).join("coding").join("tars"))?; + let dir = fs::read_dir(Path::new(&self.shreds_dir).join("data"))?; + let slots = dir + .filter_map(|e| { + let e = e.ok()?; + let path = e.path(); + if !path.is_dir() { + return None; + } + let ix: Slot = std::str::FromStr::from_str(e.file_name().to_str()?).ok()?; + Some(ix) + }) + .filter(|ix| *ix < max_slot); + for slot in slots { + let dir = self.slot_data_dir(slot); + let archive = self.slot_data_tar_path(slot); + let _ = Self::tar_dir(dir, archive); + let dir = self.slot_coding_dir(slot); + let archive = self.slot_coding_tar_path(slot); + let _ = Self::tar_dir(dir, archive); + } + Ok(()) + } + // Returns whether or not all columns have been purged until their end fn run_purge(&self, from_slot: Slot, to_slot: Slot) -> Result { let mut write_batch = self @@ -347,6 +394,12 @@ impl Blockstore { .expect("Database Error: Failed to get write batch"); // delete range cf is not inclusive let to_slot = to_slot.checked_add(1).unwrap_or_else(|| std::u64::MAX); + for s in from_slot..to_slot { + let _ = fs::remove_dir_all(self.slot_data_dir(s)); + let _ = fs::remove_file(self.slot_data_tar_path(s)); + let _ = fs::remove_dir_all(self.slot_coding_dir(s)); + let _ = fs::remove_file(self.slot_coding_tar_path(s)); + } let mut columns_empty = self .db .delete_range_cf::(&mut write_batch, from_slot, to_slot) @@ -355,14 +408,6 @@ impl Blockstore { .db .delete_range_cf::(&mut write_batch, from_slot, to_slot) .unwrap_or(false) - & self - .db - .delete_range_cf::(&mut write_batch, from_slot, to_slot) - .unwrap_or(false) - & self - .db - .delete_range_cf::(&mut write_batch, from_slot, to_slot) - .unwrap_or(false) & self .db .delete_range_cf::(&mut write_batch, from_slot, to_slot) @@ -429,14 +474,6 @@ impl Blockstore { .column::() .compact_range(from_slot, to_slot) .unwrap_or(false) - && self - .data_shred_cf - .compact_range(from_slot, to_slot) - .unwrap_or(false) - && self - .code_shred_cf - .compact_range(from_slot, to_slot) - .unwrap_or(false) && self .dead_slots_cf .compact_range(from_slot, to_slot) @@ -510,29 +547,40 @@ impl Blockstore { let orphans_iter = self.orphans_iterator(root + 1).unwrap(); root_forks.chain(orphans_iter.flat_map(move |orphan| NextSlotsIterator::new(orphan, self))) } - - pub fn slot_data_iterator<'a>( - &'a self, - slot: Slot, + pub fn dir_iterator( + slot: u64, + path: String, index: u64, - ) -> Result)> + 'a> { - let slot_iterator = self.db.iter::(IteratorMode::From( - (slot, index), - IteratorDirection::Forward, - ))?; - Ok(slot_iterator.take_while(move |((shred_slot, _), _)| *shred_slot == slot)) + ) -> Result)>> { + let dir = fs::read_dir(path)?; + Ok(dir.filter_map(move |e| { + let e = e.ok()?; + let ix: u64 = std::str::FromStr::from_str(e.file_name().to_str()?).ok()?; + if ix >= index { + let buf = Self::get_data(&e.path().to_str().unwrap().to_string(), None).ok()??; + Some(((slot, ix), buf.into_boxed_slice())) + } else { + None + } + })) } - pub fn slot_coding_iterator<'a>( - &'a self, + pub fn slot_data_iterator( + &self, slot: Slot, index: u64, - ) -> Result)> + 'a> { - let slot_iterator = self.db.iter::(IteratorMode::From( - (slot, index), - IteratorDirection::Forward, - ))?; - Ok(slot_iterator.take_while(move |((shred_slot, _), _)| *shred_slot == slot)) + ) -> Result)>> { + let dir = self.slot_data_dir(slot); + Self::dir_iterator(slot, dir, index) + } + + pub fn slot_coding_iterator( + &self, + slot: Slot, + index: u64, + ) -> Result)>> { + let dir = self.slot_coding_dir(slot); + Self::dir_iterator(slot, dir, index) } pub fn rooted_slot_iterator<'a>( @@ -546,14 +594,12 @@ impl Blockstore { } fn try_shred_recovery( - db: &Database, + &self, erasure_metas: &HashMap<(u64, u64), ErasureMeta>, index_working_set: &mut HashMap, prev_inserted_datas: &mut HashMap<(u64, u64), Shred>, prev_inserted_codes: &mut HashMap<(u64, u64), Shred>, ) -> Vec { - let data_cf = db.column::(); - let code_cf = db.column::(); let mut recovered_data_shreds = vec![]; // Recovery rules: // 1. Only try recovery around indexes for which new data or coding shreds are received @@ -588,8 +634,8 @@ impl Blockstore { if index.data().is_present(i) { if let Some(shred) = prev_inserted_datas.remove(&(slot, i)).or_else(|| { - let some_data = data_cf - .get_bytes((slot, i)) + let some_data = self + .get_data_shred(slot, i) .expect("Database failure, could not fetch data shred"); if let Some(data) = some_data { Shred::new_from_serialized_shred(data).ok() @@ -619,8 +665,8 @@ impl Blockstore { }) .or_else(|| { if index.coding().is_present(i) { - let some_code = code_cf - .get_bytes((slot, i)) + let some_code = self + .get_coding_shred(slot, i) .expect("Database failure, could not fetch code shred"); if let Some(code) = some_code { Shred::new_from_serialized_shred(code).ok() @@ -710,7 +756,6 @@ impl Blockstore { &mut erasure_metas, &mut index_working_set, &mut slot_meta_working_set, - &mut write_batch, &mut just_inserted_data_shreds, &mut index_meta_time, is_trusted, @@ -737,8 +782,7 @@ impl Blockstore { let mut start = Measure::start("Shred recovery"); let mut num_recovered = 0; if let Some(leader_schedule_cache) = leader_schedule { - let recovered_data = Self::try_shred_recovery( - &db, + let recovered_data = self.try_shred_recovery( &erasure_metas, &mut index_working_set, &mut just_inserted_data_shreds, @@ -754,7 +798,6 @@ impl Blockstore { &mut erasure_metas, &mut index_working_set, &mut slot_meta_working_set, - &mut write_batch, &mut just_inserted_data_shreds, &mut index_meta_time, is_trusted, @@ -770,12 +813,7 @@ impl Blockstore { just_inserted_coding_shreds .into_iter() .for_each(|((_, _), shred)| { - self.check_insert_coding_shred( - shred, - &mut index_working_set, - &mut write_batch, - &mut index_meta_time, - ); + self.check_insert_coding_shred(shred, &mut index_working_set, &mut index_meta_time); num_inserted += 1; }); @@ -853,7 +891,6 @@ impl Blockstore { &self, shred: Shred, index_working_set: &mut HashMap, - write_batch: &mut WriteBatch, index_meta_time: &mut u64, ) -> bool { let slot = shred.slot(); @@ -864,7 +901,7 @@ impl Blockstore { let index_meta = &mut index_meta_working_set_entry.index; // This gives the index of first coding shred in this FEC block // So, all coding shreds in a given FEC block will have the same set index - self.insert_coding_shred(index_meta, &shred, write_batch) + self.insert_coding_shred(index_meta, &shred) .map(|_| { index_meta_working_set_entry.did_insert_occur = true; }) @@ -905,6 +942,7 @@ impl Blockstore { .get((slot, set_index)) .expect("Expect database get to succeed") .unwrap_or_else(|| { + fs::create_dir_all(self.slot_coding_dir(slot)).unwrap(); ErasureMeta::new(set_index, first_coding_index, &erasure_config) }) }); @@ -917,7 +955,6 @@ impl Blockstore { erasure_meta.config, erasure_config ); } - // Should be safe to modify index_meta here. Two cases // 1) Recovery happens: Then all inserted erasure metas are removed // from just_received_coding_shreds, and nothing wll be committed by @@ -942,7 +979,6 @@ impl Blockstore { erasure_metas: &mut HashMap<(u64, u64), ErasureMeta>, index_working_set: &mut HashMap, slot_meta_working_set: &mut HashMap, - write_batch: &mut WriteBatch, just_inserted_data_shreds: &mut HashMap<(u64, u64), Shred>, index_meta_time: &mut u64, is_trusted: bool, @@ -973,9 +1009,10 @@ impl Blockstore { } let set_index = u64::from(shred.common_header.fec_set_index); - if let Ok(()) = - self.insert_data_shred(slot_meta, index_meta.data_mut(), &shred, write_batch) - { + if slot_meta.received == 0 { + fs::create_dir_all(self.slot_data_dir(shred.slot())).unwrap(); + } + if let Ok(()) = self.insert_data_shred(slot_meta, index_meta.data_mut(), &shred) { just_inserted_data_shreds.insert((slot, shred_index), shred); index_meta_working_set_entry.did_insert_occur = true; slot_meta_entry.did_insert_occur = true; @@ -1014,12 +1051,18 @@ impl Blockstore { || slot <= *last_root.read().unwrap()) } - fn insert_coding_shred( - &self, - index_meta: &mut Index, - shred: &Shred, - write_batch: &mut WriteBatch, - ) -> Result<()> { + fn write_all(path: &str, payload: &[u8]) -> Result<()> { + let tmp_name = format!("{}.tmp", path); + let tmp_path = Path::new(&tmp_name); + let mut f = fs::File::create(tmp_path)?; + f.write_all(payload)?; + let real_path = Path::new(path); + //after rename syscall returns, the real path is on disk + fs::rename(tmp_path, real_path)?; + Ok(()) + } + + fn insert_coding_shred(&self, index_meta: &mut Index, shred: &Shred) -> Result<()> { let slot = shred.slot(); let shred_index = u64::from(shred.index()); @@ -1029,7 +1072,8 @@ impl Blockstore { // Commit step: commit all changes to the mutable structures at once, or none at all. // We don't want only a subset of these changes going through. - write_batch.put_bytes::((slot, shred_index), &shred.payload)?; + let path = self.coding_shred_path(slot, shred_index); + Self::write_all(&path, &shred.payload)?; index_meta.coding_mut().set_present(shred_index, true); Ok(()) @@ -1098,7 +1142,6 @@ impl Blockstore { slot_meta: &mut SlotMeta, data_index: &mut ShredIndex, shred: &Shred, - write_batch: &mut WriteBatch, ) -> Result<()> { let slot = shred.slot(); let index = u64::from(shred.index()); @@ -1133,7 +1176,8 @@ impl Blockstore { // Commit step: commit all changes to the mutable structures at once, or none at all. // We don't want only a subset of these changes going through. - write_batch.put_bytes::((slot, index), &shred.payload)?; + let path = self.data_shred_path(slot, index); + Self::write_all(&path, &shred.payload)?; update_slot_meta( last_in_slot, last_in_data, @@ -1147,8 +1191,92 @@ impl Blockstore { Ok(()) } + fn slot_data_tar_path(&self, slot: Slot) -> String { + Path::new(&self.shreds_dir) + .join("data") + .join("tars") + .join(slot.to_string()) + .with_extension("tar.gz") + .to_str() + .unwrap() + .to_string() + } + + fn slot_data_dir(&self, slot: Slot) -> String { + Path::new(&self.shreds_dir) + .join("data") + .join(slot.to_string()) + .to_str() + .unwrap() + .to_string() + } + fn slot_coding_dir(&self, slot: Slot) -> String { + Path::new(&self.shreds_dir) + .join("coding") + .join(slot.to_string()) + .to_str() + .unwrap() + .to_string() + } + fn slot_coding_tar_path(&self, slot: Slot) -> String { + Path::new(&self.shreds_dir) + .join("coding") + .join("tars") + .join(slot.to_string()) + .with_extension("tar.gz") + .to_str() + .unwrap() + .to_string() + } + fn data_shred_path(&self, slot: Slot, index: u64) -> String { + Path::new(&self.slot_data_dir(slot)) + .join(index.to_string()) + .to_str() + .unwrap() + .to_string() + } + fn coding_shred_path(&self, slot: Slot, index: u64) -> String { + Path::new(&self.slot_coding_dir(slot)) + .join(index.to_string()) + .to_str() + .unwrap() + .to_string() + } + fn extract_data(archive: &str, file: &str) -> Result>> { + let args = ["xfzO", archive, file]; + let output = std::process::Command::new("tar").args(&args).output()?; + if !output.status.success() { + warn!( + "tar extract shred {} {} command failed with exit code: {}", + archive, file, output.status, + ); + use std::str::from_utf8; + info!("tar stdout: {}", from_utf8(&output.stdout).unwrap_or("?")); + info!("tar stderr: {}", from_utf8(&output.stderr).unwrap_or("?")); + Ok(None) + } else { + Ok(Some(output.stdout)) + } + } + fn get_data(shred_path: &str, tgz: Option<&str>) -> Result>> { + let path = Path::new(shred_path); + let f = fs::File::open(path); + if f.is_err() { + if let Some(archive) = tgz { + Self::extract_data(archive, shred_path) + } else { + Ok(None) + } + } else { + let mut buf = vec![]; + f?.read_to_end(&mut buf)?; + Ok(Some(buf)) + } + } pub fn get_data_shred(&self, slot: Slot, index: u64) -> Result>> { - self.data_shred_cf.get_bytes((slot, index)) + let shred_path = self.data_shred_path(slot, index); + let archive_path = self.slot_data_tar_path(slot); + Self::get_data(&shred_path, Some(&archive_path)) } pub fn get_data_shreds_for_slot( @@ -1156,10 +1284,14 @@ impl Blockstore { slot: Slot, start_index: u64, ) -> ShredResult> { - self.slot_data_iterator(slot, start_index) + let vec: ShredResult> = self + .slot_data_iterator(slot, start_index) .expect("blockstore couldn't fetch iterator") .map(|data| Shred::new_from_serialized_shred(data.1.to_vec())) - .collect() + .collect(); + let mut vec = vec?; + vec.sort_by_key(|s| s.index()); + Ok(vec) } pub fn get_data_shreds( @@ -1208,7 +1340,9 @@ impl Blockstore { } pub fn get_coding_shred(&self, slot: Slot, index: u64) -> Result>> { - self.code_shred_cf.get_bytes((slot, index)) + let shred_path = self.coding_shred_path(slot, index); + let archive_path = self.slot_data_tar_path(slot); + Self::get_data(&shred_path, Some(&archive_path)) } pub fn get_coding_shreds_for_slot( @@ -1216,10 +1350,14 @@ impl Blockstore { slot: Slot, start_index: u64, ) -> ShredResult> { - self.slot_coding_iterator(slot, start_index) + let vec: ShredResult> = self + .slot_coding_iterator(slot, start_index) .expect("blockstore couldn't fetch iterator") .map(|code| Shred::new_from_serialized_shred(code.1.to_vec())) - .collect() + .collect(); + let mut vec = vec?; + vec.sort_by_key(|s| s.index()); + Ok(vec) } // Only used by tests @@ -1306,110 +1444,71 @@ impl Blockstore { self.meta_cf.put_bytes(slot, bytes) } - // Given a start and end entry index, find all the missing - // indexes in the ledger in the range [start_index, end_index) - // for the slot with the specified slot - fn find_missing_indexes( - db_iterator: &mut DBRawIterator, - slot: Slot, - first_timestamp: u64, - start_index: u64, - end_index: u64, - max_missing: usize, - ) -> Vec - where - C: Column, - { - if start_index >= end_index || max_missing == 0 { - return vec![]; - } - - let mut missing_indexes = vec![]; - let ticks_since_first_insert = - DEFAULT_TICKS_PER_SECOND * (timestamp() - first_timestamp) / 1000; - - // Seek to the first shred with index >= start_index - db_iterator.seek(&C::key((slot, start_index))); - - // The index of the first missing shred in the slot - let mut prev_index = start_index; - 'outer: loop { - if !db_iterator.valid() { - for i in prev_index..end_index { - missing_indexes.push(i); - if missing_indexes.len() == max_missing { - break; - } - } - break; - } - let (current_slot, index) = C::index(&db_iterator.key().expect("Expect a valid key")); - - let current_index = { - if current_slot > slot { - end_index - } else { - index - } - }; - - let upper_index = cmp::min(current_index, end_index); - // the tick that will be used to figure out the timeout for this hole - let reference_tick = u64::from(Shred::reference_tick_from_data( - &db_iterator.value().expect("couldn't read value"), - )); - - if ticks_since_first_insert < reference_tick + MAX_TURBINE_DELAY_IN_TICKS { - // The higher index holes have not timed out yet - break 'outer; - } - for i in prev_index..upper_index { - missing_indexes.push(i); - if missing_indexes.len() == max_missing { - break 'outer; - } - } - - if current_slot > slot { - break; - } - - if current_index >= end_index { - break; - } - - prev_index = current_index + 1; - db_iterator.next(); - } - - missing_indexes - } - pub fn find_missing_data_indexes( &self, slot: Slot, - first_timestamp: u64, start_index: u64, end_index: u64, max_missing: usize, ) -> Vec { - if let Ok(mut db_iterator) = self - .db - .raw_iterator_cf(self.db.cf_handle::()) - { - Self::find_missing_indexes::( - &mut db_iterator, - slot, - first_timestamp, - start_index, - end_index, - max_missing, - ) + self.find_missing_data_indexes_ts(slot, 0, start_index, end_index, max_missing) + } + fn find_data_indexes( + &self, + slot: Slot, + start_index: u64, + end_index: u64, + first_ts: &mut u64, + ) -> HashSet { + let dir = fs::read_dir(self.slot_data_dir(slot)); + let min_ts = *first_ts; + if let Ok(dir) = dir { + dir.filter_map(|e| { + let e = e.ok()?; + let ix: u64 = std::str::FromStr::from_str(e.file_name().to_str()?).ok()?; + if ix >= start_index || ix <= end_index { + if min_ts > 0 { + let ts = fs::metadata(e.path()) + .ok()? + .modified() + .ok()? + .duration_since(std::time::UNIX_EPOCH) + .ok()?; + let ts = duration_as_ms(&ts); + if ts > min_ts { + *first_ts = cmp::min(ts, *first_ts); + } + } + Some(ix) + } else { + None + } + }) + .collect() } else { - vec![] + HashSet::new() } } + pub fn find_missing_data_indexes_ts( + &self, + slot: Slot, + mut first_ts: u64, + start_index: u64, + end_index: u64, + max_missing: usize, + ) -> Vec { + let current = self.find_data_indexes(slot, start_index, end_index, &mut first_ts); + let now = timestamp(); + if now < first_ts || now - first_ts < DEFAULT_MS_PER_SLOT / 2 { + return vec![]; + } + (start_index..end_index) + .filter(|ix| !current.contains(ix)) + .take(max_missing) + .collect() + } + pub fn get_block_time( &self, slot: Slot, @@ -1878,6 +1977,75 @@ impl Blockstore { .map(|x| x.0) } + fn verify_shred(&self, leader: &Pubkey, slot: Slot, x: u64) { + let payload = self.get_data_shred(slot, x).unwrap().unwrap(); + let shred = Shred::new_from_serialized_shred(payload).unwrap(); + assert!( + shred.verify(leader), + "shred failed verification {} {} {}", + slot, + x, + self.shreds_dir, + ); + } + + fn verify_expected_shreds( + &self, + ls: Option<&Arc>, + slot: Slot, + total: &mut usize, + ) -> ShredIndex { + let index = self + .index_cf + .get(slot) + .unwrap() + .unwrap_or_else(|| Index::new(slot)); + let data = index.data(); + if slot != 0 { + if let Some(ls) = ls { + let leader = ls.slot_leader_at(slot, None).unwrap(); + for ix in data.index.iter() { + self.verify_shred(&leader, slot, *ix); + *total += 1; + } + } + } + data.clone() + } + + pub fn reconcile_shreds( + &self, + leader_schedule: Option<&Arc>, + ) -> Result { + let root_slot: Slot = *self.last_root.read().unwrap(); + let mut total = 0; + info!("reconciling shreds from root {}", root_slot); + let slot_iterator = self + .db + .iter::(IteratorMode::From(root_slot, IteratorDirection::Forward))?; + for (slot, _) in slot_iterator { + info!("reconciling shreds slot {}", slot); + let existing = self.find_data_indexes(slot, 0, std::u64::MAX, &mut 0); + let expected = self.verify_expected_shreds(leader_schedule, slot, &mut total); + let new_shreds: Vec = existing + .into_iter() + .filter_map(|x| { + if expected.is_present(x) { + None + } else { + let payload = self.get_data_shred(slot, x).ok()??; + Shred::new_from_serialized_shred(payload).ok() + } + }) + .collect(); + if !new_shreds.is_empty() { + self.insert_shreds(new_shreds, leader_schedule, false)?; + } + } + info!("Done reconciling shreds. Verified {}", total); + Ok(total) + } + /// Returns the entry vector for the slot starting with `shred_start_index`, the number of /// shreds that comprise the entry vector, and whether the slot is full (consumed all shreds). pub fn get_slot_entries_with_shred_info( @@ -1966,26 +2134,26 @@ impl Blockstore { end_index: u32, slot_meta: &SlotMeta, ) -> Result> { - let data_shred_cf = self.db.column::(); - // Short circuit on first error let data_shreds: Result> = (start_index..=end_index) .map(|i| { - data_shred_cf - .get_bytes((slot, u64::from(i))) + self.get_data_shred(slot, u64::from(i)) .and_then(|serialized_shred| { Shred::new_from_serialized_shred(serialized_shred.unwrap_or_else(|| { + let index = self.index_cf.get(slot).unwrap().map(|i| i.data().clone()); panic!( "Shred with slot: {}, index: {}, consumed: {}, - completed_indexes: {:?} + completed_indexes: {:?}, + index_meta data: {:?}, must exist if shred index was included in a range: {} {}", slot, i, slot_meta.consumed, slot_meta.completed_data_indexes, + index, start_index, end_index ) @@ -2182,7 +2350,27 @@ impl Blockstore { } pub fn storage_size(&self) -> Result { - self.db.storage_size() + let storage = self.db.storage_size()?; + let mut dir_len = 0; + let path = Path::new(&self.shreds_dir); + Self::dir_size(path, &mut dir_len)?; + Ok(storage + dir_len) + } + + fn dir_size(dir: &Path, size: &mut u64) -> Result<()> { + if dir.is_dir() { + for entry in fs::read_dir(dir)? { + let entry = entry?; + let path = entry.path(); + if path.is_dir() { + Self::dir_size(&path, size)?; + } else { + let len = fs::metadata(path)?.len(); + *size += len; + } + } + } + Ok(()) } } @@ -2902,20 +3090,6 @@ pub mod tests { .next() .map(|(slot, _)| slot >= min_slot) .unwrap_or(true) - & blockstore - .db - .iter::(IteratorMode::Start) - .unwrap() - .next() - .map(|((slot, _), _)| slot >= min_slot) - .unwrap_or(true) - & blockstore - .db - .iter::(IteratorMode::Start) - .unwrap() - .next() - .map(|((slot, _), _)| slot >= min_slot) - .unwrap_or(true) & blockstore .db .iter::(IteratorMode::Start) @@ -3015,8 +3189,7 @@ pub mod tests { .unwrap(); let serialized_shred = ledger - .data_shred_cf - .get_bytes((0, last_shred.index() as u64)) + .get_data_shred(0, last_shred.index() as u64) .unwrap() .unwrap(); let deserialized_shred = Shred::new_from_serialized_shred(serialized_shred).unwrap(); @@ -3126,56 +3299,6 @@ pub mod tests { Blockstore::destroy(&ledger_path).expect("Expected successful database destruction"); } - #[test] - fn test_put_get_simple() { - let ledger_path = get_tmp_ledger_path!(); - let ledger = Blockstore::open(&ledger_path).unwrap(); - - // Test meta column family - let meta = SlotMeta::new(0, 1); - ledger.meta_cf.put(0, &meta).unwrap(); - let result = ledger - .meta_cf - .get(0) - .unwrap() - .expect("Expected meta object to exist"); - - assert_eq!(result, meta); - - // Test erasure column family - let erasure = vec![1u8; 16]; - let erasure_key = (0, 0); - ledger - .code_shred_cf - .put_bytes(erasure_key, &erasure) - .unwrap(); - - let result = ledger - .code_shred_cf - .get_bytes(erasure_key) - .unwrap() - .expect("Expected erasure object to exist"); - - assert_eq!(result, erasure); - - // Test data column family - let data = vec![2u8; 16]; - let data_key = (0, 0); - ledger.data_shred_cf.put_bytes(data_key, &data).unwrap(); - - let result = ledger - .data_shred_cf - .get_bytes(data_key) - .unwrap() - .expect("Expected data object to exist"); - - assert_eq!(result, data); - - // Destroying database without closing it first is undefined behavior - drop(ledger); - Blockstore::destroy(&ledger_path).expect("Expected successful database destruction"); - } - #[test] fn test_read_shred_bytes() { let slot = 0; @@ -4294,27 +4417,27 @@ pub mod tests { // range of [0, gap) let expected: Vec = (1..gap).collect(); assert_eq!( - blockstore.find_missing_data_indexes(slot, 0, 0, gap, gap as usize), + blockstore.find_missing_data_indexes(slot, 0, gap, gap as usize), expected ); assert_eq!( - blockstore.find_missing_data_indexes(slot, 0, 1, gap, (gap - 1) as usize), + blockstore.find_missing_data_indexes(slot, 1, gap, (gap - 1) as usize), expected, ); assert_eq!( - blockstore.find_missing_data_indexes(slot, 0, 0, gap - 1, (gap - 1) as usize), + blockstore.find_missing_data_indexes(slot, 0, gap - 1, (gap - 1) as usize), &expected[..expected.len() - 1], ); assert_eq!( - blockstore.find_missing_data_indexes(slot, 0, gap - 2, gap, gap as usize), + blockstore.find_missing_data_indexes(slot, gap - 2, gap, gap as usize), vec![gap - 2, gap - 1], ); assert_eq!( - blockstore.find_missing_data_indexes(slot, 0, gap - 2, gap, 1), + blockstore.find_missing_data_indexes(slot, gap - 2, gap, 1), vec![gap - 2], ); assert_eq!( - blockstore.find_missing_data_indexes(slot, 0, 0, gap, 1), + blockstore.find_missing_data_indexes(slot, 0, gap, 1), vec![1], ); @@ -4323,11 +4446,11 @@ pub mod tests { let mut expected: Vec = (1..gap).collect(); expected.push(gap + 1); assert_eq!( - blockstore.find_missing_data_indexes(slot, 0, 0, gap + 2, (gap + 2) as usize), + blockstore.find_missing_data_indexes(slot, 0, gap + 2, (gap + 2) as usize), expected, ); assert_eq!( - blockstore.find_missing_data_indexes(slot, 0, 0, gap + 2, (gap - 1) as usize), + blockstore.find_missing_data_indexes(slot, 0, gap + 2, (gap - 1) as usize), &expected[..expected.len() - 1], ); @@ -4343,7 +4466,6 @@ pub mod tests { assert_eq!( blockstore.find_missing_data_indexes( slot, - 0, j * gap, i * gap, ((i - j) * gap) as usize @@ -4381,15 +4503,14 @@ pub mod tests { }) .collect(); blockstore.insert_shreds(shreds, None, false).unwrap(); - let empty: Vec = vec![]; assert_eq!( - blockstore.find_missing_data_indexes(slot, timestamp(), 0, 50, 1), + blockstore.find_missing_data_indexes_ts(slot, timestamp() + 1, 0, 50, 1), empty ); let expected: Vec<_> = (1..=9).collect(); assert_eq!( - blockstore.find_missing_data_indexes(slot, timestamp() - 400, 0, 50, 9), + blockstore.find_missing_data_indexes_ts(slot, timestamp() - 400, 0, 50, 9), expected ); @@ -4406,22 +4527,10 @@ pub mod tests { // Early exit conditions let empty: Vec = vec![]; - assert_eq!( - blockstore.find_missing_data_indexes(slot, 0, 0, 0, 1), - empty - ); - assert_eq!( - blockstore.find_missing_data_indexes(slot, 0, 5, 5, 1), - empty - ); - assert_eq!( - blockstore.find_missing_data_indexes(slot, 0, 4, 3, 1), - empty - ); - assert_eq!( - blockstore.find_missing_data_indexes(slot, 0, 1, 2, 0), - empty - ); + assert_eq!(blockstore.find_missing_data_indexes(slot, 0, 0, 1), empty); + assert_eq!(blockstore.find_missing_data_indexes(slot, 5, 5, 1), empty); + assert_eq!(blockstore.find_missing_data_indexes(slot, 4, 3, 1), empty); + assert_eq!(blockstore.find_missing_data_indexes(slot, 1, 2, 0), empty); let entries = create_ticks(100, 0, Hash::default()); let mut shreds = entries_to_test_shreds(entries, slot, 0, true, 0); @@ -4445,7 +4554,7 @@ pub mod tests { // [i, first_index - 1] for start in 0..STARTS { let result = blockstore.find_missing_data_indexes( - slot, 0, start, // start + slot, start, // start END, //end MAX, //max ); @@ -4475,7 +4584,7 @@ pub mod tests { for i in 0..num_shreds as u64 { for j in 0..i { assert_eq!( - blockstore.find_missing_data_indexes(slot, 0, j, i, (i - j) as usize), + blockstore.find_missing_data_indexes(slot, j, i, (i - j) as usize), empty ); } @@ -4775,9 +4884,8 @@ pub mod tests { } // Slot doesnt exist, iterator should be empty - let shred_iter = blockstore.slot_data_iterator(5, 0).unwrap(); - let result: Vec<_> = shred_iter.collect(); - assert_eq!(result, vec![]); + let shred_iter = blockstore.slot_data_iterator(5, 0); + assert!(shred_iter.is_err()); // Test that the iterator for slot 8 contains what was inserted earlier let shred_iter = blockstore.slot_data_iterator(8, 0).unwrap(); @@ -4831,14 +4939,11 @@ pub mod tests { assert!(slot <= 5); assert_eq!(meta.last_index, shreds_per_slot - 1) }); - - let data_iter = blockstore - .data_shred_cf - .iter(IteratorMode::From((0, 0), IteratorDirection::Forward)) - .unwrap(); - for ((slot, _), _) in data_iter { + for slot in 0..50 { if slot > 5 { - assert!(false); + assert!(blockstore.slot_data_iterator(slot, 0).is_err()); + } else { + assert!(blockstore.slot_data_iterator(slot, 0).is_ok()); } } @@ -6556,11 +6661,13 @@ pub mod tests { let index = blockstore.get_index(slot).unwrap().unwrap(); // Test the set of data shreds in the index and in the data column // family are the same - let data_iter = blockstore.slot_data_iterator(slot, 0).unwrap(); let mut num_data = 0; - for ((slot, index), _) in data_iter { - num_data += 1; - assert!(blockstore.get_data_shred(slot, index).unwrap().is_some()); + let data_iter = blockstore.slot_data_iterator(slot, 0); + if data_iter.is_ok() { + for ((slot, index), _) in data_iter.unwrap() { + num_data += 1; + assert!(blockstore.get_data_shred(slot, index).unwrap().is_some()); + } } // Test the data index doesn't have anything extra @@ -6630,4 +6737,48 @@ pub mod tests { Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction"); } + + #[test] + fn test_reconcile() { + let slot = 1; + let num_entries = 100; + let (data_shreds, _, leader_schedule_cache) = + setup_erasure_shreds(slot, 0, num_entries, 1.0); + + let ledger_path = get_tmp_ledger_path!(); + let ledger = Blockstore::open(&ledger_path).unwrap(); + + let len = data_shreds.len(); + ledger + .insert_shreds(data_shreds, Some(&leader_schedule_cache), false) + .unwrap(); + + let num = ledger + .reconcile_shreds(Some(&leader_schedule_cache)) + .unwrap(); + assert_eq!(num, len); + drop(ledger); + Blockstore::destroy(&ledger_path).expect("Expected successful database destruction"); + } + + #[test] + fn test_reconcile_missing() { + let slot = 1; + let num_entries = 100; + let (data_shreds, _, leader_schedule_cache) = + setup_erasure_shreds(slot, 0, num_entries, 1.0); + + let ledger_path = get_tmp_ledger_path!(); + let ledger = Blockstore::open(&ledger_path).unwrap(); + + ledger + .insert_shreds(data_shreds, Some(&leader_schedule_cache), false) + .unwrap(); + let shred_path = ledger.data_shred_path(slot, 0); + fs::remove_file(shred_path.clone()).unwrap(); + let result = std::panic::catch_unwind(|| { + let _ = ledger.reconcile_shreds(Some(&leader_schedule_cache)); + }); + assert!(result.is_err()); + } } diff --git a/ledger/src/blockstore_meta.rs b/ledger/src/blockstore_meta.rs index 0488f24c30..b82134fa60 100644 --- a/ledger/src/blockstore_meta.rs +++ b/ledger/src/blockstore_meta.rs @@ -43,7 +43,7 @@ pub struct Index { #[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq)] pub struct ShredIndex { /// Map representing presence/absence of shreds - index: BTreeSet, + pub index: BTreeSet, } #[derive(Clone, Copy, Debug, Default, Deserialize, Serialize, Eq, PartialEq)] diff --git a/ledger/src/hardened_unpack.rs b/ledger/src/hardened_unpack.rs index 3ba7d9345e..cd0fc27471 100644 --- a/ledger/src/hardened_unpack.rs +++ b/ledger/src/hardened_unpack.rs @@ -206,6 +206,9 @@ fn is_valid_genesis_archive_entry(parts: &[&str], kind: tar::EntryType) -> bool (["rocksdb"], Directory) => true, (["rocksdb", ..], GNUSparse) => true, (["rocksdb", ..], Regular) => true, + (["shreds", ..], Directory) => true, + (["shreds", ..], GNUSparse) => true, + (["shreds", ..], Regular) => true, _ => false, } } diff --git a/local-cluster/tests/local_cluster.rs b/local-cluster/tests/local_cluster.rs index 8b687de76b..64d7d66e84 100644 --- a/local-cluster/tests/local_cluster.rs +++ b/local-cluster/tests/local_cluster.rs @@ -5,7 +5,7 @@ use solana_client::rpc_client::RpcClient; use solana_client::thin_client::create_client; use solana_core::{ broadcast_stage::BroadcastStageType, consensus::VOTE_THRESHOLD_DEPTH, - gossip_service::discover_cluster, validator::ValidatorConfig, + contact_info::ContactInfo, gossip_service::discover_cluster, validator::ValidatorConfig, }; use solana_download_utils::download_snapshot; use solana_ledger::bank_forks::CompressionType; @@ -274,7 +274,7 @@ fn run_cluster_partition( for node in &cluster_nodes { let node_client = RpcClient::new_socket(node.rpc); if let Ok(epoch_info) = node_client.get_epoch_info() { - info!("slots_per_epoch: {:?}", epoch_info); + debug!("slots_per_epoch: {:?}", epoch_info); if epoch_info.slots_in_epoch <= (1 << VOTE_THRESHOLD_DEPTH) { reached_epoch = false; break; @@ -343,13 +343,16 @@ fn run_cluster_partition( alive_node_contact_infos.len(), ) .unwrap(); + assert!(wait_for_new_roots(&alive_node_contact_infos, 1024, 16)); info!("PARTITION_TEST discovered {} nodes", cluster_nodes.len()); - info!("PARTITION_TEST looking for new roots on all nodes"); - let mut roots = vec![HashSet::new(); alive_node_contact_infos.len()]; - let mut done = false; +} + +pub fn wait_for_new_roots(nodes: &[ContactInfo], mut tries: usize, min_roots: usize) -> bool { + info!("looking for new roots on all nodes"); + let mut roots = vec![HashSet::new(); nodes.len()]; let mut last_print = Instant::now(); - while !done { - for (i, ingress_node) in alive_node_contact_infos.iter().enumerate() { + while tries > 0 { + for (i, ingress_node) in nodes.iter().enumerate() { let client = create_client( ingress_node.client_facing_addr(), solana_core::cluster_info::VALIDATOR_PORT_RANGE, @@ -358,14 +361,24 @@ fn run_cluster_partition( roots[i].insert(slot); let min_node = roots.iter().map(|r| r.len()).min().unwrap_or(0); if last_print.elapsed().as_secs() > 3 { - info!("PARTITION_TEST min observed roots {}/16", min_node); + info!( + "{}: min observed roots {}/{} in {} nodes", + tries, + min_node, + min_roots, + roots.len() + ); last_print = Instant::now(); } - done = min_node >= 16; + if min_node >= min_roots { + return true; + } } sleep(Duration::from_millis(clock::DEFAULT_MS_PER_SLOT / 2)); + tries -= 1; } - info!("PARTITION_TEST done waiting for roots"); + info!("failed waiting for roots"); + false } #[allow(unused_attributes)] @@ -863,6 +876,7 @@ fn test_snapshot_download() { #[test] #[serial] fn test_snapshot_restart_tower() { + solana_logger::setup(); // First set up the cluster with 2 nodes let snapshot_interval_slots = 10; let num_account_paths = 2; @@ -920,12 +934,11 @@ fn test_snapshot_restart_tower() { // Use the restarted node as the discovery point so that we get updated // validator's ContactInfo let restarted_node_info = cluster.get_contact_info(&validator_id).unwrap(); - cluster_tests::spend_and_verify_all_nodes( - &restarted_node_info, - &cluster.funding_keypair, - 1, - HashSet::new(), - ); + + let (cluster_nodes, _) = + discover_cluster(&restarted_node_info.gossip, cluster.validators.len()).unwrap(); + + assert!(wait_for_new_roots(&cluster_nodes, 512, 16)); } #[test]