diff --git a/core/src/cache_block_time_service.rs b/core/src/cache_block_time_service.rs new file mode 100644 index 000000000..07f5eced2 --- /dev/null +++ b/core/src/cache_block_time_service.rs @@ -0,0 +1,76 @@ +use crossbeam_channel::{Receiver, RecvTimeoutError, Sender}; +use solana_ledger::blockstore::Blockstore; +use solana_measure::measure::Measure; +use solana_runtime::bank::Bank; +use solana_sdk::timing::slot_duration_from_slots_per_year; +use std::{ + collections::HashMap, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, + thread::{self, Builder, JoinHandle}, + time::Duration, +}; + +pub type CacheBlockTimeReceiver = Receiver>; +pub type CacheBlockTimeSender = Sender>; + +pub struct CacheBlockTimeService { + thread_hdl: JoinHandle<()>, +} + +const CACHE_BLOCK_TIME_WARNING_MS: u64 = 150; + +impl CacheBlockTimeService { + #[allow(clippy::new_ret_no_self)] + pub fn new( + cache_block_time_receiver: CacheBlockTimeReceiver, + blockstore: Arc, + exit: &Arc, + ) -> Self { + let exit = exit.clone(); + let thread_hdl = Builder::new() + .name("solana-cache-block-time".to_string()) + .spawn(move || loop { + if exit.load(Ordering::Relaxed) { + break; + } + let recv_result = cache_block_time_receiver.recv_timeout(Duration::from_secs(1)); + match recv_result { + Err(RecvTimeoutError::Disconnected) => { + break; + } + Ok(bank) => { + let mut cache_block_time_timer = Measure::start("cache_block_time_timer"); + Self::cache_block_time(bank, &blockstore); + cache_block_time_timer.stop(); + if cache_block_time_timer.as_ms() > CACHE_BLOCK_TIME_WARNING_MS { + warn!( + "cache_block_time operation took: {}ms", + cache_block_time_timer.as_ms() + ); + } + } + _ => {} + } + }) + .unwrap(); + Self { thread_hdl } + } + + fn cache_block_time(bank: Arc, blockstore: &Arc) { + let slot_duration = slot_duration_from_slots_per_year(bank.slots_per_year()); + let epoch = bank.epoch_schedule().get_epoch(bank.slot()); + let stakes = HashMap::new(); + let stakes = bank.epoch_vote_accounts(epoch).unwrap_or(&stakes); + + if let Err(e) = blockstore.cache_block_time(bank.slot(), slot_duration, stakes) { + error!("cache_block_time failed: slot {:?} {:?}", bank.slot(), e); + } + } + + pub fn join(self) -> thread::Result<()> { + self.thread_hdl.join() + } +} diff --git a/core/src/lib.rs b/core/src/lib.rs index 03d621674..042b0ef74 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -11,6 +11,7 @@ pub mod accounts_hash_verifier; pub mod banking_stage; pub mod bigtable_upload_service; pub mod broadcast_stage; +pub mod cache_block_time_service; pub mod cluster_info_vote_listener; pub mod commitment_service; pub mod completed_data_sets_service; diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 9277720ea..67e8f1655 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -3,6 +3,7 @@ use crate::{ bank_weight_fork_choice::BankWeightForkChoice, broadcast_stage::RetransmitSlotsSender, + cache_block_time_service::CacheBlockTimeSender, cluster_info::ClusterInfo, cluster_info_vote_listener::VoteTracker, cluster_slots::ClusterSlots, @@ -106,6 +107,7 @@ pub struct ReplayStageConfig { pub block_commitment_cache: Arc>, pub transaction_status_sender: Option, pub rewards_recorder_sender: Option, + pub cache_block_time_sender: Option, } #[derive(Default)] @@ -235,6 +237,7 @@ impl ReplayStage { block_commitment_cache, transaction_status_sender, rewards_recorder_sender, + cache_block_time_sender, } = config; trace!("replay stage"); @@ -494,6 +497,7 @@ impl ReplayStage { &subscriptions, &block_commitment_cache, &mut heaviest_subtree_fork_choice, + &cache_block_time_sender, )?; }; voting_time.stop(); @@ -1004,6 +1008,7 @@ impl ReplayStage { subscriptions: &Arc, block_commitment_cache: &Arc>, heaviest_subtree_fork_choice: &mut HeaviestSubtreeForkChoice, + cache_block_time_sender: &Option, ) -> Result<()> { if bank.is_empty() { inc_new_counter_info!("replay_stage-voted_empty_bank", 1); @@ -1029,6 +1034,12 @@ impl ReplayStage { blockstore .set_roots(&rooted_slots) .expect("Ledger set roots failed"); + Self::cache_block_times( + blockstore, + bank_forks, + &rooted_slots, + cache_block_time_sender, + ); let highest_confirmed_root = Some( block_commitment_cache .read() @@ -1855,6 +1866,36 @@ impl ReplayStage { } } + fn cache_block_times( + blockstore: &Arc, + bank_forks: &Arc>, + rooted_slots: &[Slot], + cache_block_time_sender: &Option, + ) { + if let Some(cache_block_time_sender) = cache_block_time_sender { + for slot in rooted_slots { + if blockstore + .get_block_time(*slot) + .unwrap_or_default() + .is_none() + { + if let Some(rooted_bank) = bank_forks.read().unwrap().get(*slot) { + cache_block_time_sender + .send(rooted_bank.clone()) + .unwrap_or_else(|err| { + warn!("cache_block_time_sender failed: {:?}", err) + }); + } else { + error!( + "rooted_bank {:?} not available in BankForks; block time not cached", + slot + ); + } + } + } + } + } + pub fn get_unlock_switch_vote_slot(cluster_type: ClusterType) -> Slot { match cluster_type { ClusterType::Development => 0, diff --git a/core/src/rpc.rs b/core/src/rpc.rs index a191ba036..3c1bf0334 100644 --- a/core/src/rpc.rs +++ b/core/src/rpc.rs @@ -54,7 +54,6 @@ use solana_sdk::{ stake_history::StakeHistory, system_instruction, sysvar::{stake_history, Sysvar}, - timing::slot_duration_from_slots_per_year, transaction::{self, Transaction}, }; use solana_stake_program::stake_state::StakeState; @@ -686,18 +685,7 @@ impl JsonRpcRequestProcessor { .unwrap() .highest_confirmed_root() { - // This calculation currently assumes that bank.slots_per_year will remain unchanged after - // genesis (ie. that this bank's slot_per_year will be applicable to any rooted slot being - // queried). If these values will be variable in the future, those timing parameters will - // need to be stored persistently, and the slot_duration calculation will likely need to be - // moved upstream into blockstore. Also, an explicit commitment level will need to be set. - let bank = self.bank(None); - let slot_duration = slot_duration_from_slots_per_year(bank.slots_per_year()); - let epoch = bank.epoch_schedule().get_epoch(slot); - let stakes = HashMap::new(); - let stakes = bank.epoch_vote_accounts(epoch).unwrap_or(&stakes); - - let result = self.blockstore.get_block_time(slot, slot_duration, stakes); + let result = self.blockstore.get_block_time(slot); self.check_slot_cleaned_up(&result, slot)?; Ok(result.ok().unwrap_or(None)) } else { @@ -2544,6 +2532,7 @@ pub mod tests { nonce, rpc_port, signature::{Keypair, Signer}, system_program, system_transaction, + timing::slot_duration_from_slots_per_year, transaction::{self, TransactionError}, }; use solana_transaction_status::{EncodedTransaction, TransactionWithStatusMeta, UiMessage}; @@ -2555,7 +2544,7 @@ pub mod tests { option::COption, solana_sdk::pubkey::Pubkey as SplTokenPubkey, state::AccountState as TokenAccountState, state::Mint, }; - use std::collections::HashMap; + use std::{collections::HashMap, time::Duration}; const TEST_MINT_LAMPORTS: u64 = 1_000_000; const TEST_SLOTS_PER_EPOCH: u64 = DELINQUENT_VALIDATOR_SLOT_DISTANCE + 1; @@ -2656,6 +2645,11 @@ pub mod tests { for root in roots.iter() { bank_forks.write().unwrap().set_root(*root, &None, Some(0)); + let mut stakes = HashMap::new(); + stakes.insert(leader_vote_keypair.pubkey(), (1, Account::default())); + blockstore + .cache_block_time(*root, Duration::from_millis(400), &stakes) + .unwrap(); } } diff --git a/core/src/tvu.rs b/core/src/tvu.rs index 95d35ccc3..6b06a8a9b 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -5,6 +5,7 @@ use crate::{ accounts_background_service::AccountsBackgroundService, accounts_hash_verifier::AccountsHashVerifier, broadcast_stage::RetransmitSlotsSender, + cache_block_time_service::CacheBlockTimeSender, cluster_info::ClusterInfo, cluster_info_vote_listener::{VerifiedVoteReceiver, VoteTracker}, cluster_slots::ClusterSlots, @@ -96,6 +97,7 @@ impl Tvu { cfg: Option>, transaction_status_sender: Option, rewards_recorder_sender: Option, + cache_block_time_sender: Option, snapshot_package_sender: Option, vote_tracker: Arc, retransmit_slots_sender: RetransmitSlotsSender, @@ -191,6 +193,7 @@ impl Tvu { block_commitment_cache, transaction_status_sender, rewards_recorder_sender, + cache_block_time_sender, }; let replay_stage = ReplayStage::new( @@ -327,6 +330,7 @@ pub mod tests { None, None, None, + None, Arc::new(VoteTracker::new(&bank)), retransmit_slots_sender, verified_vote_receiver, diff --git a/core/src/validator.rs b/core/src/validator.rs index 37d22ccfc..4b1184b18 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -2,6 +2,7 @@ use crate::{ broadcast_stage::BroadcastStageType, + cache_block_time_service::{CacheBlockTimeSender, CacheBlockTimeService}, cluster_info::{ClusterInfo, Node}, cluster_info_vote_listener::VoteTracker, completed_data_sets_service::CompletedDataSetsService, @@ -149,6 +150,8 @@ struct TransactionHistoryServices { transaction_status_service: Option, rewards_recorder_sender: Option, rewards_recorder_service: Option, + cache_block_time_sender: Option, + cache_block_time_service: Option, } pub struct Validator { @@ -157,6 +160,7 @@ pub struct Validator { rpc_service: Option<(JsonRpcService, PubSubService, RpcBanksService)>, transaction_status_service: Option, rewards_recorder_service: Option, + cache_block_time_service: Option, gossip_service: GossipService, serve_repair_service: ServeRepairService, completed_data_sets_service: CompletedDataSetsService, @@ -244,6 +248,8 @@ impl Validator { transaction_status_service, rewards_recorder_sender, rewards_recorder_service, + cache_block_time_sender, + cache_block_time_service, }, ) = new_banks_from_ledger(config, ledger_path, poh_verify, &exit); @@ -477,6 +483,7 @@ impl Validator { config.enable_partition.clone(), transaction_status_sender.clone(), rewards_recorder_sender, + cache_block_time_sender, snapshot_package_sender, vote_tracker.clone(), retransmit_slots_sender, @@ -523,6 +530,7 @@ impl Validator { rpc_service, transaction_status_service, rewards_recorder_service, + cache_block_time_service, snapshot_packager_service, completed_data_sets_service, tpu, @@ -587,6 +595,10 @@ impl Validator { rewards_recorder_service.join()?; } + if let Some(cache_block_time_service) = self.cache_block_time_service { + cache_block_time_service.join()?; + } + if let Some(s) = self.snapshot_packager_service { s.join()?; } @@ -772,6 +784,14 @@ fn initialize_rpc_transaction_history_services( let rewards_recorder_sender = Some(rewards_recorder_sender); let rewards_recorder_service = Some(RewardsRecorderService::new( rewards_receiver, + blockstore.clone(), + exit, + )); + + let (cache_block_time_sender, cache_block_time_receiver) = unbounded(); + let cache_block_time_sender = Some(cache_block_time_sender); + let cache_block_time_service = Some(CacheBlockTimeService::new( + cache_block_time_receiver, blockstore, exit, )); @@ -780,6 +800,8 @@ fn initialize_rpc_transaction_history_services( transaction_status_service, rewards_recorder_sender, rewards_recorder_service, + cache_block_time_sender, + cache_block_time_service, } } diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index e4c87516d..1a8c3a6fb 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -78,7 +78,6 @@ thread_local!(static PAR_THREAD_POOL_ALL_CPUS: RefCell = RefCell::ne pub const MAX_COMPLETED_SLOTS_IN_CHANNEL: usize = 100_000; pub const MAX_TURBINE_PROPAGATION_IN_MS: u64 = 100; pub const MAX_TURBINE_DELAY_IN_TICKS: u64 = MAX_TURBINE_PROPAGATION_IN_MS / MS_PER_TICK; -const TIMESTAMP_SLOT_INTERVAL: u64 = 4500; const TIMESTAMP_SLOT_RANGE: usize = 16; // An upper bound on maximum number of data shreds we can handle in a slot @@ -137,6 +136,7 @@ pub struct Blockstore { transaction_status_index_cf: LedgerColumn, active_transaction_status_index: RwLock, rewards_cf: LedgerColumn, + blocktime_cf: LedgerColumn, last_root: Arc>, insert_shreds_lock: Arc>, pub new_shreds_signals: Vec>, @@ -292,6 +292,7 @@ impl Blockstore { let address_signatures_cf = db.column(); let transaction_status_index_cf = db.column(); let rewards_cf = db.column(); + let blocktime_cf = db.column(); let db = Arc::new(db); @@ -336,6 +337,7 @@ impl Blockstore { transaction_status_index_cf, active_transaction_status_index: RwLock::new(active_transaction_status_index), rewards_cf, + blocktime_cf, new_shreds_signals: vec![], completed_slots_senders: vec![], insert_shreds_lock: Arc::new(Mutex::new(())), @@ -1557,12 +1559,7 @@ impl Blockstore { } } - pub fn get_block_time( - &self, - slot: Slot, - slot_duration: Duration, - stakes: &HashMap, - ) -> Result> { + pub fn get_block_time(&self, slot: Slot) -> Result> { datapoint_info!( "blockstore-rpc-api", ("method", "get_block_time".to_string(), String) @@ -1573,18 +1570,56 @@ impl Blockstore { if *lowest_cleanup_slot > 0 && *lowest_cleanup_slot >= slot { return Err(BlockstoreError::SlotCleanedUp); } + self.blocktime_cf.get(slot) + } + fn get_timestamp_slots(&self, slot: Slot, timestamp_sample_range: usize) -> Vec { + let root_iterator = self + .db + .iter::(IteratorMode::From(slot, IteratorDirection::Reverse)); + if !self.is_root(slot) || root_iterator.is_err() { + return vec![]; + } + let mut get_slots = Measure::start("get_slots"); + let mut timestamp_slots: Vec = root_iterator + .unwrap() + .map(|(iter_slot, _)| iter_slot) + .take(timestamp_sample_range) + .collect(); + timestamp_slots.sort(); + get_slots.stop(); + datapoint_info!( + "blockstore-get-timestamp-slots", + ("slot", slot as i64, i64), + ("get_slots_us", get_slots.as_us() as i64, i64) + ); + timestamp_slots + } + + pub fn cache_block_time( + &self, + slot: Slot, + slot_duration: Duration, + stakes: &HashMap, + ) -> Result<()> { + if !self.is_root(slot) { + return Err(BlockstoreError::SlotNotRooted); + } let mut get_unique_timestamps = Measure::start("get_unique_timestamps"); let unique_timestamps: HashMap = self - .get_timestamp_slots(slot, TIMESTAMP_SLOT_INTERVAL, TIMESTAMP_SLOT_RANGE) + .get_timestamp_slots(slot, TIMESTAMP_SLOT_RANGE) .into_iter() .flat_map(|query_slot| self.get_block_timestamps(query_slot).unwrap_or_default()) .collect(); get_unique_timestamps.stop(); + if unique_timestamps.is_empty() { + return Err(BlockstoreError::NoVoteTimestampsInRange); + } let mut calculate_timestamp = Measure::start("calculate_timestamp"); - let stake_weighted_timestamps = - calculate_stake_weighted_timestamp(unique_timestamps, stakes, slot, slot_duration); + let stake_weighted_timestamp = + calculate_stake_weighted_timestamp(unique_timestamps, stakes, slot, slot_duration) + .ok_or(BlockstoreError::EmptyEpochStakes)?; calculate_timestamp.stop(); datapoint_info!( "blockstore-get-block-time", @@ -1600,52 +1635,7 @@ impl Blockstore { i64 ) ); - - Ok(stake_weighted_timestamps) - } - - fn get_timestamp_slots( - &self, - slot: Slot, - timestamp_interval: u64, - timestamp_sample_range: usize, - ) -> Vec { - let baseline_slot = slot - (slot % timestamp_interval); - let root_iterator = self.db.iter::(IteratorMode::From( - baseline_slot, - IteratorDirection::Forward, - )); - if !self.is_root(slot) || root_iterator.is_err() { - return vec![]; - } - let mut get_slots = Measure::start("get_slots"); - let mut slots: Vec = root_iterator - .unwrap() - .map(|(iter_slot, _)| iter_slot) - .take(timestamp_sample_range) - .filter(|&iter_slot| iter_slot <= slot) - .collect(); - - if slots.len() < timestamp_sample_range && baseline_slot >= timestamp_interval { - let earlier_baseline = baseline_slot - timestamp_interval; - let earlier_root_iterator = self.db.iter::(IteratorMode::From( - earlier_baseline, - IteratorDirection::Forward, - )); - if let Ok(iterator) = earlier_root_iterator { - slots = iterator - .map(|(iter_slot, _)| iter_slot) - .take(timestamp_sample_range) - .collect(); - } - } - get_slots.stop(); - datapoint_info!( - "blockstore-get-timestamp-slots", - ("slot", slot as i64, i64), - ("get_slots_us", get_slots.as_us() as i64, i64) - ); - slots + self.blocktime_cf.put(slot, &stake_weighted_timestamp) } pub fn get_first_available_block(&self) -> Result { @@ -1698,6 +1688,7 @@ impl Blockstore { .unwrap_or_else(|| panic!("Rooted slot {:?} must have blockhash", slot)); let rewards = self.rewards_cf.get(slot)?.unwrap_or_else(Vec::new); + let block_time = self.blocktime_cf.get(slot)?; let block = ConfirmedBlock { previous_blockhash: previous_blockhash.to_string(), @@ -1709,7 +1700,7 @@ impl Blockstore { slot_transaction_iterator, ), rewards, - block_time: None, // See https://github.com/solana-labs/solana/issues/10089 + block_time, }; return Ok(block); } @@ -5561,8 +5552,6 @@ pub mod tests { fn test_get_timestamp_slots() { let timestamp_sample_range = 5; let ticks_per_slot = 5; - // Smaller interval than TIMESTAMP_SLOT_INTERVAL for convenience of building blockstore - let timestamp_interval = 7; /* Build a blockstore with < TIMESTAMP_SLOT_RANGE roots */ @@ -5589,11 +5578,11 @@ pub mod tests { blockstore.set_roots(&[1, 2, 3]).unwrap(); assert_eq!( - blockstore.get_timestamp_slots(2, timestamp_interval, timestamp_sample_range), + blockstore.get_timestamp_slots(2, timestamp_sample_range), vec![0, 1, 2] ); assert_eq!( - blockstore.get_timestamp_slots(3, timestamp_interval, timestamp_sample_range), + blockstore.get_timestamp_slots(3, timestamp_sample_range), vec![0, 1, 2, 3] ); @@ -5601,14 +5590,13 @@ pub mod tests { Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction"); /* - Build a blockstore in the ledger with the following rooted slots: - [0, 1, 2, 3, 4, 5, 7, 8, 9, 10, 11, 14, 15, 16, 17] + Build a blockstore in the ledger with gaps in rooted slot sequence */ let blockstore_path = get_tmp_ledger_path!(); let blockstore = Blockstore::open(&blockstore_path).unwrap(); blockstore.set_roots(&[0]).unwrap(); - let desired_roots = vec![1, 2, 3, 4, 5, 6, 8, 9, 10, 11, 12, 13, 14, 16, 17, 18, 19]; + let desired_roots = vec![1, 2, 3, 5, 6, 8, 11]; let mut last_entry_hash = Hash::default(); for (i, slot) in desired_roots.iter().enumerate() { let parent = { @@ -5629,28 +5617,20 @@ pub mod tests { blockstore.set_roots(&desired_roots).unwrap(); assert_eq!( - blockstore.get_timestamp_slots(2, timestamp_interval, timestamp_sample_range), + blockstore.get_timestamp_slots(2, timestamp_sample_range), vec![0, 1, 2] ); assert_eq!( - blockstore.get_timestamp_slots(6, timestamp_interval, timestamp_sample_range), - vec![0, 1, 2, 3, 4] + blockstore.get_timestamp_slots(6, timestamp_sample_range), + vec![1, 2, 3, 5, 6] ); assert_eq!( - blockstore.get_timestamp_slots(8, timestamp_interval, timestamp_sample_range), - vec![0, 1, 2, 3, 4] + blockstore.get_timestamp_slots(8, timestamp_sample_range), + vec![2, 3, 5, 6, 8] ); assert_eq!( - blockstore.get_timestamp_slots(13, timestamp_interval, timestamp_sample_range), - vec![8, 9, 10, 11, 12] - ); - assert_eq!( - blockstore.get_timestamp_slots(18, timestamp_interval, timestamp_sample_range), - vec![8, 9, 10, 11, 12] - ); - assert_eq!( - blockstore.get_timestamp_slots(19, timestamp_interval, timestamp_sample_range), - vec![14, 16, 17, 18, 19] + blockstore.get_timestamp_slots(11, timestamp_sample_range), + vec![3, 5, 6, 8, 11] ); } @@ -5754,7 +5734,7 @@ pub mod tests { let confirmed_block = ledger.get_confirmed_block(slot + 1, None).unwrap(); assert_eq!(confirmed_block.transactions.len(), 100); - let expected_block = ConfirmedBlock { + let mut expected_block = ConfirmedBlock { transactions: expected_transactions .iter() .cloned() @@ -5774,6 +5754,14 @@ pub mod tests { let not_root = ledger.get_confirmed_block(slot + 2, None).unwrap_err(); assert_matches!(not_root, BlockstoreError::SlotNotRooted); + // Test block_time returns, if available + let timestamp = 1_576_183_541; + ledger.blocktime_cf.put(slot + 1, ×tamp).unwrap(); + expected_block.block_time = Some(timestamp); + + let confirmed_block = ledger.get_confirmed_block(slot + 1, None).unwrap(); + assert_eq!(confirmed_block, expected_block); + drop(ledger); Blockstore::destroy(&ledger_path).expect("Expected successful database destruction"); } @@ -5821,14 +5809,25 @@ pub mod tests { ); assert_eq!(blockstore.get_block_timestamps(2).unwrap(), vec![]); - // Build epoch vote_accounts HashMap to test stake-weighted block time blockstore.set_roots(&[3, 8]).unwrap(); let mut stakes = HashMap::new(); + let slot_duration = Duration::from_millis(400); + for slot in &[1, 2, 3, 8] { + assert!(blockstore + .cache_block_time(*slot, slot_duration, &stakes) + .is_err()); + } + + // Build epoch vote_accounts HashMap to test stake-weighted block time for (i, keypair) in vote_keypairs.iter().enumerate() { stakes.insert(keypair.pubkey(), (1 + i as u64, Account::default())); } - let slot_duration = Duration::from_millis(400); - let block_time_slot_3 = blockstore.get_block_time(3, slot_duration, &stakes); + for slot in &[1, 2, 3, 8] { + blockstore + .cache_block_time(*slot, slot_duration, &stakes) + .unwrap(); + } + let block_time_slot_3 = blockstore.get_block_time(3); let mut total_stake = 0; let mut expected_time: u64 = (0..6) @@ -5844,14 +5843,53 @@ pub mod tests { expected_time /= total_stake; assert_eq!(block_time_slot_3.unwrap().unwrap() as u64, expected_time); assert_eq!( - blockstore - .get_block_time(8, slot_duration, &stakes) - .unwrap() - .unwrap() as u64, + blockstore.get_block_time(8).unwrap().unwrap() as u64, expected_time + 2 // At 400ms block duration, 5 slots == 2sec ); } + #[test] + fn test_get_block_time_no_timestamps() { + let vote_keypairs: Vec = (0..6).map(|_| Keypair::new()).collect(); + + // Populate slot 1 with vote transactions, none of which have timestamps + let mut vote_entries: Vec = Vec::new(); + for (i, keypair) in vote_keypairs.iter().enumerate() { + let vote = Vote { + slots: vec![1], + hash: Hash::default(), + timestamp: None, + }; + let vote_ix = vote_instruction::vote(&keypair.pubkey(), &keypair.pubkey(), vote); + let vote_msg = Message::new(&[vote_ix], Some(&keypair.pubkey())); + let vote_tx = Transaction::new(&[keypair], vote_msg, Hash::default()); + + vote_entries.push(next_entry_mut(&mut Hash::default(), 0, vec![vote_tx])); + let mut tick = create_ticks(1, 0, hash(&serialize(&i).unwrap())); + vote_entries.append(&mut tick); + } + let shreds = entries_to_test_shreds(vote_entries, 1, 0, true, 0); + let ledger_path = get_tmp_ledger_path!(); + let blockstore = Blockstore::open(&ledger_path).unwrap(); + blockstore.insert_shreds(shreds, None, false).unwrap(); + // Populate slot 2 with ticks only + fill_blockstore_slot_with_ticks(&blockstore, 6, 2, 1, Hash::default()); + blockstore.set_roots(&[0, 1, 2]).unwrap(); + + // Build epoch vote_accounts HashMap to test stake-weighted block time + let mut stakes = HashMap::new(); + for (i, keypair) in vote_keypairs.iter().enumerate() { + stakes.insert(keypair.pubkey(), (1 + i as u64, Account::default())); + } + let slot_duration = Duration::from_millis(400); + for slot in &[1, 2, 3, 8] { + assert!(blockstore + .cache_block_time(*slot, slot_duration, &stakes) + .is_err()); + assert_eq!(blockstore.get_block_time(*slot).unwrap(), None); + } + } + #[test] fn test_calculate_stake_weighted_timestamp() { let recent_timestamp: UnixTimestamp = 1_578_909_061; diff --git a/ledger/src/blockstore/blockstore_purge.rs b/ledger/src/blockstore/blockstore_purge.rs index 649fc5f5f..db08c46cb 100644 --- a/ledger/src/blockstore/blockstore_purge.rs +++ b/ledger/src/blockstore/blockstore_purge.rs @@ -133,6 +133,10 @@ impl Blockstore { & self .db .delete_range_cf::(&mut write_batch, from_slot, to_slot) + .is_ok() + & self + .db + .delete_range_cf::(&mut write_batch, from_slot, to_slot) .is_ok(); let mut w_active_transaction_status_index = self.active_transaction_status_index.write().unwrap(); @@ -223,6 +227,10 @@ impl Blockstore { && self .rewards_cf .compact_range(from_slot, to_slot) + .unwrap_or(false) + && self + .blocktime_cf + .compact_range(from_slot, to_slot) .unwrap_or(false); compact_timer.stop(); if !result { diff --git a/ledger/src/blockstore_db.rs b/ledger/src/blockstore_db.rs index f95e8c95f..772a5be76 100644 --- a/ledger/src/blockstore_db.rs +++ b/ledger/src/blockstore_db.rs @@ -10,7 +10,11 @@ use rocksdb::{ use serde::de::DeserializeOwned; use serde::Serialize; use solana_runtime::hardened_unpack::UnpackError; -use solana_sdk::{clock::Slot, pubkey::Pubkey, signature::Signature}; +use solana_sdk::{ + clock::{Slot, UnixTimestamp}, + pubkey::Pubkey, + signature::Signature, +}; use solana_transaction_status::{Rewards, TransactionStatusMeta}; use std::{collections::HashMap, fs, marker::PhantomData, path::Path, sync::Arc}; use thiserror::Error; @@ -46,6 +50,8 @@ const ADDRESS_SIGNATURES_CF: &str = "address_signatures"; const TRANSACTION_STATUS_INDEX_CF: &str = "transaction_status_index"; /// Column family for Rewards const REWARDS_CF: &str = "rewards"; +/// Column family for Blocktime +const BLOCKTIME_CF: &str = "blocktime"; #[derive(Error, Debug)] pub enum BlockstoreError { @@ -61,6 +67,8 @@ pub enum BlockstoreError { UnpackError(#[from] UnpackError), UnableToSetOpenFileDescriptorLimit, TransactionStatusSlotMismatch, + EmptyEpochStakes, + NoVoteTimestampsInRange, } pub type Result = std::result::Result; @@ -128,6 +136,10 @@ pub mod columns { #[derive(Debug)] /// The rewards column pub struct Rewards; + + #[derive(Debug)] + /// The blocktime column + pub struct Blocktime; } pub enum AccessType { @@ -187,8 +199,9 @@ impl Rocks { recovery_mode: Option, ) -> Result { use columns::{ - AddressSignatures, DeadSlots, DuplicateSlots, ErasureMeta, Index, Orphans, Rewards, - Root, ShredCode, ShredData, SlotMeta, TransactionStatus, TransactionStatusIndex, + AddressSignatures, Blocktime, DeadSlots, DuplicateSlots, ErasureMeta, Index, Orphans, + Rewards, Root, ShredCode, ShredData, SlotMeta, TransactionStatus, + TransactionStatusIndex, }; fs::create_dir_all(&path)?; @@ -221,6 +234,8 @@ impl Rocks { let transaction_status_index_cf_descriptor = ColumnFamilyDescriptor::new(TransactionStatusIndex::NAME, get_cf_options()); let rewards_cf_descriptor = ColumnFamilyDescriptor::new(Rewards::NAME, get_cf_options()); + let blocktime_cf_descriptor = + ColumnFamilyDescriptor::new(Blocktime::NAME, get_cf_options()); let cfs = vec![ (SlotMeta::NAME, meta_cf_descriptor), @@ -239,6 +254,7 @@ impl Rocks { transaction_status_index_cf_descriptor, ), (Rewards::NAME, rewards_cf_descriptor), + (Blocktime::NAME, blocktime_cf_descriptor), ]; // Open the database @@ -276,8 +292,9 @@ impl Rocks { fn columns(&self) -> Vec<&'static str> { use columns::{ - AddressSignatures, DeadSlots, DuplicateSlots, ErasureMeta, Index, Orphans, Rewards, - Root, ShredCode, ShredData, SlotMeta, TransactionStatus, TransactionStatusIndex, + AddressSignatures, Blocktime, DeadSlots, DuplicateSlots, ErasureMeta, Index, Orphans, + Rewards, Root, ShredCode, ShredData, SlotMeta, TransactionStatus, + TransactionStatusIndex, }; vec![ @@ -294,6 +311,7 @@ impl Rocks { AddressSignatures::NAME, TransactionStatusIndex::NAME, Rewards::NAME, + Blocktime::NAME, ] } @@ -519,6 +537,14 @@ impl TypedColumn for columns::Rewards { type Type = Rewards; } +impl SlotColumn for columns::Blocktime {} +impl ColumnName for columns::Blocktime { + const NAME: &'static str = BLOCKTIME_CF; +} +impl TypedColumn for columns::Blocktime { + type Type = UnixTimestamp; +} + impl Column for columns::ShredCode { type Index = (u64, u64);