ReplayStage metrics (#6358)

* ReplayStage metrics

* Add more metrics

* Refactor get_slot_entries_with_shred_count() to detect wasted work

* Update dashboard

* Update broadcast slots to micros

* Add broadcast dashboard
This commit is contained in:
carllin 2019-10-16 14:32:18 -07:00 committed by GitHub
parent 2d351d3952
commit ccb4e32ee0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 1053 additions and 285 deletions

View File

@ -25,6 +25,7 @@ use std::path::{Path, PathBuf};
use std::rc::Rc;
use std::sync::mpsc::{sync_channel, Receiver, SyncSender, TrySendError};
use std::sync::{Arc, RwLock};
use std::time::Instant;
pub use self::meta::*;
pub use self::rooted_slot_iterator::*;
@ -981,65 +982,73 @@ impl Blocktree {
&self,
slot: u64,
mut start_index: u64,
) -> Result<(Vec<Entry>, usize)> {
// Find the next consecutive block of shreds.
let mut serialized_shreds: Vec<Vec<u8>> = vec![];
let data_shred_cf = self.db.column::<cf::ShredData>();
while let Some(serialized_shred) = data_shred_cf.get_bytes((slot, start_index))? {
serialized_shreds.push(serialized_shred);
start_index += 1;
}
trace!(
"Found {:?} shreds for slot {:?}",
serialized_shreds.len(),
slot
);
let mut shreds: Vec<Shred> = serialized_shreds
.into_iter()
.filter_map(|serialized_shred| Shred::new_from_serialized_shred(serialized_shred).ok())
.collect();
) -> Result<(Vec<Entry>, usize, u64, u64)> {
let mut useful_time = 0;
let mut wasted_time = 0;
let mut all_entries = vec![];
let mut num = 0;
let mut num_shreds = 0;
loop {
let mut look_for_last_shred = true;
let now = Instant::now();
let mut res = self.get_entries_in_data_block(slot, &mut start_index);
let elapsed = now.elapsed().as_micros();
let mut shred_chunk = vec![];
while look_for_last_shred && !shreds.is_empty() {
let shred = shreds.remove(0);
if shred.data_complete() || shred.last_in_slot() {
look_for_last_shred = false;
if let Ok((ref mut entries, new_num_shreds)) = res {
if !entries.is_empty() {
all_entries.append(entries);
num_shreds += new_num_shreds;
useful_time += elapsed;
continue;
}
shred_chunk.push(shred);
}
debug!(
"{:?} shreds in last FEC set. Looking for last shred {:?}",
shred_chunk.len(),
look_for_last_shred
);
// Break if we didn't find the last shred (as more data is required)
if look_for_last_shred {
break;
}
if let Ok(deshred_payload) = Shredder::deshred(&shred_chunk) {
let entries: Vec<Entry> = bincode::deserialize(&deshred_payload)?;
trace!("Found entries: {:#?}", entries);
all_entries.extend(entries);
num += shred_chunk.len();
} else {
debug!("Failed in deshredding shred payloads");
break;
}
// All unsuccessful cases (errors, incomplete data blocks) will count as wasted work
wasted_time += elapsed;
res?;
break;
}
trace!("Found {:?} entries", all_entries.len());
Ok((all_entries, num))
Ok((
all_entries,
num_shreds,
useful_time as u64,
wasted_time as u64,
))
}
pub fn get_entries_in_data_block(
&self,
slot: u64,
start_index: &mut u64,
) -> Result<(Vec<Entry>, usize)> {
let mut shred_chunk: Vec<Shred> = vec![];
let data_shred_cf = self.db.column::<cf::ShredData>();
while let Some(serialized_shred) = data_shred_cf.get_bytes((slot, *start_index))? {
*start_index += 1;
let new_shred = Shred::new_from_serialized_shred(serialized_shred).ok();
if let Some(shred) = new_shred {
let is_complete = shred.data_complete() || shred.last_in_slot();
shred_chunk.push(shred);
if is_complete {
if let Ok(deshred_payload) = Shredder::deshred(&shred_chunk) {
debug!("{:?} shreds in last FEC set", shred_chunk.len(),);
let entries: Vec<Entry> = bincode::deserialize(&deshred_payload)?;
return Ok((entries, shred_chunk.len()));
} else {
debug!("Failed in deshredding shred payloads");
break;
}
}
} else {
// Didn't find a valid shred, this slot is dead.
// TODO: Mark as dead, but have to carefully handle last shred of interrupted
// slots.
break;
}
}
Ok((vec![], 0))
}
// Returns slots connecting to any element of the list `slots`.

View File

@ -298,7 +298,7 @@ mod test {
);
let blocktree = broadcast_service.blocktree;
let (entries, _) = blocktree
let (entries, _, _, _) = blocktree
.get_slot_entries_with_shred_count(slot, 0)
.expect("Expect entries to be present");
assert_eq!(entries.len(), max_tick_height as usize);

View File

@ -270,7 +270,7 @@ impl StandardBroadcastRun {
),
(
"slot_broadcast_time",
self.slot_broadcast_start.unwrap().elapsed().as_millis() as i64,
self.slot_broadcast_start.unwrap().elapsed().as_micros() as i64,
i64
),
);

View File

@ -59,20 +59,73 @@ pub struct ReplayStage {
confidence_service: AggregateConfidenceService,
}
#[derive(Default)]
struct ReplaySlotStats {
// Per-slot elapsed time
slot: u64,
fetch_entries_elapsed: u64,
fetch_entries_fail_elapsed: u64,
entry_verification_elapsed: u64,
replay_elapsed: u64,
replay_start: Instant,
}
impl ReplaySlotStats {
pub fn new(slot: u64) -> Self {
Self {
slot,
fetch_entries_elapsed: 0,
fetch_entries_fail_elapsed: 0,
entry_verification_elapsed: 0,
replay_elapsed: 0,
replay_start: Instant::now(),
}
}
pub fn report_stats(&self, total_entries: usize, total_shreds: usize) {
datapoint_info!(
"replay-slot-stats",
("slot", self.slot as i64, i64),
("fetch_entries_time", self.fetch_entries_elapsed as i64, i64),
(
"fetch_entries_fail_time",
self.fetch_entries_fail_elapsed as i64,
i64
),
(
"entry_verification_time",
self.entry_verification_elapsed as i64,
i64
),
("replay_time", self.replay_elapsed as i64, i64),
(
"replay_total_elapsed",
self.replay_start.elapsed().as_micros() as i64,
i64
),
("total_entries", total_entries as i64, i64),
("total_shreds", total_shreds as i64, i64),
);
}
}
struct ForkProgress {
last_entry: Hash,
num_blobs: usize,
num_shreds: usize,
num_entries: usize,
started_ms: u64,
is_dead: bool,
stats: ReplaySlotStats,
}
impl ForkProgress {
pub fn new(last_entry: Hash) -> Self {
pub fn new(slot: u64, last_entry: Hash) -> Self {
Self {
last_entry,
num_blobs: 0,
num_shreds: 0,
num_entries: 0,
started_ms: timing::timestamp(),
is_dead: false,
stats: ReplaySlotStats::new(slot),
}
}
}
@ -369,24 +422,41 @@ impl ReplayStage {
progress: &mut HashMap<u64, ForkProgress>,
) -> (Result<()>, usize) {
let mut tx_count = 0;
let result =
Self::load_blocktree_entries(bank, blocktree, progress).and_then(|(entries, num)| {
debug!("Replaying {:?} entries, num {:?}", entries.len(), num);
let bank_progress = &mut progress
.entry(bank.slot())
.or_insert_with(|| ForkProgress::new(bank.slot(), bank.last_blockhash()));
let now = Instant::now();
let load_result = Self::load_blocktree_entries(bank, blocktree, bank_progress);
let fetch_entries_elapsed = now.elapsed().as_micros();
if load_result.is_err() {
bank_progress.stats.fetch_entries_fail_elapsed += fetch_entries_elapsed as u64;
}
let replay_result =
load_result.and_then(|(entries, num_shreds, useful_time, wasted_time)| {
trace!(
"Fetch entries for slot {}, {:?} entries, num shreds {:?}",
bank.slot(),
entries.len(),
num_shreds
);
tx_count += entries.iter().map(|e| e.transactions.len()).sum::<usize>();
Self::replay_entries_into_bank(bank, entries, progress, num)
bank_progress.stats.fetch_entries_elapsed += useful_time as u64;
bank_progress.stats.fetch_entries_fail_elapsed += wasted_time as u64;
Self::replay_entries_into_bank(bank, entries, bank_progress, num_shreds)
});
if Self::is_replay_result_fatal(&result) {
if Self::is_replay_result_fatal(&replay_result) {
warn!(
"Fatal replay result in slot: {}, result: {:?}",
bank.slot(),
result
replay_result
);
datapoint_warn!("replay-stage-mark_dead_slot", ("slot", bank.slot(), i64),);
Self::mark_dead_slot(bank.slot(), blocktree, progress);
}
(result, tx_count)
(replay_result, tx_count)
}
fn mark_dead_slot(slot: u64, blocktree: &Blocktree, progress: &mut HashMap<u64, ForkProgress>) {
@ -542,6 +612,11 @@ impl ReplayStage {
}
assert_eq!(*bank_slot, bank.slot());
if bank.tick_height() == bank.max_tick_height() {
if let Some(bank_progress) = &mut progress.get(&bank.slot()) {
bank_progress
.stats
.report_stats(bank_progress.num_entries, bank_progress.num_shreds);
}
did_complete_bank = true;
Self::process_completed_bank(my_pubkey, bank, slot_full_senders);
} else {
@ -665,31 +740,26 @@ impl ReplayStage {
fn load_blocktree_entries(
bank: &Bank,
blocktree: &Blocktree,
progress: &mut HashMap<u64, ForkProgress>,
) -> Result<(Vec<Entry>, usize)> {
bank_progress: &mut ForkProgress,
) -> Result<(Vec<Entry>, usize, u64, u64)> {
let bank_slot = bank.slot();
let bank_progress = &mut progress
.entry(bank_slot)
.or_insert_with(|| ForkProgress::new(bank.last_blockhash()));
blocktree.get_slot_entries_with_shred_count(bank_slot, bank_progress.num_blobs as u64)
blocktree.get_slot_entries_with_shred_count(bank_slot, bank_progress.num_shreds as u64)
}
fn replay_entries_into_bank(
bank: &Arc<Bank>,
entries: Vec<Entry>,
progress: &mut HashMap<u64, ForkProgress>,
bank_progress: &mut ForkProgress,
num: usize,
) -> Result<()> {
let bank_progress = &mut progress
.entry(bank.slot())
.or_insert_with(|| ForkProgress::new(bank.last_blockhash()));
let result = Self::verify_and_process_entries(
&bank,
&entries,
&bank_progress.last_entry,
bank_progress.num_blobs,
bank_progress.num_shreds,
bank_progress,
);
bank_progress.num_blobs += num;
bank_progress.num_shreds += num;
bank_progress.num_entries += entries.len();
if let Some(last_entry) = entries.last() {
bank_progress.last_entry = last_entry.hash;
}
@ -697,15 +767,21 @@ impl ReplayStage {
result
}
pub fn verify_and_process_entries(
fn verify_and_process_entries(
bank: &Arc<Bank>,
entries: &[Entry],
last_entry: &Hash,
shred_index: usize,
bank_progress: &mut ForkProgress,
) -> Result<()> {
if !entries.verify(last_entry) {
warn!(
"entry verification failed {} {} {} {} {}",
let now = Instant::now();
let last_entry = &bank_progress.last_entry;
let verify_result = entries.verify(last_entry);
let verify_entries_elapsed = now.elapsed().as_micros();
bank_progress.stats.entry_verification_elapsed += verify_entries_elapsed as u64;
if !verify_result {
info!(
"entry verification failed, slot: {}, entry len: {}, tick_height: {}, last entry: {}, last_blockhash: {}, shred_index: {}",
bank.slot(),
entries.len(),
bank.tick_height(),
last_entry,
@ -720,8 +796,13 @@ impl ReplayStage {
);
return Err(Error::BlobError(BlobError::VerificationFailed));
}
blocktree_processor::process_entries(bank, entries, true)?;
let now = Instant::now();
let res = blocktree_processor::process_entries(bank, entries, true);
let replay_elapsed = now.elapsed().as_micros();
bank_progress.stats.replay_elapsed += replay_elapsed as u64;
res?;
Ok(())
}
@ -859,7 +940,7 @@ mod test {
let bank0 = Bank::new(&genesis_block);
let bank_forks = Arc::new(RwLock::new(BankForks::new(0, bank0)));
let mut progress = HashMap::new();
progress.insert(5, ForkProgress::new(Hash::default()));
progress.insert(5, ForkProgress::new(0, Hash::default()));
ReplayStage::handle_new_root(&bank_forks, &mut progress);
assert!(progress.is_empty());
}
@ -963,7 +1044,7 @@ mod test {
let bank0 = Arc::new(Bank::new(&genesis_block));
let mut progress = HashMap::new();
let last_blockhash = bank0.last_blockhash();
progress.insert(bank0.slot(), ForkProgress::new(last_blockhash));
progress.insert(bank0.slot(), ForkProgress::new(0, last_blockhash));
let shreds = shred_to_insert(&last_blockhash, bank0.slot());
blocktree.insert_shreds(shreds, None).unwrap();
let (res, _tx_count) =