Broadcast stage tuning (#5989)

This commit is contained in:
Pankaj Garg 2019-09-19 16:29:52 -07:00 committed by GitHub
parent 6657312f44
commit ca9d4e34df
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 110 additions and 80 deletions

View File

@ -52,7 +52,7 @@ pub const NUM_THREADS: u32 = 4;
const TOTAL_BUFFERED_PACKETS: usize = 500_000;
const MAX_NUM_TRANSACTIONS_PER_BATCH: usize = 512;
const MAX_NUM_TRANSACTIONS_PER_BATCH: usize = 128;
/// Stores the stage's thread handle and output receiver.
pub struct BankingStage {

View File

@ -8,7 +8,7 @@ use crate::poh_recorder::WorkingBankEntry;
use crate::result::{Error, Result};
use crate::service::Service;
use crate::staking_utils;
use solana_metrics::{datapoint, inc_new_counter_error, inc_new_counter_info};
use solana_metrics::{datapoint_info, inc_new_counter_error, inc_new_counter_info};
use std::net::UdpSocket;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{Receiver, RecvTimeoutError};

View File

@ -15,6 +15,11 @@ pub(super) struct ReceiveResults {
pub last_tick: u64,
}
/// Theis parameter tunes how many entries are received in one iteration of recv loop
/// This will prevent broadcast stage from consuming more entries, that could have led
/// to delays in shredding, and broadcasting shreds to peer validators
const RECEIVE_ENTRY_COUNT_THRESHOLD: usize = 8;
pub(super) fn recv_slot_entries(receiver: &Receiver<WorkingBankEntry>) -> Result<ReceiveResults> {
let timer = Duration::new(1, 0);
let (bank, (entry, mut last_tick)) = receiver.recv_timeout(timer)?;
@ -38,6 +43,10 @@ pub(super) fn recv_slot_entries(receiver: &Receiver<WorkingBankEntry>) -> Result
last_tick = tick_height;
entries.push(entry);
if entries.len() >= RECEIVE_ENTRY_COUNT_THRESHOLD {
break;
}
assert!(last_tick <= max_tick_height);
if last_tick == max_tick_height {
break;

View File

@ -23,16 +23,19 @@ impl StandardBroadcastRun {
fn update_broadcast_stats(
&mut self,
receive_entries_elapsed: u64,
shredding_elapsed: u64,
insert_shreds_elapsed: u64,
broadcast_elapsed: u64,
run_elapsed: u64,
num_entries: usize,
to_blobs_elapsed: u64,
num_shreds: usize,
blob_index: u64,
) {
inc_new_counter_info!("broadcast_service-time_ms", broadcast_elapsed as usize);
self.stats.num_entries.push(num_entries);
self.stats.to_blobs_elapsed.push(to_blobs_elapsed);
self.stats.to_blobs_elapsed.push(shredding_elapsed);
self.stats.run_elapsed.push(run_elapsed);
if self.stats.num_entries.len() >= 16 {
info!(
@ -44,7 +47,16 @@ impl StandardBroadcastRun {
self.stats.run_elapsed.clear();
}
datapoint!("broadcast-service", ("transmit-index", blob_index, i64));
datapoint_info!(
"broadcast-service",
("num_entries", num_entries as i64, i64),
("num_shreds", num_shreds as i64, i64),
("receive_time", receive_entries_elapsed as i64, i64),
("shredding_time", shredding_elapsed as i64, i64),
("insert_shred_time", insert_shreds_elapsed as i64, i64),
("broadcast_time", broadcast_elapsed as i64, i64),
("transmit-index", blob_index as i64, i64),
);
}
}
@ -65,7 +77,6 @@ impl BroadcastRun for StandardBroadcastRun {
inc_new_counter_info!("broadcast_service-entries_received", num_entries);
// 2) Convert entries to blobs + generate coding blobs
let to_blobs_start = Instant::now();
let keypair = &cluster_info.read().unwrap().keypair.clone();
let latest_shred_index = blocktree
.meta(bank.slot())
@ -79,6 +90,7 @@ impl BroadcastRun for StandardBroadcastRun {
0
};
let to_shreds_start = Instant::now();
let (shred_infos, latest_shred_index) = entries_to_shreds(
receive_results.entries,
last_tick,
@ -88,14 +100,15 @@ impl BroadcastRun for StandardBroadcastRun {
latest_shred_index,
parent_slot,
);
let to_shreds_elapsed = to_shreds_start.elapsed();
let all_seeds: Vec<[u8; 32]> = shred_infos.iter().map(|s| s.seed()).collect();
let num_shreds = shred_infos.len();
let insert_shreds_start = Instant::now();
blocktree
.insert_shreds(shred_infos.clone(), None)
.expect("Failed to insert shreds in blocktree");
let to_blobs_elapsed = to_blobs_start.elapsed();
let insert_shreds_elapsed = insert_shreds_start.elapsed();
// 3) Start broadcast step
let broadcast_start = Instant::now();
@ -111,14 +124,17 @@ impl BroadcastRun for StandardBroadcastRun {
stakes.as_ref(),
)?;
inc_new_counter_debug!("streamer-broadcast-sent", num_shreds);
let broadcast_elapsed = broadcast_start.elapsed();
self.update_broadcast_stats(
duration_as_ms(&receive_elapsed),
duration_as_ms(&to_shreds_elapsed),
duration_as_ms(&insert_shreds_elapsed),
duration_as_ms(&broadcast_elapsed),
duration_as_ms(&(receive_elapsed + to_blobs_elapsed + broadcast_elapsed)),
duration_as_ms(
&(receive_elapsed + to_shreds_elapsed + insert_shreds_elapsed + broadcast_elapsed),
),
num_entries,
duration_as_ms(&to_blobs_elapsed),
num_shreds,
latest_shred_index,
);

View File

@ -694,7 +694,12 @@ impl ReplayStage {
let bank_progress = &mut progress
.entry(bank.slot())
.or_insert_with(|| ForkProgress::new(bank.last_blockhash()));
let result = Self::verify_and_process_entries(&bank, &entries, &bank_progress.last_entry);
let result = Self::verify_and_process_entries(
&bank,
&entries,
&bank_progress.last_entry,
bank_progress.num_blobs,
);
bank_progress.num_blobs += num;
if let Some(last_entry) = entries.last() {
bank_progress.last_entry = last_entry.hash;
@ -707,14 +712,16 @@ impl ReplayStage {
bank: &Bank,
entries: &[Entry],
last_entry: &Hash,
shred_index: usize,
) -> Result<()> {
if !entries.verify(last_entry) {
warn!(
"entry verification failed {} {} {} {}",
"entry verification failed {} {} {} {} {}",
entries.len(),
bank.tick_height(),
last_entry,
bank.last_blockhash()
bank.last_blockhash(),
shred_index
);
datapoint_error!(

View File

@ -23,8 +23,6 @@ lazy_static! {
{ serialized_size(&DataShredHeader::default()).unwrap() as usize };
static ref SIZE_OF_SIGNATURE: usize =
{ bincode::serialized_size(&Signature::default()).unwrap() as usize };
static ref SIZE_OF_EMPTY_VEC: usize =
{ bincode::serialized_size(&vec![0u8; 0]).unwrap() as usize };
static ref SIZE_OF_SHRED_TYPE: usize = { bincode::serialized_size(&0u8).unwrap() as usize };
}
@ -37,6 +35,69 @@ thread_local!(static PAR_THREAD_POOL: RefCell<ThreadPool> = RefCell::new(rayon::
pub const DATA_SHRED: u8 = 0b1010_0101;
pub const CODING_SHRED: u8 = 0b0101_1010;
/// This limit comes from reed solomon library, but unfortunately they don't have
/// a public constant defined for it.
const MAX_DATA_SHREDS_PER_FEC_BLOCK: u32 = 16;
/// Based on rse benchmarks, the optimal erasure config uses 16 data shreds and 4 coding shreds
pub const RECOMMENDED_FEC_RATE: f32 = 0.25;
const LAST_SHRED_IN_SLOT: u8 = 0b0000_0001;
const DATA_COMPLETE_SHRED: u8 = 0b0000_0010;
/// A common header that is present at start of every shred
#[derive(Serialize, Clone, Deserialize, Default, PartialEq, Debug)]
pub struct ShredCommonHeader {
pub signature: Signature,
pub slot: u64,
pub index: u32,
}
/// A common header that is present at start of every data shred
#[derive(Serialize, Clone, Deserialize, PartialEq, Debug)]
pub struct DataShredHeader {
pub common_header: CodingShredHeader,
pub data_header: ShredCommonHeader,
pub parent_offset: u16,
pub flags: u8,
}
/// The coding shred header has FEC information
#[derive(Serialize, Clone, Deserialize, PartialEq, Debug)]
pub struct CodingShredHeader {
pub shred_type: u8,
pub coding_header: ShredCommonHeader,
pub num_data_shreds: u16,
pub num_coding_shreds: u16,
pub position: u16,
}
impl Default for DataShredHeader {
fn default() -> Self {
DataShredHeader {
common_header: CodingShredHeader {
shred_type: DATA_SHRED,
..CodingShredHeader::default()
},
data_header: ShredCommonHeader::default(),
parent_offset: 0,
flags: 0,
}
}
}
impl Default for CodingShredHeader {
fn default() -> Self {
CodingShredHeader {
shred_type: CODING_SHRED,
coding_header: ShredCommonHeader::default(),
num_data_shreds: 0,
num_coding_shreds: 0,
position: 0,
}
}
}
#[derive(Clone, Debug, PartialEq)]
pub struct Shred {
pub headers: DataShredHeader,
@ -180,69 +241,6 @@ impl Shred {
}
}
/// This limit comes from reed solomon library, but unfortunately they don't have
/// a public constant defined for it.
const MAX_DATA_SHREDS_PER_FEC_BLOCK: u32 = 16;
/// Based on rse benchmarks, the optimal erasure config uses 16 data shreds and 4 coding shreds
pub const RECOMMENDED_FEC_RATE: f32 = 0.25;
const LAST_SHRED_IN_SLOT: u8 = 0b0000_0001;
const DATA_COMPLETE_SHRED: u8 = 0b0000_0010;
/// A common header that is present at start of every shred
#[derive(Serialize, Clone, Deserialize, Default, PartialEq, Debug)]
pub struct ShredCommonHeader {
pub signature: Signature,
pub slot: u64,
pub index: u32,
}
/// A common header that is present at start of every data shred
#[derive(Serialize, Clone, Deserialize, PartialEq, Debug)]
pub struct DataShredHeader {
pub common_header: CodingShredHeader,
pub data_header: ShredCommonHeader,
pub parent_offset: u16,
pub flags: u8,
}
/// The coding shred header has FEC information
#[derive(Serialize, Clone, Deserialize, PartialEq, Debug)]
pub struct CodingShredHeader {
pub shred_type: u8,
pub coding_header: ShredCommonHeader,
pub num_data_shreds: u16,
pub num_coding_shreds: u16,
pub position: u16,
}
impl Default for DataShredHeader {
fn default() -> Self {
DataShredHeader {
common_header: CodingShredHeader {
shred_type: DATA_SHRED,
..CodingShredHeader::default()
},
data_header: ShredCommonHeader::default(),
parent_offset: 0,
flags: 0,
}
}
}
impl Default for CodingShredHeader {
fn default() -> Self {
CodingShredHeader {
shred_type: CODING_SHRED,
coding_header: ShredCommonHeader::default(),
num_data_shreds: 0,
num_coding_shreds: 0,
position: 0,
}
}
}
#[derive(Debug)]
pub struct Shredder {
slot: u64,