From 5e424826ba52e643bbd8e761b7bee11f699eb46c Mon Sep 17 00:00:00 2001 From: Tao Zhu <82401714+taozhu-chicago@users.noreply.github.com> Date: Thu, 1 Jul 2021 11:32:41 -0500 Subject: [PATCH] Persist cost table to blockstore (#18123) * Add `ProgramCosts` Column Family to blockstore, implement LedgerColumn; add `delete_cf` to Rocks * Add ProgramCosts to compaction excluding list alone side with TransactionStatusIndex in one place: `excludes_from_compaction()` * Write cost table to blockstore after `replay_stage` replayed active banks; add stats to measure persist time * Deletes program from `ProgramCosts` in blockstore when they are removed from cost_table in memory * Only try to persist to blockstore when cost_table is changed. * Restore cost table during validator startup * Offload `cost_model` related operations from replay main thread to dedicated service thread, add channel to send execute_timings between these threads; * Move `cost_update_service` to its own module; replay_stage is now decoupled from cost_model. --- core/src/cost_model.rs | 11 +- core/src/cost_update_service.rs | 275 ++++++++++++++++++++++++++++++++ core/src/lib.rs | 1 + core/src/replay_stage.rs | 145 ++--------------- core/src/tvu.rs | 20 ++- core/src/validator.rs | 26 +++ ledger/src/blockstore.rs | 145 +++++++++++++++++ ledger/src/blockstore_db.rs | 96 +++++++++-- ledger/src/blockstore_meta.rs | 5 + 9 files changed, 575 insertions(+), 149 deletions(-) create mode 100644 core/src/cost_update_service.rs diff --git a/core/src/cost_model.rs b/core/src/cost_model.rs index b7ecd6ae07..149045f9ae 100644 --- a/core/src/cost_model.rs +++ b/core/src/cost_model.rs @@ -12,11 +12,14 @@ use log::*; use solana_sdk::{message::Message, pubkey::Pubkey, transaction::Transaction}; use std::collections::HashMap; -// Guestimated from mainnet-beta data, sigver averages 1us, read averages 7us and write avergae 25us -const SIGNED_WRITABLE_ACCOUNT_ACCESS_COST: u64 = 1 + 25; -const SIGNED_READONLY_ACCOUNT_ACCESS_COST: u64 = 1 + 7; -const NON_SIGNED_WRITABLE_ACCOUNT_ACCESS_COST: u64 = 25; +// Guestimated from mainnet-beta data, sigver averages 1us, average read 7us and average write 25us +const SIGVER_COST: u64 = 1; const NON_SIGNED_READONLY_ACCOUNT_ACCESS_COST: u64 = 7; +const NON_SIGNED_WRITABLE_ACCOUNT_ACCESS_COST: u64 = 25; +const SIGNED_READONLY_ACCOUNT_ACCESS_COST: u64 = + SIGVER_COST + NON_SIGNED_READONLY_ACCOUNT_ACCESS_COST; +const SIGNED_WRITABLE_ACCOUNT_ACCESS_COST: u64 = + SIGVER_COST + NON_SIGNED_WRITABLE_ACCOUNT_ACCESS_COST; // Sampled from mainnet-beta, the instruction execution timings stats are (in us): // min=194, max=62164, avg=8214.49, med=2243 diff --git a/core/src/cost_update_service.rs b/core/src/cost_update_service.rs new file mode 100644 index 0000000000..f3e4253540 --- /dev/null +++ b/core/src/cost_update_service.rs @@ -0,0 +1,275 @@ +//! this service receives instruction ExecuteTimings from replay_stage, +//! update cost_model which is shared with banking_stage to optimize +//! packing transactions into block; it also triggers persisting cost +//! table to blockstore. + +use crate::cost_model::CostModel; +use solana_ledger::blockstore::Blockstore; +use solana_measure::measure::Measure; +use solana_runtime::bank::ExecuteTimings; +use solana_sdk::timing::timestamp; +use std::{ + sync::{ + atomic::{AtomicBool, Ordering}, + mpsc::Receiver, + Arc, RwLock, + }, + thread::{self, Builder, JoinHandle}, + time::Duration, +}; + +#[derive(Default)] +pub struct CostUpdateServiceTiming { + last_print: u64, + update_cost_model_count: u64, + update_cost_model_elapsed: u64, + persist_cost_table_elapsed: u64, +} + +impl CostUpdateServiceTiming { + fn update( + &mut self, + update_cost_model_count: u64, + update_cost_model_elapsed: u64, + persist_cost_table_elapsed: u64, + ) { + self.update_cost_model_count += update_cost_model_count; + self.update_cost_model_elapsed += update_cost_model_elapsed; + self.persist_cost_table_elapsed += persist_cost_table_elapsed; + + let now = timestamp(); + let elapsed_ms = now - self.last_print; + if elapsed_ms > 1000 { + datapoint_info!( + "replay-service-timing-stats", + ("total_elapsed_us", elapsed_ms * 1000, i64), + ( + "update_cost_model_count", + self.update_cost_model_count as i64, + i64 + ), + ( + "update_cost_model_elapsed", + self.update_cost_model_elapsed as i64, + i64 + ), + ( + "persist_cost_table_elapsed", + self.persist_cost_table_elapsed as i64, + i64 + ), + ); + + *self = CostUpdateServiceTiming::default(); + self.last_print = now; + } + } +} + +pub type CostUpdateReceiver = Receiver; + +pub struct CostUpdateService { + thread_hdl: JoinHandle<()>, +} + +impl CostUpdateService { + #[allow(clippy::new_ret_no_self)] + pub fn new( + exit: Arc, + blockstore: Arc, + cost_model: Arc>, + cost_update_receiver: CostUpdateReceiver, + ) -> Self { + let thread_hdl = Builder::new() + .name("solana-cost-update-service".to_string()) + .spawn(move || { + Self::service_loop(exit, blockstore, cost_model, cost_update_receiver); + }) + .unwrap(); + + Self { thread_hdl } + } + + pub fn join(self) -> thread::Result<()> { + self.thread_hdl.join() + } + + fn service_loop( + exit: Arc, + blockstore: Arc, + cost_model: Arc>, + cost_update_receiver: CostUpdateReceiver, + ) { + let mut cost_update_service_timing = CostUpdateServiceTiming::default(); + let mut dirty = false; + let wait_timer = Duration::from_millis(100); + + loop { + if exit.load(Ordering::Relaxed) { + break; + } + + let mut update_count = 0_u64; + let mut update_cost_model_time = Measure::start("update_cost_model_time"); + for cost_update in cost_update_receiver.try_iter() { + dirty |= Self::update_cost_model(&cost_model, &cost_update); + update_count += 1; + } + update_cost_model_time.stop(); + + let mut persist_cost_table_time = Measure::start("persist_cost_table_time"); + if dirty { + Self::persist_cost_table(&blockstore, &cost_model); + } + persist_cost_table_time.stop(); + + cost_update_service_timing.update( + update_count, + update_cost_model_time.as_us(), + persist_cost_table_time.as_us(), + ); + + thread::sleep(wait_timer); + } + } + + fn update_cost_model(cost_model: &RwLock, execute_timings: &ExecuteTimings) -> bool { + let mut dirty = false; + let mut cost_model_mutable = cost_model.write().unwrap(); + for (program_id, stats) in &execute_timings.details.per_program_timings { + let cost = stats.0 / stats.1 as u64; + match cost_model_mutable.upsert_instruction_cost(program_id, &cost) { + Ok(c) => { + debug!( + "after replayed into bank, instruction {:?} has averaged cost {}", + program_id, c + ); + dirty = true; + } + Err(err) => { + debug!( + "after replayed into bank, instruction {:?} failed to update cost, err: {}", + program_id, err + ); + } + } + } + drop(cost_model_mutable); + debug!( + "after replayed into bank, updated cost model instruction cost table, current values: {:?}", + cost_model.read().unwrap().get_instruction_cost_table() + ); + dirty + } + + fn persist_cost_table(blockstore: &Blockstore, cost_model: &RwLock) { + let cost_model_read = cost_model.read().unwrap(); + let cost_table = cost_model_read.get_instruction_cost_table(); + let db_records = blockstore.read_program_costs().expect("read programs"); + + // delete records from blockstore if they are no longer in cost_table + db_records.iter().for_each(|(pubkey, _)| { + if cost_table.get(pubkey).is_none() { + blockstore + .delete_program_cost(pubkey) + .expect("delete old program"); + } + }); + + for (key, cost) in cost_table.iter() { + blockstore + .write_program_cost(key, cost) + .expect("persist program costs to blockstore"); + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use solana_sdk::pubkey::Pubkey; + + #[test] + fn test_update_cost_model_with_empty_execute_timings() { + let cost_model = Arc::new(RwLock::new(CostModel::default())); + let empty_execute_timings = ExecuteTimings::default(); + CostUpdateService::update_cost_model(&cost_model, &empty_execute_timings); + + assert_eq!( + 0, + cost_model + .read() + .unwrap() + .get_instruction_cost_table() + .len() + ); + } + + #[test] + fn test_update_cost_model_with_execute_timings() { + let cost_model = Arc::new(RwLock::new(CostModel::default())); + let mut execute_timings = ExecuteTimings::default(); + + let program_key_1 = Pubkey::new_unique(); + let mut expected_cost: u64; + + // add new program + { + let accumulated_us: u64 = 1000; + let count: u32 = 10; + expected_cost = accumulated_us / count as u64; + + execute_timings + .details + .per_program_timings + .insert(program_key_1, (accumulated_us, count)); + CostUpdateService::update_cost_model(&cost_model, &execute_timings); + assert_eq!( + 1, + cost_model + .read() + .unwrap() + .get_instruction_cost_table() + .len() + ); + assert_eq!( + Some(&expected_cost), + cost_model + .read() + .unwrap() + .get_instruction_cost_table() + .get(&program_key_1) + ); + } + + // update program + { + let accumulated_us: u64 = 2000; + let count: u32 = 10; + // to expect new cost is Average(new_value, existing_value) + expected_cost = ((accumulated_us / count as u64) + expected_cost) / 2; + + execute_timings + .details + .per_program_timings + .insert(program_key_1, (accumulated_us, count)); + CostUpdateService::update_cost_model(&cost_model, &execute_timings); + assert_eq!( + 1, + cost_model + .read() + .unwrap() + .get_instruction_cost_table() + .len() + ); + assert_eq!( + Some(&expected_cost), + cost_model + .read() + .unwrap() + .get_instruction_cost_table() + .get(&program_key_1) + ); + } + } +} diff --git a/core/src/lib.rs b/core/src/lib.rs index d56ccb671f..8c93ba8c35 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -20,6 +20,7 @@ pub mod completed_data_sets_service; pub mod consensus; pub mod cost_model; pub mod cost_tracker; +pub mod cost_update_service; pub mod execute_cost_table; pub mod fetch_stage; pub mod fork_choice; diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index cfe2699b5e..bb3549df28 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -13,13 +13,11 @@ use crate::{ consensus::{ ComputedBankState, Stake, SwitchForkDecision, Tower, VotedStakes, SWITCH_FORK_THRESHOLD, }, - cost_model::CostModel, fork_choice::{ForkChoice, SelectVoteAndResetForkResult}, heaviest_subtree_fork_choice::HeaviestSubtreeForkChoice, latest_validator_votes_for_frozen_banks::LatestValidatorVotesForFrozenBanks, progress_map::{ForkProgress, ProgressMap, PropagatedStats}, repair_service::DuplicateSlotsResetReceiver, - result::Result, rewards_recorder_service::RewardsRecorderSender, unfrozen_gossip_verified_vote_hashes::UnfrozenGossipVerifiedVoteHashes, window_service::DuplicateSlotReceiver, @@ -276,7 +274,7 @@ impl ReplayTiming { "process_duplicate_slots_elapsed", self.process_duplicate_slots_elapsed as i64, i64 - ) + ), ); *self = ReplayTiming::default(); @@ -286,7 +284,7 @@ impl ReplayTiming { } pub struct ReplayStage { - t_replay: JoinHandle>, + t_replay: JoinHandle<()>, commitment_service: AggregateCommitmentService, } @@ -309,7 +307,7 @@ impl ReplayStage { gossip_duplicate_confirmed_slots_receiver: GossipDuplicateConfirmedSlotsReceiver, gossip_verified_vote_hash_receiver: GossipVerifiedVoteHashReceiver, cluster_slots_update_sender: ClusterSlotsUpdateSender, - cost_model: Arc>, + cost_update_sender: Sender, ) -> Self { let ReplayStageConfig { vote_account, @@ -406,7 +404,7 @@ impl ReplayStage { &mut unfrozen_gossip_verified_vote_hashes, &mut latest_validator_votes_for_frozen_banks, &cluster_slots_update_sender, - &cost_model, + &cost_update_sender, ); replay_active_banks_time.stop(); @@ -734,7 +732,6 @@ impl ReplayStage { process_duplicate_slots_time.as_us(), ); } - Ok(()) }) .unwrap(); @@ -1668,10 +1665,11 @@ impl ReplayStage { unfrozen_gossip_verified_vote_hashes: &mut UnfrozenGossipVerifiedVoteHashes, latest_validator_votes_for_frozen_banks: &mut LatestValidatorVotesForFrozenBanks, cluster_slots_update_sender: &ClusterSlotsUpdateSender, - cost_model: &RwLock, + cost_update_sender: &Sender, ) -> bool { let mut did_complete_bank = false; let mut tx_count = 0; + let mut execute_timings = ExecuteTimings::default(); let active_banks = bank_forks.read().unwrap().active_banks(); trace!("active banks {:?}", active_banks); @@ -1719,7 +1717,7 @@ impl ReplayStage { replay_vote_sender, verify_recyclers, ); - Self::update_cost_model(cost_model, &bank_progress.replay_stats.execute_timings); + execute_timings.accumulate(&bank_progress.replay_stats.execute_timings); match replay_result { Ok(replay_tx_count) => tx_count += replay_tx_count, Err(err) => { @@ -1804,6 +1802,12 @@ impl ReplayStage { ); } } + + // send accumulated excute-timings to cost_update_service + cost_update_sender + .send(execute_timings) + .expect("send execution cost update to cost_model"); + inc_new_counter_info!("replay_stage-replay_transactions", tx_count); did_complete_bank } @@ -1910,44 +1914,6 @@ impl ReplayStage { new_stats } - fn update_cost_model(cost_model: &RwLock, execute_timings: &ExecuteTimings) { - let mut update_cost_model_time = Measure::start("update_cost_model_time"); - let mut cost_model_mutable = cost_model.write().unwrap(); - for (program_id, stats) in &execute_timings.details.per_program_timings { - let cost = stats.0 / stats.1 as u64; - match cost_model_mutable.upsert_instruction_cost(program_id, &cost) { - Ok(c) => { - debug!( - "after replayed into bank, instruction {:?} has averaged cost {}", - program_id, c - ); - } - Err(err) => { - debug!( - "after replayed into bank, instruction {:?} failed to update cost, err: {}", - program_id, err - ); - } - } - } - drop(cost_model_mutable); - debug!( - "after replayed into bank, updated cost model instruction cost table, current values: {:?}", - cost_model.read().unwrap().get_instruction_cost_table() - ); - update_cost_model_time.stop(); - - inc_new_counter_info!("replay_stage-update_cost_model", 1); - datapoint_info!( - "replay-loop-timing-stats", - ( - "update_cost_model_elapsed", - update_cost_model_time.as_us() as i64, - i64 - ) - ); - } - fn update_propagation_status( progress: &mut ProgressMap, slot: Slot, @@ -4911,91 +4877,6 @@ mod tests { ); assert_eq!(tower.last_voted_slot().unwrap(), 1); } - - #[test] - fn test_update_cost_model_with_empty_execute_timings() { - let cost_model = Arc::new(RwLock::new(CostModel::default())); - let empty_execute_timings = ExecuteTimings::default(); - ReplayStage::update_cost_model(&cost_model, &empty_execute_timings); - - assert_eq!( - 0, - cost_model - .read() - .unwrap() - .get_instruction_cost_table() - .len() - ); - } - - #[test] - fn test_update_cost_model_with_execute_timings() { - let cost_model = Arc::new(RwLock::new(CostModel::default())); - let mut execute_timings = ExecuteTimings::default(); - - let program_key_1 = Pubkey::new_unique(); - let mut expected_cost: u64; - - // add new program - { - let accumulated_us: u64 = 1000; - let count: u32 = 10; - expected_cost = accumulated_us / count as u64; - - execute_timings - .details - .per_program_timings - .insert(program_key_1, (accumulated_us, count)); - ReplayStage::update_cost_model(&cost_model, &execute_timings); - assert_eq!( - 1, - cost_model - .read() - .unwrap() - .get_instruction_cost_table() - .len() - ); - assert_eq!( - Some(&expected_cost), - cost_model - .read() - .unwrap() - .get_instruction_cost_table() - .get(&program_key_1) - ); - } - - // update program - { - let accumulated_us: u64 = 2000; - let count: u32 = 10; - // to expect new cost is Average(new_value, existing_value) - expected_cost = ((accumulated_us / count as u64) + expected_cost) / 2; - - execute_timings - .details - .per_program_timings - .insert(program_key_1, (accumulated_us, count)); - ReplayStage::update_cost_model(&cost_model, &execute_timings); - assert_eq!( - 1, - cost_model - .read() - .unwrap() - .get_instruction_cost_table() - .len() - ); - assert_eq!( - Some(&expected_cost), - cost_model - .read() - .unwrap() - .get_instruction_cost_table() - .get(&program_key_1) - ); - } - } - fn run_compute_and_select_forks( bank_forks: &RwLock, progress: &mut ProgressMap, diff --git a/core/src/tvu.rs b/core/src/tvu.rs index 5329d7dbfe..d59201c1fc 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -13,6 +13,7 @@ use crate::{ completed_data_sets_service::CompletedDataSetsSender, consensus::Tower, cost_model::CostModel, + cost_update_service::CostUpdateService, ledger_cleanup_service::LedgerCleanupService, replay_stage::{ReplayStage, ReplayStageConfig}, retransmit_stage::RetransmitStage, @@ -38,6 +39,7 @@ use solana_runtime::{ AbsRequestHandler, AbsRequestSender, AccountsBackgroundService, SnapshotRequestHandler, }, accounts_db::AccountShrinkThreshold, + bank::ExecuteTimings, bank_forks::BankForks, commitment::BlockCommitmentCache, snapshot_config::SnapshotConfig, @@ -50,7 +52,7 @@ use std::{ net::UdpSocket, sync::{ atomic::AtomicBool, - mpsc::{channel, Receiver}, + mpsc::{channel, Receiver, Sender}, Arc, Mutex, RwLock, }, thread, @@ -64,6 +66,7 @@ pub struct Tvu { ledger_cleanup_service: Option, accounts_background_service: AccountsBackgroundService, accounts_hash_verifier: AccountsHashVerifier, + cost_update_service: CostUpdateService, } pub struct Sockets { @@ -270,6 +273,17 @@ impl Tvu { wait_for_vote_to_start_leader: tvu_config.wait_for_vote_to_start_leader, }; + let (cost_update_sender, cost_update_receiver): ( + Sender, + Receiver, + ) = channel(); + let cost_update_service = CostUpdateService::new( + exit.clone(), + blockstore.clone(), + cost_model.clone(), + cost_update_receiver, + ); + let replay_stage = ReplayStage::new( replay_stage_config, blockstore.clone(), @@ -287,7 +301,7 @@ impl Tvu { gossip_confirmed_slots_receiver, gossip_verified_vote_hash_receiver, cluster_slots_update_sender, - cost_model.clone(), + cost_update_sender, ); let ledger_cleanup_service = tvu_config.max_ledger_shreds.map(|max_ledger_shreds| { @@ -318,6 +332,7 @@ impl Tvu { ledger_cleanup_service, accounts_background_service, accounts_hash_verifier, + cost_update_service, } } @@ -331,6 +346,7 @@ impl Tvu { self.accounts_background_service.join()?; self.replay_stage.join()?; self.accounts_hash_verifier.join()?; + self.cost_update_service.join()?; Ok(()) } } diff --git a/core/src/validator.rs b/core/src/validator.rs index 9a9cd52241..62eaf46616 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -662,6 +662,7 @@ impl Validator { ACCOUNT_MAX_COST, BLOCK_MAX_COST, ))); + Self::initiate_cost_model(&cost_model, &blockstore.read_program_costs().unwrap()); let (retransmit_slots_sender, retransmit_slots_receiver) = unbounded(); let (verified_vote_sender, verified_vote_receiver) = unbounded(); @@ -892,6 +893,31 @@ impl Validator { ip_echo_server.shutdown_background(); } } + + fn initiate_cost_model(cost_model: &RwLock, cost_table: &[(Pubkey, u64)]) { + let mut cost_model_mutable = cost_model.write().unwrap(); + for (program_id, cost) in cost_table { + match cost_model_mutable.upsert_instruction_cost(program_id, cost) { + Ok(c) => { + debug!( + "initiating cost table, instruction {:?} has cost {}", + program_id, c + ); + } + Err(err) => { + debug!( + "initiating cost table, failed for instruction {:?}, err: {}", + program_id, err + ); + } + } + } + drop(cost_model_mutable); + debug!( + "restored cost model instruction cost table from blockstore, current values: {:?}", + cost_model.read().unwrap().get_instruction_cost_table() + ); + } } fn active_vote_account_exists_in_bank(bank: &Arc, vote_account: &Pubkey) -> bool { diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index dd1a4bd43f..c1b712ba7a 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -143,6 +143,7 @@ pub struct Blockstore { blocktime_cf: LedgerColumn, perf_samples_cf: LedgerColumn, block_height_cf: LedgerColumn, + program_costs_cf: LedgerColumn, last_root: Arc>, insert_shreds_lock: Arc>, pub new_shreds_signals: Vec>, @@ -342,6 +343,7 @@ impl Blockstore { let blocktime_cf = db.column(); let perf_samples_cf = db.column(); let block_height_cf = db.column(); + let program_costs_cf = db.column(); let db = Arc::new(db); @@ -390,6 +392,7 @@ impl Blockstore { blocktime_cf, perf_samples_cf, block_height_cf, + program_costs_cf, new_shreds_signals: vec![], completed_slots_senders: vec![], insert_shreds_lock: Arc::new(Mutex::new(())), @@ -2686,6 +2689,26 @@ impl Blockstore { self.perf_samples_cf.put(index, perf_sample) } + pub fn read_program_costs(&self) -> Result> { + Ok(self + .db + .iter::(IteratorMode::End)? + .map(|(pubkey, data)| { + let program_cost: ProgramCost = deserialize(&data).unwrap(); + (pubkey, program_cost.cost) + }) + .collect()) + } + + pub fn write_program_cost(&self, key: &Pubkey, value: &u64) -> Result<()> { + self.program_costs_cf + .put(*key, &ProgramCost { cost: *value }) + } + + pub fn delete_program_cost(&self, key: &Pubkey) -> Result<()> { + self.program_costs_cf.delete(*key) + } + /// Returns the entry vector for the slot starting with `shred_start_index` pub fn get_slot_entries(&self, slot: Slot, shred_start_index: u64) -> Result> { self.get_slot_entries_with_shred_info(slot, shred_start_index, false) @@ -8850,4 +8873,126 @@ pub mod tests { Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction"); } + + #[test] + fn test_read_write_cost_table() { + let blockstore_path = get_tmp_ledger_path!(); + { + let blockstore = Blockstore::open(&blockstore_path).unwrap(); + let num_entries: usize = 10; + let mut cost_table: HashMap = HashMap::new(); + for x in 1..num_entries + 1 { + cost_table.insert(Pubkey::new_unique(), (x + 100) as u64); + } + + // write to db + for (key, cost) in cost_table.iter() { + blockstore + .write_program_cost(key, cost) + .expect("write a program"); + } + + // read back from db + let read_back = blockstore.read_program_costs().expect("read programs"); + // verify + assert_eq!(read_back.len(), cost_table.len()); + for (read_key, read_cost) in read_back { + assert_eq!(read_cost, *cost_table.get(&read_key).unwrap()); + } + + // update value, write to db + for val in cost_table.values_mut() { + *val += 100; + } + for (key, cost) in cost_table.iter() { + blockstore + .write_program_cost(key, cost) + .expect("write a program"); + } + // add a new record + let new_program_key = Pubkey::new_unique(); + let new_program_cost = 999; + blockstore + .write_program_cost(&new_program_key, &new_program_cost) + .unwrap(); + + // confirm value updated + let read_back = blockstore.read_program_costs().expect("read programs"); + // verify + assert_eq!(read_back.len(), cost_table.len() + 1); + for (key, cost) in cost_table.iter() { + assert_eq!(*cost, read_back.iter().find(|(k, _v)| k == key).unwrap().1); + } + assert_eq!( + new_program_cost, + read_back + .iter() + .find(|(k, _v)| *k == new_program_key) + .unwrap() + .1 + ); + + // test delete + blockstore + .delete_program_cost(&new_program_key) + .expect("delete a progrma"); + let read_back = blockstore.read_program_costs().expect("read programs"); + // verify + assert_eq!(read_back.len(), cost_table.len()); + for (read_key, read_cost) in read_back { + assert_eq!(read_cost, *cost_table.get(&read_key).unwrap()); + } + } + Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction"); + } + + #[test] + fn test_delete_old_records_from_cost_table() { + let blockstore_path = get_tmp_ledger_path!(); + { + let blockstore = Blockstore::open(&blockstore_path).unwrap(); + let num_entries: usize = 10; + let mut cost_table: HashMap = HashMap::new(); + for x in 1..num_entries + 1 { + cost_table.insert(Pubkey::new_unique(), (x + 100) as u64); + } + + // write to db + for (key, cost) in cost_table.iter() { + blockstore + .write_program_cost(key, cost) + .expect("write a program"); + } + + // remove a record + let mut removed_key = Pubkey::new_unique(); + for (key, cost) in cost_table.iter() { + if *cost == 101_u64 { + removed_key = *key; + break; + } + } + cost_table.remove(&removed_key); + + // delete records from blockstore if they are no longer in cost_table + let db_records = blockstore.read_program_costs().expect("read programs"); + db_records.iter().for_each(|(pubkey, _)| { + if !cost_table.iter().any(|(key, _)| key == pubkey) { + assert_eq!(*pubkey, removed_key); + blockstore + .delete_program_cost(pubkey) + .expect("delete old program"); + } + }); + + // read back from db + let read_back = blockstore.read_program_costs().expect("read programs"); + // verify + assert_eq!(read_back.len(), cost_table.len()); + for (read_key, read_cost) in read_back { + assert_eq!(read_cost, *cost_table.get(&read_key).unwrap()); + } + } + Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction"); + } } diff --git a/ledger/src/blockstore_db.rs b/ledger/src/blockstore_db.rs index f197aebe33..3131b606be 100644 --- a/ledger/src/blockstore_db.rs +++ b/ledger/src/blockstore_db.rs @@ -22,7 +22,7 @@ use solana_sdk::{ }; use solana_storage_proto::convert::generated; use std::{ - collections::HashMap, + collections::{HashMap, HashSet}, ffi::{CStr, CString}, fs, marker::PhantomData, @@ -71,6 +71,8 @@ const BLOCKTIME_CF: &str = "blocktime"; const PERF_SAMPLES_CF: &str = "perf_samples"; /// Column family for BlockHeight const BLOCK_HEIGHT_CF: &str = "block_height"; +/// Column family for ProgramCosts +const PROGRAM_COSTS_CF: &str = "program_costs"; // 1 day is chosen for the same reasoning of DEFAULT_COMPACTION_SLOT_INTERVAL const PERIODIC_COMPACTION_SECONDS: u64 = 60 * 60 * 24; @@ -174,6 +176,10 @@ pub mod columns { #[derive(Debug)] /// The block height column pub struct BlockHeight; + + #[derive(Debug)] + // The program costs column + pub struct ProgramCosts; } pub enum AccessType { @@ -258,8 +264,8 @@ impl Rocks { ) -> Result { use columns::{ AddressSignatures, BlockHeight, Blocktime, DeadSlots, DuplicateSlots, ErasureMeta, - Index, Orphans, PerfSamples, Rewards, Root, ShredCode, ShredData, SlotMeta, - TransactionStatus, TransactionStatusIndex, + Index, Orphans, PerfSamples, ProgramCosts, Rewards, Root, ShredCode, ShredData, + SlotMeta, TransactionStatus, TransactionStatusIndex, }; fs::create_dir_all(&path)?; @@ -340,6 +346,10 @@ impl Rocks { BlockHeight::NAME, get_cf_options::(&access_type, &oldest_slot), ); + let program_costs_cf_descriptor = ColumnFamilyDescriptor::new( + ProgramCosts::NAME, + get_cf_options::(&access_type, &oldest_slot), + ); // Don't forget to add to both run_purge_with_stats() and // compact_storage() in ledger/src/blockstore/blockstore_purge.rs!! @@ -363,6 +373,7 @@ impl Rocks { (Blocktime::NAME, blocktime_cf_descriptor), (PerfSamples::NAME, perf_samples_cf_descriptor), (BlockHeight::NAME, block_height_cf_descriptor), + (ProgramCosts::NAME, program_costs_cf_descriptor), ]; let cf_names: Vec<_> = cfs.iter().map(|c| c.0).collect(); @@ -403,9 +414,9 @@ impl Rocks { // this is only needed for LedgerCleanupService. so guard with PrimaryOnly (i.e. running solana-validator) if matches!(access_type, AccessType::PrimaryOnly) { for cf_name in cf_names { - // this special column family must be excluded from LedgerCleanupService's rocksdb + // these special column families must be excluded from LedgerCleanupService's rocksdb // compactions - if cf_name == TransactionStatusIndex::NAME { + if excludes_from_compaction(cf_name) { continue; } @@ -463,8 +474,8 @@ impl Rocks { fn columns(&self) -> Vec<&'static str> { use columns::{ AddressSignatures, BlockHeight, Blocktime, DeadSlots, DuplicateSlots, ErasureMeta, - Index, Orphans, PerfSamples, Rewards, Root, ShredCode, ShredData, SlotMeta, - TransactionStatus, TransactionStatusIndex, + Index, Orphans, PerfSamples, ProgramCosts, Rewards, Root, ShredCode, ShredData, + SlotMeta, TransactionStatus, TransactionStatusIndex, }; vec![ @@ -484,6 +495,7 @@ impl Rocks { Blocktime::NAME, PerfSamples::NAME, BlockHeight::NAME, + ProgramCosts::NAME, ] } @@ -509,6 +521,11 @@ impl Rocks { Ok(()) } + fn delete_cf(&self, cf: &ColumnFamily, key: &[u8]) -> Result<()> { + self.0.delete_cf(cf, key)?; + Ok(()) + } + fn iterator_cf(&self, cf: &ColumnFamily, iterator_mode: IteratorMode) -> DBIterator where C: Column, @@ -750,6 +767,39 @@ impl TypedColumn for columns::BlockHeight { type Type = u64; } +impl ColumnName for columns::ProgramCosts { + const NAME: &'static str = PROGRAM_COSTS_CF; +} +impl TypedColumn for columns::ProgramCosts { + type Type = blockstore_meta::ProgramCost; +} +impl Column for columns::ProgramCosts { + type Index = Pubkey; + + fn key(pubkey: Pubkey) -> Vec { + let mut key = vec![0; 32]; // size_of Pubkey + key[0..32].clone_from_slice(&pubkey.as_ref()[0..32]); + key + } + + fn index(key: &[u8]) -> Self::Index { + Pubkey::new(&key[0..32]) + } + + fn primary_index(_index: Self::Index) -> u64 { + unimplemented!() + } + + fn slot(_index: Self::Index) -> Slot { + unimplemented!() + } + + #[allow(clippy::wrong_self_convention)] + fn as_index(_index: u64) -> Self::Index { + Pubkey::default() + } +} + impl Column for columns::ShredCode { type Index = (u64, u64); @@ -1113,6 +1163,10 @@ where self.backend .put_cf(self.handle(), &C::key(key), &serialized_value) } + + pub fn delete(&self, key: C::Index) -> Result<()> { + self.backend.delete_cf(self.handle(), &C::key(key)) + } } impl LedgerColumn @@ -1260,11 +1314,9 @@ fn get_cf_options( options.set_max_bytes_for_level_base(total_size_base); options.set_target_file_size_base(file_size_base); - // TransactionStatusIndex must be excluded from LedgerCleanupService's rocksdb + // TransactionStatusIndex and ProgramCosts must be excluded from LedgerCleanupService's rocksdb // compactions.... - if matches!(access_type, AccessType::PrimaryOnly) - && C::NAME != columns::TransactionStatusIndex::NAME - { + if matches!(access_type, AccessType::PrimaryOnly) && !excludes_from_compaction(C::NAME) { options.set_compaction_filter_factory(PurgedSlotFilterFactory:: { oldest_slot: oldest_slot.clone(), name: CString::new(format!("purged_slot_filter_factory({})", C::NAME)).unwrap(), @@ -1304,6 +1356,18 @@ fn get_db_options(access_type: &AccessType) -> Options { options } +fn excludes_from_compaction(cf_name: &str) -> bool { + // list of Column Families must be excluded from compaction: + let no_compaction_cfs: HashSet<&'static str> = vec![ + columns::TransactionStatusIndex::NAME, + columns::ProgramCosts::NAME, + ] + .into_iter() + .collect(); + + no_compaction_cfs.get(cf_name).is_some() +} + #[cfg(test)] pub mod tests { use super::*; @@ -1356,4 +1420,14 @@ pub mod tests { CompactionDecision::Keep ); } + + #[test] + fn test_excludes_from_compaction() { + // currently there are two CFs are excluded from compaction: + assert!(excludes_from_compaction( + columns::TransactionStatusIndex::NAME + )); + assert!(excludes_from_compaction(columns::ProgramCosts::NAME)); + assert!(!excludes_from_compaction("something else")); + } } diff --git a/ledger/src/blockstore_meta.rs b/ledger/src/blockstore_meta.rs index 586a61adb5..42877df5e3 100644 --- a/ledger/src/blockstore_meta.rs +++ b/ledger/src/blockstore_meta.rs @@ -253,6 +253,11 @@ pub struct PerfSample { pub sample_period_secs: u16, } +#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq)] +pub struct ProgramCost { + pub cost: u64, +} + #[cfg(test)] mod test { use super::*;