removes fec_set_offset from UnfinishedSlotInfo (#25815)
If the blockstore has shreds for a slot, it should not recreate the slot: https://github.com/solana-labs/solana/blob/ff68bf6c2/ledger/src/leader_schedule_cache.rs#L142-L146 https://github.com/solana-labs/solana/pull/15849/files#r596657314 Therefore in broadcast stage if UnfinishedSlotInfo is None, then fec_set_offset will be zero: https://github.com/solana-labs/solana/blob/ff68bf6c2/core/src/broadcast_stage/standard_broadcast_run.rs#L111-L120 As a result fec_set_offset will always be zero, and is so redundant and can be removed.
This commit is contained in:
parent
59797af390
commit
6c9f2eac78
|
@ -169,25 +169,38 @@ impl BroadcastRun for BroadcastDuplicatesRun {
|
|||
if let Some(index) = coding_shreds.iter().map(Shred::index).max() {
|
||||
self.next_code_index = index + 1;
|
||||
}
|
||||
let last_shreds = last_entries.map(|(original_last_entry, duplicate_extra_last_entries)| {
|
||||
let (original_last_data_shred, _) =
|
||||
shredder.entries_to_shreds(keypair, &[original_last_entry], true, self.next_shred_index, self.next_code_index);
|
||||
|
||||
let (partition_last_data_shred, _) =
|
||||
// Don't mark the last shred as last so that validators won't know that
|
||||
// they've gotten all the shreds, and will continue trying to repair
|
||||
shredder.entries_to_shreds(keypair, &duplicate_extra_last_entries, true, self.next_shred_index, self.next_code_index);
|
||||
|
||||
let sigs: Vec<_> = partition_last_data_shred.iter().map(|s| (s.signature(), s.index())).collect();
|
||||
let last_shreds =
|
||||
last_entries.map(|(original_last_entry, duplicate_extra_last_entries)| {
|
||||
let (original_last_data_shred, _) = shredder.entries_to_shreds(
|
||||
keypair,
|
||||
&[original_last_entry],
|
||||
true,
|
||||
self.next_shred_index,
|
||||
self.next_code_index,
|
||||
);
|
||||
// Don't mark the last shred as last so that validators won't
|
||||
// know that they've gotten all the shreds, and will continue
|
||||
// trying to repair.
|
||||
let (partition_last_data_shred, _) = shredder.entries_to_shreds(
|
||||
keypair,
|
||||
&duplicate_extra_last_entries,
|
||||
true,
|
||||
self.next_shred_index,
|
||||
self.next_code_index,
|
||||
);
|
||||
let sigs: Vec<_> = partition_last_data_shred
|
||||
.iter()
|
||||
.map(|s| (s.signature(), s.index()))
|
||||
.collect();
|
||||
info!(
|
||||
"duplicate signatures for slot {}, sigs: {:?}",
|
||||
bank.slot(),
|
||||
sigs,
|
||||
);
|
||||
|
||||
self.next_shred_index += 1;
|
||||
(original_last_data_shred, partition_last_data_shred)
|
||||
});
|
||||
self.next_shred_index += 1;
|
||||
(original_last_data_shred, partition_last_data_shred)
|
||||
});
|
||||
|
||||
let data_shreds = Arc::new(data_shreds);
|
||||
blockstore_sender.send((data_shreds.clone(), None))?;
|
||||
|
|
|
@ -28,7 +28,6 @@ pub struct UnfinishedSlotInfo {
|
|||
// Data shreds buffered to make a batch of size
|
||||
// MAX_DATA_SHREDS_PER_FEC_BLOCK.
|
||||
pub(crate) data_shreds_buffer: Vec<Shred>,
|
||||
pub(crate) fec_set_offset: u32, // See Shredder::fec_set_index.
|
||||
}
|
||||
|
||||
/// This parameter tunes how many entries are received in one iteration of recv loop
|
||||
|
|
|
@ -99,14 +99,23 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun {
|
|||
self.next_code_index = index + 1;
|
||||
}
|
||||
let last_shreds = last_entries.map(|(good_last_entry, bad_last_entry)| {
|
||||
let (good_last_data_shred, _) =
|
||||
shredder.entries_to_shreds(keypair, &[good_last_entry], true, self.next_shred_index, self.next_code_index);
|
||||
|
||||
let (bad_last_data_shred, _) =
|
||||
// Don't mark the last shred as last so that validators won't know that
|
||||
// they've gotten all the shreds, and will continue trying to repair
|
||||
shredder.entries_to_shreds(keypair, &[bad_last_entry], false, self.next_shred_index, self.next_code_index);
|
||||
|
||||
let (good_last_data_shred, _) = shredder.entries_to_shreds(
|
||||
keypair,
|
||||
&[good_last_entry],
|
||||
true,
|
||||
self.next_shred_index,
|
||||
self.next_code_index,
|
||||
);
|
||||
// Don't mark the last shred as last so that validators won't know
|
||||
// that they've gotten all the shreds, and will continue trying to
|
||||
// repair.
|
||||
let (bad_last_data_shred, _) = shredder.entries_to_shreds(
|
||||
keypair,
|
||||
&[bad_last_entry],
|
||||
false,
|
||||
self.next_shred_index,
|
||||
self.next_code_index,
|
||||
);
|
||||
self.next_shred_index += 1;
|
||||
(good_last_data_shred, bad_last_data_shred)
|
||||
});
|
||||
|
|
|
@ -70,8 +70,12 @@ impl StandardBroadcastRun {
|
|||
Some(ref mut state) => {
|
||||
let parent_offset = state.slot - state.parent;
|
||||
let reference_tick = max_ticks_in_slot & SHRED_TICK_REFERENCE_MASK;
|
||||
let fec_set_index =
|
||||
Shredder::fec_set_index(state.next_shred_index, state.fec_set_offset);
|
||||
let fec_set_offset = state
|
||||
.data_shreds_buffer
|
||||
.first()
|
||||
.map(Shred::index)
|
||||
.unwrap_or(state.next_shred_index);
|
||||
let fec_set_index = Shredder::fec_set_index(state.next_shred_index, fec_set_offset);
|
||||
let mut shred = Shred::new_from_data(
|
||||
state.slot,
|
||||
state.next_shred_index,
|
||||
|
@ -108,15 +112,22 @@ impl StandardBroadcastRun {
|
|||
process_stats: &mut ProcessShredsStats,
|
||||
) -> Vec<Shred> {
|
||||
let (slot, parent_slot) = self.current_slot_and_parent.unwrap();
|
||||
let (next_shred_index, fec_set_offset) = match &self.unfinished_slot {
|
||||
Some(state) => (state.next_shred_index, state.fec_set_offset),
|
||||
None => match blockstore.meta(slot).unwrap() {
|
||||
Some(slot_meta) => {
|
||||
let shreds_consumed = slot_meta.consumed as u32;
|
||||
(shreds_consumed, shreds_consumed)
|
||||
let next_shred_index = match &self.unfinished_slot {
|
||||
Some(state) => state.next_shred_index,
|
||||
None => {
|
||||
// If the blockstore has shreds for the slot, it should not
|
||||
// recreate the slot:
|
||||
// https://github.com/solana-labs/solana/blob/ff68bf6c2/ledger/src/leader_schedule_cache.rs#L142-L146
|
||||
if let Some(slot_meta) = blockstore.meta(slot).unwrap() {
|
||||
if slot_meta.received > 0 || slot_meta.consumed > 0 {
|
||||
process_stats.num_extant_slots += 1;
|
||||
// This is a faulty situation that should not happen.
|
||||
// Refrain from generating shreds for the slot.
|
||||
return Vec::default();
|
||||
}
|
||||
}
|
||||
None => (0, 0),
|
||||
},
|
||||
0u32
|
||||
}
|
||||
};
|
||||
let data_shreds = Shredder::new(slot, parent_slot, reference_tick, self.shred_version)
|
||||
.unwrap()
|
||||
|
@ -125,7 +136,7 @@ impl StandardBroadcastRun {
|
|||
entries,
|
||||
is_slot_end,
|
||||
next_shred_index,
|
||||
fec_set_offset,
|
||||
0, // fec_set_offset
|
||||
process_stats,
|
||||
);
|
||||
let mut data_shreds_buffer = match &mut self.unfinished_slot {
|
||||
|
@ -150,7 +161,6 @@ impl StandardBroadcastRun {
|
|||
slot,
|
||||
parent: parent_slot,
|
||||
data_shreds_buffer,
|
||||
fec_set_offset,
|
||||
});
|
||||
data_shreds
|
||||
}
|
||||
|
@ -571,7 +581,6 @@ mod test {
|
|||
slot,
|
||||
parent,
|
||||
data_shreds_buffer: Vec::default(),
|
||||
fec_set_offset: next_shred_index,
|
||||
});
|
||||
run.slot_broadcast_start = Some(Instant::now());
|
||||
|
||||
|
|
|
@ -21,6 +21,8 @@ pub struct ProcessShredsStats {
|
|||
// Number of data shreds from serializing ledger entries which do not make
|
||||
// a full batch of MAX_DATA_SHREDS_PER_FEC_BLOCK; counted in 4 buckets.
|
||||
num_residual_data_shreds: [usize; 4],
|
||||
// If the blockstore already has shreds for the broadcast slot.
|
||||
pub num_extant_slots: u64,
|
||||
}
|
||||
|
||||
#[derive(Default, Debug, Eq, PartialEq)]
|
||||
|
@ -67,6 +69,7 @@ impl ProcessShredsStats {
|
|||
("gen_coding_time", self.gen_coding_elapsed, i64),
|
||||
("sign_coding_time", self.sign_coding_elapsed, i64),
|
||||
("coding_send_time", self.coding_send_elapsed, i64),
|
||||
("num_extant_slots", self.num_extant_slots, i64),
|
||||
(
|
||||
"residual_data_shreds_08",
|
||||
self.num_residual_data_shreds[0],
|
||||
|
@ -133,6 +136,7 @@ impl AddAssign<ProcessShredsStats> for ProcessShredsStats {
|
|||
coding_send_elapsed,
|
||||
get_leader_schedule_elapsed,
|
||||
num_residual_data_shreds,
|
||||
num_extant_slots,
|
||||
} = rhs;
|
||||
self.shredding_elapsed += shredding_elapsed;
|
||||
self.receive_elapsed += receive_elapsed;
|
||||
|
@ -142,6 +146,7 @@ impl AddAssign<ProcessShredsStats> for ProcessShredsStats {
|
|||
self.sign_coding_elapsed += sign_coding_elapsed;
|
||||
self.coding_send_elapsed += coding_send_elapsed;
|
||||
self.get_leader_schedule_elapsed += get_leader_schedule_elapsed;
|
||||
self.num_extant_slots += num_extant_slots;
|
||||
for (i, bucket) in self.num_residual_data_shreds.iter_mut().enumerate() {
|
||||
*bucket += num_residual_data_shreds[i];
|
||||
}
|
||||
|
|
|
@ -83,14 +83,16 @@ impl Shredder {
|
|||
(data_shreds, coding_shreds)
|
||||
}
|
||||
|
||||
// Each FEC block has maximum MAX_DATA_SHREDS_PER_FEC_BLOCK shreds.
|
||||
// "FEC set index" is the index of first data shred in that FEC block.
|
||||
// Shred indices with the same value of:
|
||||
// (shred_index - fec_set_offset) / MAX_DATA_SHREDS_PER_FEC_BLOCK
|
||||
// belong to the same FEC set.
|
||||
pub fn fec_set_index(shred_index: u32, fec_set_offset: u32) -> Option<u32> {
|
||||
let diff = shred_index.checked_sub(fec_set_offset)?;
|
||||
Some(shred_index - diff % MAX_DATA_SHREDS_PER_FEC_BLOCK)
|
||||
/// Each FEC block has maximum MAX_DATA_SHREDS_PER_FEC_BLOCK shreds.
|
||||
/// "FEC set index" is the index of first data shred in that FEC block.
|
||||
/// **Data** shreds with the same value of:
|
||||
/// (data_shred.index() - fec_set_offset) / MAX_DATA_SHREDS_PER_FEC_BLOCK
|
||||
/// belong to the same FEC set.
|
||||
/// Coding shreds inherit their fec_set_index from the data shreds that
|
||||
/// they are generated from.
|
||||
pub fn fec_set_index(data_shred_index: u32, fec_set_offset: u32) -> Option<u32> {
|
||||
let diff = data_shred_index.checked_sub(fec_set_offset)?;
|
||||
Some(data_shred_index - diff % MAX_DATA_SHREDS_PER_FEC_BLOCK)
|
||||
}
|
||||
|
||||
pub fn entries_to_data_shreds(
|
||||
|
|
Loading…
Reference in New Issue