removes packet modifier from shred_fetch_stage

... in favor of just passing packet flags.
This commit is contained in:
behzad nouri 2022-06-20 16:19:53 -04:00
parent 1f0f5dc03e
commit faa6c32162
1 changed files with 80 additions and 149 deletions

View File

@ -26,46 +26,15 @@ pub(crate) struct ShredFetchStage {
}
impl ShredFetchStage {
fn process_packet<F>(
packet: &mut Packet,
shreds_received: &mut ShredsReceived,
stats: &mut ShredFetchStats,
shred_version: u16,
last_root: Slot,
last_slot: Slot,
slots_per_epoch: u64,
modify: &F,
packet_hasher: &PacketHasher,
) where
F: Fn(&mut Packet),
{
// Limit shreds to 2 epochs away.
let slot_bounds = (last_root + 1)..(last_slot + 2 * slots_per_epoch);
if should_discard_packet(
packet,
slot_bounds,
shred_version,
packet_hasher,
shreds_received,
stats,
) {
packet.meta.set_discard(true);
} else {
modify(packet);
}
}
// updates packets received on a channel and sends them on another channel
fn modify_packets<F>(
fn modify_packets(
recvr: PacketBatchReceiver,
sendr: Sender<Vec<PacketBatch>>,
bank_forks: &RwLock<BankForks>,
shred_version: u16,
name: &'static str,
modify: F,
) where
F: Fn(&mut Packet),
{
flags: PacketFlags,
) {
const STATS_SUBMIT_CADENCE: Duration = Duration::from_secs(1);
let mut shreds_received = LruCache::new(DEFAULT_LRU_SIZE);
let mut last_updated = Instant::now();
@ -93,19 +62,22 @@ impl ShredFetchStage {
}
}
stats.shred_count += packet_batch.len();
packet_batch.iter_mut().for_each(|packet| {
Self::process_packet(
// Limit shreds to 2 epochs away.
let slot_bounds = (last_root + 1)..(last_slot + 2 * slots_per_epoch);
for packet in packet_batch.iter_mut() {
if should_discard_packet(
packet,
slot_bounds.clone(),
shred_version,
&packet_hasher,
&mut shreds_received,
&mut stats,
shred_version,
last_root,
last_slot,
slots_per_epoch,
&modify,
&packet_hasher,
);
});
) {
packet.meta.set_discard(true);
} else {
packet.meta.flags.insert(flags);
}
}
stats.maybe_submit(name, STATS_SUBMIT_CADENCE);
if sendr.send(vec![packet_batch]).is_err() {
break;
@ -113,7 +85,7 @@ impl ShredFetchStage {
}
}
fn packet_modifier<F>(
fn packet_modifier(
sockets: Vec<Arc<UdpSocket>>,
exit: &Arc<AtomicBool>,
sender: Sender<Vec<PacketBatch>>,
@ -121,11 +93,8 @@ impl ShredFetchStage {
bank_forks: Arc<RwLock<BankForks>>,
shred_version: u16,
name: &'static str,
modify: F,
) -> (Vec<JoinHandle<()>>, JoinHandle<()>)
where
F: Fn(&mut Packet) + Send + 'static,
{
flags: PacketFlags,
) -> (Vec<JoinHandle<()>>, JoinHandle<()>) {
let (packet_sender, packet_receiver) = unbounded();
let streamers = sockets
.into_iter()
@ -152,7 +121,7 @@ impl ShredFetchStage {
&bank_forks,
shred_version,
name,
modify,
flags,
)
})
.unwrap();
@ -178,7 +147,7 @@ impl ShredFetchStage {
bank_forks.clone(),
shred_version,
"shred_fetch",
|_| {},
PacketFlags::empty(),
);
let (tvu_forwards_threads, fwd_thread_hdl) = Self::packet_modifier(
@ -189,7 +158,7 @@ impl ShredFetchStage {
bank_forks.clone(),
shred_version,
"shred_fetch_tvu_forwards",
|p| p.meta.flags.insert(PacketFlags::FORWARDED),
PacketFlags::FORWARDED,
);
let (repair_receiver, repair_handler) = Self::packet_modifier(
@ -200,7 +169,7 @@ impl ShredFetchStage {
bank_forks,
shred_version,
"shred_fetch_repair",
|p| p.meta.flags.insert(PacketFlags::REPAIR),
PacketFlags::REPAIR,
);
tvu_threads.extend(tvu_forwards_threads.into_iter());
@ -292,36 +261,29 @@ mod tests {
let last_root = 0;
let last_slot = 100;
let slots_per_epoch = 10;
ShredFetchStage::process_packet(
&mut packet,
let slot_bounds = (last_root + 1)..(last_slot + 2 * slots_per_epoch);
assert!(!should_discard_packet(
&packet,
slot_bounds.clone(),
shred_version,
&hasher,
&mut shreds_received,
&mut stats,
shred_version,
last_root,
last_slot,
slots_per_epoch,
&|_p| {},
&hasher,
);
assert!(!packet.meta.discard());
));
let coding = solana_ledger::shred::Shredder::generate_coding_shreds(
&[shred],
false, // is_last_in_slot
3, // next_code_index
);
coding[0].copy_to_packet(&mut packet);
ShredFetchStage::process_packet(
&mut packet,
assert!(!should_discard_packet(
&packet,
slot_bounds,
shred_version,
&hasher,
&mut shreds_received,
&mut stats,
shred_version,
last_root,
last_slot,
slots_per_epoch,
&|_p| {},
&hasher,
);
assert!(!packet.meta.discard());
));
}
#[test]
@ -334,24 +296,20 @@ mod tests {
let last_slot = 100;
let slots_per_epoch = 10;
let shred_version = 59445;
let slot_bounds = (last_root + 1)..(last_slot + 2 * slots_per_epoch);
let hasher = PacketHasher::default();
// packet size is 0, so cannot get index
ShredFetchStage::process_packet(
&mut packet,
assert!(should_discard_packet(
&packet,
slot_bounds.clone(),
shred_version,
&hasher,
&mut shreds_received,
&mut stats,
shred_version,
last_root,
last_slot,
slots_per_epoch,
&|_p| {},
&hasher,
);
));
assert_eq!(stats.index_overrun, 1);
assert!(packet.meta.discard());
packet.meta.set_discard(false);
let shred = Shred::new_from_data(
1,
3,
@ -365,63 +323,46 @@ mod tests {
shred.copy_to_packet(&mut packet);
// rejected slot is 1, root is 3
ShredFetchStage::process_packet(
&mut packet,
&mut shreds_received,
&mut stats,
assert!(should_discard_packet(
&packet,
3..slot_bounds.end,
shred_version,
3,
last_slot,
slots_per_epoch,
&|_p| {},
&hasher,
);
assert!(packet.meta.discard());
packet.meta.set_discard(false);
ShredFetchStage::process_packet(
&mut packet,
&mut shreds_received,
&mut stats,
));
assert_eq!(stats.slot_out_of_range, 1);
assert!(should_discard_packet(
&packet,
slot_bounds.clone(),
345, // shred_version
last_root,
last_slot,
slots_per_epoch,
&|_p| {},
&hasher,
);
assert!(packet.meta.discard());
packet.meta.set_discard(false);
&mut shreds_received,
&mut stats,
));
assert_eq!(stats.shred_version_mismatch, 1);
// Accepted for 1,3
ShredFetchStage::process_packet(
&mut packet,
assert!(!should_discard_packet(
&packet,
slot_bounds.clone(),
shred_version,
&hasher,
&mut shreds_received,
&mut stats,
shred_version,
last_root,
last_slot,
slots_per_epoch,
&|_p| {},
&hasher,
);
assert!(!packet.meta.discard());
));
// shreds_received should filter duplicate
ShredFetchStage::process_packet(
&mut packet,
assert!(should_discard_packet(
&packet,
slot_bounds.clone(),
shred_version,
&hasher,
&mut shreds_received,
&mut stats,
shred_version,
last_root,
last_slot,
slots_per_epoch,
&|_p| {},
&hasher,
);
assert!(packet.meta.discard());
packet.meta.set_discard(false);
));
assert_eq!(stats.duplicate_shred, 1);
let shred = Shred::new_from_data(
1_000_000,
@ -436,35 +377,25 @@ mod tests {
shred.copy_to_packet(&mut packet);
// Slot 1 million is too high
ShredFetchStage::process_packet(
&mut packet,
assert!(should_discard_packet(
&packet,
slot_bounds.clone(),
shred_version,
&hasher,
&mut shreds_received,
&mut stats,
shred_version,
last_root,
last_slot,
slots_per_epoch,
&|_p| {},
&hasher,
);
assert!(packet.meta.discard());
packet.meta.set_discard(false);
));
let index = MAX_DATA_SHREDS_PER_SLOT as u32;
let shred = Shred::new_from_data(5, index, 0, &[], ShredFlags::LAST_SHRED_IN_SLOT, 0, 0, 0);
shred.copy_to_packet(&mut packet);
ShredFetchStage::process_packet(
&mut packet,
assert!(should_discard_packet(
&packet,
slot_bounds,
shred_version,
&hasher,
&mut shreds_received,
&mut stats,
shred_version,
last_root,
last_slot,
slots_per_epoch,
&|_p| {},
&hasher,
);
assert!(packet.meta.discard());
packet.meta.set_discard(false);
));
}
}