From 453f5ce8f2d36d1fd18db9490e99524e35d9f440 Mon Sep 17 00:00:00 2001 From: sakridge Date: Fri, 20 Mar 2020 07:49:48 -0700 Subject: [PATCH] Shred filter (#8975) Thread bank_forks into shred fetch --- archiver-lib/src/archiver.rs | 1 + core/src/shred_fetch_stage.rs | 236 +++++++++++++++++++++++++++++++--- core/src/tvu.rs | 1 + 3 files changed, 218 insertions(+), 20 deletions(-) diff --git a/archiver-lib/src/archiver.rs b/archiver-lib/src/archiver.rs index caa59d5cb7..c7cd1cca92 100644 --- a/archiver-lib/src/archiver.rs +++ b/archiver-lib/src/archiver.rs @@ -235,6 +235,7 @@ impl Archiver { shred_forward_sockets, repair_socket.clone(), &shred_fetch_sender, + None, &exit, ); let (slot_sender, slot_receiver) = channel(); diff --git a/core/src/shred_fetch_stage.rs b/core/src/shred_fetch_stage.rs index ef21d65196..fb7e23d597 100644 --- a/core/src/shred_fetch_stage.rs +++ b/core/src/shred_fetch_stage.rs @@ -1,43 +1,127 @@ //! The `shred_fetch_stage` pulls shreds from UDP sockets and sends it to a channel. +use bv::BitVec; +use solana_ledger::bank_forks::BankForks; use solana_ledger::blockstore::MAX_DATA_SHREDS_PER_SLOT; -use solana_ledger::shred::{OFFSET_OF_SHRED_INDEX, SIZE_OF_SHRED_INDEX}; +use solana_ledger::shred::{ + OFFSET_OF_SHRED_INDEX, OFFSET_OF_SHRED_SLOT, SIZE_OF_SHRED_INDEX, SIZE_OF_SHRED_SLOT, +}; use solana_perf::cuda_runtime::PinnedVec; use solana_perf::packet::{limited_deserialize, Packet, PacketsRecycler}; use solana_perf::recycler::Recycler; +use solana_sdk::clock::Slot; use solana_streamer::streamer::{self, PacketReceiver, PacketSender}; +use std::collections::HashMap; use std::net::UdpSocket; use std::sync::atomic::AtomicBool; use std::sync::mpsc::channel; use std::sync::Arc; +use std::sync::RwLock; use std::thread::{self, Builder, JoinHandle}; +use std::time::Instant; + +pub type ShredsReceived = HashMap>; pub struct ShredFetchStage { thread_hdls: Vec>, } impl ShredFetchStage { - // updates packets received on a channel and sends them on another channel - fn modify_packets(recvr: PacketReceiver, sendr: PacketSender, modify: F) - where - F: Fn(&mut Packet), - { - while let Some(mut p) = recvr.iter().next() { - let index_start = OFFSET_OF_SHRED_INDEX; - let index_end = index_start + SIZE_OF_SHRED_INDEX; - p.packets.iter_mut().for_each(|p| { - p.meta.discard = true; - if index_end <= p.meta.size { - if let Ok(index) = limited_deserialize::(&p.data[index_start..index_end]) { - if index < MAX_DATA_SHREDS_PER_SLOT as u32 { - p.meta.discard = false; - modify(p); - } else { - inc_new_counter_warn!("shred_fetch_stage-shred_index_overrun", 1); - } + fn get_slot_index(p: &Packet, index_overrun: &mut usize) -> Option<(u64, u32)> { + let index_start = OFFSET_OF_SHRED_INDEX; + let index_end = index_start + SIZE_OF_SHRED_INDEX; + let slot_start = OFFSET_OF_SHRED_SLOT; + let slot_end = slot_start + SIZE_OF_SHRED_SLOT; + + if index_end <= p.meta.size { + if let Ok(index) = limited_deserialize::(&p.data[index_start..index_end]) { + if index < MAX_DATA_SHREDS_PER_SLOT as u32 && slot_end <= p.meta.size { + if let Ok(slot) = limited_deserialize::(&p.data[slot_start..slot_end]) { + return Some((slot, index)); } } + } + } else { + *index_overrun += 1; + } + None + } + + fn process_packet( + p: &mut Packet, + shreds_received: &mut ShredsReceived, + index_overrun: &mut usize, + last_root: Slot, + last_slot: Slot, + slots_per_epoch: u64, + modify: &F, + ) where + F: Fn(&mut Packet), + { + p.meta.discard = true; + if let Some((slot, index)) = Self::get_slot_index(p, index_overrun) { + info!("slot {} index {}", slot, index); + if slot > last_root && slot < (last_slot + 2 * slots_per_epoch) { + info!("slot > {}", last_root); + // Shred filter + let slot_received = shreds_received + .entry(slot) + .or_insert_with(|| BitVec::new_fill(false, MAX_DATA_SHREDS_PER_SLOT as u64)); + if !slot_received.get(index.into()) { + p.meta.discard = false; + modify(p); + slot_received.set(index.into(), true); + } + } + } + } + + // updates packets received on a channel and sends them on another channel + fn modify_packets( + recvr: PacketReceiver, + sendr: PacketSender, + bank_forks: Option>>, + modify: F, + ) where + F: Fn(&mut Packet), + { + let mut shreds_received = ShredsReceived::default(); + let mut last_cleared = Instant::now(); + + // In the case of bank_forks=None, setup to accept any slot range + let mut last_root = 0; + let mut last_slot = std::u64::MAX; + let mut slots_per_epoch = 0; + + while let Some(mut p) = recvr.iter().next() { + if last_cleared.elapsed().as_millis() > 200 { + shreds_received.clear(); + last_cleared = Instant::now(); + if let Some(bank_forks) = bank_forks.as_ref() { + let bank_forks_r = bank_forks.read().unwrap(); + last_root = bank_forks_r.root(); + let working_bank = bank_forks_r.working_bank(); + last_slot = working_bank.slot(); + let root_bank = bank_forks_r.root_bank(); + slots_per_epoch = root_bank.get_slots_in_epoch(root_bank.epoch()); + } + } + let mut index_overrun = 0; + let mut shred_count = 0; + p.packets.iter_mut().for_each(|mut packet| { + shred_count += 1; + Self::process_packet( + &mut packet, + &mut shreds_received, + &mut index_overrun, + last_root, + last_slot, + slots_per_epoch, + &modify, + ); }); + inc_new_counter_warn!("shred_fetch_stage-shred_index_overrun", index_overrun); + inc_new_counter_info!("shred_fetch_stage-shred_count", shred_count); if sendr.send(p).is_err() { break; } @@ -49,6 +133,7 @@ impl ShredFetchStage { exit: &Arc, sender: PacketSender, recycler: Recycler>, + bank_forks: Option>>, modify: F, ) -> (Vec>, JoinHandle<()>) where @@ -70,7 +155,7 @@ impl ShredFetchStage { let modifier_hdl = Builder::new() .name("solana-tvu-fetch-stage-packet-modifier".to_string()) - .spawn(|| Self::modify_packets(packet_receiver, sender, modify)) + .spawn(move || Self::modify_packets(packet_receiver, sender, bank_forks, modify)) .unwrap(); (streamers, modifier_hdl) } @@ -80,6 +165,7 @@ impl ShredFetchStage { forward_sockets: Vec>, repair_socket: Arc, sender: &PacketSender, + bank_forks: Option>>, exit: &Arc, ) -> Self { let recycler: PacketsRecycler = Recycler::warmed(100, 1024); @@ -99,6 +185,7 @@ impl ShredFetchStage { &exit, sender.clone(), recycler.clone(), + bank_forks.clone(), |p| p.meta.forward = true, ); @@ -107,6 +194,7 @@ impl ShredFetchStage { &exit, sender.clone(), recycler.clone(), + bank_forks, |p| p.meta.repair = true, ); @@ -127,3 +215,111 @@ impl ShredFetchStage { Ok(()) } } + +#[cfg(test)] +mod tests { + use super::*; + use solana_ledger::shred::Shred; + + #[test] + fn test_shred_filter() { + solana_logger::setup(); + let mut shreds_received = ShredsReceived::default(); + let mut packet = Packet::default(); + let mut index_overrun = 0; + let last_root = 0; + let last_slot = 100; + let slots_per_epoch = 10; + // packet size is 0, so cannot get index + ShredFetchStage::process_packet( + &mut packet, + &mut shreds_received, + &mut index_overrun, + last_root, + last_slot, + slots_per_epoch, + &|_p| {}, + ); + assert_eq!(index_overrun, 1); + assert!(packet.meta.discard); + let shred = Shred::new_from_data(1, 3, 0, None, true, true, 0, 0, 0); + shred.copy_to_packet(&mut packet); + + // rejected slot is 1, root is 3 + ShredFetchStage::process_packet( + &mut packet, + &mut shreds_received, + &mut index_overrun, + 3, + last_slot, + slots_per_epoch, + &|_p| {}, + ); + assert!(packet.meta.discard); + + // Accepted for 1,3 + ShredFetchStage::process_packet( + &mut packet, + &mut shreds_received, + &mut index_overrun, + last_root, + last_slot, + slots_per_epoch, + &|_p| {}, + ); + assert!(!packet.meta.discard); + + // shreds_received should filter duplicate + ShredFetchStage::process_packet( + &mut packet, + &mut shreds_received, + &mut index_overrun, + last_root, + last_slot, + slots_per_epoch, + &|_p| {}, + ); + assert!(packet.meta.discard); + + let shred = Shred::new_from_data(1_000_000, 3, 0, None, true, true, 0, 0, 0); + shred.copy_to_packet(&mut packet); + + // Slot 1 million is too high + ShredFetchStage::process_packet( + &mut packet, + &mut shreds_received, + &mut index_overrun, + last_root, + last_slot, + slots_per_epoch, + &|_p| {}, + ); + assert!(packet.meta.discard); + + let index = MAX_DATA_SHREDS_PER_SLOT as u32; + let shred = Shred::new_from_data(5, index, 0, None, true, true, 0, 0, 0); + shred.copy_to_packet(&mut packet); + ShredFetchStage::process_packet( + &mut packet, + &mut shreds_received, + &mut index_overrun, + last_root, + last_slot, + slots_per_epoch, + &|_p| {}, + ); + assert!(packet.meta.discard); + } + + #[test] + fn test_shred_offsets() { + let shred = Shred::new_from_data(1, 3, 0, None, true, true, 0, 0, 0); + let mut packet = Packet::default(); + shred.copy_to_packet(&mut packet); + let mut index_overrun = 0; + assert_eq!( + Some((1, 3)), + ShredFetchStage::get_slot_index(&packet, &mut index_overrun) + ); + } +} diff --git a/core/src/tvu.rs b/core/src/tvu.rs index 3df71576e1..3436128ce4 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -128,6 +128,7 @@ impl Tvu { forward_sockets, repair_socket.clone(), &fetch_sender, + Some(bank_forks.clone()), &exit, );