Move and rename ledger services from core to ledger (#33947)

These services currently live in core/; however, they operate on the
ledger. Mores so, these two services operate on the blockstore only,
and not necessarily the entire ledger. So, it makes sense to move these
services out of core and into ledger. We've recently been doing similar
changes with breaking things out into individual crates in order to
reduce the scope of core.

So, this change moves the services from core/ to ledger/, and replaces
ledger with blockstore.
This commit is contained in:
steviez 2023-11-08 11:58:31 -06:00 committed by GitHub
parent 8c5b5f18be
commit 73815aee51
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 67 additions and 708 deletions

View File

@ -22,8 +22,6 @@ pub mod cost_update_service;
pub mod drop_bank_service;
pub mod fetch_stage;
pub mod gen_keys;
pub mod ledger_cleanup_service;
pub mod ledger_metric_report_service;
pub mod next_leader;
pub mod optimistic_confirmation_verifier;
pub mod poh_timing_report_service;

View File

@ -14,7 +14,6 @@ use {
consensus::{tower_storage::TowerStorage, Tower},
cost_update_service::CostUpdateService,
drop_bank_service::DropBankService,
ledger_cleanup_service::LedgerCleanupService,
repair::{quic_endpoint::LocalRequest, repair_service::RepairInfo},
replay_stage::{ReplayStage, ReplayStageConfig},
rewards_recorder_service::RewardsRecorderSender,
@ -32,8 +31,9 @@ use {
duplicate_shred_listener::DuplicateShredListener,
},
solana_ledger::{
blockstore::Blockstore, blockstore_processor::TransactionStatusSender,
entry_notifier_service::EntryNotifierSender, leader_schedule_cache::LeaderScheduleCache,
blockstore::Blockstore, blockstore_cleanup_service::BlockstoreCleanupService,
blockstore_processor::TransactionStatusSender, entry_notifier_service::EntryNotifierSender,
leader_schedule_cache::LeaderScheduleCache,
},
solana_poh::poh_recorder::PohRecorder,
solana_rpc::{
@ -63,7 +63,7 @@ pub struct Tvu {
window_service: WindowService,
cluster_slots_service: ClusterSlotsService,
replay_stage: ReplayStage,
ledger_cleanup_service: Option<LedgerCleanupService>,
blockstore_cleanup_service: Option<BlockstoreCleanupService>,
cost_update_service: CostUpdateService,
voting_service: VotingService,
warm_quic_cache_service: Option<WarmQuicCacheService>,
@ -236,14 +236,14 @@ impl Tvu {
exit.clone(),
);
let (ledger_cleanup_slot_sender, ledger_cleanup_slot_receiver) = unbounded();
let (blockstore_cleanup_slot_sender, blockstore_cleanup_slot_receiver) = unbounded();
let replay_stage_config = ReplayStageConfig {
vote_account: *vote_account,
authorized_voter_keypairs,
exit: exit.clone(),
rpc_subscriptions: rpc_subscriptions.clone(),
leader_schedule_cache: leader_schedule_cache.clone(),
latest_root_senders: vec![ledger_cleanup_slot_sender],
latest_root_senders: vec![blockstore_cleanup_slot_sender],
accounts_background_request_sender,
block_commitment_cache,
transaction_status_sender,
@ -311,9 +311,9 @@ impl Tvu {
popular_pruned_forks_receiver,
)?;
let ledger_cleanup_service = tvu_config.max_ledger_shreds.map(|max_ledger_shreds| {
LedgerCleanupService::new(
ledger_cleanup_slot_receiver,
let blockstore_cleanup_service = tvu_config.max_ledger_shreds.map(|max_ledger_shreds| {
BlockstoreCleanupService::new(
blockstore_cleanup_slot_receiver,
blockstore.clone(),
max_ledger_shreds,
exit.clone(),
@ -337,7 +337,7 @@ impl Tvu {
window_service,
cluster_slots_service,
replay_stage,
ledger_cleanup_service,
blockstore_cleanup_service,
cost_update_service,
voting_service,
warm_quic_cache_service,
@ -352,8 +352,8 @@ impl Tvu {
self.cluster_slots_service.join()?;
self.fetch_stage.join()?;
self.shred_sigverify.join()?;
if self.ledger_cleanup_service.is_some() {
self.ledger_cleanup_service.unwrap().join()?;
if self.blockstore_cleanup_service.is_some() {
self.blockstore_cleanup_service.unwrap().join()?;
}
self.replay_stage.join()?;
self.cost_update_service.join()?;

View File

@ -14,7 +14,6 @@ use {
tower_storage::{NullTowerStorage, TowerStorage},
ExternalRootSource, Tower,
},
ledger_metric_report_service::LedgerMetricReportService,
poh_timing_report_service::PohTimingReportService,
repair::{self, serve_repair::ServeRepair, serve_repair_service::ServeRepairService},
rewards_recorder_service::{RewardsRecorderSender, RewardsRecorderService},
@ -56,6 +55,7 @@ use {
blockstore::{
Blockstore, BlockstoreError, BlockstoreSignals, CompletedSlotsReceiver, PurgeType,
},
blockstore_metric_report_service::BlockstoreMetricReportService,
blockstore_options::{BlockstoreOptions, BlockstoreRecoveryMode, LedgerColumnOptions},
blockstore_processor::{self, TransactionStatusSender},
entry_notifier_interface::EntryNotifierArc,
@ -465,7 +465,7 @@ pub struct Validator {
pub bank_forks: Arc<RwLock<BankForks>>,
pub blockstore: Arc<Blockstore>,
geyser_plugin_service: Option<GeyserPluginService>,
ledger_metric_report_service: LedgerMetricReportService,
blockstore_metric_report_service: BlockstoreMetricReportService,
accounts_background_service: AccountsBackgroundService,
accounts_hash_verifier: AccountsHashVerifier,
turbine_quic_endpoint: Endpoint,
@ -1102,8 +1102,8 @@ impl Validator {
)
.map_err(|err| format!("wait_for_supermajority failed: {err:?}"))?;
let ledger_metric_report_service =
LedgerMetricReportService::new(blockstore.clone(), exit.clone());
let blockstore_metric_report_service =
BlockstoreMetricReportService::new(blockstore.clone(), exit.clone());
let wait_for_vote_to_start_leader =
!waited_for_supermajority && !config.no_wait_for_vote_to_start_leader;
@ -1378,7 +1378,7 @@ impl Validator {
bank_forks,
blockstore,
geyser_plugin_service,
ledger_metric_report_service,
blockstore_metric_report_service,
accounts_background_service,
accounts_hash_verifier,
turbine_quic_endpoint,
@ -1507,7 +1507,7 @@ impl Validator {
self.stats_reporter_service
.join()
.expect("stats_reporter_service");
self.ledger_metric_report_service
self.blockstore_metric_report_service
.join()
.expect("ledger_metric_report_service");
self.accounts_background_service

View File

@ -1,613 +0,0 @@
#![allow(clippy::arithmetic_side_effects)]
// Long-running ledger_cleanup tests
#[cfg(test)]
mod tests {
use {
crossbeam_channel::unbounded,
log::*,
solana_core::ledger_cleanup_service::LedgerCleanupService,
solana_ledger::{
blockstore::{make_many_slot_shreds, Blockstore},
blockstore_options::{
BlockstoreOptions, BlockstoreRocksFifoOptions, LedgerColumnOptions,
ShredStorageType,
},
get_tmp_ledger_path,
},
solana_measure::measure::Measure,
std::{
collections::VecDeque,
str::FromStr,
sync::{
atomic::{AtomicBool, AtomicU64, Ordering},
Arc, Mutex, RwLock,
},
thread::{self, Builder, JoinHandle},
time::{Duration, Instant},
},
systemstat::{CPULoad, Platform, System},
};
const DEFAULT_BENCHMARK_SLOTS: u64 = 50;
const DEFAULT_BATCH_SIZE_SLOTS: u64 = 1;
const DEFAULT_MAX_LEDGER_SHREDS: u64 = 50;
const DEFAULT_SHREDS_PER_SLOT: u64 = 25;
const DEFAULT_STOP_SIZE_BYTES: u64 = 0;
const DEFAULT_STOP_SIZE_ITERATIONS: u64 = 0;
const DEFAULT_STOP_SIZE_CF_DATA_BYTES: u64 = 0;
const DEFAULT_SHRED_DATA_CF_SIZE_BYTES: u64 = 125 * 1024 * 1024 * 1024;
#[derive(Debug)]
struct BenchmarkConfig {
benchmark_slots: u64,
batch_size_slots: u64,
max_ledger_shreds: u64,
shreds_per_slot: u64,
stop_size_bytes: u64,
stop_size_iterations: u64,
stop_size_cf_data_bytes: u64,
pre_generate_data: bool,
cleanup_blockstore: bool,
num_writers: u64,
cleanup_service: bool,
fifo_compaction: bool,
shred_data_cf_size: u64,
}
#[derive(Clone, Copy, Debug)]
struct CpuStatsInner {
cpu_user: f32,
cpu_system: f32,
cpu_idle: f32,
}
impl From<CPULoad> for CpuStatsInner {
fn from(cpu: CPULoad) -> Self {
Self {
cpu_user: cpu.user * 100.0,
cpu_system: cpu.system * 100.0,
cpu_idle: cpu.idle * 100.0,
}
}
}
impl Default for CpuStatsInner {
fn default() -> Self {
Self {
cpu_user: 0.0,
cpu_system: 0.0,
cpu_idle: 0.0,
}
}
}
struct CpuStats {
stats: RwLock<CpuStatsInner>,
sys: System,
}
impl Default for CpuStats {
fn default() -> Self {
Self {
stats: RwLock::new(CpuStatsInner::default()),
sys: System::new(),
}
}
}
impl CpuStats {
fn update(&self) {
if let Ok(cpu) = self.sys.cpu_load_aggregate() {
std::thread::sleep(Duration::from_millis(400));
let cpu_new = CpuStatsInner::from(cpu.done().unwrap());
*self.stats.write().unwrap() = cpu_new;
}
}
fn get_stats(&self) -> CpuStatsInner {
*self.stats.read().unwrap()
}
}
struct CpuStatsUpdater {
cpu_stats: Arc<CpuStats>,
t_cleanup: JoinHandle<()>,
}
impl CpuStatsUpdater {
pub fn new(exit: Arc<AtomicBool>) -> Self {
let cpu_stats = Arc::new(CpuStats::default());
let cpu_stats_clone = cpu_stats.clone();
let t_cleanup = Builder::new()
.name("cpu_info".to_string())
.spawn(move || loop {
if exit.load(Ordering::Relaxed) {
break;
}
cpu_stats_clone.update();
})
.unwrap();
Self {
cpu_stats,
t_cleanup,
}
}
pub fn get_stats(&self) -> CpuStatsInner {
self.cpu_stats.get_stats()
}
pub fn join(self) -> std::thread::Result<()> {
self.t_cleanup.join()
}
}
fn read_env<T>(key: &str, default: T) -> T
where
T: FromStr,
{
match std::env::var(key) {
Ok(val) => val.parse().unwrap_or(default),
Err(_e) => default,
}
}
/// Obtains the benchmark config from the following environmental arguments:
///
/// Basic benchmark settings:
/// - `BENCHMARK_SLOTS`: the number of slots in the benchmark.
/// - `BATCH_SIZE`: the number of slots in each write batch.
/// - `SHREDS_PER_SLOT`: the number of shreds in each slot. Together with
/// the `BATCH_SIZE` and `BENCHMARK_SLOTS`, it means:
/// - the number of shreds in one write batch is `BATCH_SIZE` * `SHREDS_PER_SLOT`.
/// - the total number of batches is `BENCHMARK_SLOTS` / `BATCH_SIZE`.
/// - the total number of shreds is `BENCHMARK_SLOTS` * `SHREDS_PER_SLOT`.
/// - `NUM_WRITERS`: controls the number of concurrent threads performing
/// shred insertion. Default: 1.
///
/// Advanced benchmark settings:
/// - `STOP_SIZE_BYTES`: if specified, the benchmark will count how
/// many times the ledger store size exceeds the specified threshold.
/// - `STOP_SIZE_CF_DATA_BYTES`: if specified, the benchmark will count how
/// many times the storage size of `cf::ShredData` which stores data shred
/// exceeds the specified threshold.
/// - `STOP_SIZE_ITERATIONS`: when any of the stop size is specified, the
/// benchmark will stop immediately when the number of consecutive times
/// where the ledger store size exceeds the configured `STOP_SIZE_BYTES`.
/// These configs are used to make sure the benchmark runs successfully
/// under the storage limitation.
/// - `CLEANUP_BLOCKSTORE`: if true, the ledger store created in the current
/// benchmark run will be deleted. Default: true.
///
/// Cleanup-service related settings:
/// - `MAX_LEDGER_SHREDS`: when the clean-up service is on, the service will
/// clean up the ledger store when the number of shreds exceeds this value.
/// - `CLEANUP_SERVICE`: whether to enable the background cleanup service.
/// If set to false, the ledger store in the benchmark will be purely relied
/// on RocksDB's compaction. Default: true.
///
/// Fifo-compaction settings:
/// - `FIFO_COMPACTION`: if true, then RocksDB's Fifo compaction will be
/// used for storing data shreds. Default: false.
/// - `SHRED_DATA_CF_SIZE_BYTES`: the maximum size of the data-shred column family.
/// Default: 125 * 1024 * 1024 * 1024.
fn get_benchmark_config() -> BenchmarkConfig {
let benchmark_slots = read_env("BENCHMARK_SLOTS", DEFAULT_BENCHMARK_SLOTS);
let batch_size_slots = read_env("BATCH_SIZE", DEFAULT_BATCH_SIZE_SLOTS);
let max_ledger_shreds = read_env("MAX_LEDGER_SHREDS", DEFAULT_MAX_LEDGER_SHREDS);
let shreds_per_slot = read_env("SHREDS_PER_SLOT", DEFAULT_SHREDS_PER_SLOT);
let stop_size_bytes = read_env("STOP_SIZE_BYTES", DEFAULT_STOP_SIZE_BYTES);
let stop_size_iterations = read_env("STOP_SIZE_ITERATIONS", DEFAULT_STOP_SIZE_ITERATIONS);
let stop_size_cf_data_bytes =
read_env("STOP_SIZE_CF_DATA_BYTES", DEFAULT_STOP_SIZE_CF_DATA_BYTES);
let pre_generate_data = read_env("PRE_GENERATE_DATA", false);
let cleanup_blockstore = read_env("CLEANUP_BLOCKSTORE", true);
let num_writers = read_env("NUM_WRITERS", 1);
// A flag indicating whether to have a background clean-up service.
// If set to false, the ledger store will purely rely on RocksDB's
// compaction to perform the clean-up.
let cleanup_service = read_env("CLEANUP_SERVICE", true);
let fifo_compaction = read_env("FIFO_COMPACTION", false);
let shred_data_cf_size =
read_env("SHRED_DATA_CF_SIZE_BYTES", DEFAULT_SHRED_DATA_CF_SIZE_BYTES);
BenchmarkConfig {
benchmark_slots,
batch_size_slots,
max_ledger_shreds,
shreds_per_slot,
stop_size_bytes,
stop_size_iterations,
stop_size_cf_data_bytes,
pre_generate_data,
cleanup_blockstore,
num_writers,
cleanup_service,
fifo_compaction,
shred_data_cf_size,
}
}
fn emit_header() {
println!("TIME_MS,DELTA_MS,START_SLOT,BATCH_SIZE,SHREDS,MAX,SIZE,DELTA_SIZE,DATA_SHRED_SIZE,DATA_SHRED_SIZE_DELTA,CPU_USER,CPU_SYSTEM,CPU_IDLE");
}
#[allow(clippy::too_many_arguments)]
fn emit_stats(
time_initial: Instant,
time_previous: &mut Instant,
storage_previous: &mut u64,
data_shred_storage_previous: &mut u64,
start_slot: u64,
batch_size: u64,
num_shreds: u64,
max_shreds: i64,
blockstore: &Blockstore,
cpu: &CpuStatsInner,
) {
let time_now = Instant::now();
let storage_now = blockstore.storage_size().unwrap_or(0);
let data_shred_storage_now = blockstore.total_data_shred_storage_size().unwrap();
let (cpu_user, cpu_system, cpu_idle) = (cpu.cpu_user, cpu.cpu_system, cpu.cpu_idle);
info!(
"{},{},{},{},{},{},{},{},{},{},{:.2},{:.2},{:.2}",
time_now.duration_since(time_initial).as_millis(),
time_now.duration_since(*time_previous).as_millis(),
start_slot,
batch_size,
num_shreds,
max_shreds,
storage_now,
storage_now as i64 - *storage_previous as i64,
data_shred_storage_now,
data_shred_storage_now - *data_shred_storage_previous as i64,
cpu_user,
cpu_system,
cpu_idle,
);
*time_previous = time_now;
*storage_previous = storage_now;
*data_shred_storage_previous = data_shred_storage_now.try_into().unwrap();
}
/// Helper function of the benchmark `test_ledger_cleanup_compaction` which
/// returns true if the benchmark fails the size limitation check.
fn is_exceeded_stop_size_iterations(
storage_size: u64,
stop_size: u64,
exceeded_iterations: &mut u64,
iteration_limit: u64,
storage_desc: &str,
) -> bool {
if stop_size > 0 {
if storage_size >= stop_size {
*exceeded_iterations += 1;
warn!(
"{} size {} exceeds the stop size {} for {} times!",
storage_desc, storage_size, stop_size, exceeded_iterations
);
} else {
*exceeded_iterations = 0;
}
if *exceeded_iterations >= iteration_limit {
error!(
"{} size exceeds the configured limit {} for {} times",
storage_desc, stop_size, exceeded_iterations,
);
return true;
}
}
false
}
/// The ledger cleanup test which can also be used as a benchmark
/// measuring shred insertion performance of the blockstore.
///
/// The benchmark is controlled by several environmental arguments.
/// Check [`get_benchmark_config`] for the full list of arguments.
///
/// Example command:
/// BENCHMARK_SLOTS=1000000 BATCH_SIZE=1 SHREDS_PER_SLOT=25 NUM_WRITERS=8 \
/// PRE_GENERATE_DATA=false cargo test --release tests::test_ledger_cleanup \
/// -- --exact --nocapture
#[test]
fn test_ledger_cleanup() {
solana_logger::setup_with("error,ledger_cleanup::tests=info");
let ledger_path = get_tmp_ledger_path!();
let config = get_benchmark_config();
let blockstore = Blockstore::open_with_options(
&ledger_path,
if config.fifo_compaction {
BlockstoreOptions {
column_options: LedgerColumnOptions {
shred_storage_type: ShredStorageType::RocksFifo(
BlockstoreRocksFifoOptions {
shred_data_cf_size: config.shred_data_cf_size,
shred_code_cf_size: config.shred_data_cf_size,
},
),
..LedgerColumnOptions::default()
},
..BlockstoreOptions::default()
}
} else {
BlockstoreOptions::default()
},
)
.unwrap();
let blockstore = Arc::new(blockstore);
info!("Benchmark configuration: {:#?}", config);
info!("Ledger path: {:?}", &ledger_path);
let benchmark_slots = config.benchmark_slots;
let batch_size_slots = config.batch_size_slots;
let max_ledger_shreds = config.max_ledger_shreds;
let shreds_per_slot = config.shreds_per_slot;
let stop_size_bytes = config.stop_size_bytes;
let stop_size_iterations = config.stop_size_iterations;
let stop_size_cf_data_bytes = config.stop_size_cf_data_bytes;
let pre_generate_data = config.pre_generate_data;
let num_writers = config.num_writers;
let cleanup_service = config.cleanup_service;
let num_batches = benchmark_slots / batch_size_slots;
let num_shreds_total = benchmark_slots * shreds_per_slot;
let (sender, receiver) = unbounded();
let exit = Arc::new(AtomicBool::new(false));
let cleaner = if cleanup_service {
Some(LedgerCleanupService::new(
receiver,
blockstore.clone(),
max_ledger_shreds,
exit.clone(),
))
} else {
None
};
let exit_cpu = Arc::new(AtomicBool::new(false));
let sys = CpuStatsUpdater::new(exit_cpu.clone());
let mut shreds = VecDeque::new();
if pre_generate_data {
let mut pre_generate_data_timer = Measure::start("Pre-generate data");
info!("Pre-generate data ... this may take a while");
for i in 0..num_batches {
let start_slot = i * batch_size_slots;
let (new_shreds, _) =
make_many_slot_shreds(start_slot, batch_size_slots, shreds_per_slot);
shreds.push_back(new_shreds);
}
pre_generate_data_timer.stop();
info!("{}", pre_generate_data_timer);
}
let shreds = Arc::new(Mutex::new(shreds));
info!(
"Bench info num_batches: {}, batch size (slots): {}, shreds_per_slot: {}, num_shreds_total: {}",
num_batches,
batch_size_slots,
shreds_per_slot,
num_shreds_total
);
let time_initial = Instant::now();
let mut time_previous = time_initial;
let mut storage_previous = 0;
let mut data_shred_storage_previous = 0;
let mut stop_size_bytes_exceeded_iterations = 0;
let mut stop_size_cf_data_exceeded_iterations = 0;
emit_header();
emit_stats(
time_initial,
&mut time_previous,
&mut storage_previous,
&mut data_shred_storage_previous,
0,
0,
0,
0,
&blockstore,
&sys.get_stats(),
);
let mut insert_threads = vec![];
let insert_exit = Arc::new(AtomicBool::new(false));
info!("Begin inserting shreds ...");
let mut insert_timer = Measure::start("Shred insertion");
let current_batch_id = Arc::new(AtomicU64::new(0));
let finished_batch_count = Arc::new(AtomicU64::new(0));
for i in 0..num_writers {
let cloned_insert_exit = insert_exit.clone();
let cloned_blockstore = blockstore.clone();
let cloned_shreds = shreds.clone();
let shared_batch_id = current_batch_id.clone();
let shared_finished_count = finished_batch_count.clone();
let insert_thread = Builder::new()
.name(format!("insert_shreds-{i}"))
.spawn(move || {
let start = Instant::now();
let mut now = Instant::now();
let mut total = 0;
let mut total_batches = 0;
let mut total_inserted_shreds = 0;
let mut num_shreds = 0;
let mut max_speed = 0f32;
let mut min_speed = f32::MAX;
let (first_shreds, _) = make_many_slot_shreds(
0, batch_size_slots, shreds_per_slot);
loop {
let batch_id = shared_batch_id.fetch_add(1, Ordering::Relaxed);
let start_slot = batch_id * batch_size_slots;
if start_slot >= benchmark_slots {
break;
}
let len = batch_id;
// No duplicates being generated, so all shreds
// being passed to insert() are getting inserted
let num_shred_inserted = if pre_generate_data {
let mut sl = cloned_shreds.lock().unwrap();
if let Some(shreds_from_queue) = sl.pop_front() {
let num_shreds = shreds_from_queue.len();
total += num_shreds;
cloned_blockstore.insert_shreds(
shreds_from_queue, None, false).unwrap();
num_shreds
} else {
// If the queue is empty, we're done!
break;
}
} else {
let slot_id = start_slot;
if slot_id > 0 {
let (shreds_with_parent, _) = make_many_slot_shreds(
slot_id, batch_size_slots, shreds_per_slot);
let num_shreds = shreds_with_parent.len();
total += num_shreds;
cloned_blockstore.insert_shreds(
shreds_with_parent.clone(), None, false).unwrap();
num_shreds
} else {
let num_shreds = first_shreds.len();
total += num_shreds;
cloned_blockstore.insert_shreds(
first_shreds.clone(), None, false).unwrap();
num_shreds
}
};
total_batches += 1;
total_inserted_shreds += num_shred_inserted;
num_shreds += num_shred_inserted;
shared_finished_count.fetch_add(1, Ordering::Relaxed);
// as_secs() returns whole number of seconds, so this runs every second
if now.elapsed().as_secs() > 0 {
let shreds_per_second = num_shreds as f32 / now.elapsed().as_secs() as f32;
warn!(
"insert-{} tried: {} inserted: {} batches: {} len: {} shreds_per_second: {}",
i, total, total_inserted_shreds, total_batches, len, shreds_per_second,
);
let average_speed =
total_inserted_shreds as f32 / start.elapsed().as_secs() as f32;
max_speed = max_speed.max(shreds_per_second);
min_speed = min_speed.min(shreds_per_second);
warn!(
"highest: {} lowest: {} avg: {}",
max_speed, min_speed, average_speed
);
now = Instant::now();
num_shreds = 0;
}
if cloned_insert_exit.load(Ordering::Relaxed) {
if max_speed > 0.0 {
info!(
"insert-{} exiting highest shreds/s: {}, lowest shreds/s: {}",
i, max_speed, min_speed
);
} else {
// Not enough time elapsed to sample
info!(
"insert-{} exiting",
i
);
}
break;
}
}
})
.unwrap();
insert_threads.push(insert_thread);
}
loop {
let finished_batch = finished_batch_count.load(Ordering::Relaxed);
let finished_slot = (finished_batch + 1) * batch_size_slots - 1;
if cleanup_service {
sender.send(finished_slot).unwrap();
}
emit_stats(
time_initial,
&mut time_previous,
&mut storage_previous,
&mut data_shred_storage_previous,
finished_slot,
batch_size_slots,
shreds_per_slot,
max_ledger_shreds as i64,
&blockstore,
&sys.get_stats(),
);
if is_exceeded_stop_size_iterations(
storage_previous,
stop_size_bytes,
&mut stop_size_bytes_exceeded_iterations,
stop_size_iterations,
"Storage",
) {
break;
}
if is_exceeded_stop_size_iterations(
data_shred_storage_previous,
stop_size_cf_data_bytes,
&mut stop_size_cf_data_exceeded_iterations,
stop_size_iterations,
"cf::ShredData",
) {
break;
}
if finished_batch >= num_batches {
break;
} else {
thread::sleep(Duration::from_millis(500));
}
}
// Send exit signal to stop all the writer threads.
insert_exit.store(true, Ordering::Relaxed);
while let Some(thread) = insert_threads.pop() {
thread.join().unwrap();
}
insert_timer.stop();
info!(
"Done inserting shreds: {}, {} shreds/s",
insert_timer,
num_shreds_total as f32 / insert_timer.as_s(),
);
exit.store(true, Ordering::SeqCst);
if cleanup_service {
cleaner.unwrap().join().unwrap();
}
exit_cpu.store(true, Ordering::SeqCst);
sys.join().unwrap();
if config.cleanup_blockstore {
drop(blockstore);
Blockstore::destroy(&ledger_path).expect("Expected successful database destruction");
}
}
}

View File

@ -7821,7 +7821,7 @@ pub mod tests {
assert_eq!(counter, 1);
}
fn do_test_lowest_cleanup_slot_and_special_cfs(simulate_ledger_cleanup_service: bool) {
fn do_test_lowest_cleanup_slot_and_special_cfs(simulate_blockstore_cleanup_service: bool) {
solana_logger::setup();
let ledger_path = get_tmp_ledger_path_auto_delete!();
@ -7929,13 +7929,13 @@ pub mod tests {
assert_eq!(are_missing, (false, false));
assert_existing_always();
if simulate_ledger_cleanup_service {
if simulate_blockstore_cleanup_service {
*blockstore.lowest_cleanup_slot.write().unwrap() = lowest_cleanup_slot;
blockstore.purge_slots(0, lowest_cleanup_slot, PurgeType::CompactionFilter);
}
let are_missing = check_for_missing();
if simulate_ledger_cleanup_service {
if simulate_blockstore_cleanup_service {
// ... when either simulation (or both) is effective, we should observe to be missing
// consistently
assert_eq!(are_missing, (true, true));
@ -7947,12 +7947,12 @@ pub mod tests {
}
#[test]
fn test_lowest_cleanup_slot_and_special_cfs_with_ledger_cleanup_service_simulation() {
fn test_lowest_cleanup_slot_and_special_cfs_with_blockstore_cleanup_service_simulation() {
do_test_lowest_cleanup_slot_and_special_cfs(true);
}
#[test]
fn test_lowest_cleanup_slot_and_special_cfs_without_ledger_cleanup_service_simulation() {
fn test_lowest_cleanup_slot_and_special_cfs_without_blockstore_cleanup_service_simulation() {
do_test_lowest_cleanup_slot_and_special_cfs(false);
}

View File

@ -1,15 +1,15 @@
//! The `ledger_cleanup_service` drops older ledger data to limit disk space usage.
//! The `blockstore_cleanup_service` drops older ledger data to limit disk space usage.
//! The service works by counting the number of live data shreds in the ledger; this
//! can be done quickly and should have a fairly stable correlation to actual bytes.
//! Once the shred count (and thus roughly the byte count) reaches a threshold,
//! the services begins removing data in FIFO order.
use {
crossbeam_channel::{Receiver, RecvTimeoutError},
solana_ledger::{
crate::{
blockstore::{Blockstore, PurgeType},
blockstore_db::{Result as BlockstoreResult, DATA_SHRED_CF},
},
crossbeam_channel::{Receiver, RecvTimeoutError},
solana_measure::measure::Measure,
solana_sdk::clock::Slot,
std::{
@ -40,11 +40,11 @@ pub const DEFAULT_MIN_MAX_LEDGER_SHREDS: u64 = 50_000_000;
// and starve other blockstore users.
pub const DEFAULT_PURGE_SLOT_INTERVAL: u64 = 512;
pub struct LedgerCleanupService {
pub struct BlockstoreCleanupService {
t_cleanup: JoinHandle<()>,
}
impl LedgerCleanupService {
impl BlockstoreCleanupService {
pub fn new(
new_root_receiver: Receiver<Slot>,
blockstore: Arc<Blockstore>,
@ -54,12 +54,12 @@ impl LedgerCleanupService {
let mut last_purge_slot = 0;
info!(
"LedgerCleanupService active. max ledger shreds={}",
"BlockstoreCleanupService active. max ledger shreds={}",
max_ledger_shreds
);
let t_cleanup = Builder::new()
.name("solLedgerClean".to_string())
.name("solBstoreClean".to_string())
.spawn(move || loop {
if exit.load(Ordering::Relaxed) {
break;
@ -296,8 +296,8 @@ impl LedgerCleanupService {
mod tests {
use {
super::*,
crate::{blockstore::make_many_slot_entries, get_tmp_ledger_path_auto_delete},
crossbeam_channel::unbounded,
solana_ledger::{blockstore::make_many_slot_entries, get_tmp_ledger_path_auto_delete},
};
fn flush_blockstore_contents_to_disk(blockstore: Blockstore) -> Blockstore {
@ -314,7 +314,7 @@ mod tests {
#[test]
fn test_find_slots_to_clean() {
// LedgerCleanupService::find_slots_to_clean() does not modify the
// BlockstoreCleanupService::find_slots_to_clean() does not modify the
// Blockstore, so we can make repeated calls on the same slots
solana_logger::setup();
let ledger_path = get_tmp_ledger_path_auto_delete!();
@ -334,22 +334,31 @@ mod tests {
// Ensure no cleaning of slots > last_root
let last_root = 0;
let max_ledger_shreds = 0;
let (should_clean, lowest_purged, _) =
LedgerCleanupService::find_slots_to_clean(&blockstore, last_root, max_ledger_shreds);
let (should_clean, lowest_purged, _) = BlockstoreCleanupService::find_slots_to_clean(
&blockstore,
last_root,
max_ledger_shreds,
);
// Slot 0 will exist in blockstore with zero shreds since it is slot
// 1's parent. Thus, slot 0 will be identified for clean.
assert!(should_clean && lowest_purged == 0);
// Now, set max_ledger_shreds to 1, slot 0 still eligible for clean
let max_ledger_shreds = 1;
let (should_clean, lowest_purged, _) =
LedgerCleanupService::find_slots_to_clean(&blockstore, last_root, max_ledger_shreds);
let (should_clean, lowest_purged, _) = BlockstoreCleanupService::find_slots_to_clean(
&blockstore,
last_root,
max_ledger_shreds,
);
assert!(should_clean && lowest_purged == 0);
// Ensure no cleaning if blockstore contains fewer than max_ledger_shreds
let last_root = num_slots;
let max_ledger_shreds = (shreds_per_slot * num_slots) + 1;
let (should_clean, lowest_purged, _) =
LedgerCleanupService::find_slots_to_clean(&blockstore, last_root, max_ledger_shreds);
let (should_clean, lowest_purged, _) = BlockstoreCleanupService::find_slots_to_clean(
&blockstore,
last_root,
max_ledger_shreds,
);
assert!(!should_clean && lowest_purged == 0);
for slot in 1..=num_slots {
@ -357,7 +366,7 @@ mod tests {
let last_root = slot;
// Set max_ledger_shreds to 0 so that all eligible slots are cleaned
let max_ledger_shreds = 0;
let (should_clean, lowest_purged, _) = LedgerCleanupService::find_slots_to_clean(
let (should_clean, lowest_purged, _) = BlockstoreCleanupService::find_slots_to_clean(
&blockstore,
last_root,
max_ledger_shreds,
@ -369,7 +378,7 @@ mod tests {
// Set max_ledger_shreds to the number of shreds in slots > slot.
// This will make it so that slots [1, slot] are cleaned
let max_ledger_shreds = shreds_per_slot * (num_slots - slot);
let (should_clean, lowest_purged, _) = LedgerCleanupService::find_slots_to_clean(
let (should_clean, lowest_purged, _) = BlockstoreCleanupService::find_slots_to_clean(
&blockstore,
last_root,
max_ledger_shreds,
@ -393,8 +402,14 @@ mod tests {
//send a signal to kill all but 5 shreds, which will be in the newest slots
let mut last_purge_slot = 0;
sender.send(50).unwrap();
LedgerCleanupService::cleanup_ledger(&receiver, &blockstore, 5, &mut last_purge_slot, 10)
.unwrap();
BlockstoreCleanupService::cleanup_ledger(
&receiver,
&blockstore,
5,
&mut last_purge_slot,
10,
)
.unwrap();
assert_eq!(last_purge_slot, 50);
//check that 0-40 don't exist
@ -437,7 +452,7 @@ mod tests {
let mut time = Measure::start("purge time");
sender.send(slot + num_slots).unwrap();
LedgerCleanupService::cleanup_ledger(
BlockstoreCleanupService::cleanup_ledger(
&receiver,
&blockstore,
initial_slots,

View File

@ -1,7 +1,7 @@
//! The `ledger_metric_report_service` periodically reports ledger store metrics.
//! The `blockstore_metric_report_service` periodically reports ledger store metrics.
use {
solana_ledger::blockstore::Blockstore,
crate::blockstore::Blockstore,
std::{
string::ToString,
sync::{
@ -14,15 +14,15 @@ use {
};
// Determines how often we report blockstore metrics under
// LedgerMetricReportService. Note that there're other blockstore
// metrics that are reported outside LedgerMetricReportService.
// BlockstoreMetricReportService. Note that there are other blockstore
// metrics that are reported outside BlockstoreMetricReportService.
const BLOCKSTORE_METRICS_REPORT_PERIOD_MILLIS: u64 = 10000;
pub struct LedgerMetricReportService {
pub struct BlockstoreMetricReportService {
t_cf_metric: JoinHandle<()>,
}
impl LedgerMetricReportService {
impl BlockstoreMetricReportService {
pub fn new(blockstore: Arc<Blockstore>, exit: Arc<AtomicBool>) -> Self {
let t_cf_metric = Builder::new()
.name("solRocksCfMtrcs".to_string())

View File

@ -9,8 +9,10 @@ pub mod block_error;
#[macro_use]
pub mod blockstore;
pub mod ancestor_iterator;
pub mod blockstore_cleanup_service;
pub mod blockstore_db;
pub mod blockstore_meta;
pub mod blockstore_metric_report_service;
pub mod blockstore_metrics;
pub mod blockstore_options;
pub mod blockstore_processor;

View File

@ -4274,49 +4274,6 @@ fn test_leader_failure_4() {
);
}
#[test]
#[serial]
fn test_ledger_cleanup_service() {
solana_logger::setup_with_default(RUST_LOG_FILTER);
error!("test_ledger_cleanup_service");
let num_nodes = 3;
let validator_config = ValidatorConfig {
max_ledger_shreds: Some(100),
..ValidatorConfig::default_for_test()
};
let mut config = ClusterConfig {
cluster_lamports: DEFAULT_CLUSTER_LAMPORTS,
poh_config: PohConfig::new_sleep(Duration::from_millis(50)),
node_stakes: vec![DEFAULT_NODE_STAKE; num_nodes],
validator_configs: make_identical_validator_configs(&validator_config, num_nodes),
..ClusterConfig::default()
};
let mut cluster = LocalCluster::new(&mut config, SocketAddrSpace::Unspecified);
// 200ms/per * 100 = 20 seconds, so sleep a little longer than that.
sleep(Duration::from_secs(60));
cluster_tests::spend_and_verify_all_nodes(
&cluster.entry_point_info,
&cluster.funding_keypair,
num_nodes,
HashSet::new(),
SocketAddrSpace::Unspecified,
&cluster.connection_cache,
);
cluster.close_preserve_ledgers();
//check everyone's ledgers and make sure only ~100 slots are stored
for info in cluster.validators.values() {
let mut slots = 0;
let blockstore = Blockstore::open(&info.info.ledger_path).unwrap();
blockstore
.slot_meta_iterator(0)
.unwrap()
.for_each(|_| slots += 1);
// with 3 nodes up to 3 slots can be in progress and not complete so max slots in blockstore should be up to 103
assert!(slots <= 103, "got {slots}");
}
}
// This test verifies that even if votes from a validator end up taking too long to land, and thus
// some of the referenced slots are slots are no longer present in the slot hashes sysvar,
// consensus can still be attained.

View File

@ -22,7 +22,6 @@ use {
solana_core::{
banking_trace::DISABLED_BAKING_TRACE_DIR,
consensus::tower_storage,
ledger_cleanup_service::{DEFAULT_MAX_LEDGER_SHREDS, DEFAULT_MIN_MAX_LEDGER_SHREDS},
system_monitor_service::SystemMonitorService,
tpu::DEFAULT_TPU_COALESCE,
validator::{
@ -32,6 +31,7 @@ use {
},
solana_gossip::{cluster_info::Node, legacy_contact_info::LegacyContactInfo as ContactInfo},
solana_ledger::{
blockstore_cleanup_service::{DEFAULT_MAX_LEDGER_SHREDS, DEFAULT_MIN_MAX_LEDGER_SHREDS},
blockstore_options::{
BlockstoreCompressionType, BlockstoreRecoveryMode, LedgerColumnOptions,
ShredStorageType,