Blocktree metrics (#6527)

* Add metrics for blocktree performance
* Plumb metrics through window service
This commit is contained in:
carllin 2019-10-26 16:15:59 -07:00 committed by GitHub
parent 08238e8307
commit 6efaaa9d7a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 124 additions and 29 deletions

View File

@ -85,6 +85,7 @@ where
total_packets += more_packets.packets.len(); total_packets += more_packets.packets.len();
packets.push(more_packets) packets.push(more_packets)
} }
let now = Instant::now(); let now = Instant::now();
inc_new_counter_debug!("streamer-recv_window-recv", total_packets); inc_new_counter_debug!("streamer-recv_window-recv", total_packets);
@ -127,7 +128,8 @@ where
} }
} }
blocktree.insert_shreds(shreds, Some(leader_schedule_cache))?; let blocktree_insert_metrics = blocktree.insert_shreds(shreds, Some(leader_schedule_cache))?;
blocktree_insert_metrics.report_metrics("recv-window-insert-shreds");
trace!( trace!(
"Elapsed processing time in recv_window(): {}", "Elapsed processing time in recv_window(): {}",

View File

@ -17,6 +17,7 @@ use rayon::iter::IntoParallelRefIterator;
use rayon::iter::ParallelIterator; use rayon::iter::ParallelIterator;
use rayon::ThreadPool; use rayon::ThreadPool;
use rocksdb::DBRawIterator; use rocksdb::DBRawIterator;
use solana_measure::measure::Measure;
use solana_metrics::{datapoint_debug, datapoint_error}; use solana_metrics::{datapoint_debug, datapoint_error};
use solana_rayon_threadlimit::get_thread_count; use solana_rayon_threadlimit::get_thread_count;
use solana_sdk::clock::Slot; use solana_sdk::clock::Slot;
@ -60,6 +61,49 @@ pub struct Blocktree {
pub completed_slots_senders: Vec<SyncSender<Vec<u64>>>, pub completed_slots_senders: Vec<SyncSender<Vec<u64>>>,
} }
pub struct BlocktreeInsertionMetrics {
pub num_shreds: usize,
pub insert_lock_elapsed: u64,
pub insert_shreds_elapsed: u64,
pub shred_recovery_elapsed: u64,
pub chaining_elapsed: u64,
pub commit_working_sets_elapsed: u64,
pub write_batch_elapsed: u64,
pub total_elapsed: u64,
pub num_inserted: u64,
pub num_recovered: usize,
}
impl BlocktreeInsertionMetrics {
pub fn report_metrics(&self, metric_name: &'static str) {
datapoint_info!(
metric_name,
("num_shreds", self.num_shreds as i64, i64),
("total_elapsed", self.total_elapsed as i64, i64),
("insert_lock_elapsed", self.insert_lock_elapsed as i64, i64),
(
"insert_shreds_elapsed",
self.insert_shreds_elapsed as i64,
i64
),
(
"shred_recovery_elapsed",
self.shred_recovery_elapsed as i64,
i64
),
("chaining_elapsed", self.chaining_elapsed as i64, i64),
(
"commit_working_sets_elapsed",
self.commit_working_sets_elapsed as i64,
i64
),
("write_batch_elapsed", self.write_batch_elapsed as i64, i64),
("num_inserted", self.num_inserted as i64, i64),
("num_recovered", self.num_recovered as i64, i64),
);
}
}
impl Blocktree { impl Blocktree {
/// Opens a Ledger in directory, provides "infinite" window of shreds /// Opens a Ledger in directory, provides "infinite" window of shreds
pub fn open(ledger_path: &Path) -> Result<Blocktree> { pub fn open(ledger_path: &Path) -> Result<Blocktree> {
@ -360,8 +404,13 @@ impl Blocktree {
&self, &self,
shreds: Vec<Shred>, shreds: Vec<Shred>,
leader_schedule: Option<&Arc<LeaderScheduleCache>>, leader_schedule: Option<&Arc<LeaderScheduleCache>>,
) -> Result<()> { ) -> Result<BlocktreeInsertionMetrics> {
let mut total_start = Measure::start("Total elapsed");
let mut start = Measure::start("Blocktree lock");
let _lock = self.insert_shreds_lock.lock().unwrap(); let _lock = self.insert_shreds_lock.lock().unwrap();
start.stop();
let insert_lock_elapsed = start.as_us();
let db = &*self.db; let db = &*self.db;
let mut write_batch = db.batch()?; let mut write_batch = db.batch()?;
@ -371,26 +420,40 @@ impl Blocktree {
let mut slot_meta_working_set = HashMap::new(); let mut slot_meta_working_set = HashMap::new();
let mut index_working_set = HashMap::new(); let mut index_working_set = HashMap::new();
let num_shreds = shreds.len();
let mut start = Measure::start("Shred insertion");
let mut num_inserted = 0;
shreds.into_iter().for_each(|shred| { shreds.into_iter().for_each(|shred| {
if shred.is_data() { let insert_success = {
self.check_insert_data_shred( if shred.is_data() {
shred, self.check_insert_data_shred(
&mut index_working_set, shred,
&mut slot_meta_working_set, &mut index_working_set,
&mut write_batch, &mut slot_meta_working_set,
&mut just_inserted_data_shreds, &mut write_batch,
); &mut just_inserted_data_shreds,
} else if shred.is_code() { )
self.check_insert_coding_shred( } else if shred.is_code() {
shred, self.check_insert_coding_shred(
&mut erasure_metas, shred,
&mut index_working_set, &mut erasure_metas,
&mut write_batch, &mut index_working_set,
&mut just_inserted_coding_shreds, &mut write_batch,
); &mut just_inserted_coding_shreds,
)
} else {
panic!("There should be no other case");
}
};
if insert_success {
num_inserted += 1;
} }
}); });
start.stop();
let insert_shreds_elapsed = start.as_us();
let mut start = Measure::start("Shred recovery");
let mut num_recovered = 0;
if let Some(leader_schedule_cache) = leader_schedule { if let Some(leader_schedule_cache) = leader_schedule {
let recovered_data = Self::try_shred_recovery( let recovered_data = Self::try_shred_recovery(
&db, &db,
@ -400,6 +463,7 @@ impl Blocktree {
&mut just_inserted_coding_shreds, &mut just_inserted_coding_shreds,
); );
num_recovered = recovered_data.len();
recovered_data.into_iter().for_each(|shred| { recovered_data.into_iter().for_each(|shred| {
if let Some(leader) = leader_schedule_cache.slot_leader_at(shred.slot(), None) { if let Some(leader) = leader_schedule_cache.slot_leader_at(shred.slot(), None) {
if shred.verify(&leader) { if shred.verify(&leader) {
@ -409,15 +473,21 @@ impl Blocktree {
&mut slot_meta_working_set, &mut slot_meta_working_set,
&mut write_batch, &mut write_batch,
&mut just_inserted_coding_shreds, &mut just_inserted_coding_shreds,
) );
} }
} }
}); });
} }
start.stop();
let shred_recovery_elapsed = start.as_us();
let mut start = Measure::start("Shred recovery");
// Handle chaining for the working set // Handle chaining for the working set
handle_chaining(&self.db, &mut write_batch, &slot_meta_working_set)?; handle_chaining(&self.db, &mut write_batch, &slot_meta_working_set)?;
start.stop();
let chaining_elapsed = start.as_us();
let mut start = Measure::start("Commit Worknig Sets");
let (should_signal, newly_completed_slots) = commit_slot_meta_working_set( let (should_signal, newly_completed_slots) = commit_slot_meta_working_set(
&slot_meta_working_set, &slot_meta_working_set,
&self.completed_slots_senders, &self.completed_slots_senders,
@ -431,8 +501,13 @@ impl Blocktree {
for (&slot, index) in index_working_set.iter() { for (&slot, index) in index_working_set.iter() {
write_batch.put::<cf::Index>(slot, index)?; write_batch.put::<cf::Index>(slot, index)?;
} }
start.stop();
let commit_working_sets_elapsed = start.as_us();
let mut start = Measure::start("Write Batch");
self.db.write(write_batch)?; self.db.write(write_batch)?;
start.stop();
let write_batch_elapsed = start.as_us();
if should_signal { if should_signal {
for signal in &self.new_shreds_signals { for signal in &self.new_shreds_signals {
@ -447,7 +522,20 @@ impl Blocktree {
newly_completed_slots, newly_completed_slots,
)?; )?;
Ok(()) total_start.stop();
Ok(BlocktreeInsertionMetrics {
num_shreds,
total_elapsed: total_start.as_us(),
insert_lock_elapsed,
insert_shreds_elapsed,
shred_recovery_elapsed,
chaining_elapsed,
commit_working_sets_elapsed,
write_batch_elapsed,
num_inserted,
num_recovered,
})
} }
fn check_insert_coding_shred( fn check_insert_coding_shred(
@ -457,7 +545,7 @@ impl Blocktree {
index_working_set: &mut HashMap<u64, Index>, index_working_set: &mut HashMap<u64, Index>,
write_batch: &mut WriteBatch, write_batch: &mut WriteBatch,
just_inserted_coding_shreds: &mut HashMap<(u64, u64), Shred>, just_inserted_coding_shreds: &mut HashMap<(u64, u64), Shred>,
) { ) -> bool {
let slot = shred.slot(); let slot = shred.slot();
let shred_index = u64::from(shred.index()); let shred_index = u64::from(shred.index());
@ -468,13 +556,16 @@ impl Blocktree {
// This gives the index of first coding shred in this FEC block // This gives the index of first coding shred in this FEC block
// So, all coding shreds in a given FEC block will have the same set index // So, all coding shreds in a given FEC block will have the same set index
if Blocktree::should_insert_coding_shred(&shred, index_meta.coding(), &self.last_root) { if Blocktree::should_insert_coding_shred(&shred, index_meta.coding(), &self.last_root) {
if let Ok(()) = self.insert_coding_shred(erasure_metas, index_meta, &shred, write_batch) self.insert_coding_shred(erasure_metas, index_meta, &shred, write_batch)
{ .map(|_| {
just_inserted_coding_shreds just_inserted_coding_shreds
.entry((slot, shred_index)) .entry((slot, shred_index))
.or_insert_with(|| shred); .or_insert_with(|| shred);
new_index_meta.map(|n| index_working_set.insert(slot, n)); new_index_meta.map(|n| index_working_set.insert(slot, n))
} })
.is_ok()
} else {
false
} }
} }
@ -485,7 +576,7 @@ impl Blocktree {
slot_meta_working_set: &mut HashMap<u64, SlotMetaWorkingSetEntry>, slot_meta_working_set: &mut HashMap<u64, SlotMetaWorkingSetEntry>,
write_batch: &mut WriteBatch, write_batch: &mut WriteBatch,
just_inserted_data_shreds: &mut HashMap<(u64, u64), Shred>, just_inserted_data_shreds: &mut HashMap<(u64, u64), Shred>,
) { ) -> bool {
let slot = shred.slot(); let slot = shred.slot();
let shred_index = u64::from(shred.index()); let shred_index = u64::from(shred.index());
let (index_meta, mut new_index_meta) = let (index_meta, mut new_index_meta) =
@ -524,6 +615,8 @@ impl Blocktree {
if insert_success { if insert_success {
new_slot_meta_entry.map(|n| slot_meta_working_set.insert(slot, n)); new_slot_meta_entry.map(|n| slot_meta_working_set.insert(slot, n));
} }
insert_success
} }
fn should_insert_coding_shred( fn should_insert_coding_shred(