Fix unnecessarily copying shreds in broadcast stage (#6588)

* Optimize coalesce_shreds to not explictly clone

* Remove Coalesce Shreds altogether

* fn no longer needs clippy exception
This commit is contained in:
Sagar Dhawan 2019-10-28 14:58:27 -07:00 committed by GitHub
parent b04c8c1c1a
commit 579a02529d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 65 additions and 79 deletions

View File

@ -3,8 +3,10 @@ use super::*;
use crate::broadcast_stage::broadcast_utils::UnfinishedSlotInfo;
use solana_ledger::entry::Entry;
use solana_ledger::shred::{Shred, Shredder, RECOMMENDED_FEC_RATE};
use solana_sdk::pubkey::Pubkey;
use solana_sdk::signature::Keypair;
use solana_sdk::timing::duration_as_us;
use std::collections::HashMap;
use std::time::Duration;
#[derive(Default)]
@ -14,7 +16,7 @@ struct BroadcastStats {
insert_shreds_elapsed: u64,
broadcast_elapsed: u64,
receive_elapsed: u64,
clone_and_seed_elapsed: u64,
seed_elapsed: u64,
}
impl BroadcastStats {
@ -23,7 +25,7 @@ impl BroadcastStats {
self.shredding_elapsed = 0;
self.broadcast_elapsed = 0;
self.receive_elapsed = 0;
self.clone_and_seed_elapsed = 0;
self.seed_elapsed = 0;
}
}
@ -75,27 +77,6 @@ impl StandardBroadcastRun {
last_unfinished_slot_shred
}
fn coalesce_shreds(
data_shreds: Vec<Shred>,
coding_shreds: Vec<Shred>,
last_unfinished_slot_shred: Option<Shred>,
) -> Vec<Shred> {
if let Some(shred) = last_unfinished_slot_shred {
data_shreds
.iter()
.chain(coding_shreds.iter())
.cloned()
.chain(std::iter::once(shred))
.collect::<Vec<_>>()
} else {
data_shreds
.iter()
.chain(coding_shreds.iter())
.cloned()
.collect::<Vec<_>>()
}
}
fn entries_to_shreds(
&mut self,
blocktree: &Blocktree,
@ -170,79 +151,88 @@ impl StandardBroadcastRun {
let last_unfinished_slot_shred = self.check_for_interrupted_slot();
// 2) Convert entries to shreds and coding shreds
let (data_shreds, coding_shreds) = self.entries_to_shreds(
let (mut data_shreds, coding_shreds) = self.entries_to_shreds(
blocktree,
&receive_results.entries,
last_tick_height == bank.max_tick_height(),
);
if let Some(last_shred) = last_unfinished_slot_shred {
data_shreds.push(last_shred);
}
let to_shreds_elapsed = to_shreds_start.elapsed();
let clone_and_seed_start = Instant::now();
let all_shreds =
Self::coalesce_shreds(data_shreds, coding_shreds, last_unfinished_slot_shred);
let all_shreds_ = all_shreds.clone();
let all_seeds: Vec<[u8; 32]> = all_shreds.iter().map(|s| s.seed()).collect();
let clone_and_seed_elapsed = clone_and_seed_start.elapsed();
// 3) Insert shreds into blocktree
let insert_shreds_start = Instant::now();
blocktree
.insert_shreds(all_shreds_, None)
.expect("Failed to insert shreds in blocktree");
let insert_shreds_elapsed = insert_shreds_start.elapsed();
// 4) Broadcast the shreds
let broadcast_start = Instant::now();
let bank_epoch = bank.get_leader_schedule_epoch(bank.slot());
let stakes = staking_utils::staked_nodes_at_epoch(&bank, bank_epoch);
let all_shred_bufs: Vec<Vec<u8>> = all_shreds.into_iter().map(|s| s.payload).collect();
trace!("Broadcasting {:?} shreds", all_shred_bufs.len());
cluster_info.read().unwrap().broadcast_shreds(
sock,
all_shred_bufs,
&all_seeds,
self.insert_and_broadcast(data_shreds, blocktree, cluster_info, stakes.as_ref(), sock)?;
self.insert_and_broadcast(
coding_shreds,
blocktree,
cluster_info,
stakes.as_ref(),
sock,
)?;
let broadcast_elapsed = broadcast_start.elapsed();
self.update_broadcast_stats(
duration_as_us(&receive_elapsed),
duration_as_us(&to_shreds_elapsed),
duration_as_us(&insert_shreds_elapsed),
duration_as_us(&broadcast_elapsed),
duration_as_us(&clone_and_seed_elapsed),
last_tick_height == bank.max_tick_height(),
);
self.update_broadcast_stats(BroadcastStats {
shredding_elapsed: duration_as_us(&to_shreds_elapsed),
receive_elapsed: duration_as_us(&receive_elapsed),
..BroadcastStats::default()
});
if last_tick_height == bank.max_tick_height() {
self.report_and_reset_stats();
self.unfinished_slot = None;
}
Ok(())
}
#[allow(clippy::too_many_arguments)]
fn update_broadcast_stats(
fn insert_and_broadcast(
&mut self,
receive_entries_elapsed: u64,
shredding_elapsed: u64,
insert_shreds_elapsed: u64,
broadcast_elapsed: u64,
clone_and_seed_elapsed: u64,
slot_ended: bool,
) {
self.stats.receive_elapsed += receive_entries_elapsed;
self.stats.shredding_elapsed += shredding_elapsed;
self.stats.insert_shreds_elapsed += insert_shreds_elapsed;
self.stats.broadcast_elapsed += broadcast_elapsed;
self.stats.clone_and_seed_elapsed += clone_and_seed_elapsed;
shreds: Vec<Shred>,
blocktree: &Arc<Blocktree>,
cluster_info: &Arc<RwLock<ClusterInfo>>,
stakes: Option<&HashMap<Pubkey, u64>>,
sock: &UdpSocket,
) -> Result<()> {
let seed_start = Instant::now();
let seeds: Vec<[u8; 32]> = shreds.iter().map(|s| s.seed()).collect();
let seed_elapsed = seed_start.elapsed();
if slot_ended {
self.report_and_reset_stats()
}
// Insert shreds into blocktree
let insert_shreds_start = Instant::now();
blocktree
.insert_shreds(shreds.clone(), None)
.expect("Failed to insert shreds in blocktree");
let insert_shreds_elapsed = insert_shreds_start.elapsed();
// Broadcast the shreds
let broadcast_start = Instant::now();
let shred_bufs: Vec<Vec<u8>> = shreds.into_iter().map(|s| s.payload).collect();
trace!("Broadcasting {:?} shreds", shred_bufs.len());
cluster_info
.read()
.unwrap()
.broadcast_shreds(sock, shred_bufs, &seeds, stakes)?;
let broadcast_elapsed = broadcast_start.elapsed();
self.update_broadcast_stats(BroadcastStats {
insert_shreds_elapsed: duration_as_us(&insert_shreds_elapsed),
broadcast_elapsed: duration_as_us(&broadcast_elapsed),
seed_elapsed: duration_as_us(&seed_elapsed),
..BroadcastStats::default()
});
Ok(())
}
fn update_broadcast_stats(&mut self, stats: BroadcastStats) {
self.stats.receive_elapsed += stats.receive_elapsed;
self.stats.shredding_elapsed += stats.shredding_elapsed;
self.stats.insert_shreds_elapsed += stats.insert_shreds_elapsed;
self.stats.broadcast_elapsed += stats.broadcast_elapsed;
self.stats.seed_elapsed += stats.seed_elapsed;
}
fn report_and_reset_stats(&mut self) {
@ -258,11 +248,7 @@ impl StandardBroadcastRun {
),
("broadcast_time", self.stats.broadcast_elapsed as i64, i64),
("receive_time", self.stats.receive_elapsed as i64, i64),
(
"clone_and_seed",
self.stats.clone_and_seed_elapsed as i64,
i64
),
("seed", self.stats.seed_elapsed as i64, i64),
(
"num_shreds",
i64::from(self.unfinished_slot.unwrap().next_shred_index),