Shred filter (#14030)
This commit is contained in:
parent
201637b326
commit
5c95d8e963
|
@ -7,14 +7,18 @@ use log::*;
|
||||||
use solana_core::cluster_info::{ClusterInfo, Node};
|
use solana_core::cluster_info::{ClusterInfo, Node};
|
||||||
use solana_core::contact_info::ContactInfo;
|
use solana_core::contact_info::ContactInfo;
|
||||||
use solana_core::retransmit_stage::retransmitter;
|
use solana_core::retransmit_stage::retransmitter;
|
||||||
|
use solana_ledger::entry::Entry;
|
||||||
use solana_ledger::genesis_utils::{create_genesis_config, GenesisConfigInfo};
|
use solana_ledger::genesis_utils::{create_genesis_config, GenesisConfigInfo};
|
||||||
use solana_ledger::leader_schedule_cache::LeaderScheduleCache;
|
use solana_ledger::leader_schedule_cache::LeaderScheduleCache;
|
||||||
|
use solana_ledger::shred::Shredder;
|
||||||
use solana_measure::measure::Measure;
|
use solana_measure::measure::Measure;
|
||||||
use solana_perf::packet::to_packets_chunked;
|
use solana_perf::packet::{Packet, Packets};
|
||||||
use solana_perf::test_tx::test_tx;
|
|
||||||
use solana_runtime::bank::Bank;
|
use solana_runtime::bank::Bank;
|
||||||
use solana_runtime::bank_forks::BankForks;
|
use solana_runtime::bank_forks::BankForks;
|
||||||
|
use solana_sdk::hash::Hash;
|
||||||
use solana_sdk::pubkey;
|
use solana_sdk::pubkey;
|
||||||
|
use solana_sdk::signature::{Keypair, Signer};
|
||||||
|
use solana_sdk::system_transaction;
|
||||||
use solana_sdk::timing::timestamp;
|
use solana_sdk::timing::timestamp;
|
||||||
use std::net::UdpSocket;
|
use std::net::UdpSocket;
|
||||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||||
|
@ -63,14 +67,24 @@ fn bench_retransmitter(bencher: &mut Bencher) {
|
||||||
let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank));
|
let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank));
|
||||||
|
|
||||||
// To work reliably with higher values, this needs larger udp rmem size
|
// To work reliably with higher values, this needs larger udp rmem size
|
||||||
let tx = test_tx();
|
let entries: Vec<_> = (0..5)
|
||||||
const NUM_PACKETS: usize = 50;
|
.map(|_| {
|
||||||
let chunk_size = NUM_PACKETS / (4 * NUM_THREADS);
|
let keypair0 = Keypair::new();
|
||||||
let batches = to_packets_chunked(
|
let keypair1 = Keypair::new();
|
||||||
&std::iter::repeat(tx).take(NUM_PACKETS).collect::<Vec<_>>(),
|
let tx0 =
|
||||||
chunk_size,
|
system_transaction::transfer(&keypair0, &keypair1.pubkey(), 1, Hash::default());
|
||||||
);
|
Entry::new(&Hash::default(), 1, vec![tx0])
|
||||||
info!("batches: {}", batches.len());
|
})
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
let keypair = Arc::new(Keypair::new());
|
||||||
|
let slot = 0;
|
||||||
|
let parent = 0;
|
||||||
|
let shredder =
|
||||||
|
Shredder::new(slot, parent, 0.0, keypair, 0, 0).expect("Failed to create entry shredder");
|
||||||
|
let mut data_shreds = shredder.entries_to_shreds(&entries, true, 0).0;
|
||||||
|
|
||||||
|
let num_packets = data_shreds.len();
|
||||||
|
|
||||||
let retransmitter_handles = retransmitter(
|
let retransmitter_handles = retransmitter(
|
||||||
Arc::new(sockets),
|
Arc::new(sockets),
|
||||||
|
@ -80,6 +94,8 @@ fn bench_retransmitter(bencher: &mut Bencher) {
|
||||||
packet_receiver,
|
packet_receiver,
|
||||||
);
|
);
|
||||||
|
|
||||||
|
let mut index = 0;
|
||||||
|
let mut slot = 0;
|
||||||
let total = Arc::new(AtomicUsize::new(0));
|
let total = Arc::new(AtomicUsize::new(0));
|
||||||
bencher.iter(move || {
|
bencher.iter(move || {
|
||||||
let peer_sockets1 = peer_sockets.clone();
|
let peer_sockets1 = peer_sockets.clone();
|
||||||
|
@ -96,7 +112,7 @@ fn bench_retransmitter(bencher: &mut Bencher) {
|
||||||
while peer_sockets2[p].recv(&mut buf).is_ok() {
|
while peer_sockets2[p].recv(&mut buf).is_ok() {
|
||||||
total2.fetch_add(1, Ordering::Relaxed);
|
total2.fetch_add(1, Ordering::Relaxed);
|
||||||
}
|
}
|
||||||
if total2.load(Ordering::Relaxed) >= NUM_PACKETS {
|
if total2.load(Ordering::Relaxed) >= num_packets {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
info!("{} recv", total2.load(Ordering::Relaxed));
|
info!("{} recv", total2.load(Ordering::Relaxed));
|
||||||
|
@ -107,9 +123,17 @@ fn bench_retransmitter(bencher: &mut Bencher) {
|
||||||
})
|
})
|
||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
for packets in batches.clone() {
|
for shred in data_shreds.iter_mut() {
|
||||||
packet_sender.send(packets).unwrap();
|
shred.set_slot(slot);
|
||||||
|
shred.set_index(index);
|
||||||
|
index += 1;
|
||||||
|
index %= 200;
|
||||||
|
let mut p = Packet::default();
|
||||||
|
shred.copy_to_packet(&mut p);
|
||||||
|
let _ = packet_sender.send(Packets::new(vec![p]));
|
||||||
}
|
}
|
||||||
|
slot += 1;
|
||||||
|
|
||||||
info!("sent...");
|
info!("sent...");
|
||||||
|
|
||||||
let mut join_time = Measure::start("join");
|
let mut join_time = Measure::start("join");
|
||||||
|
|
|
@ -1,5 +1,7 @@
|
||||||
//! The `retransmit_stage` retransmits shreds between validators
|
//! The `retransmit_stage` retransmits shreds between validators
|
||||||
|
|
||||||
|
use crate::shred_fetch_stage::ShredFetchStage;
|
||||||
|
use crate::shred_fetch_stage::ShredFetchStats;
|
||||||
use crate::{
|
use crate::{
|
||||||
cluster_info::{compute_retransmit_peers, ClusterInfo, DATA_PLANE_FANOUT},
|
cluster_info::{compute_retransmit_peers, ClusterInfo, DATA_PLANE_FANOUT},
|
||||||
cluster_info_vote_listener::VerifiedVoteReceiver,
|
cluster_info_vote_listener::VerifiedVoteReceiver,
|
||||||
|
@ -12,7 +14,10 @@ use crate::{
|
||||||
result::{Error, Result},
|
result::{Error, Result},
|
||||||
window_service::{should_retransmit_and_persist, WindowService},
|
window_service::{should_retransmit_and_persist, WindowService},
|
||||||
};
|
};
|
||||||
|
use ahash::AHasher;
|
||||||
use crossbeam_channel::Receiver;
|
use crossbeam_channel::Receiver;
|
||||||
|
use lru::LruCache;
|
||||||
|
use rand::{thread_rng, Rng};
|
||||||
use solana_ledger::{
|
use solana_ledger::{
|
||||||
blockstore::{Blockstore, CompletedSlotsReceiver},
|
blockstore::{Blockstore, CompletedSlotsReceiver},
|
||||||
leader_schedule_cache::LeaderScheduleCache,
|
leader_schedule_cache::LeaderScheduleCache,
|
||||||
|
@ -27,6 +32,7 @@ use solana_sdk::epoch_schedule::EpochSchedule;
|
||||||
use solana_sdk::pubkey::Pubkey;
|
use solana_sdk::pubkey::Pubkey;
|
||||||
use solana_sdk::timing::timestamp;
|
use solana_sdk::timing::timestamp;
|
||||||
use solana_streamer::streamer::PacketReceiver;
|
use solana_streamer::streamer::PacketReceiver;
|
||||||
|
use std::hash::Hasher;
|
||||||
use std::{
|
use std::{
|
||||||
cmp,
|
cmp,
|
||||||
collections::hash_set::HashSet,
|
collections::hash_set::HashSet,
|
||||||
|
@ -41,6 +47,9 @@ use std::{
|
||||||
time::Duration,
|
time::Duration,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
const MAX_DUPLICATE_COUNT: usize = 2;
|
||||||
|
const DEFAULT_LRU_SIZE: usize = 10_000;
|
||||||
|
|
||||||
// Limit a given thread to consume about this many packets so that
|
// Limit a given thread to consume about this many packets so that
|
||||||
// it doesn't pull up too much work.
|
// it doesn't pull up too much work.
|
||||||
const MAX_PACKET_BATCH_SIZE: usize = 100;
|
const MAX_PACKET_BATCH_SIZE: usize = 100;
|
||||||
|
@ -196,6 +205,9 @@ struct EpochStakesCache {
|
||||||
stakes_and_index: Vec<(u64, usize)>,
|
stakes_and_index: Vec<(u64, usize)>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub type ShredFilterAndSeeds = (LruCache<(Slot, u32), Vec<u64>>, u128, u128);
|
||||||
|
|
||||||
|
#[allow(clippy::too_many_arguments)]
|
||||||
fn retransmit(
|
fn retransmit(
|
||||||
bank_forks: &Arc<RwLock<BankForks>>,
|
bank_forks: &Arc<RwLock<BankForks>>,
|
||||||
leader_schedule_cache: &Arc<LeaderScheduleCache>,
|
leader_schedule_cache: &Arc<LeaderScheduleCache>,
|
||||||
|
@ -206,6 +218,7 @@ fn retransmit(
|
||||||
stats: &Arc<RetransmitStats>,
|
stats: &Arc<RetransmitStats>,
|
||||||
epoch_stakes_cache: &Arc<RwLock<EpochStakesCache>>,
|
epoch_stakes_cache: &Arc<RwLock<EpochStakesCache>>,
|
||||||
last_peer_update: &Arc<AtomicU64>,
|
last_peer_update: &Arc<AtomicU64>,
|
||||||
|
shreds_received: &Arc<Mutex<ShredFilterAndSeeds>>,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
let timer = Duration::new(1, 0);
|
let timer = Duration::new(1, 0);
|
||||||
let r_lock = r.lock().unwrap();
|
let r_lock = r.lock().unwrap();
|
||||||
|
@ -254,6 +267,12 @@ fn retransmit(
|
||||||
w_epoch_stakes_cache.stakes_and_index = stakes_and_index;
|
w_epoch_stakes_cache.stakes_and_index = stakes_and_index;
|
||||||
drop(w_epoch_stakes_cache);
|
drop(w_epoch_stakes_cache);
|
||||||
r_epoch_stakes_cache = epoch_stakes_cache.read().unwrap();
|
r_epoch_stakes_cache = epoch_stakes_cache.read().unwrap();
|
||||||
|
{
|
||||||
|
let mut sr = shreds_received.lock().unwrap();
|
||||||
|
sr.0.clear();
|
||||||
|
sr.1 = thread_rng().gen::<u128>();
|
||||||
|
sr.2 = thread_rng().gen::<u128>();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
let mut peers_len = 0;
|
let mut peers_len = 0;
|
||||||
epoch_cache_update.stop();
|
epoch_cache_update.stop();
|
||||||
|
@ -279,6 +298,33 @@ fn retransmit(
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
match ShredFetchStage::get_slot_index(packet, &mut ShredFetchStats::default()) {
|
||||||
|
Some(slot_index) => {
|
||||||
|
let mut received = shreds_received.lock().unwrap();
|
||||||
|
let seed1 = received.1;
|
||||||
|
let seed2 = received.2;
|
||||||
|
if let Some(sent) = received.0.get_mut(&slot_index) {
|
||||||
|
if sent.len() < MAX_DUPLICATE_COUNT {
|
||||||
|
let mut hasher = AHasher::new_with_keys(seed1, seed2);
|
||||||
|
hasher.write(&packet.data[0..packet.meta.size]);
|
||||||
|
let hash = hasher.finish();
|
||||||
|
if sent.contains(&hash) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
sent.push(hash);
|
||||||
|
} else {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
let mut hasher = AHasher::new_with_keys(seed1, seed2);
|
||||||
|
hasher.write(&packet.data[0..packet.meta.size]);
|
||||||
|
let hash = hasher.finish();
|
||||||
|
received.0.put(slot_index, vec![hash]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
None => continue,
|
||||||
|
}
|
||||||
let mut compute_turbine_peers = Measure::start("turbine_start");
|
let mut compute_turbine_peers = Measure::start("turbine_start");
|
||||||
let (my_index, mut shuffled_stakes_and_index) = ClusterInfo::shuffle_peers_and_index(
|
let (my_index, mut shuffled_stakes_and_index) = ClusterInfo::shuffle_peers_and_index(
|
||||||
&my_id,
|
&my_id,
|
||||||
|
@ -367,6 +413,7 @@ pub fn retransmitter(
|
||||||
r: Arc<Mutex<PacketReceiver>>,
|
r: Arc<Mutex<PacketReceiver>>,
|
||||||
) -> Vec<JoinHandle<()>> {
|
) -> Vec<JoinHandle<()>> {
|
||||||
let stats = Arc::new(RetransmitStats::default());
|
let stats = Arc::new(RetransmitStats::default());
|
||||||
|
let shreds_received = Arc::new(Mutex::new((LruCache::new(DEFAULT_LRU_SIZE), 0, 0)));
|
||||||
(0..sockets.len())
|
(0..sockets.len())
|
||||||
.map(|s| {
|
.map(|s| {
|
||||||
let sockets = sockets.clone();
|
let sockets = sockets.clone();
|
||||||
|
@ -377,6 +424,7 @@ pub fn retransmitter(
|
||||||
let stats = stats.clone();
|
let stats = stats.clone();
|
||||||
let epoch_stakes_cache = Arc::new(RwLock::new(EpochStakesCache::default()));
|
let epoch_stakes_cache = Arc::new(RwLock::new(EpochStakesCache::default()));
|
||||||
let last_peer_update = Arc::new(AtomicU64::new(0));
|
let last_peer_update = Arc::new(AtomicU64::new(0));
|
||||||
|
let shreds_received = shreds_received.clone();
|
||||||
|
|
||||||
Builder::new()
|
Builder::new()
|
||||||
.name("solana-retransmitter".to_string())
|
.name("solana-retransmitter".to_string())
|
||||||
|
@ -393,6 +441,7 @@ pub fn retransmitter(
|
||||||
&stats,
|
&stats,
|
||||||
&epoch_stakes_cache,
|
&epoch_stakes_cache,
|
||||||
&last_peer_update,
|
&last_peer_update,
|
||||||
|
&shreds_received,
|
||||||
) {
|
) {
|
||||||
match e {
|
match e {
|
||||||
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
|
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
|
||||||
|
@ -519,11 +568,12 @@ mod tests {
|
||||||
use solana_ledger::create_new_tmp_ledger;
|
use solana_ledger::create_new_tmp_ledger;
|
||||||
use solana_ledger::genesis_utils::{create_genesis_config, GenesisConfigInfo};
|
use solana_ledger::genesis_utils::{create_genesis_config, GenesisConfigInfo};
|
||||||
use solana_net_utils::find_available_port_in_range;
|
use solana_net_utils::find_available_port_in_range;
|
||||||
use solana_perf::packet::{Meta, Packet, Packets};
|
use solana_perf::packet::{Packet, Packets};
|
||||||
use std::net::{IpAddr, Ipv4Addr};
|
use std::net::{IpAddr, Ipv4Addr};
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_skip_repair() {
|
fn test_skip_repair() {
|
||||||
|
solana_logger::setup();
|
||||||
let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(123);
|
let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(123);
|
||||||
let (ledger_path, _blockhash) = create_new_tmp_ledger!(&genesis_config);
|
let (ledger_path, _blockhash) = create_new_tmp_ledger!(&genesis_config);
|
||||||
let blockstore = Blockstore::open(&ledger_path).unwrap();
|
let blockstore = Blockstore::open(&ledger_path).unwrap();
|
||||||
|
@ -565,7 +615,12 @@ mod tests {
|
||||||
);
|
);
|
||||||
let _thread_hdls = vec![t_retransmit];
|
let _thread_hdls = vec![t_retransmit];
|
||||||
|
|
||||||
let packets = Packets::new(vec![Packet::default()]);
|
let mut shred =
|
||||||
|
solana_ledger::shred::Shred::new_from_data(0, 0, 0, None, true, true, 0, 0x20, 0);
|
||||||
|
let mut packet = Packet::default();
|
||||||
|
shred.copy_to_packet(&mut packet);
|
||||||
|
|
||||||
|
let packets = Packets::new(vec![packet.clone()]);
|
||||||
// it should send this over the sockets.
|
// it should send this over the sockets.
|
||||||
retransmit_sender.send(packets).unwrap();
|
retransmit_sender.send(packets).unwrap();
|
||||||
let mut packets = Packets::new(vec![]);
|
let mut packets = Packets::new(vec![]);
|
||||||
|
@ -573,16 +628,13 @@ mod tests {
|
||||||
assert_eq!(packets.packets.len(), 1);
|
assert_eq!(packets.packets.len(), 1);
|
||||||
assert_eq!(packets.packets[0].meta.repair, false);
|
assert_eq!(packets.packets[0].meta.repair, false);
|
||||||
|
|
||||||
let repair = Packet {
|
let mut repair = packet.clone();
|
||||||
meta: Meta {
|
repair.meta.repair = true;
|
||||||
repair: true,
|
|
||||||
..Meta::default()
|
|
||||||
},
|
|
||||||
..Packet::default()
|
|
||||||
};
|
|
||||||
|
|
||||||
|
shred.set_slot(1);
|
||||||
|
shred.copy_to_packet(&mut packet);
|
||||||
// send 1 repair and 1 "regular" packet so that we don't block forever on the recv_from
|
// send 1 repair and 1 "regular" packet so that we don't block forever on the recv_from
|
||||||
let packets = Packets::new(vec![repair, Packet::default()]);
|
let packets = Packets::new(vec![repair, packet]);
|
||||||
retransmit_sender.send(packets).unwrap();
|
retransmit_sender.send(packets).unwrap();
|
||||||
let mut packets = Packets::new(vec![]);
|
let mut packets = Packets::new(vec![]);
|
||||||
solana_streamer::packet::recv_from(&mut packets, &me_retransmit, 1).unwrap();
|
solana_streamer::packet::recv_from(&mut packets, &me_retransmit, 1).unwrap();
|
||||||
|
|
|
@ -28,7 +28,7 @@ const DEFAULT_LRU_SIZE: usize = 10_000;
|
||||||
pub type ShredsReceived = LruCache<u64, ()>;
|
pub type ShredsReceived = LruCache<u64, ()>;
|
||||||
|
|
||||||
#[derive(Default)]
|
#[derive(Default)]
|
||||||
struct ShredFetchStats {
|
pub struct ShredFetchStats {
|
||||||
index_overrun: usize,
|
index_overrun: usize,
|
||||||
shred_count: usize,
|
shred_count: usize,
|
||||||
index_bad_deserialize: usize,
|
index_bad_deserialize: usize,
|
||||||
|
@ -43,7 +43,7 @@ pub struct ShredFetchStage {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ShredFetchStage {
|
impl ShredFetchStage {
|
||||||
fn get_slot_index(p: &Packet, stats: &mut ShredFetchStats) -> Option<(u64, u32)> {
|
pub fn get_slot_index(p: &Packet, stats: &mut ShredFetchStats) -> Option<(u64, u32)> {
|
||||||
let index_start = OFFSET_OF_SHRED_INDEX;
|
let index_start = OFFSET_OF_SHRED_INDEX;
|
||||||
let index_end = index_start + SIZE_OF_SHRED_INDEX;
|
let index_end = index_start + SIZE_OF_SHRED_INDEX;
|
||||||
let slot_start = OFFSET_OF_SHRED_SLOT;
|
let slot_start = OFFSET_OF_SHRED_SLOT;
|
||||||
|
|
Loading…
Reference in New Issue