diff --git a/core/src/lib.rs b/core/src/lib.rs index 99ac98b5d..44e7a8ab8 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -22,8 +22,6 @@ pub mod cost_update_service; pub mod drop_bank_service; pub mod fetch_stage; pub mod gen_keys; -pub mod ledger_cleanup_service; -pub mod ledger_metric_report_service; pub mod next_leader; pub mod optimistic_confirmation_verifier; pub mod poh_timing_report_service; diff --git a/core/src/tvu.rs b/core/src/tvu.rs index 639670479..be39b5f9d 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -14,7 +14,6 @@ use { consensus::{tower_storage::TowerStorage, Tower}, cost_update_service::CostUpdateService, drop_bank_service::DropBankService, - ledger_cleanup_service::LedgerCleanupService, repair::{quic_endpoint::LocalRequest, repair_service::RepairInfo}, replay_stage::{ReplayStage, ReplayStageConfig}, rewards_recorder_service::RewardsRecorderSender, @@ -32,8 +31,9 @@ use { duplicate_shred_listener::DuplicateShredListener, }, solana_ledger::{ - blockstore::Blockstore, blockstore_processor::TransactionStatusSender, - entry_notifier_service::EntryNotifierSender, leader_schedule_cache::LeaderScheduleCache, + blockstore::Blockstore, blockstore_cleanup_service::BlockstoreCleanupService, + blockstore_processor::TransactionStatusSender, entry_notifier_service::EntryNotifierSender, + leader_schedule_cache::LeaderScheduleCache, }, solana_poh::poh_recorder::PohRecorder, solana_rpc::{ @@ -63,7 +63,7 @@ pub struct Tvu { window_service: WindowService, cluster_slots_service: ClusterSlotsService, replay_stage: ReplayStage, - ledger_cleanup_service: Option, + blockstore_cleanup_service: Option, cost_update_service: CostUpdateService, voting_service: VotingService, warm_quic_cache_service: Option, @@ -236,14 +236,14 @@ impl Tvu { exit.clone(), ); - let (ledger_cleanup_slot_sender, ledger_cleanup_slot_receiver) = unbounded(); + let (blockstore_cleanup_slot_sender, blockstore_cleanup_slot_receiver) = unbounded(); let replay_stage_config = ReplayStageConfig { vote_account: *vote_account, authorized_voter_keypairs, exit: exit.clone(), rpc_subscriptions: rpc_subscriptions.clone(), leader_schedule_cache: leader_schedule_cache.clone(), - latest_root_senders: vec![ledger_cleanup_slot_sender], + latest_root_senders: vec![blockstore_cleanup_slot_sender], accounts_background_request_sender, block_commitment_cache, transaction_status_sender, @@ -311,9 +311,9 @@ impl Tvu { popular_pruned_forks_receiver, )?; - let ledger_cleanup_service = tvu_config.max_ledger_shreds.map(|max_ledger_shreds| { - LedgerCleanupService::new( - ledger_cleanup_slot_receiver, + let blockstore_cleanup_service = tvu_config.max_ledger_shreds.map(|max_ledger_shreds| { + BlockstoreCleanupService::new( + blockstore_cleanup_slot_receiver, blockstore.clone(), max_ledger_shreds, exit.clone(), @@ -337,7 +337,7 @@ impl Tvu { window_service, cluster_slots_service, replay_stage, - ledger_cleanup_service, + blockstore_cleanup_service, cost_update_service, voting_service, warm_quic_cache_service, @@ -352,8 +352,8 @@ impl Tvu { self.cluster_slots_service.join()?; self.fetch_stage.join()?; self.shred_sigverify.join()?; - if self.ledger_cleanup_service.is_some() { - self.ledger_cleanup_service.unwrap().join()?; + if self.blockstore_cleanup_service.is_some() { + self.blockstore_cleanup_service.unwrap().join()?; } self.replay_stage.join()?; self.cost_update_service.join()?; diff --git a/core/src/validator.rs b/core/src/validator.rs index 4aa6fb992..241105e28 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -14,7 +14,6 @@ use { tower_storage::{NullTowerStorage, TowerStorage}, ExternalRootSource, Tower, }, - ledger_metric_report_service::LedgerMetricReportService, poh_timing_report_service::PohTimingReportService, repair::{self, serve_repair::ServeRepair, serve_repair_service::ServeRepairService}, rewards_recorder_service::{RewardsRecorderSender, RewardsRecorderService}, @@ -56,6 +55,7 @@ use { blockstore::{ Blockstore, BlockstoreError, BlockstoreSignals, CompletedSlotsReceiver, PurgeType, }, + blockstore_metric_report_service::BlockstoreMetricReportService, blockstore_options::{BlockstoreOptions, BlockstoreRecoveryMode, LedgerColumnOptions}, blockstore_processor::{self, TransactionStatusSender}, entry_notifier_interface::EntryNotifierArc, @@ -465,7 +465,7 @@ pub struct Validator { pub bank_forks: Arc>, pub blockstore: Arc, geyser_plugin_service: Option, - ledger_metric_report_service: LedgerMetricReportService, + blockstore_metric_report_service: BlockstoreMetricReportService, accounts_background_service: AccountsBackgroundService, accounts_hash_verifier: AccountsHashVerifier, turbine_quic_endpoint: Endpoint, @@ -1102,8 +1102,8 @@ impl Validator { ) .map_err(|err| format!("wait_for_supermajority failed: {err:?}"))?; - let ledger_metric_report_service = - LedgerMetricReportService::new(blockstore.clone(), exit.clone()); + let blockstore_metric_report_service = + BlockstoreMetricReportService::new(blockstore.clone(), exit.clone()); let wait_for_vote_to_start_leader = !waited_for_supermajority && !config.no_wait_for_vote_to_start_leader; @@ -1378,7 +1378,7 @@ impl Validator { bank_forks, blockstore, geyser_plugin_service, - ledger_metric_report_service, + blockstore_metric_report_service, accounts_background_service, accounts_hash_verifier, turbine_quic_endpoint, @@ -1507,7 +1507,7 @@ impl Validator { self.stats_reporter_service .join() .expect("stats_reporter_service"); - self.ledger_metric_report_service + self.blockstore_metric_report_service .join() .expect("ledger_metric_report_service"); self.accounts_background_service diff --git a/core/tests/ledger_cleanup.rs b/core/tests/ledger_cleanup.rs deleted file mode 100644 index 1a096c738..000000000 --- a/core/tests/ledger_cleanup.rs +++ /dev/null @@ -1,613 +0,0 @@ -#![allow(clippy::arithmetic_side_effects)] -// Long-running ledger_cleanup tests - -#[cfg(test)] -mod tests { - use { - crossbeam_channel::unbounded, - log::*, - solana_core::ledger_cleanup_service::LedgerCleanupService, - solana_ledger::{ - blockstore::{make_many_slot_shreds, Blockstore}, - blockstore_options::{ - BlockstoreOptions, BlockstoreRocksFifoOptions, LedgerColumnOptions, - ShredStorageType, - }, - get_tmp_ledger_path, - }, - solana_measure::measure::Measure, - std::{ - collections::VecDeque, - str::FromStr, - sync::{ - atomic::{AtomicBool, AtomicU64, Ordering}, - Arc, Mutex, RwLock, - }, - thread::{self, Builder, JoinHandle}, - time::{Duration, Instant}, - }, - systemstat::{CPULoad, Platform, System}, - }; - - const DEFAULT_BENCHMARK_SLOTS: u64 = 50; - const DEFAULT_BATCH_SIZE_SLOTS: u64 = 1; - const DEFAULT_MAX_LEDGER_SHREDS: u64 = 50; - const DEFAULT_SHREDS_PER_SLOT: u64 = 25; - const DEFAULT_STOP_SIZE_BYTES: u64 = 0; - const DEFAULT_STOP_SIZE_ITERATIONS: u64 = 0; - const DEFAULT_STOP_SIZE_CF_DATA_BYTES: u64 = 0; - const DEFAULT_SHRED_DATA_CF_SIZE_BYTES: u64 = 125 * 1024 * 1024 * 1024; - - #[derive(Debug)] - struct BenchmarkConfig { - benchmark_slots: u64, - batch_size_slots: u64, - max_ledger_shreds: u64, - shreds_per_slot: u64, - stop_size_bytes: u64, - stop_size_iterations: u64, - stop_size_cf_data_bytes: u64, - pre_generate_data: bool, - cleanup_blockstore: bool, - num_writers: u64, - cleanup_service: bool, - fifo_compaction: bool, - shred_data_cf_size: u64, - } - - #[derive(Clone, Copy, Debug)] - struct CpuStatsInner { - cpu_user: f32, - cpu_system: f32, - cpu_idle: f32, - } - - impl From for CpuStatsInner { - fn from(cpu: CPULoad) -> Self { - Self { - cpu_user: cpu.user * 100.0, - cpu_system: cpu.system * 100.0, - cpu_idle: cpu.idle * 100.0, - } - } - } - - impl Default for CpuStatsInner { - fn default() -> Self { - Self { - cpu_user: 0.0, - cpu_system: 0.0, - cpu_idle: 0.0, - } - } - } - - struct CpuStats { - stats: RwLock, - sys: System, - } - - impl Default for CpuStats { - fn default() -> Self { - Self { - stats: RwLock::new(CpuStatsInner::default()), - sys: System::new(), - } - } - } - - impl CpuStats { - fn update(&self) { - if let Ok(cpu) = self.sys.cpu_load_aggregate() { - std::thread::sleep(Duration::from_millis(400)); - let cpu_new = CpuStatsInner::from(cpu.done().unwrap()); - *self.stats.write().unwrap() = cpu_new; - } - } - - fn get_stats(&self) -> CpuStatsInner { - *self.stats.read().unwrap() - } - } - - struct CpuStatsUpdater { - cpu_stats: Arc, - t_cleanup: JoinHandle<()>, - } - - impl CpuStatsUpdater { - pub fn new(exit: Arc) -> Self { - let cpu_stats = Arc::new(CpuStats::default()); - let cpu_stats_clone = cpu_stats.clone(); - - let t_cleanup = Builder::new() - .name("cpu_info".to_string()) - .spawn(move || loop { - if exit.load(Ordering::Relaxed) { - break; - } - cpu_stats_clone.update(); - }) - .unwrap(); - - Self { - cpu_stats, - t_cleanup, - } - } - - pub fn get_stats(&self) -> CpuStatsInner { - self.cpu_stats.get_stats() - } - - pub fn join(self) -> std::thread::Result<()> { - self.t_cleanup.join() - } - } - - fn read_env(key: &str, default: T) -> T - where - T: FromStr, - { - match std::env::var(key) { - Ok(val) => val.parse().unwrap_or(default), - Err(_e) => default, - } - } - - /// Obtains the benchmark config from the following environmental arguments: - /// - /// Basic benchmark settings: - /// - `BENCHMARK_SLOTS`: the number of slots in the benchmark. - /// - `BATCH_SIZE`: the number of slots in each write batch. - /// - `SHREDS_PER_SLOT`: the number of shreds in each slot. Together with - /// the `BATCH_SIZE` and `BENCHMARK_SLOTS`, it means: - /// - the number of shreds in one write batch is `BATCH_SIZE` * `SHREDS_PER_SLOT`. - /// - the total number of batches is `BENCHMARK_SLOTS` / `BATCH_SIZE`. - /// - the total number of shreds is `BENCHMARK_SLOTS` * `SHREDS_PER_SLOT`. - /// - `NUM_WRITERS`: controls the number of concurrent threads performing - /// shred insertion. Default: 1. - /// - /// Advanced benchmark settings: - /// - `STOP_SIZE_BYTES`: if specified, the benchmark will count how - /// many times the ledger store size exceeds the specified threshold. - /// - `STOP_SIZE_CF_DATA_BYTES`: if specified, the benchmark will count how - /// many times the storage size of `cf::ShredData` which stores data shred - /// exceeds the specified threshold. - /// - `STOP_SIZE_ITERATIONS`: when any of the stop size is specified, the - /// benchmark will stop immediately when the number of consecutive times - /// where the ledger store size exceeds the configured `STOP_SIZE_BYTES`. - /// These configs are used to make sure the benchmark runs successfully - /// under the storage limitation. - /// - `CLEANUP_BLOCKSTORE`: if true, the ledger store created in the current - /// benchmark run will be deleted. Default: true. - /// - /// Cleanup-service related settings: - /// - `MAX_LEDGER_SHREDS`: when the clean-up service is on, the service will - /// clean up the ledger store when the number of shreds exceeds this value. - /// - `CLEANUP_SERVICE`: whether to enable the background cleanup service. - /// If set to false, the ledger store in the benchmark will be purely relied - /// on RocksDB's compaction. Default: true. - /// - /// Fifo-compaction settings: - /// - `FIFO_COMPACTION`: if true, then RocksDB's Fifo compaction will be - /// used for storing data shreds. Default: false. - /// - `SHRED_DATA_CF_SIZE_BYTES`: the maximum size of the data-shred column family. - /// Default: 125 * 1024 * 1024 * 1024. - fn get_benchmark_config() -> BenchmarkConfig { - let benchmark_slots = read_env("BENCHMARK_SLOTS", DEFAULT_BENCHMARK_SLOTS); - let batch_size_slots = read_env("BATCH_SIZE", DEFAULT_BATCH_SIZE_SLOTS); - let max_ledger_shreds = read_env("MAX_LEDGER_SHREDS", DEFAULT_MAX_LEDGER_SHREDS); - let shreds_per_slot = read_env("SHREDS_PER_SLOT", DEFAULT_SHREDS_PER_SLOT); - let stop_size_bytes = read_env("STOP_SIZE_BYTES", DEFAULT_STOP_SIZE_BYTES); - let stop_size_iterations = read_env("STOP_SIZE_ITERATIONS", DEFAULT_STOP_SIZE_ITERATIONS); - let stop_size_cf_data_bytes = - read_env("STOP_SIZE_CF_DATA_BYTES", DEFAULT_STOP_SIZE_CF_DATA_BYTES); - let pre_generate_data = read_env("PRE_GENERATE_DATA", false); - let cleanup_blockstore = read_env("CLEANUP_BLOCKSTORE", true); - let num_writers = read_env("NUM_WRITERS", 1); - // A flag indicating whether to have a background clean-up service. - // If set to false, the ledger store will purely rely on RocksDB's - // compaction to perform the clean-up. - let cleanup_service = read_env("CLEANUP_SERVICE", true); - let fifo_compaction = read_env("FIFO_COMPACTION", false); - let shred_data_cf_size = - read_env("SHRED_DATA_CF_SIZE_BYTES", DEFAULT_SHRED_DATA_CF_SIZE_BYTES); - - BenchmarkConfig { - benchmark_slots, - batch_size_slots, - max_ledger_shreds, - shreds_per_slot, - stop_size_bytes, - stop_size_iterations, - stop_size_cf_data_bytes, - pre_generate_data, - cleanup_blockstore, - num_writers, - cleanup_service, - fifo_compaction, - shred_data_cf_size, - } - } - - fn emit_header() { - println!("TIME_MS,DELTA_MS,START_SLOT,BATCH_SIZE,SHREDS,MAX,SIZE,DELTA_SIZE,DATA_SHRED_SIZE,DATA_SHRED_SIZE_DELTA,CPU_USER,CPU_SYSTEM,CPU_IDLE"); - } - - #[allow(clippy::too_many_arguments)] - fn emit_stats( - time_initial: Instant, - time_previous: &mut Instant, - storage_previous: &mut u64, - data_shred_storage_previous: &mut u64, - start_slot: u64, - batch_size: u64, - num_shreds: u64, - max_shreds: i64, - blockstore: &Blockstore, - cpu: &CpuStatsInner, - ) { - let time_now = Instant::now(); - let storage_now = blockstore.storage_size().unwrap_or(0); - let data_shred_storage_now = blockstore.total_data_shred_storage_size().unwrap(); - let (cpu_user, cpu_system, cpu_idle) = (cpu.cpu_user, cpu.cpu_system, cpu.cpu_idle); - - info!( - "{},{},{},{},{},{},{},{},{},{},{:.2},{:.2},{:.2}", - time_now.duration_since(time_initial).as_millis(), - time_now.duration_since(*time_previous).as_millis(), - start_slot, - batch_size, - num_shreds, - max_shreds, - storage_now, - storage_now as i64 - *storage_previous as i64, - data_shred_storage_now, - data_shred_storage_now - *data_shred_storage_previous as i64, - cpu_user, - cpu_system, - cpu_idle, - ); - - *time_previous = time_now; - *storage_previous = storage_now; - *data_shred_storage_previous = data_shred_storage_now.try_into().unwrap(); - } - - /// Helper function of the benchmark `test_ledger_cleanup_compaction` which - /// returns true if the benchmark fails the size limitation check. - fn is_exceeded_stop_size_iterations( - storage_size: u64, - stop_size: u64, - exceeded_iterations: &mut u64, - iteration_limit: u64, - storage_desc: &str, - ) -> bool { - if stop_size > 0 { - if storage_size >= stop_size { - *exceeded_iterations += 1; - warn!( - "{} size {} exceeds the stop size {} for {} times!", - storage_desc, storage_size, stop_size, exceeded_iterations - ); - } else { - *exceeded_iterations = 0; - } - - if *exceeded_iterations >= iteration_limit { - error!( - "{} size exceeds the configured limit {} for {} times", - storage_desc, stop_size, exceeded_iterations, - ); - return true; - } - } - false - } - - /// The ledger cleanup test which can also be used as a benchmark - /// measuring shred insertion performance of the blockstore. - /// - /// The benchmark is controlled by several environmental arguments. - /// Check [`get_benchmark_config`] for the full list of arguments. - /// - /// Example command: - /// BENCHMARK_SLOTS=1000000 BATCH_SIZE=1 SHREDS_PER_SLOT=25 NUM_WRITERS=8 \ - /// PRE_GENERATE_DATA=false cargo test --release tests::test_ledger_cleanup \ - /// -- --exact --nocapture - #[test] - fn test_ledger_cleanup() { - solana_logger::setup_with("error,ledger_cleanup::tests=info"); - - let ledger_path = get_tmp_ledger_path!(); - let config = get_benchmark_config(); - let blockstore = Blockstore::open_with_options( - &ledger_path, - if config.fifo_compaction { - BlockstoreOptions { - column_options: LedgerColumnOptions { - shred_storage_type: ShredStorageType::RocksFifo( - BlockstoreRocksFifoOptions { - shred_data_cf_size: config.shred_data_cf_size, - shred_code_cf_size: config.shred_data_cf_size, - }, - ), - ..LedgerColumnOptions::default() - }, - ..BlockstoreOptions::default() - } - } else { - BlockstoreOptions::default() - }, - ) - .unwrap(); - let blockstore = Arc::new(blockstore); - - info!("Benchmark configuration: {:#?}", config); - info!("Ledger path: {:?}", &ledger_path); - - let benchmark_slots = config.benchmark_slots; - let batch_size_slots = config.batch_size_slots; - let max_ledger_shreds = config.max_ledger_shreds; - let shreds_per_slot = config.shreds_per_slot; - let stop_size_bytes = config.stop_size_bytes; - let stop_size_iterations = config.stop_size_iterations; - let stop_size_cf_data_bytes = config.stop_size_cf_data_bytes; - let pre_generate_data = config.pre_generate_data; - let num_writers = config.num_writers; - let cleanup_service = config.cleanup_service; - - let num_batches = benchmark_slots / batch_size_slots; - let num_shreds_total = benchmark_slots * shreds_per_slot; - - let (sender, receiver) = unbounded(); - let exit = Arc::new(AtomicBool::new(false)); - - let cleaner = if cleanup_service { - Some(LedgerCleanupService::new( - receiver, - blockstore.clone(), - max_ledger_shreds, - exit.clone(), - )) - } else { - None - }; - - let exit_cpu = Arc::new(AtomicBool::new(false)); - let sys = CpuStatsUpdater::new(exit_cpu.clone()); - - let mut shreds = VecDeque::new(); - - if pre_generate_data { - let mut pre_generate_data_timer = Measure::start("Pre-generate data"); - info!("Pre-generate data ... this may take a while"); - for i in 0..num_batches { - let start_slot = i * batch_size_slots; - let (new_shreds, _) = - make_many_slot_shreds(start_slot, batch_size_slots, shreds_per_slot); - shreds.push_back(new_shreds); - } - pre_generate_data_timer.stop(); - info!("{}", pre_generate_data_timer); - } - let shreds = Arc::new(Mutex::new(shreds)); - - info!( - "Bench info num_batches: {}, batch size (slots): {}, shreds_per_slot: {}, num_shreds_total: {}", - num_batches, - batch_size_slots, - shreds_per_slot, - num_shreds_total - ); - - let time_initial = Instant::now(); - let mut time_previous = time_initial; - let mut storage_previous = 0; - let mut data_shred_storage_previous = 0; - let mut stop_size_bytes_exceeded_iterations = 0; - let mut stop_size_cf_data_exceeded_iterations = 0; - - emit_header(); - emit_stats( - time_initial, - &mut time_previous, - &mut storage_previous, - &mut data_shred_storage_previous, - 0, - 0, - 0, - 0, - &blockstore, - &sys.get_stats(), - ); - - let mut insert_threads = vec![]; - let insert_exit = Arc::new(AtomicBool::new(false)); - - info!("Begin inserting shreds ..."); - let mut insert_timer = Measure::start("Shred insertion"); - let current_batch_id = Arc::new(AtomicU64::new(0)); - let finished_batch_count = Arc::new(AtomicU64::new(0)); - - for i in 0..num_writers { - let cloned_insert_exit = insert_exit.clone(); - let cloned_blockstore = blockstore.clone(); - let cloned_shreds = shreds.clone(); - let shared_batch_id = current_batch_id.clone(); - let shared_finished_count = finished_batch_count.clone(); - let insert_thread = Builder::new() - .name(format!("insert_shreds-{i}")) - .spawn(move || { - let start = Instant::now(); - let mut now = Instant::now(); - let mut total = 0; - let mut total_batches = 0; - let mut total_inserted_shreds = 0; - let mut num_shreds = 0; - let mut max_speed = 0f32; - let mut min_speed = f32::MAX; - let (first_shreds, _) = make_many_slot_shreds( - 0, batch_size_slots, shreds_per_slot); - loop { - let batch_id = shared_batch_id.fetch_add(1, Ordering::Relaxed); - let start_slot = batch_id * batch_size_slots; - if start_slot >= benchmark_slots { - break; - } - let len = batch_id; - - // No duplicates being generated, so all shreds - // being passed to insert() are getting inserted - let num_shred_inserted = if pre_generate_data { - let mut sl = cloned_shreds.lock().unwrap(); - if let Some(shreds_from_queue) = sl.pop_front() { - let num_shreds = shreds_from_queue.len(); - total += num_shreds; - cloned_blockstore.insert_shreds( - shreds_from_queue, None, false).unwrap(); - num_shreds - } else { - // If the queue is empty, we're done! - break; - } - } else { - let slot_id = start_slot; - if slot_id > 0 { - let (shreds_with_parent, _) = make_many_slot_shreds( - slot_id, batch_size_slots, shreds_per_slot); - let num_shreds = shreds_with_parent.len(); - total += num_shreds; - cloned_blockstore.insert_shreds( - shreds_with_parent.clone(), None, false).unwrap(); - num_shreds - } else { - let num_shreds = first_shreds.len(); - total += num_shreds; - cloned_blockstore.insert_shreds( - first_shreds.clone(), None, false).unwrap(); - num_shreds - } - }; - - total_batches += 1; - total_inserted_shreds += num_shred_inserted; - num_shreds += num_shred_inserted; - shared_finished_count.fetch_add(1, Ordering::Relaxed); - - // as_secs() returns whole number of seconds, so this runs every second - if now.elapsed().as_secs() > 0 { - let shreds_per_second = num_shreds as f32 / now.elapsed().as_secs() as f32; - warn!( - "insert-{} tried: {} inserted: {} batches: {} len: {} shreds_per_second: {}", - i, total, total_inserted_shreds, total_batches, len, shreds_per_second, - ); - let average_speed = - total_inserted_shreds as f32 / start.elapsed().as_secs() as f32; - max_speed = max_speed.max(shreds_per_second); - min_speed = min_speed.min(shreds_per_second); - warn!( - "highest: {} lowest: {} avg: {}", - max_speed, min_speed, average_speed - ); - now = Instant::now(); - num_shreds = 0; - } - - if cloned_insert_exit.load(Ordering::Relaxed) { - if max_speed > 0.0 { - info!( - "insert-{} exiting highest shreds/s: {}, lowest shreds/s: {}", - i, max_speed, min_speed - ); - } else { - // Not enough time elapsed to sample - info!( - "insert-{} exiting", - i - ); - } - break; - } - } - }) - .unwrap(); - insert_threads.push(insert_thread); - } - - loop { - let finished_batch = finished_batch_count.load(Ordering::Relaxed); - let finished_slot = (finished_batch + 1) * batch_size_slots - 1; - - if cleanup_service { - sender.send(finished_slot).unwrap(); - } - - emit_stats( - time_initial, - &mut time_previous, - &mut storage_previous, - &mut data_shred_storage_previous, - finished_slot, - batch_size_slots, - shreds_per_slot, - max_ledger_shreds as i64, - &blockstore, - &sys.get_stats(), - ); - - if is_exceeded_stop_size_iterations( - storage_previous, - stop_size_bytes, - &mut stop_size_bytes_exceeded_iterations, - stop_size_iterations, - "Storage", - ) { - break; - } - - if is_exceeded_stop_size_iterations( - data_shred_storage_previous, - stop_size_cf_data_bytes, - &mut stop_size_cf_data_exceeded_iterations, - stop_size_iterations, - "cf::ShredData", - ) { - break; - } - - if finished_batch >= num_batches { - break; - } else { - thread::sleep(Duration::from_millis(500)); - } - } - // Send exit signal to stop all the writer threads. - insert_exit.store(true, Ordering::Relaxed); - - while let Some(thread) = insert_threads.pop() { - thread.join().unwrap(); - } - insert_timer.stop(); - - info!( - "Done inserting shreds: {}, {} shreds/s", - insert_timer, - num_shreds_total as f32 / insert_timer.as_s(), - ); - - exit.store(true, Ordering::SeqCst); - if cleanup_service { - cleaner.unwrap().join().unwrap(); - } - - exit_cpu.store(true, Ordering::SeqCst); - sys.join().unwrap(); - - if config.cleanup_blockstore { - drop(blockstore); - Blockstore::destroy(&ledger_path).expect("Expected successful database destruction"); - } - } -} diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index b7a592151..3ea9525fc 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -7821,7 +7821,7 @@ pub mod tests { assert_eq!(counter, 1); } - fn do_test_lowest_cleanup_slot_and_special_cfs(simulate_ledger_cleanup_service: bool) { + fn do_test_lowest_cleanup_slot_and_special_cfs(simulate_blockstore_cleanup_service: bool) { solana_logger::setup(); let ledger_path = get_tmp_ledger_path_auto_delete!(); @@ -7929,13 +7929,13 @@ pub mod tests { assert_eq!(are_missing, (false, false)); assert_existing_always(); - if simulate_ledger_cleanup_service { + if simulate_blockstore_cleanup_service { *blockstore.lowest_cleanup_slot.write().unwrap() = lowest_cleanup_slot; blockstore.purge_slots(0, lowest_cleanup_slot, PurgeType::CompactionFilter); } let are_missing = check_for_missing(); - if simulate_ledger_cleanup_service { + if simulate_blockstore_cleanup_service { // ... when either simulation (or both) is effective, we should observe to be missing // consistently assert_eq!(are_missing, (true, true)); @@ -7947,12 +7947,12 @@ pub mod tests { } #[test] - fn test_lowest_cleanup_slot_and_special_cfs_with_ledger_cleanup_service_simulation() { + fn test_lowest_cleanup_slot_and_special_cfs_with_blockstore_cleanup_service_simulation() { do_test_lowest_cleanup_slot_and_special_cfs(true); } #[test] - fn test_lowest_cleanup_slot_and_special_cfs_without_ledger_cleanup_service_simulation() { + fn test_lowest_cleanup_slot_and_special_cfs_without_blockstore_cleanup_service_simulation() { do_test_lowest_cleanup_slot_and_special_cfs(false); } diff --git a/core/src/ledger_cleanup_service.rs b/ledger/src/blockstore_cleanup_service.rs similarity index 93% rename from core/src/ledger_cleanup_service.rs rename to ledger/src/blockstore_cleanup_service.rs index 80924bf76..dbd8e64e6 100644 --- a/core/src/ledger_cleanup_service.rs +++ b/ledger/src/blockstore_cleanup_service.rs @@ -1,15 +1,15 @@ -//! The `ledger_cleanup_service` drops older ledger data to limit disk space usage. +//! The `blockstore_cleanup_service` drops older ledger data to limit disk space usage. //! The service works by counting the number of live data shreds in the ledger; this //! can be done quickly and should have a fairly stable correlation to actual bytes. //! Once the shred count (and thus roughly the byte count) reaches a threshold, //! the services begins removing data in FIFO order. use { - crossbeam_channel::{Receiver, RecvTimeoutError}, - solana_ledger::{ + crate::{ blockstore::{Blockstore, PurgeType}, blockstore_db::{Result as BlockstoreResult, DATA_SHRED_CF}, }, + crossbeam_channel::{Receiver, RecvTimeoutError}, solana_measure::measure::Measure, solana_sdk::clock::Slot, std::{ @@ -40,11 +40,11 @@ pub const DEFAULT_MIN_MAX_LEDGER_SHREDS: u64 = 50_000_000; // and starve other blockstore users. pub const DEFAULT_PURGE_SLOT_INTERVAL: u64 = 512; -pub struct LedgerCleanupService { +pub struct BlockstoreCleanupService { t_cleanup: JoinHandle<()>, } -impl LedgerCleanupService { +impl BlockstoreCleanupService { pub fn new( new_root_receiver: Receiver, blockstore: Arc, @@ -54,12 +54,12 @@ impl LedgerCleanupService { let mut last_purge_slot = 0; info!( - "LedgerCleanupService active. max ledger shreds={}", + "BlockstoreCleanupService active. max ledger shreds={}", max_ledger_shreds ); let t_cleanup = Builder::new() - .name("solLedgerClean".to_string()) + .name("solBstoreClean".to_string()) .spawn(move || loop { if exit.load(Ordering::Relaxed) { break; @@ -296,8 +296,8 @@ impl LedgerCleanupService { mod tests { use { super::*, + crate::{blockstore::make_many_slot_entries, get_tmp_ledger_path_auto_delete}, crossbeam_channel::unbounded, - solana_ledger::{blockstore::make_many_slot_entries, get_tmp_ledger_path_auto_delete}, }; fn flush_blockstore_contents_to_disk(blockstore: Blockstore) -> Blockstore { @@ -314,7 +314,7 @@ mod tests { #[test] fn test_find_slots_to_clean() { - // LedgerCleanupService::find_slots_to_clean() does not modify the + // BlockstoreCleanupService::find_slots_to_clean() does not modify the // Blockstore, so we can make repeated calls on the same slots solana_logger::setup(); let ledger_path = get_tmp_ledger_path_auto_delete!(); @@ -334,22 +334,31 @@ mod tests { // Ensure no cleaning of slots > last_root let last_root = 0; let max_ledger_shreds = 0; - let (should_clean, lowest_purged, _) = - LedgerCleanupService::find_slots_to_clean(&blockstore, last_root, max_ledger_shreds); + let (should_clean, lowest_purged, _) = BlockstoreCleanupService::find_slots_to_clean( + &blockstore, + last_root, + max_ledger_shreds, + ); // Slot 0 will exist in blockstore with zero shreds since it is slot // 1's parent. Thus, slot 0 will be identified for clean. assert!(should_clean && lowest_purged == 0); // Now, set max_ledger_shreds to 1, slot 0 still eligible for clean let max_ledger_shreds = 1; - let (should_clean, lowest_purged, _) = - LedgerCleanupService::find_slots_to_clean(&blockstore, last_root, max_ledger_shreds); + let (should_clean, lowest_purged, _) = BlockstoreCleanupService::find_slots_to_clean( + &blockstore, + last_root, + max_ledger_shreds, + ); assert!(should_clean && lowest_purged == 0); // Ensure no cleaning if blockstore contains fewer than max_ledger_shreds let last_root = num_slots; let max_ledger_shreds = (shreds_per_slot * num_slots) + 1; - let (should_clean, lowest_purged, _) = - LedgerCleanupService::find_slots_to_clean(&blockstore, last_root, max_ledger_shreds); + let (should_clean, lowest_purged, _) = BlockstoreCleanupService::find_slots_to_clean( + &blockstore, + last_root, + max_ledger_shreds, + ); assert!(!should_clean && lowest_purged == 0); for slot in 1..=num_slots { @@ -357,7 +366,7 @@ mod tests { let last_root = slot; // Set max_ledger_shreds to 0 so that all eligible slots are cleaned let max_ledger_shreds = 0; - let (should_clean, lowest_purged, _) = LedgerCleanupService::find_slots_to_clean( + let (should_clean, lowest_purged, _) = BlockstoreCleanupService::find_slots_to_clean( &blockstore, last_root, max_ledger_shreds, @@ -369,7 +378,7 @@ mod tests { // Set max_ledger_shreds to the number of shreds in slots > slot. // This will make it so that slots [1, slot] are cleaned let max_ledger_shreds = shreds_per_slot * (num_slots - slot); - let (should_clean, lowest_purged, _) = LedgerCleanupService::find_slots_to_clean( + let (should_clean, lowest_purged, _) = BlockstoreCleanupService::find_slots_to_clean( &blockstore, last_root, max_ledger_shreds, @@ -393,8 +402,14 @@ mod tests { //send a signal to kill all but 5 shreds, which will be in the newest slots let mut last_purge_slot = 0; sender.send(50).unwrap(); - LedgerCleanupService::cleanup_ledger(&receiver, &blockstore, 5, &mut last_purge_slot, 10) - .unwrap(); + BlockstoreCleanupService::cleanup_ledger( + &receiver, + &blockstore, + 5, + &mut last_purge_slot, + 10, + ) + .unwrap(); assert_eq!(last_purge_slot, 50); //check that 0-40 don't exist @@ -437,7 +452,7 @@ mod tests { let mut time = Measure::start("purge time"); sender.send(slot + num_slots).unwrap(); - LedgerCleanupService::cleanup_ledger( + BlockstoreCleanupService::cleanup_ledger( &receiver, &blockstore, initial_slots, diff --git a/core/src/ledger_metric_report_service.rs b/ledger/src/blockstore_metric_report_service.rs similarity index 75% rename from core/src/ledger_metric_report_service.rs rename to ledger/src/blockstore_metric_report_service.rs index 2e91013eb..393442a3e 100644 --- a/core/src/ledger_metric_report_service.rs +++ b/ledger/src/blockstore_metric_report_service.rs @@ -1,7 +1,7 @@ -//! The `ledger_metric_report_service` periodically reports ledger store metrics. +//! The `blockstore_metric_report_service` periodically reports ledger store metrics. use { - solana_ledger::blockstore::Blockstore, + crate::blockstore::Blockstore, std::{ string::ToString, sync::{ @@ -14,15 +14,15 @@ use { }; // Determines how often we report blockstore metrics under -// LedgerMetricReportService. Note that there're other blockstore -// metrics that are reported outside LedgerMetricReportService. +// BlockstoreMetricReportService. Note that there are other blockstore +// metrics that are reported outside BlockstoreMetricReportService. const BLOCKSTORE_METRICS_REPORT_PERIOD_MILLIS: u64 = 10000; -pub struct LedgerMetricReportService { +pub struct BlockstoreMetricReportService { t_cf_metric: JoinHandle<()>, } -impl LedgerMetricReportService { +impl BlockstoreMetricReportService { pub fn new(blockstore: Arc, exit: Arc) -> Self { let t_cf_metric = Builder::new() .name("solRocksCfMtrcs".to_string()) diff --git a/ledger/src/lib.rs b/ledger/src/lib.rs index 0f311ca12..10dd51827 100644 --- a/ledger/src/lib.rs +++ b/ledger/src/lib.rs @@ -9,8 +9,10 @@ pub mod block_error; #[macro_use] pub mod blockstore; pub mod ancestor_iterator; +pub mod blockstore_cleanup_service; pub mod blockstore_db; pub mod blockstore_meta; +pub mod blockstore_metric_report_service; pub mod blockstore_metrics; pub mod blockstore_options; pub mod blockstore_processor; diff --git a/local-cluster/tests/local_cluster.rs b/local-cluster/tests/local_cluster.rs index d675feda0..f6791307d 100644 --- a/local-cluster/tests/local_cluster.rs +++ b/local-cluster/tests/local_cluster.rs @@ -4274,49 +4274,6 @@ fn test_leader_failure_4() { ); } -#[test] -#[serial] -fn test_ledger_cleanup_service() { - solana_logger::setup_with_default(RUST_LOG_FILTER); - error!("test_ledger_cleanup_service"); - let num_nodes = 3; - let validator_config = ValidatorConfig { - max_ledger_shreds: Some(100), - ..ValidatorConfig::default_for_test() - }; - let mut config = ClusterConfig { - cluster_lamports: DEFAULT_CLUSTER_LAMPORTS, - poh_config: PohConfig::new_sleep(Duration::from_millis(50)), - node_stakes: vec![DEFAULT_NODE_STAKE; num_nodes], - validator_configs: make_identical_validator_configs(&validator_config, num_nodes), - ..ClusterConfig::default() - }; - let mut cluster = LocalCluster::new(&mut config, SocketAddrSpace::Unspecified); - // 200ms/per * 100 = 20 seconds, so sleep a little longer than that. - sleep(Duration::from_secs(60)); - - cluster_tests::spend_and_verify_all_nodes( - &cluster.entry_point_info, - &cluster.funding_keypair, - num_nodes, - HashSet::new(), - SocketAddrSpace::Unspecified, - &cluster.connection_cache, - ); - cluster.close_preserve_ledgers(); - //check everyone's ledgers and make sure only ~100 slots are stored - for info in cluster.validators.values() { - let mut slots = 0; - let blockstore = Blockstore::open(&info.info.ledger_path).unwrap(); - blockstore - .slot_meta_iterator(0) - .unwrap() - .for_each(|_| slots += 1); - // with 3 nodes up to 3 slots can be in progress and not complete so max slots in blockstore should be up to 103 - assert!(slots <= 103, "got {slots}"); - } -} - // This test verifies that even if votes from a validator end up taking too long to land, and thus // some of the referenced slots are slots are no longer present in the slot hashes sysvar, // consensus can still be attained. diff --git a/validator/src/main.rs b/validator/src/main.rs index 4c247c9a9..bb8fa537b 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -22,7 +22,6 @@ use { solana_core::{ banking_trace::DISABLED_BAKING_TRACE_DIR, consensus::tower_storage, - ledger_cleanup_service::{DEFAULT_MAX_LEDGER_SHREDS, DEFAULT_MIN_MAX_LEDGER_SHREDS}, system_monitor_service::SystemMonitorService, tpu::DEFAULT_TPU_COALESCE, validator::{ @@ -32,6 +31,7 @@ use { }, solana_gossip::{cluster_info::Node, legacy_contact_info::LegacyContactInfo as ContactInfo}, solana_ledger::{ + blockstore_cleanup_service::{DEFAULT_MAX_LEDGER_SHREDS, DEFAULT_MIN_MAX_LEDGER_SHREDS}, blockstore_options::{ BlockstoreCompressionType, BlockstoreRecoveryMode, LedgerColumnOptions, ShredStorageType,