diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index 3394a35a97..86331586da 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -12,7 +12,6 @@ use crate::{ erasure::ErasureConfig, leader_schedule_cache::LeaderScheduleCache, next_slots_iterator::NextSlotsIterator, - rooted_slot_iterator::RootedSlotIterator, shred::{Result as ShredResult, Shred, Shredder}, }; use bincode::deserialize; @@ -66,7 +65,7 @@ thread_local!(static PAR_THREAD_POOL: RefCell = RefCell::new(rayon:: pub const MAX_COMPLETED_SLOTS_IN_CHANNEL: usize = 100_000; pub const MAX_TURBINE_PROPAGATION_IN_MS: u64 = 100; pub const MAX_TURBINE_DELAY_IN_TICKS: u64 = MAX_TURBINE_PROPAGATION_IN_MS / MS_PER_TICK; -const TIMESTAMP_SLOT_RANGE: usize = 50; +const TIMESTAMP_SLOT_RANGE: usize = 16; // An upper bound on maximum number of data shreds we can handle in a slot // 32K shreds would allow ~320K peak TPS @@ -415,7 +414,7 @@ impl Blockstore { write_timer.stop(); datapoint_info!( "blockstore-purge", - ("write_batch_ns", write_timer.as_us() as i64, i64) + ("write_batch_us", write_timer.as_us() as i64, i64) ); Ok(columns_empty) } @@ -1417,6 +1416,10 @@ impl Blockstore { slot_duration: Duration, stakes: &HashMap, ) -> Result> { + datapoint_info!( + "blockstore-rpc-api", + ("method", "get_block_time".to_string(), String) + ); let lowest_cleanup_slot = self.lowest_cleanup_slot.read().unwrap(); // lowest_cleanup_slot is the last slot that was not cleaned up by // LedgerCleanupService @@ -1424,18 +1427,34 @@ impl Blockstore { return Err(BlockstoreError::SlotCleanedUp); } + let mut get_unique_timestamps = Measure::start("get_unique_timestamps"); let unique_timestamps: HashMap = self .get_timestamp_slots(slot, TIMESTAMP_SLOT_INTERVAL, TIMESTAMP_SLOT_RANGE) .into_iter() .flat_map(|query_slot| self.get_block_timestamps(query_slot).unwrap_or_default()) .collect(); + get_unique_timestamps.stop(); - Ok(calculate_stake_weighted_timestamp( - unique_timestamps, - stakes, - slot, - slot_duration, - )) + let mut calculate_timestamp = Measure::start("calculate_timestamp"); + let stake_weighted_timestamps = + calculate_stake_weighted_timestamp(unique_timestamps, stakes, slot, slot_duration); + calculate_timestamp.stop(); + datapoint_info!( + "blockstore-get-block-time", + ("slot", slot as i64, i64), + ( + "get_unique_timestamps_us", + get_unique_timestamps.as_us() as i64, + i64 + ), + ( + "calculate_stake_weighted_timestamp_us", + calculate_timestamp.as_us() as i64, + i64 + ) + ); + + Ok(stake_weighted_timestamps) } fn get_timestamp_slots( @@ -1444,41 +1463,42 @@ impl Blockstore { timestamp_interval: u64, timestamp_sample_range: usize, ) -> Vec { - let root_iterator = self.db.iter::(IteratorMode::Start); + let baseline_slot = slot - (slot % timestamp_interval); + let root_iterator = self.db.iter::(IteratorMode::From( + baseline_slot, + IteratorDirection::Forward, + )); if !self.is_root(slot) || root_iterator.is_err() { return vec![]; } - let lowest_nonzero_root = root_iterator.unwrap().map(|(slot, _)| slot).nth(1).unwrap(); - let rooted_slots = RootedSlotIterator::new(lowest_nonzero_root, &self); - let slots: Vec = rooted_slots + let mut get_slots = Measure::start("get_slots"); + let mut slots: Vec = root_iterator .unwrap() .map(|(iter_slot, _)| iter_slot) + .take(timestamp_sample_range) .filter(|&iter_slot| iter_slot <= slot) .collect(); - if slots.len() < timestamp_sample_range { - return slots; + if slots.len() < timestamp_sample_range && baseline_slot >= timestamp_interval { + let earlier_baseline = baseline_slot - timestamp_interval; + let earlier_root_iterator = self.db.iter::(IteratorMode::From( + earlier_baseline, + IteratorDirection::Forward, + )); + if let Ok(iterator) = earlier_root_iterator { + slots = iterator + .map(|(iter_slot, _)| iter_slot) + .take(timestamp_sample_range) + .collect(); + } } - - let recent_timestamp_slot_position = slots - .iter() - .position(|&x| x >= slot - (slot % timestamp_interval)) - .unwrap(); - - let filtered_iter = - if slots.len() - timestamp_sample_range >= recent_timestamp_slot_position { - slots.iter().skip(recent_timestamp_slot_position) - } else { - let earlier_timestamp_slot_position = slots - .iter() - .position(|&x| x >= slot - (slot % timestamp_interval) - timestamp_interval) - .unwrap(); - slots.iter().skip(earlier_timestamp_slot_position) - }; - filtered_iter - .take(timestamp_sample_range) - .cloned() - .collect() + get_slots.stop(); + datapoint_info!( + "blockstore-get-timestamp-slots", + ("slot", slot as i64, i64), + ("get_slots_us", get_slots.as_us() as i64, i64) + ); + slots } pub fn get_confirmed_block( @@ -1486,6 +1506,10 @@ impl Blockstore { slot: Slot, encoding: Option, ) -> Result { + datapoint_info!( + "blockstore-rpc-api", + ("method", "get_confirmed_block".to_string(), String) + ); let lowest_cleanup_slot = self.lowest_cleanup_slot.read().unwrap(); // lowest_cleanup_slot is the last slot that was not cleaned up by // LedgerCleanupService @@ -1711,6 +1735,10 @@ impl Blockstore { &self, signature: Signature, ) -> Result> { + datapoint_info!( + "blockstore-rpc-api", + ("method", "get_transaction_status".to_string(), String) + ); self.get_transaction_status_with_counter(signature) .map(|(status, _)| status) } @@ -1721,6 +1749,10 @@ impl Blockstore { signature: Signature, encoding: Option, ) -> Result> { + datapoint_info!( + "blockstore-rpc-api", + ("method", "get_confirmed_transaction".to_string(), String) + ); if let Some((slot, status)) = self.get_transaction_status(signature.clone())? { let transaction = self.find_transaction_in_slot(slot, signature)? .expect("Transaction to exist in slot entries if it exists in statuses and hasn't been cleaned up"); @@ -1790,6 +1822,14 @@ impl Blockstore { start_slot: Slot, end_slot: Slot, ) -> Result> { + datapoint_info!( + "blockstore-rpc-api", + ( + "method", + "get_confirmed_signatures_for_address".to_string(), + String + ) + ); self.find_address_signatures(pubkey, start_slot, end_slot) .map(|signatures| signatures.iter().map(|(_, signature)| *signature).collect()) } @@ -5066,11 +5106,11 @@ pub mod tests { assert_eq!( blockstore.get_timestamp_slots(2, timestamp_interval, timestamp_sample_range), - vec![1, 2] + vec![0, 1, 2] ); assert_eq!( blockstore.get_timestamp_slots(3, timestamp_interval, timestamp_sample_range), - vec![1, 2, 3] + vec![0, 1, 2, 3] ); drop(blockstore); @@ -5106,11 +5146,15 @@ pub mod tests { assert_eq!( blockstore.get_timestamp_slots(2, timestamp_interval, timestamp_sample_range), - vec![1, 2] + vec![0, 1, 2] + ); + assert_eq!( + blockstore.get_timestamp_slots(6, timestamp_interval, timestamp_sample_range), + vec![0, 1, 2, 3, 4] ); assert_eq!( blockstore.get_timestamp_slots(8, timestamp_interval, timestamp_sample_range), - vec![1, 2, 3, 4, 5] + vec![0, 1, 2, 3, 4] ); assert_eq!( blockstore.get_timestamp_slots(13, timestamp_interval, timestamp_sample_range),