Distinguish between shred type in shred fetch stage duplicate filter (#10068)
* Shred type check * Test
This commit is contained in:
parent
c4a096d8d4
commit
f562ed4cc8
|
@ -4,7 +4,8 @@ use bv::BitVec;
|
||||||
use solana_ledger::bank_forks::BankForks;
|
use solana_ledger::bank_forks::BankForks;
|
||||||
use solana_ledger::blockstore::MAX_DATA_SHREDS_PER_SLOT;
|
use solana_ledger::blockstore::MAX_DATA_SHREDS_PER_SLOT;
|
||||||
use solana_ledger::shred::{
|
use solana_ledger::shred::{
|
||||||
OFFSET_OF_SHRED_INDEX, OFFSET_OF_SHRED_SLOT, SIZE_OF_SHRED_INDEX, SIZE_OF_SHRED_SLOT,
|
CODING_SHRED, DATA_SHRED, OFFSET_OF_SHRED_INDEX, OFFSET_OF_SHRED_SLOT, OFFSET_OF_SHRED_TYPE,
|
||||||
|
SIZE_OF_SHRED_INDEX, SIZE_OF_SHRED_SLOT,
|
||||||
};
|
};
|
||||||
use solana_perf::cuda_runtime::PinnedVec;
|
use solana_perf::cuda_runtime::PinnedVec;
|
||||||
use solana_perf::packet::{limited_deserialize, Packet, PacketsRecycler};
|
use solana_perf::packet::{limited_deserialize, Packet, PacketsRecycler};
|
||||||
|
@ -20,7 +21,7 @@ use std::sync::RwLock;
|
||||||
use std::thread::{self, Builder, JoinHandle};
|
use std::thread::{self, Builder, JoinHandle};
|
||||||
use std::time::Instant;
|
use std::time::Instant;
|
||||||
|
|
||||||
pub type ShredsReceived = HashMap<Slot, BitVec<u64>>;
|
pub type ShredsReceived = HashMap<(Slot, u8), BitVec<u64>>;
|
||||||
|
|
||||||
#[derive(Default)]
|
#[derive(Default)]
|
||||||
struct ShredFetchStats {
|
struct ShredFetchStats {
|
||||||
|
@ -78,17 +79,26 @@ impl ShredFetchStage {
|
||||||
p.meta.discard = true;
|
p.meta.discard = true;
|
||||||
if let Some((slot, index)) = Self::get_slot_index(p, stats) {
|
if let Some((slot, index)) = Self::get_slot_index(p, stats) {
|
||||||
// Seems reasonable to limit shreds to 2 epochs away
|
// Seems reasonable to limit shreds to 2 epochs away
|
||||||
if slot > last_root && slot < (last_slot + 2 * slots_per_epoch) {
|
if slot > last_root
|
||||||
// Shred filter
|
&& slot < (last_slot + 2 * slots_per_epoch)
|
||||||
let slot_received = shreds_received
|
&& p.meta.size > OFFSET_OF_SHRED_TYPE
|
||||||
.entry(slot)
|
{
|
||||||
.or_insert_with(|| BitVec::new_fill(false, MAX_DATA_SHREDS_PER_SLOT as u64));
|
let shred_type = p.data[OFFSET_OF_SHRED_TYPE];
|
||||||
if !slot_received.get(index.into()) {
|
if shred_type == DATA_SHRED || shred_type == CODING_SHRED {
|
||||||
p.meta.discard = false;
|
// Shred filter
|
||||||
modify(p);
|
let slot_received =
|
||||||
slot_received.set(index.into(), true);
|
shreds_received
|
||||||
} else {
|
.entry((slot, shred_type))
|
||||||
stats.duplicate_shred += 1;
|
.or_insert_with(|| {
|
||||||
|
BitVec::new_fill(false, MAX_DATA_SHREDS_PER_SLOT as u64)
|
||||||
|
});
|
||||||
|
if !slot_received.get(index.into()) {
|
||||||
|
p.meta.discard = false;
|
||||||
|
modify(p);
|
||||||
|
slot_received.set(index.into(), true);
|
||||||
|
} else {
|
||||||
|
stats.duplicate_shred += 1;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
stats.slot_out_of_range += 1;
|
stats.slot_out_of_range += 1;
|
||||||
|
@ -258,6 +268,51 @@ mod tests {
|
||||||
use super::*;
|
use super::*;
|
||||||
use solana_ledger::shred::Shred;
|
use solana_ledger::shred::Shred;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_data_code_same_index() {
|
||||||
|
solana_logger::setup();
|
||||||
|
let mut shreds_received = ShredsReceived::default();
|
||||||
|
let mut packet = Packet::default();
|
||||||
|
let mut stats = ShredFetchStats::default();
|
||||||
|
|
||||||
|
let slot = 1;
|
||||||
|
let shred = Shred::new_from_data(slot, 3, 0, None, true, true, 0, 0, 0);
|
||||||
|
shred.copy_to_packet(&mut packet);
|
||||||
|
|
||||||
|
let last_root = 0;
|
||||||
|
let last_slot = 100;
|
||||||
|
let slots_per_epoch = 10;
|
||||||
|
ShredFetchStage::process_packet(
|
||||||
|
&mut packet,
|
||||||
|
&mut shreds_received,
|
||||||
|
&mut stats,
|
||||||
|
last_root,
|
||||||
|
last_slot,
|
||||||
|
slots_per_epoch,
|
||||||
|
&|_p| {},
|
||||||
|
);
|
||||||
|
assert!(!packet.meta.discard);
|
||||||
|
|
||||||
|
let coding = solana_ledger::shred::Shredder::generate_coding_shreds(
|
||||||
|
slot,
|
||||||
|
1.0f32,
|
||||||
|
&[shred],
|
||||||
|
10,
|
||||||
|
false,
|
||||||
|
);
|
||||||
|
coding[0].copy_to_packet(&mut packet);
|
||||||
|
ShredFetchStage::process_packet(
|
||||||
|
&mut packet,
|
||||||
|
&mut shreds_received,
|
||||||
|
&mut stats,
|
||||||
|
last_root,
|
||||||
|
last_slot,
|
||||||
|
slots_per_epoch,
|
||||||
|
&|_p| {},
|
||||||
|
);
|
||||||
|
assert!(!packet.meta.discard);
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_shred_filter() {
|
fn test_shred_filter() {
|
||||||
solana_logger::setup();
|
solana_logger::setup();
|
||||||
|
|
|
@ -41,6 +41,7 @@ pub const SIZE_OF_DATA_SHRED_PAYLOAD: usize = PACKET_DATA_SIZE
|
||||||
- SIZE_OF_DATA_SHRED_HEADER
|
- SIZE_OF_DATA_SHRED_HEADER
|
||||||
- SIZE_OF_DATA_SHRED_IGNORED_TAIL;
|
- SIZE_OF_DATA_SHRED_IGNORED_TAIL;
|
||||||
|
|
||||||
|
pub const OFFSET_OF_SHRED_TYPE: usize = SIZE_OF_SIGNATURE;
|
||||||
pub const OFFSET_OF_SHRED_SLOT: usize = SIZE_OF_SIGNATURE + SIZE_OF_SHRED_TYPE;
|
pub const OFFSET_OF_SHRED_SLOT: usize = SIZE_OF_SIGNATURE + SIZE_OF_SHRED_TYPE;
|
||||||
pub const OFFSET_OF_SHRED_INDEX: usize = OFFSET_OF_SHRED_SLOT + SIZE_OF_SHRED_SLOT;
|
pub const OFFSET_OF_SHRED_INDEX: usize = OFFSET_OF_SHRED_SLOT + SIZE_OF_SHRED_SLOT;
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue