Finish unfininished slot before processing new slots (#6197)

This commit is contained in:
Pankaj Garg 2019-10-01 11:46:14 -07:00 committed by GitHub
parent faae122375
commit 774e9df2e5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 55 additions and 8 deletions

View File

@ -45,6 +45,7 @@ impl BroadcastRun for BroadcastFakeBlobsRun {
keypair,
latest_blob_index,
bank.parent().unwrap().slot(),
None,
);
// If the last blockhash is default, a new block is being created
@ -65,6 +66,7 @@ impl BroadcastRun for BroadcastFakeBlobsRun {
keypair,
latest_blob_index,
bank.parent().unwrap().slot(),
None,
);
// If it's the last tick, reset the last block hash to default

View File

@ -15,6 +15,13 @@ pub(super) struct ReceiveResults {
pub last_tick: u64,
}
#[derive(Copy, Clone)]
pub struct UnfinishedSlotInfo {
pub next_index: u64,
pub slot: u64,
pub parent: u64,
}
/// Theis parameter tunes how many entries are received in one iteration of recv loop
/// This will prevent broadcast stage from consuming more entries, that could have led
/// to delays in shredding, and broadcasting shreds to peer validators
@ -72,7 +79,27 @@ pub(super) fn entries_to_shreds(
keypair: &Arc<Keypair>,
latest_shred_index: u64,
parent_slot: u64,
) -> (Vec<Shred>, u64) {
last_unfinished_slot: Option<UnfinishedSlotInfo>,
) -> (Vec<Shred>, Option<UnfinishedSlotInfo>) {
let mut shreds = if let Some(unfinished_slot) = last_unfinished_slot {
if unfinished_slot.slot != slot {
let mut shredder = Shredder::new(
unfinished_slot.slot,
unfinished_slot.parent,
RECOMMENDED_FEC_RATE,
keypair,
unfinished_slot.next_index as u32,
)
.expect("Expected to create a new shredder");
shredder.finalize_slot();
shredder.shreds.drain(..).collect()
} else {
vec![]
}
} else {
vec![]
};
let mut shredder = Shredder::new(
slot,
parent_slot,
@ -85,17 +112,23 @@ pub(super) fn entries_to_shreds(
bincode::serialize_into(&mut shredder, &entries)
.expect("Expect to write all entries to shreds");
if last_tick == bank_max_tick {
let unfinished_slot = if last_tick == bank_max_tick {
shredder.finalize_slot();
None
} else {
shredder.finalize_data();
}
Some(UnfinishedSlotInfo {
next_index: u64::from(shredder.index),
slot,
parent: parent_slot,
})
};
let shred_infos: Vec<Shred> = shredder.shreds.drain(..).collect();
shreds.append(&mut shredder.shreds);
trace!("Inserting {:?} shreds in blocktree", shred_infos.len());
trace!("Inserting {:?} shreds in blocktree", shreds.len());
(shred_infos, u64::from(shredder.index))
(shreds, unfinished_slot)
}
#[cfg(test)]

View File

@ -44,6 +44,7 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun {
keypair,
latest_blob_index,
bank.parent().unwrap().slot(),
None,
);
let seeds: Vec<[u8; 32]> = shred_infos.iter().map(|s| s.seed()).collect();

View File

@ -1,6 +1,6 @@
use super::broadcast_utils;
use super::*;
use crate::broadcast_stage::broadcast_utils::entries_to_shreds;
use crate::broadcast_stage::broadcast_utils::{entries_to_shreds, UnfinishedSlotInfo};
use solana_sdk::timing::duration_as_ms;
#[derive(Default)]
@ -12,12 +12,14 @@ struct BroadcastStats {
pub(super) struct StandardBroadcastRun {
stats: BroadcastStats,
unfinished_slot: Option<UnfinishedSlotInfo>,
}
impl StandardBroadcastRun {
pub(super) fn new() -> Self {
Self {
stats: BroadcastStats::default(),
unfinished_slot: None,
}
}
@ -91,7 +93,7 @@ impl BroadcastRun for StandardBroadcastRun {
};
let to_shreds_start = Instant::now();
let (shred_infos, latest_shred_index) = entries_to_shreds(
let (shred_infos, uninished_slot) = entries_to_shreds(
receive_results.entries,
last_tick,
bank.slot(),
@ -99,8 +101,10 @@ impl BroadcastRun for StandardBroadcastRun {
keypair,
latest_shred_index,
parent_slot,
self.unfinished_slot,
);
let to_shreds_elapsed = to_shreds_start.elapsed();
self.unfinished_slot = uninished_slot;
let all_seeds: Vec<[u8; 32]> = shred_infos.iter().map(|s| s.seed()).collect();
let num_shreds = shred_infos.len();
@ -125,6 +129,13 @@ impl BroadcastRun for StandardBroadcastRun {
)?;
let broadcast_elapsed = broadcast_start.elapsed();
let latest_shred_index = uninished_slot.map(|s| s.next_index).unwrap_or_else(|| {
blocktree
.meta(bank.slot())
.expect("Database error")
.map(|meta| meta.consumed)
.unwrap_or(0)
});
self.update_broadcast_stats(
duration_as_ms(&receive_elapsed),
duration_as_ms(&to_shreds_elapsed),