From 6a9005645a3bd5096f35d953fd475589b77366d4 Mon Sep 17 00:00:00 2001 From: Sagar Dhawan Date: Wed, 18 Dec 2019 11:50:09 -0800 Subject: [PATCH] Update "limit-ledger-size" to use DeleteRange for much faster deletes (#7515) * Update "limit-ledger-size" to use DeleteRange for much faster deletes * Update core/src/ledger_cleanup_service.rs Co-Authored-By: Michael Vines * Rewrite more idiomatically * Move max_ledger_slots to a fn for clippy * Remove unused import * Detect when all columns have been purged and fix a bug in deletion * Check that more than 1 column is actually deleted * Add helper to test that ledger meets minimum slot bounds * Remove manual batching of deletes * Refactor to keep some N slots older than the highest root * Define MAX_LEDGER_SLOTS that ledger_cleanup_service will try to keep around * Refactor compact range --- core/src/ledger_cleanup_service.rs | 51 +++++--- core/src/replay_stage.rs | 9 ++ core/src/tvu.rs | 3 +- ledger/src/blocktree.rs | 195 +++++++++++++++-------------- ledger/src/blocktree_db.rs | 30 +++++ validator/src/main.rs | 6 +- 6 files changed, 184 insertions(+), 110 deletions(-) diff --git a/core/src/ledger_cleanup_service.rs b/core/src/ledger_cleanup_service.rs index faee12d5fe..553e3942ea 100644 --- a/core/src/ledger_cleanup_service.rs +++ b/core/src/ledger_cleanup_service.rs @@ -3,8 +3,7 @@ use crate::result::{Error, Result}; use solana_ledger::blocktree::Blocktree; use solana_metrics::datapoint_debug; -use solana_sdk::clock::DEFAULT_SLOTS_PER_EPOCH; -use solana_sdk::pubkey::Pubkey; +use solana_sdk::clock::Slot; use std::string::ToString; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::{Receiver, RecvTimeoutError}; @@ -13,7 +12,14 @@ use std::thread; use std::thread::{Builder, JoinHandle}; use std::time::Duration; -pub const DEFAULT_MAX_LEDGER_SLOTS: u64 = 3 * DEFAULT_SLOTS_PER_EPOCH; +// This is chosen to allow enough time for +// - To try and keep the RocksDB size under 128GB at 50k tps (100 slots take ~2GB). +// - A validator to download a snapshot from a peer and boot from it +// - To make sure that if a validator needs to reboot from its own snapshot, it has enough slots locally +// to catch back up to where it was when it stopped +pub const MAX_LEDGER_SLOTS: u64 = 6400; +// Remove a fixed number of slots at a time, it's more efficient than doing it one-by-one +pub const DEFAULT_PURGE_BATCH_SIZE: u64 = 256; pub struct LedgerCleanupService { t_cleanup: JoinHandle<()>, @@ -21,7 +27,7 @@ pub struct LedgerCleanupService { impl LedgerCleanupService { pub fn new( - slot_full_receiver: Receiver<(u64, Pubkey)>, + new_root_receiver: Receiver, blocktree: Arc, max_ledger_slots: u64, exit: &Arc, @@ -31,15 +37,19 @@ impl LedgerCleanupService { max_ledger_slots ); let exit = exit.clone(); + let mut next_purge_batch = max_ledger_slots; let t_cleanup = Builder::new() .name("solana-ledger-cleanup".to_string()) .spawn(move || loop { if exit.load(Ordering::Relaxed) { break; } - if let Err(e) = - Self::cleanup_ledger(&slot_full_receiver, &blocktree, max_ledger_slots) - { + if let Err(e) = Self::cleanup_ledger( + &new_root_receiver, + &blocktree, + max_ledger_slots, + &mut next_purge_batch, + ) { match e { Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break, Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (), @@ -52,16 +62,18 @@ impl LedgerCleanupService { } fn cleanup_ledger( - slot_full_receiver: &Receiver<(u64, Pubkey)>, + new_root_receiver: &Receiver, blocktree: &Arc, max_ledger_slots: u64, + next_purge_batch: &mut u64, ) -> Result<()> { let disk_utilization_pre = blocktree.storage_size(); - let (slot, _) = slot_full_receiver.recv_timeout(Duration::from_secs(1))?; - if slot > max_ledger_slots { + let root = new_root_receiver.recv_timeout(Duration::from_secs(1))?; + if root > *next_purge_batch { //cleanup - blocktree.purge_slots(0, Some(slot - max_ledger_slots)); + blocktree.purge_slots(0, Some(root - max_ledger_slots)); + *next_purge_batch += DEFAULT_PURGE_BATCH_SIZE; } let disk_utilization_post = blocktree.storage_size(); @@ -105,8 +117,10 @@ mod tests { let (sender, receiver) = channel(); //send a signal to kill slots 0-40 - sender.send((50, Pubkey::default())).unwrap(); - LedgerCleanupService::cleanup_ledger(&receiver, &blocktree, 10).unwrap(); + let mut next_purge_slot = 0; + sender.send(50).unwrap(); + LedgerCleanupService::cleanup_ledger(&receiver, &blocktree, 10, &mut next_purge_slot) + .unwrap(); //check that 0-40 don't exist blocktree @@ -137,8 +151,15 @@ mod tests { // send signal to cleanup slots let (sender, receiver) = channel(); - sender.send((n, Pubkey::default())).unwrap(); - LedgerCleanupService::cleanup_ledger(&receiver, &blocktree, max_ledger_slots).unwrap(); + sender.send(n).unwrap(); + let mut next_purge_batch = 0; + LedgerCleanupService::cleanup_ledger( + &receiver, + &blocktree, + max_ledger_slots, + &mut next_purge_batch, + ) + .unwrap(); thread::sleep(Duration::from_secs(2)); diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index fbff7d6530..6668de57e4 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -73,6 +73,7 @@ pub struct ReplayStageConfig { pub subscriptions: Arc, pub leader_schedule_cache: Arc, pub slot_full_senders: Vec>, + pub latest_root_senders: Vec>, pub snapshot_package_sender: Option, pub block_commitment_cache: Arc>, pub transaction_status_sender: Option, @@ -193,6 +194,7 @@ impl ReplayStage { subscriptions, leader_schedule_cache, slot_full_senders, + latest_root_senders, snapshot_package_sender, block_commitment_cache, transaction_status_sender, @@ -315,6 +317,7 @@ impl ReplayStage { stats.total_staked, &lockouts_sender, &snapshot_package_sender, + &latest_root_senders, )?; } datapoint_debug!( @@ -615,6 +618,7 @@ impl ReplayStage { total_staked: u64, lockouts_sender: &Sender, snapshot_package_sender: &Option, + latest_root_senders: &[Sender], ) -> Result<()> { if bank.is_empty() { inc_new_counter_info!("replay_stage-voted_empty_bank", 1); @@ -645,6 +649,11 @@ impl ReplayStage { .unwrap() .set_root(new_root, snapshot_package_sender); Self::handle_new_root(&bank_forks, progress); + latest_root_senders.iter().for_each(|s| { + if let Err(e) = s.send(new_root) { + trace!("latest root send failed: {:?}", e); + } + }); trace!("new root {}", new_root); if let Err(e) = root_bank_sender.send(rooted_banks) { trace!("root_bank_sender failed: {:?}", e); diff --git a/core/src/tvu.rs b/core/src/tvu.rs index f89ce9fb4a..d8f2275400 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -166,7 +166,8 @@ impl Tvu { exit: exit.clone(), subscriptions: subscriptions.clone(), leader_schedule_cache: leader_schedule_cache.clone(), - slot_full_senders: vec![blockstream_slot_sender, ledger_cleanup_slot_sender], + slot_full_senders: vec![blockstream_slot_sender], + latest_root_senders: vec![ledger_cleanup_slot_sender], snapshot_package_sender, block_commitment_cache, transaction_status_sender, diff --git a/ledger/src/blocktree.rs b/ledger/src/blocktree.rs index 2e0b26de62..28bab1e173 100644 --- a/ledger/src/blocktree.rs +++ b/ledger/src/blocktree.rs @@ -252,111 +252,83 @@ impl Blocktree { /// Does not check for integrity and does not update slot metas that refer to deleted slots /// Modifies multiple column families simultaneously pub fn purge_slots(&self, mut from_slot: Slot, to_slot: Option) { - // split the purge request into batches of 1000 slots + // if there's no upper bound, split the purge request into batches of 1000 slots const PURGE_BATCH_SIZE: u64 = 1000; - let mut batch_end = to_slot - .unwrap_or(from_slot + PURGE_BATCH_SIZE) - .min(from_slot + PURGE_BATCH_SIZE); + let mut batch_end = to_slot.unwrap_or(from_slot + PURGE_BATCH_SIZE); while from_slot < batch_end { - if let Ok(end) = self.run_purge_batch(from_slot, batch_end) { - // no more slots to iter or reached the upper bound - if end { + match self.run_purge(from_slot, batch_end) { + Ok(end) => { + if let Err(e) = self.compact_storage(from_slot, batch_end) { + // This error is not fatal and indicates an internal error + error!( + "Error: {:?}; Couldn't compact storage from {:?} to {:?}", + e, from_slot, batch_end + ); + } + + if end { + break; + } else { + // update the next batch bounds + from_slot = batch_end; + batch_end = to_slot.unwrap_or(batch_end + PURGE_BATCH_SIZE); + } + } + Err(e) => { + error!( + "Error: {:?}; Purge failed in range {:?} to {:?}", + e, from_slot, batch_end + ); break; - } else { - // update the next batch bounds - from_slot = batch_end; - batch_end = to_slot - .unwrap_or(batch_end + PURGE_BATCH_SIZE) - .min(batch_end + PURGE_BATCH_SIZE); } } } } - // Returns whether or not all iterators have reached their end - fn run_purge_batch(&self, from_slot: Slot, batch_end: Slot) -> Result { - let some_from_slot = Some(from_slot); - let some_batch_end = Some(batch_end); - + // Returns whether or not all columns have been purged until their end + fn run_purge(&self, from_slot: Slot, to_slot: Slot) -> Result { let mut write_batch = self .db .batch() .expect("Database Error: Failed to get write batch"); - let end = self - .meta_cf - .delete_slot(&mut write_batch, some_from_slot, some_batch_end) - .unwrap_or(false) - & self - .meta_cf - .compact_range(from_slot, batch_end) - .unwrap_or(false) - & self - .erasure_meta_cf - .delete_slot(&mut write_batch, some_from_slot, some_batch_end) - .unwrap_or(false) - & self - .erasure_meta_cf - .compact_range(from_slot, batch_end) - .unwrap_or(false) - & self - .data_shred_cf - .delete_slot(&mut write_batch, some_from_slot, some_batch_end) - .unwrap_or(false) - & self - .data_shred_cf - .compact_range(from_slot, batch_end) - .unwrap_or(false) - & self - .code_shred_cf - .delete_slot(&mut write_batch, some_from_slot, some_batch_end) - .unwrap_or(false) - & self - .code_shred_cf - .compact_range(from_slot, batch_end) - .unwrap_or(false) - & self - .transaction_status_cf - .delete_slot(&mut write_batch, some_from_slot, some_batch_end) - .unwrap_or(false) - & self - .transaction_status_cf - .compact_range(from_slot, batch_end) - .unwrap_or(false) - & self - .orphans_cf - .delete_slot(&mut write_batch, some_from_slot, some_batch_end) - .unwrap_or(false) - & self - .orphans_cf - .compact_range(from_slot, batch_end) - .unwrap_or(false) - & self - .index_cf - .delete_slot(&mut write_batch, some_from_slot, some_batch_end) - .unwrap_or(false) - & self - .index_cf - .compact_range(from_slot, batch_end) - .unwrap_or(false) - & self - .dead_slots_cf - .delete_slot(&mut write_batch, some_from_slot, some_batch_end) - .unwrap_or(false) - & self - .dead_slots_cf - .compact_range(from_slot, batch_end) - .unwrap_or(false) + // delete range cf is not inclusive + let to_slot = to_slot.checked_add(1).unwrap_or_else(|| std::u64::MAX); + let columns_empty = self + .db + .delete_range_cf::(&mut write_batch, from_slot, to_slot) + .unwrap_or_else(|_| false) & self .db - .column::() - .delete_slot(&mut write_batch, some_from_slot, some_batch_end) - .unwrap_or(false) + .delete_range_cf::(&mut write_batch, from_slot, to_slot) + .unwrap_or_else(|_| false) & self .db - .column::() - .compact_range(from_slot, batch_end) - .unwrap_or(false); - + .delete_range_cf::(&mut write_batch, from_slot, to_slot) + .unwrap_or_else(|_| false) + & self + .db + .delete_range_cf::(&mut write_batch, from_slot, to_slot) + .unwrap_or_else(|_| false) + & self + .db + .delete_range_cf::(&mut write_batch, from_slot, to_slot) + .unwrap_or_else(|_| false) + & self + .db + .delete_range_cf::(&mut write_batch, from_slot, to_slot) + .unwrap_or_else(|_| false) + & self + .db + .delete_range_cf::(&mut write_batch, from_slot, to_slot) + .unwrap_or_else(|_| false) + & self + .db + .delete_range_cf::(&mut write_batch, from_slot, to_slot) + .unwrap_or_else(|_| false) + & self + .db + .delete_range_cf::(&mut write_batch, from_slot, to_slot) + .unwrap_or_else(|_| false); if let Err(e) = self.db.write(write_batch) { error!( "Error: {:?} while submitting write batch for slot {:?} retrying...", @@ -364,7 +336,48 @@ impl Blocktree { ); return Err(e); } - Ok(end) + Ok(columns_empty) + } + + pub fn compact_storage(&self, from_slot: Slot, to_slot: Slot) -> Result { + let result = self + .meta_cf + .compact_range(from_slot, to_slot) + .unwrap_or(false) + && self + .db + .column::() + .compact_range(from_slot, to_slot) + .unwrap_or(false) + && self + .data_shred_cf + .compact_range(from_slot, to_slot) + .unwrap_or(false) + && self + .code_shred_cf + .compact_range(from_slot, to_slot) + .unwrap_or(false) + && self + .dead_slots_cf + .compact_range(from_slot, to_slot) + .unwrap_or(false) + && self + .erasure_meta_cf + .compact_range(from_slot, to_slot) + .unwrap_or(false) + && self + .orphans_cf + .compact_range(from_slot, to_slot) + .unwrap_or(false) + && self + .index_cf + .compact_range(from_slot, to_slot) + .unwrap_or(false) + && self + .transaction_status_cf + .compact_range(from_slot, to_slot) + .unwrap_or(false); + Ok(result) } pub fn erasure_meta(&self, slot: Slot, set_index: u64) -> Result> { @@ -4856,7 +4869,7 @@ pub mod tests { blocktree.insert_shreds(shreds, None, false).unwrap(); } assert_eq!(blocktree.lowest_slot(), 1); - blocktree.run_purge_batch(0, 5).unwrap(); + blocktree.run_purge(0, 5).unwrap(); assert_eq!(blocktree.lowest_slot(), 6); } Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction"); diff --git a/ledger/src/blocktree_db.rs b/ledger/src/blocktree_db.rs index 383c6a5c2d..8c7f543bd4 100644 --- a/ledger/src/blocktree_db.rs +++ b/ledger/src/blocktree_db.rs @@ -588,6 +588,25 @@ impl Database { pub fn storage_size(&self) -> Result { Ok(fs_extra::dir::get_size(&self.path)?) } + + // Adds a range to delete to the given write batch and returns whether or not the column has reached + // its end + pub fn delete_range_cf(&self, batch: &mut WriteBatch, from: Slot, to: Slot) -> Result + where + C: Column, + { + let cf = self.cf_handle::(); + let from_index = C::as_index(from); + let to_index = C::as_index(to); + let result = batch.delete_range_cf::(cf, from_index, to_index); + let max_slot = self + .iter::(IteratorMode::End)? + .next() + .map(|(i, _)| C::slot(i)) + .unwrap_or(0); + let end = max_slot <= to; + result.map(|_| end) + } } impl LedgerColumn @@ -715,6 +734,17 @@ impl<'a> WriteBatch<'a> { fn get_cf(&self) -> &'a ColumnFamily { self.map[C::NAME] } + + pub fn delete_range_cf( + &mut self, + cf: &ColumnFamily, + from: C::Index, + to: C::Index, + ) -> Result<()> { + self.write_batch + .delete_range_cf(cf, C::key(from), C::key(to))?; + Ok(()) + } } fn get_cf_options() -> Options { diff --git a/validator/src/main.rs b/validator/src/main.rs index ef461eeca1..02fe1be166 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -12,11 +12,11 @@ use solana_clap_utils::{ }, }; use solana_client::rpc_client::RpcClient; +use solana_core::ledger_cleanup_service::MAX_LEDGER_SLOTS; use solana_core::{ cluster_info::{ClusterInfo, Node, VALIDATOR_PORT_RANGE}, contact_info::ContactInfo, gossip_service::GossipService, - ledger_cleanup_service::DEFAULT_MAX_LEDGER_SLOTS, validator::{Validator, ValidatorConfig}, }; use solana_ledger::bank_forks::SnapshotConfig; @@ -507,7 +507,7 @@ pub fn main() { clap::Arg::with_name("limit_ledger_size") .long("limit-ledger-size") .takes_value(false) - .help("drop older slots in the ledger"), + .help("Drop older slots in the ledger"), ) .arg( clap::Arg::with_name("skip_poh_verify") @@ -635,7 +635,7 @@ pub fn main() { }); if matches.is_present("limit_ledger_size") { - validator_config.max_ledger_slots = Some(DEFAULT_MAX_LEDGER_SLOTS); + validator_config.max_ledger_slots = Some(MAX_LEDGER_SLOTS); } if matches.value_of("signer_addr").is_some() {