From a07b17b9b597c5ee9036270dbce49fd8c6dcc9a8 Mon Sep 17 00:00:00 2001 From: Sagar Dhawan Date: Sat, 20 Jul 2019 13:13:55 -0700 Subject: [PATCH] Drop older slots in the ledger (#5188) * Add facility to delete blocktree columns in range * Add ledger cleanup service * Add local_cluster test --- core/src/blocktree.rs | 110 +++++++++++++++++------------ core/src/blocktree/db.rs | 22 ++++-- core/src/ledger_cleanup_service.rs | 100 ++++++++++++++++++++++++++ core/src/lib.rs | 1 + core/src/replay_stage.rs | 22 +++--- core/src/tvu.rs | 25 ++++++- core/src/validator.rs | 3 + core/tests/local_cluster.rs | 39 ++++++++++ core/tests/tvu.rs | 1 + validator/src/main.rs | 15 ++-- 10 files changed, 271 insertions(+), 67 deletions(-) create mode 100644 core/src/ledger_cleanup_service.rs diff --git a/core/src/blocktree.rs b/core/src/blocktree.rs index d006618f6e..4c11aa99ee 100644 --- a/core/src/blocktree.rs +++ b/core/src/blocktree.rs @@ -33,6 +33,7 @@ use std::sync::{Arc, RwLock}; pub use self::meta::*; pub use self::rooted_slot_iterator::*; +use solana_sdk::timing::Slot; mod db; mod meta; @@ -193,67 +194,62 @@ impl Blocktree { false } - // silently deletes all blocktree column families starting at the given slot - fn delete_all_columns(&self, starting_slot: u64) { - match self.meta_cf.force_delete_all(Some(starting_slot)) { - Ok(_) => (), - Err(e) => error!( + /// Silently deletes all blocktree column families starting at the given slot until the `to` slot + /// Use with care; does not check for integrity and does not update slot metas that + /// refer to deleted slots + pub fn purge_slots(&self, from_slot: Slot, to_slot: Option) { + let to_index = to_slot.map(|slot| (slot + 1, 0)); + if let Err(e) = self.meta_cf.force_delete(Some(from_slot), to_slot) { + error!( "Error: {:?} while deleting meta_cf for slot {:?}", - e, starting_slot - ), + e, from_slot + ) } - match self.data_cf.force_delete_all(Some((starting_slot, 0))) { - Ok(_) => (), - Err(e) => error!( + if let Err(e) = self.data_cf.force_delete(Some((from_slot, 0)), to_index) { + error!( "Error: {:?} while deleting data_cf for slot {:?}", - e, starting_slot - ), + e, from_slot + ) } - match self + if let Err(e) = self .erasure_meta_cf - .force_delete_all(Some((starting_slot, 0))) + .force_delete(Some((from_slot, 0)), to_index) { - Ok(_) => (), - Err(e) => error!( + error!( "Error: {:?} while deleting erasure_meta_cf for slot {:?}", - e, starting_slot - ), + e, from_slot + ) } - match self.erasure_cf.force_delete_all(Some((starting_slot, 0))) { - Ok(_) => (), - Err(e) => error!( + if let Err(e) = self.erasure_cf.force_delete(Some((from_slot, 0)), to_index) { + error!( "Error: {:?} while deleting erasure_cf for slot {:?}", - e, starting_slot - ), + e, from_slot + ) } - match self.orphans_cf.force_delete_all(Some(starting_slot)) { - Ok(_) => (), - Err(e) => error!( + if let Err(e) = self.orphans_cf.force_delete(Some(from_slot), to_slot) { + error!( "Error: {:?} while deleting orphans_cf for slot {:?}", - e, starting_slot - ), + e, from_slot + ) } - match self.index_cf.force_delete_all(Some(starting_slot)) { - Ok(_) => (), - Err(e) => error!( + if let Err(e) = self.index_cf.force_delete(Some(from_slot), to_slot) { + error!( "Error: {:?} while deleting index_cf for slot {:?}", - e, starting_slot - ), + e, from_slot + ) } - match self.dead_slots_cf.force_delete_all(Some(starting_slot)) { - Ok(_) => (), - Err(e) => error!( + if let Err(e) = self.dead_slots_cf.force_delete(Some(from_slot), to_slot) { + error!( "Error: {:?} while deleting dead_slots_cf for slot {:?}", - e, starting_slot - ), + e, from_slot + ) } let roots_cf = self.db.column::(); - match roots_cf.force_delete_all(Some(starting_slot)) { - Ok(_) => (), - Err(e) => error!( + if let Err(e) = roots_cf.force_delete(Some(from_slot), to_slot) { + error!( "Error: {:?} while deleting roots_cf for slot {:?}", - e, starting_slot - ), + e, from_slot + ) } } @@ -1007,7 +1003,7 @@ impl Blocktree { ) .expect("unable to update meta for target slot"); - self.delete_all_columns(target_slot + 1); + self.purge_slots(target_slot + 1, None); // fixup anything that refers to non-root slots and delete the rest for (slot, mut meta) in self @@ -3472,6 +3468,32 @@ pub mod tests { Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction"); } + #[test] + fn test_purge_slots() { + let blocktree_path = get_tmp_ledger_path!(); + let blocktree = Blocktree::open(&blocktree_path).unwrap(); + let (blobs, _) = make_many_slot_entries(0, 50, 5); + blocktree.write_blobs(blobs).unwrap(); + + blocktree.purge_slots(0, Some(5)); + + blocktree + .slot_meta_iterator(0) + .unwrap() + .for_each(|(slot, _)| { + assert!(slot > 5); + }); + + blocktree.purge_slots(0, None); + + blocktree.slot_meta_iterator(0).unwrap().for_each(|(_, _)| { + assert!(false); + }); + + drop(blocktree); + Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction"); + } + #[should_panic] #[test] fn test_prune_out_of_bounds() { diff --git a/core/src/blocktree/db.rs b/core/src/blocktree/db.rs index ac23ae8805..a8bdf2b042 100644 --- a/core/src/blocktree/db.rs +++ b/core/src/blocktree/db.rs @@ -409,13 +409,21 @@ where Ok(iter.map(|(key, value)| (C::index(&key), value))) } - //TODO add a delete_until that goes the other way - pub fn force_delete_all(&self, start_from: Option) -> Result<()> { - let iter = self.iter(start_from)?; - iter.for_each(|(index, _)| match self.delete(index) { - Ok(_) => (), - Err(e) => error!("Error: {:?} while deleting {:?}", e, C::NAME), - }); + pub fn force_delete(&self, from: Option, to: Option) -> Result<()> + where + C::Index: PartialOrd, + { + let iter = self.iter(from)?; + for (index, _) in iter { + if let Some(ref to) = to { + if &index > to { + break; + } + } + if let Err(e) = self.delete(index) { + error!("Error: {:?} while deleting {:?}", e, C::NAME) + } + } Ok(()) } diff --git a/core/src/ledger_cleanup_service.rs b/core/src/ledger_cleanup_service.rs new file mode 100644 index 0000000000..c464602419 --- /dev/null +++ b/core/src/ledger_cleanup_service.rs @@ -0,0 +1,100 @@ +//! The `ledger_cleanup_service` drops older ledger data to limit disk space usage + +use crate::blocktree::Blocktree; +use crate::result::{Error, Result}; +use crate::service::Service; +use solana_sdk::pubkey::Pubkey; +use solana_sdk::timing::DEFAULT_SLOTS_PER_EPOCH; +use std::string::ToString; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::mpsc::{Receiver, RecvTimeoutError}; +use std::sync::Arc; +use std::thread; +use std::thread::{Builder, JoinHandle}; +use std::time::Duration; + +pub const DEFAULT_MAX_LEDGER_SLOTS: u64 = DEFAULT_SLOTS_PER_EPOCH * 5; + +pub struct LedgerCleanupService { + t_cleanup: JoinHandle<()>, +} + +impl LedgerCleanupService { + pub fn new( + slot_full_receiver: Receiver<(u64, Pubkey)>, + blocktree: Arc, + max_ledger_slots: u64, + exit: &Arc, + ) -> Self { + let exit = exit.clone(); + let t_cleanup = Builder::new() + .name("solana-ledger-cleanup".to_string()) + .spawn(move || loop { + if exit.load(Ordering::Relaxed) { + break; + } + if let Err(e) = + Self::cleanup_ledger(&slot_full_receiver, &blocktree, max_ledger_slots) + { + match e { + Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break, + Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (), + _ => info!("Error from cleanup_ledger: {:?}", e), + } + } + }) + .unwrap(); + Self { t_cleanup } + } + + fn cleanup_ledger( + slot_full_receiver: &Receiver<(u64, Pubkey)>, + blocktree: &Arc, + max_ledger_slots: u64, + ) -> Result<()> { + let (slot, _) = slot_full_receiver.recv_timeout(Duration::from_secs(1))?; + if slot > max_ledger_slots { + //cleanup + blocktree.purge_slots(0, Some(slot - max_ledger_slots)); + } + Ok(()) + } +} + +impl Service for LedgerCleanupService { + type JoinReturnType = (); + + fn join(self) -> thread::Result<()> { + self.t_cleanup.join() + } +} +#[cfg(test)] +mod tests { + use super::*; + use crate::blocktree::get_tmp_ledger_path; + use crate::blocktree::tests::make_many_slot_entries; + use std::sync::mpsc::channel; + + #[test] + fn test_cleanup() { + let blocktree_path = get_tmp_ledger_path!(); + let blocktree = Blocktree::open(&blocktree_path).unwrap(); + let (blobs, _) = make_many_slot_entries(0, 50, 5); + blocktree.write_blobs(blobs).unwrap(); + let blocktree = Arc::new(blocktree); + let (sender, receiver) = channel(); + + //send a signal to kill slots 0-40 + sender.send((50, Pubkey::default())).unwrap(); + LedgerCleanupService::cleanup_ledger(&receiver, &blocktree, 10).unwrap(); + + //check that 0-40 don't exist + blocktree + .slot_meta_iterator(0) + .unwrap() + .for_each(|(slot, _)| assert!(slot > 40)); + + drop(blocktree); + Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction"); + } +} diff --git a/core/src/lib.rs b/core/src/lib.rs index 23afb45931..9b83c84d5b 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -42,6 +42,7 @@ pub mod gossip_service; pub mod leader_schedule; pub mod leader_schedule_cache; pub mod leader_schedule_utils; +pub mod ledger_cleanup_service; pub mod local_cluster; pub mod local_vote_signer_service; pub mod packet; diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index d6add9e471..dd853e1790 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -88,12 +88,12 @@ impl ReplayStage { subscriptions: &Arc, poh_recorder: &Arc>, leader_schedule_cache: &Arc, - ) -> (Self, Receiver<(u64, Pubkey)>, Receiver>>) + slot_full_senders: Vec>, + ) -> (Self, Receiver>>) where T: 'static + KeypairUtil + Send + Sync, { let (root_bank_sender, root_bank_receiver) = channel(); - let (slot_full_sender, slot_full_receiver) = channel(); trace!("replay stage"); let exit_ = exit.clone(); let subscriptions = subscriptions.clone(); @@ -132,7 +132,7 @@ impl ReplayStage { &bank_forks, &my_pubkey, &mut progress, - &slot_full_sender, + &slot_full_senders, ); let votable = Self::generate_votable_banks(&bank_forks, &tower, &mut progress); @@ -191,7 +191,7 @@ impl ReplayStage { Ok(()) }) .unwrap(); - (Self { t_replay }, slot_full_receiver, root_bank_receiver) + (Self { t_replay }, root_bank_receiver) } fn maybe_start_leader( @@ -409,7 +409,7 @@ impl ReplayStage { bank_forks: &Arc>, my_pubkey: &Pubkey, progress: &mut HashMap, - slot_full_sender: &Sender<(u64, Pubkey)>, + slot_full_senders: &[Sender<(u64, Pubkey)>], ) -> bool { let mut did_complete_bank = false; let active_banks = bank_forks.read().unwrap().active_banks(); @@ -435,7 +435,7 @@ impl ReplayStage { assert_eq!(*bank_slot, bank.slot()); if bank.tick_height() == bank.max_tick_height() { did_complete_bank = true; - Self::process_completed_bank(my_pubkey, bank, slot_full_sender); + Self::process_completed_bank(my_pubkey, bank, slot_full_senders); } else { trace!( "bank {} not completed tick_height: {}, max_tick_height: {}", @@ -618,13 +618,15 @@ impl ReplayStage { fn process_completed_bank( my_pubkey: &Pubkey, bank: Arc, - slot_full_sender: &Sender<(u64, Pubkey)>, + slot_full_senders: &[Sender<(u64, Pubkey)>], ) { bank.freeze(); info!("bank frozen {}", bank.slot()); - if let Err(e) = slot_full_sender.send((bank.slot(), *bank.collector_id())) { - trace!("{} slot_full alert failed: {:?}", my_pubkey, e); - } + slot_full_senders.iter().for_each(|sender| { + if let Err(e) = sender.send((bank.slot(), *bank.collector_id())) { + trace!("{} slot_full alert failed: {:?}", my_pubkey, e); + } + }); } fn generate_new_bank_forks( diff --git a/core/src/tvu.rs b/core/src/tvu.rs index a3abf2b722..0813f77d86 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -18,6 +18,7 @@ use crate::blockstream_service::BlockstreamService; use crate::blocktree::{Blocktree, CompletedSlotsReceiver}; use crate::cluster_info::ClusterInfo; use crate::leader_schedule_cache::LeaderScheduleCache; +use crate::ledger_cleanup_service::LedgerCleanupService; use crate::poh_recorder::PohRecorder; use crate::replay_stage::ReplayStage; use crate::retransmit_stage::RetransmitStage; @@ -37,6 +38,7 @@ pub struct Tvu { retransmit_stage: RetransmitStage, replay_stage: ReplayStage, blockstream_service: Option, + ledger_cleanup_service: Option, storage_stage: StorageStage, } @@ -64,6 +66,7 @@ impl Tvu { blocktree: Arc, storage_state: &StorageState, blockstream: Option<&String>, + max_ledger_slots: Option, ledger_signal_receiver: Receiver, subscriptions: &Arc, poh_recorder: &Arc>, @@ -110,7 +113,10 @@ impl Tvu { *bank_forks.read().unwrap().working_bank().epoch_schedule(), ); - let (replay_stage, slot_full_receiver, root_bank_receiver) = ReplayStage::new( + let (blockstream_slot_sender, blockstream_slot_receiver) = channel(); + let (ledger_cleanup_slot_sender, ledger_cleanup_slot_receiver) = channel(); + + let (replay_stage, root_bank_receiver) = ReplayStage::new( &keypair.pubkey(), vote_account, voting_keypair, @@ -122,11 +128,12 @@ impl Tvu { subscriptions, poh_recorder, leader_schedule_cache, + vec![blockstream_slot_sender, ledger_cleanup_slot_sender], ); let blockstream_service = if blockstream.is_some() { let blockstream_service = BlockstreamService::new( - slot_full_receiver, + blockstream_slot_receiver, blocktree.clone(), blockstream.unwrap().to_string(), &exit, @@ -136,6 +143,15 @@ impl Tvu { None }; + let ledger_cleanup_service = max_ledger_slots.map(|max_ledger_slots| { + LedgerCleanupService::new( + ledger_cleanup_slot_receiver, + blocktree.clone(), + max_ledger_slots, + &exit, + ) + }); + let storage_stage = StorageStage::new( storage_state, root_bank_receiver, @@ -152,6 +168,7 @@ impl Tvu { retransmit_stage, replay_stage, blockstream_service, + ledger_cleanup_service, storage_stage, } } @@ -167,6 +184,9 @@ impl Service for Tvu { if self.blockstream_service.is_some() { self.blockstream_service.unwrap().join()?; } + if self.ledger_cleanup_service.is_some() { + self.ledger_cleanup_service.unwrap().join()?; + } self.replay_stage.join()?; Ok(()) } @@ -226,6 +246,7 @@ pub mod tests { blocktree, &StorageState::default(), None, + None, l_receiver, &Arc::new(RpcSubscriptions::default()), &poh_recorder, diff --git a/core/src/validator.rs b/core/src/validator.rs index b2b6113977..d2e821ef7a 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -40,6 +40,7 @@ pub struct ValidatorConfig { pub account_paths: Option, pub rpc_config: JsonRpcConfig, pub snapshot_path: Option, + pub max_ledger_slots: Option, pub broadcast_stage_type: BroadcastStageType, pub erasure_config: ErasureConfig, } @@ -51,6 +52,7 @@ impl Default for ValidatorConfig { voting_disabled: false, blockstream: None, storage_slots_per_turn: DEFAULT_SLOTS_PER_TURN, + max_ledger_slots: None, account_paths: None, rpc_config: JsonRpcConfig::default(), snapshot_path: None, @@ -247,6 +249,7 @@ impl Validator { blocktree.clone(), &storage_state, config.blockstream.as_ref(), + config.max_ledger_slots, ledger_signal_receiver, &subscriptions, &poh_recorder, diff --git a/core/tests/local_cluster.rs b/core/tests/local_cluster.rs index 1b85abaef9..f772cc770a 100644 --- a/core/tests/local_cluster.rs +++ b/core/tests/local_cluster.rs @@ -3,6 +3,7 @@ extern crate solana; use hashbrown::HashSet; use log::*; use serial_test_derive::serial; +use solana::blocktree::Blocktree; use solana::broadcast_stage::BroadcastStageType; use solana::cluster::Cluster; use solana::cluster_tests; @@ -16,6 +17,44 @@ use solana_sdk::timing; use std::thread::sleep; use std::time::Duration; +#[test] +#[serial] +fn test_ledger_cleanup_service() { + solana_logger::setup(); + let num_nodes = 3; + let mut validator_config = ValidatorConfig::default(); + validator_config.max_ledger_slots = Some(100); + let config = ClusterConfig { + cluster_lamports: 10_000, + poh_config: PohConfig::new_sleep(Duration::from_millis(50)), + node_stakes: vec![100; num_nodes], + validator_configs: vec![validator_config.clone(); num_nodes], + ..ClusterConfig::default() + }; + let mut cluster = LocalCluster::new(&config); + // 200ms/per * 100 = 20 seconds, so sleep a little longer than that. + sleep(Duration::from_secs(60)); + + cluster_tests::spend_and_verify_all_nodes( + &cluster.entry_point_info, + &cluster.funding_keypair, + num_nodes, + HashSet::new(), + ); + cluster.close_preserve_ledgers(); + //check everyone's ledgers and make sure only ~100 slots are stored + for (_, info) in &cluster.fullnode_infos { + let mut slots = 0; + let blocktree = Blocktree::open(&info.info.ledger_path).unwrap(); + blocktree + .slot_meta_iterator(0) + .unwrap() + .for_each(|_| slots += 1); + // with 3 nodes upto 3 slots can be in progress and not complete so max slots in blocktree should be upto 103 + assert!(slots <= 103, "got {}", slots); + } +} + #[test] #[serial] fn test_spend_and_verify_all_nodes_1() { diff --git a/core/tests/tvu.rs b/core/tests/tvu.rs index 86b48896ee..9f3c2756d7 100644 --- a/core/tests/tvu.rs +++ b/core/tests/tvu.rs @@ -134,6 +134,7 @@ fn test_replay() { blocktree, &StorageState::default(), None, + None, ledger_signal_receiver, &Arc::new(RpcSubscriptions::default()), &poh_recorder, diff --git a/validator/src/main.rs b/validator/src/main.rs index 17ddfa5d11..0fee3afca1 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -2,6 +2,7 @@ use clap::{crate_description, crate_name, crate_version, App, Arg}; use log::*; use solana::cluster_info::{Node, FULLNODE_PORT_RANGE}; use solana::contact_info::ContactInfo; +use solana::ledger_cleanup_service::DEFAULT_MAX_LEDGER_SLOTS; use solana::local_vote_signer_service::LocalVoteSignerService; use solana::service::Service; use solana::socketaddr; @@ -163,6 +164,13 @@ fn main() { .takes_value(true) .help("Snapshot path"), ) + .arg( + clap::Arg::with_name("limit_ledger_size") + .long("limit-ledger-size") + .takes_value(false) + .requires("snapshot_path") + .help("drop older slots in the ledger"), + ) .arg( clap::Arg::with_name("skip_ledger_verify") .long("skip-ledger-verify") @@ -230,13 +238,12 @@ fn main() { if let Some(paths) = matches.value_of("accounts") { validator_config.account_paths = Some(paths.to_string()); - } else { - validator_config.account_paths = None; } if let Some(paths) = matches.value_of("snapshot_path") { validator_config.snapshot_path = Some(paths.to_string()); - } else { - validator_config.snapshot_path = None; + } + if matches.is_present("limit_ledger_size") { + validator_config.max_ledger_slots = Some(DEFAULT_MAX_LEDGER_SLOTS); } let cluster_entrypoint = matches.value_of("entrypoint").map(|entrypoint| { let entrypoint_addr = solana_netutil::parse_host_port(entrypoint)