Rpc: Speed up getBlockTime (#9510)
* Add get-block-time metrics * Add datapoints to blockstore rpc apis * Tune timestamp_slot_range * Refactor get_timestamp_slots * Cargo.lock
This commit is contained in:
parent
7aa4d401f7
commit
530c542002
|
@ -12,7 +12,6 @@ use crate::{
|
||||||
erasure::ErasureConfig,
|
erasure::ErasureConfig,
|
||||||
leader_schedule_cache::LeaderScheduleCache,
|
leader_schedule_cache::LeaderScheduleCache,
|
||||||
next_slots_iterator::NextSlotsIterator,
|
next_slots_iterator::NextSlotsIterator,
|
||||||
rooted_slot_iterator::RootedSlotIterator,
|
|
||||||
shred::{Result as ShredResult, Shred, Shredder},
|
shred::{Result as ShredResult, Shred, Shredder},
|
||||||
};
|
};
|
||||||
use bincode::deserialize;
|
use bincode::deserialize;
|
||||||
|
@ -66,7 +65,7 @@ thread_local!(static PAR_THREAD_POOL: RefCell<ThreadPool> = RefCell::new(rayon::
|
||||||
pub const MAX_COMPLETED_SLOTS_IN_CHANNEL: usize = 100_000;
|
pub const MAX_COMPLETED_SLOTS_IN_CHANNEL: usize = 100_000;
|
||||||
pub const MAX_TURBINE_PROPAGATION_IN_MS: u64 = 100;
|
pub const MAX_TURBINE_PROPAGATION_IN_MS: u64 = 100;
|
||||||
pub const MAX_TURBINE_DELAY_IN_TICKS: u64 = MAX_TURBINE_PROPAGATION_IN_MS / MS_PER_TICK;
|
pub const MAX_TURBINE_DELAY_IN_TICKS: u64 = MAX_TURBINE_PROPAGATION_IN_MS / MS_PER_TICK;
|
||||||
const TIMESTAMP_SLOT_RANGE: usize = 50;
|
const TIMESTAMP_SLOT_RANGE: usize = 16;
|
||||||
|
|
||||||
// An upper bound on maximum number of data shreds we can handle in a slot
|
// An upper bound on maximum number of data shreds we can handle in a slot
|
||||||
// 32K shreds would allow ~320K peak TPS
|
// 32K shreds would allow ~320K peak TPS
|
||||||
|
@ -415,7 +414,7 @@ impl Blockstore {
|
||||||
write_timer.stop();
|
write_timer.stop();
|
||||||
datapoint_info!(
|
datapoint_info!(
|
||||||
"blockstore-purge",
|
"blockstore-purge",
|
||||||
("write_batch_ns", write_timer.as_us() as i64, i64)
|
("write_batch_us", write_timer.as_us() as i64, i64)
|
||||||
);
|
);
|
||||||
Ok(columns_empty)
|
Ok(columns_empty)
|
||||||
}
|
}
|
||||||
|
@ -1417,6 +1416,10 @@ impl Blockstore {
|
||||||
slot_duration: Duration,
|
slot_duration: Duration,
|
||||||
stakes: &HashMap<Pubkey, (u64, Account)>,
|
stakes: &HashMap<Pubkey, (u64, Account)>,
|
||||||
) -> Result<Option<UnixTimestamp>> {
|
) -> Result<Option<UnixTimestamp>> {
|
||||||
|
datapoint_info!(
|
||||||
|
"blockstore-rpc-api",
|
||||||
|
("method", "get_block_time".to_string(), String)
|
||||||
|
);
|
||||||
let lowest_cleanup_slot = self.lowest_cleanup_slot.read().unwrap();
|
let lowest_cleanup_slot = self.lowest_cleanup_slot.read().unwrap();
|
||||||
// lowest_cleanup_slot is the last slot that was not cleaned up by
|
// lowest_cleanup_slot is the last slot that was not cleaned up by
|
||||||
// LedgerCleanupService
|
// LedgerCleanupService
|
||||||
|
@ -1424,18 +1427,34 @@ impl Blockstore {
|
||||||
return Err(BlockstoreError::SlotCleanedUp);
|
return Err(BlockstoreError::SlotCleanedUp);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let mut get_unique_timestamps = Measure::start("get_unique_timestamps");
|
||||||
let unique_timestamps: HashMap<Pubkey, (Slot, UnixTimestamp)> = self
|
let unique_timestamps: HashMap<Pubkey, (Slot, UnixTimestamp)> = self
|
||||||
.get_timestamp_slots(slot, TIMESTAMP_SLOT_INTERVAL, TIMESTAMP_SLOT_RANGE)
|
.get_timestamp_slots(slot, TIMESTAMP_SLOT_INTERVAL, TIMESTAMP_SLOT_RANGE)
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.flat_map(|query_slot| self.get_block_timestamps(query_slot).unwrap_or_default())
|
.flat_map(|query_slot| self.get_block_timestamps(query_slot).unwrap_or_default())
|
||||||
.collect();
|
.collect();
|
||||||
|
get_unique_timestamps.stop();
|
||||||
|
|
||||||
Ok(calculate_stake_weighted_timestamp(
|
let mut calculate_timestamp = Measure::start("calculate_timestamp");
|
||||||
unique_timestamps,
|
let stake_weighted_timestamps =
|
||||||
stakes,
|
calculate_stake_weighted_timestamp(unique_timestamps, stakes, slot, slot_duration);
|
||||||
slot,
|
calculate_timestamp.stop();
|
||||||
slot_duration,
|
datapoint_info!(
|
||||||
))
|
"blockstore-get-block-time",
|
||||||
|
("slot", slot as i64, i64),
|
||||||
|
(
|
||||||
|
"get_unique_timestamps_us",
|
||||||
|
get_unique_timestamps.as_us() as i64,
|
||||||
|
i64
|
||||||
|
),
|
||||||
|
(
|
||||||
|
"calculate_stake_weighted_timestamp_us",
|
||||||
|
calculate_timestamp.as_us() as i64,
|
||||||
|
i64
|
||||||
|
)
|
||||||
|
);
|
||||||
|
|
||||||
|
Ok(stake_weighted_timestamps)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn get_timestamp_slots(
|
fn get_timestamp_slots(
|
||||||
|
@ -1444,41 +1463,42 @@ impl Blockstore {
|
||||||
timestamp_interval: u64,
|
timestamp_interval: u64,
|
||||||
timestamp_sample_range: usize,
|
timestamp_sample_range: usize,
|
||||||
) -> Vec<Slot> {
|
) -> Vec<Slot> {
|
||||||
let root_iterator = self.db.iter::<cf::Root>(IteratorMode::Start);
|
let baseline_slot = slot - (slot % timestamp_interval);
|
||||||
|
let root_iterator = self.db.iter::<cf::Root>(IteratorMode::From(
|
||||||
|
baseline_slot,
|
||||||
|
IteratorDirection::Forward,
|
||||||
|
));
|
||||||
if !self.is_root(slot) || root_iterator.is_err() {
|
if !self.is_root(slot) || root_iterator.is_err() {
|
||||||
return vec![];
|
return vec![];
|
||||||
}
|
}
|
||||||
let lowest_nonzero_root = root_iterator.unwrap().map(|(slot, _)| slot).nth(1).unwrap();
|
let mut get_slots = Measure::start("get_slots");
|
||||||
let rooted_slots = RootedSlotIterator::new(lowest_nonzero_root, &self);
|
let mut slots: Vec<Slot> = root_iterator
|
||||||
let slots: Vec<Slot> = rooted_slots
|
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.map(|(iter_slot, _)| iter_slot)
|
.map(|(iter_slot, _)| iter_slot)
|
||||||
|
.take(timestamp_sample_range)
|
||||||
.filter(|&iter_slot| iter_slot <= slot)
|
.filter(|&iter_slot| iter_slot <= slot)
|
||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
if slots.len() < timestamp_sample_range {
|
if slots.len() < timestamp_sample_range && baseline_slot >= timestamp_interval {
|
||||||
return slots;
|
let earlier_baseline = baseline_slot - timestamp_interval;
|
||||||
}
|
let earlier_root_iterator = self.db.iter::<cf::Root>(IteratorMode::From(
|
||||||
|
earlier_baseline,
|
||||||
let recent_timestamp_slot_position = slots
|
IteratorDirection::Forward,
|
||||||
.iter()
|
));
|
||||||
.position(|&x| x >= slot - (slot % timestamp_interval))
|
if let Ok(iterator) = earlier_root_iterator {
|
||||||
.unwrap();
|
slots = iterator
|
||||||
|
.map(|(iter_slot, _)| iter_slot)
|
||||||
let filtered_iter =
|
|
||||||
if slots.len() - timestamp_sample_range >= recent_timestamp_slot_position {
|
|
||||||
slots.iter().skip(recent_timestamp_slot_position)
|
|
||||||
} else {
|
|
||||||
let earlier_timestamp_slot_position = slots
|
|
||||||
.iter()
|
|
||||||
.position(|&x| x >= slot - (slot % timestamp_interval) - timestamp_interval)
|
|
||||||
.unwrap();
|
|
||||||
slots.iter().skip(earlier_timestamp_slot_position)
|
|
||||||
};
|
|
||||||
filtered_iter
|
|
||||||
.take(timestamp_sample_range)
|
.take(timestamp_sample_range)
|
||||||
.cloned()
|
.collect();
|
||||||
.collect()
|
}
|
||||||
|
}
|
||||||
|
get_slots.stop();
|
||||||
|
datapoint_info!(
|
||||||
|
"blockstore-get-timestamp-slots",
|
||||||
|
("slot", slot as i64, i64),
|
||||||
|
("get_slots_us", get_slots.as_us() as i64, i64)
|
||||||
|
);
|
||||||
|
slots
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn get_confirmed_block(
|
pub fn get_confirmed_block(
|
||||||
|
@ -1486,6 +1506,10 @@ impl Blockstore {
|
||||||
slot: Slot,
|
slot: Slot,
|
||||||
encoding: Option<TransactionEncoding>,
|
encoding: Option<TransactionEncoding>,
|
||||||
) -> Result<ConfirmedBlock> {
|
) -> Result<ConfirmedBlock> {
|
||||||
|
datapoint_info!(
|
||||||
|
"blockstore-rpc-api",
|
||||||
|
("method", "get_confirmed_block".to_string(), String)
|
||||||
|
);
|
||||||
let lowest_cleanup_slot = self.lowest_cleanup_slot.read().unwrap();
|
let lowest_cleanup_slot = self.lowest_cleanup_slot.read().unwrap();
|
||||||
// lowest_cleanup_slot is the last slot that was not cleaned up by
|
// lowest_cleanup_slot is the last slot that was not cleaned up by
|
||||||
// LedgerCleanupService
|
// LedgerCleanupService
|
||||||
|
@ -1711,6 +1735,10 @@ impl Blockstore {
|
||||||
&self,
|
&self,
|
||||||
signature: Signature,
|
signature: Signature,
|
||||||
) -> Result<Option<(Slot, TransactionStatusMeta)>> {
|
) -> Result<Option<(Slot, TransactionStatusMeta)>> {
|
||||||
|
datapoint_info!(
|
||||||
|
"blockstore-rpc-api",
|
||||||
|
("method", "get_transaction_status".to_string(), String)
|
||||||
|
);
|
||||||
self.get_transaction_status_with_counter(signature)
|
self.get_transaction_status_with_counter(signature)
|
||||||
.map(|(status, _)| status)
|
.map(|(status, _)| status)
|
||||||
}
|
}
|
||||||
|
@ -1721,6 +1749,10 @@ impl Blockstore {
|
||||||
signature: Signature,
|
signature: Signature,
|
||||||
encoding: Option<TransactionEncoding>,
|
encoding: Option<TransactionEncoding>,
|
||||||
) -> Result<Option<ConfirmedTransaction>> {
|
) -> Result<Option<ConfirmedTransaction>> {
|
||||||
|
datapoint_info!(
|
||||||
|
"blockstore-rpc-api",
|
||||||
|
("method", "get_confirmed_transaction".to_string(), String)
|
||||||
|
);
|
||||||
if let Some((slot, status)) = self.get_transaction_status(signature.clone())? {
|
if let Some((slot, status)) = self.get_transaction_status(signature.clone())? {
|
||||||
let transaction = self.find_transaction_in_slot(slot, signature)?
|
let transaction = self.find_transaction_in_slot(slot, signature)?
|
||||||
.expect("Transaction to exist in slot entries if it exists in statuses and hasn't been cleaned up");
|
.expect("Transaction to exist in slot entries if it exists in statuses and hasn't been cleaned up");
|
||||||
|
@ -1790,6 +1822,14 @@ impl Blockstore {
|
||||||
start_slot: Slot,
|
start_slot: Slot,
|
||||||
end_slot: Slot,
|
end_slot: Slot,
|
||||||
) -> Result<Vec<Signature>> {
|
) -> Result<Vec<Signature>> {
|
||||||
|
datapoint_info!(
|
||||||
|
"blockstore-rpc-api",
|
||||||
|
(
|
||||||
|
"method",
|
||||||
|
"get_confirmed_signatures_for_address".to_string(),
|
||||||
|
String
|
||||||
|
)
|
||||||
|
);
|
||||||
self.find_address_signatures(pubkey, start_slot, end_slot)
|
self.find_address_signatures(pubkey, start_slot, end_slot)
|
||||||
.map(|signatures| signatures.iter().map(|(_, signature)| *signature).collect())
|
.map(|signatures| signatures.iter().map(|(_, signature)| *signature).collect())
|
||||||
}
|
}
|
||||||
|
@ -5066,11 +5106,11 @@ pub mod tests {
|
||||||
|
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
blockstore.get_timestamp_slots(2, timestamp_interval, timestamp_sample_range),
|
blockstore.get_timestamp_slots(2, timestamp_interval, timestamp_sample_range),
|
||||||
vec![1, 2]
|
vec![0, 1, 2]
|
||||||
);
|
);
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
blockstore.get_timestamp_slots(3, timestamp_interval, timestamp_sample_range),
|
blockstore.get_timestamp_slots(3, timestamp_interval, timestamp_sample_range),
|
||||||
vec![1, 2, 3]
|
vec![0, 1, 2, 3]
|
||||||
);
|
);
|
||||||
|
|
||||||
drop(blockstore);
|
drop(blockstore);
|
||||||
|
@ -5106,11 +5146,15 @@ pub mod tests {
|
||||||
|
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
blockstore.get_timestamp_slots(2, timestamp_interval, timestamp_sample_range),
|
blockstore.get_timestamp_slots(2, timestamp_interval, timestamp_sample_range),
|
||||||
vec![1, 2]
|
vec![0, 1, 2]
|
||||||
|
);
|
||||||
|
assert_eq!(
|
||||||
|
blockstore.get_timestamp_slots(6, timestamp_interval, timestamp_sample_range),
|
||||||
|
vec![0, 1, 2, 3, 4]
|
||||||
);
|
);
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
blockstore.get_timestamp_slots(8, timestamp_interval, timestamp_sample_range),
|
blockstore.get_timestamp_slots(8, timestamp_interval, timestamp_sample_range),
|
||||||
vec![1, 2, 3, 4, 5]
|
vec![0, 1, 2, 3, 4]
|
||||||
);
|
);
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
blockstore.get_timestamp_slots(13, timestamp_interval, timestamp_sample_range),
|
blockstore.get_timestamp_slots(13, timestamp_interval, timestamp_sample_range),
|
||||||
|
|
Loading…
Reference in New Issue