Use retain on Packets instead of creating new ones (#5804)
* Use remove on Packets instead of creating a new one * Fix compile after rebase
This commit is contained in:
parent
3d3b03a123
commit
a452249bf3
|
@ -4,13 +4,12 @@
|
||||||
use crate::blocktree::Blocktree;
|
use crate::blocktree::Blocktree;
|
||||||
use crate::cluster_info::ClusterInfo;
|
use crate::cluster_info::ClusterInfo;
|
||||||
use crate::leader_schedule_cache::LeaderScheduleCache;
|
use crate::leader_schedule_cache::LeaderScheduleCache;
|
||||||
use crate::packet::Packets;
|
|
||||||
use crate::repair_service::{RepairService, RepairStrategy};
|
use crate::repair_service::{RepairService, RepairStrategy};
|
||||||
use crate::result::{Error, Result};
|
use crate::result::{Error, Result};
|
||||||
use crate::service::Service;
|
use crate::service::Service;
|
||||||
use crate::shred::Shred;
|
use crate::shred::Shred;
|
||||||
use crate::streamer::{PacketReceiver, PacketSender};
|
use crate::streamer::{PacketReceiver, PacketSender};
|
||||||
use rayon::iter::{ParallelBridge, ParallelIterator};
|
use rayon::iter::{IndexedParallelIterator, IntoParallelRefMutIterator, ParallelIterator};
|
||||||
use rayon::ThreadPool;
|
use rayon::ThreadPool;
|
||||||
use solana_metrics::{inc_new_counter_debug, inc_new_counter_error};
|
use solana_metrics::{inc_new_counter_debug, inc_new_counter_error};
|
||||||
use solana_runtime::bank::Bank;
|
use solana_runtime::bank::Bank;
|
||||||
|
@ -77,18 +76,18 @@ where
|
||||||
let now = Instant::now();
|
let now = Instant::now();
|
||||||
inc_new_counter_debug!("streamer-recv_window-recv", packets.packets.len());
|
inc_new_counter_debug!("streamer-recv_window-recv", packets.packets.len());
|
||||||
|
|
||||||
let (shreds, packets): (Vec<_>, Vec<_>) = thread_pool.install(|| {
|
let (shreds, packets_ix): (Vec<_>, Vec<_>) = thread_pool.install(|| {
|
||||||
packets
|
packets
|
||||||
.packets
|
.packets
|
||||||
.drain(..)
|
.par_iter_mut()
|
||||||
.par_bridge()
|
.enumerate()
|
||||||
.filter_map(|mut packet| {
|
.filter_map(|(i, packet)| {
|
||||||
if let Ok(s) = bincode::deserialize(&packet.data) {
|
if let Ok(s) = bincode::deserialize(&packet.data) {
|
||||||
let shred: Shred = s;
|
let shred: Shred = s;
|
||||||
if shred_filter(&shred, &packet.data) {
|
if shred_filter(&shred, &packet.data) {
|
||||||
packet.meta.slot = shred.slot();
|
packet.meta.slot = shred.slot();
|
||||||
packet.meta.seed = shred.seed();
|
packet.meta.seed = shred.seed();
|
||||||
Some((shred, packet))
|
Some((shred, i))
|
||||||
} else {
|
} else {
|
||||||
None
|
None
|
||||||
}
|
}
|
||||||
|
@ -98,7 +97,21 @@ where
|
||||||
})
|
})
|
||||||
.unzip()
|
.unzip()
|
||||||
});
|
});
|
||||||
let packets = Packets::new(packets);
|
// to avoid lookups into the `packets_ix` vec, this block manually tracks where we are in that vec
|
||||||
|
// and since `packets.packets.retain` and the `packets_ix` vec are both in order,
|
||||||
|
// we should be able to automatically drop any packets in the index gaps.
|
||||||
|
let mut retain_ix = 0;
|
||||||
|
let mut i = 0;
|
||||||
|
packets.packets.retain(|_| {
|
||||||
|
let retain = if !packets_ix.is_empty() && i == packets_ix[retain_ix] {
|
||||||
|
retain_ix = (packets_ix.len() - 1).min(retain_ix + 1);
|
||||||
|
true
|
||||||
|
} else {
|
||||||
|
false
|
||||||
|
};
|
||||||
|
i += 1;
|
||||||
|
retain
|
||||||
|
});
|
||||||
|
|
||||||
trace!("{:?} shreds from packets", shreds.len());
|
trace!("{:?} shreds from packets", shreds.len());
|
||||||
|
|
||||||
|
@ -258,22 +271,29 @@ impl Service for WindowService {
|
||||||
mod test {
|
mod test {
|
||||||
use super::*;
|
use super::*;
|
||||||
use crate::bank_forks::BankForks;
|
use crate::bank_forks::BankForks;
|
||||||
|
use crate::blocktree::tests::make_many_slot_entries;
|
||||||
use crate::blocktree::{get_tmp_ledger_path, Blocktree};
|
use crate::blocktree::{get_tmp_ledger_path, Blocktree};
|
||||||
use crate::cluster_info::{ClusterInfo, Node};
|
use crate::cluster_info::{ClusterInfo, Node};
|
||||||
|
use crate::contact_info::ContactInfo;
|
||||||
use crate::entry::{make_consecutive_blobs, make_tiny_test_entries, Entry};
|
use crate::entry::{make_consecutive_blobs, make_tiny_test_entries, Entry};
|
||||||
use crate::genesis_utils::create_genesis_block_with_leader;
|
use crate::genesis_utils::create_genesis_block_with_leader;
|
||||||
|
use crate::packet::{Packet, Packets};
|
||||||
use crate::recycler::Recycler;
|
use crate::recycler::Recycler;
|
||||||
|
use crate::repair_service::RepairSlotRange;
|
||||||
use crate::service::Service;
|
use crate::service::Service;
|
||||||
use crate::shred::Shredder;
|
use crate::shred::Shredder;
|
||||||
use crate::streamer::{receiver, responder};
|
use crate::streamer::{receiver, responder};
|
||||||
|
use rand::seq::SliceRandom;
|
||||||
|
use rand::thread_rng;
|
||||||
use solana_runtime::epoch_schedule::MINIMUM_SLOTS_PER_EPOCH;
|
use solana_runtime::epoch_schedule::MINIMUM_SLOTS_PER_EPOCH;
|
||||||
use solana_sdk::hash::Hash;
|
use solana_sdk::hash::Hash;
|
||||||
use solana_sdk::signature::{Keypair, KeypairUtil};
|
use solana_sdk::signature::{Keypair, KeypairUtil};
|
||||||
use std::fs::remove_dir_all;
|
use std::fs::remove_dir_all;
|
||||||
use std::net::UdpSocket;
|
use std::net::UdpSocket;
|
||||||
use std::sync::atomic::{AtomicBool, Ordering};
|
use std::sync::atomic::{AtomicBool, Ordering};
|
||||||
use std::sync::mpsc::channel;
|
use std::sync::mpsc::{channel, Receiver};
|
||||||
use std::sync::{Arc, RwLock};
|
use std::sync::{Arc, RwLock};
|
||||||
|
use std::thread::sleep;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
fn local_entries_to_shred(entries: Vec<Entry>, keypair: &Arc<Keypair>) -> Vec<Shred> {
|
fn local_entries_to_shred(entries: Vec<Entry>, keypair: &Arc<Keypair>) -> Vec<Shred> {
|
||||||
|
@ -539,4 +559,68 @@ mod test {
|
||||||
Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction");
|
Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction");
|
||||||
let _ignored = remove_dir_all(&blocktree_path);
|
let _ignored = remove_dir_all(&blocktree_path);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn make_test_window(
|
||||||
|
packet_receiver: Receiver<Packets>,
|
||||||
|
exit: Arc<AtomicBool>,
|
||||||
|
) -> WindowService {
|
||||||
|
let blocktree_path = get_tmp_ledger_path!();
|
||||||
|
let (blocktree, _, _) = Blocktree::open_with_signal(&blocktree_path)
|
||||||
|
.expect("Expected to be able to open database ledger");
|
||||||
|
|
||||||
|
let blocktree = Arc::new(blocktree);
|
||||||
|
let (retransmit_sender, _retransmit_receiver) = channel();
|
||||||
|
let cluster_info = Arc::new(RwLock::new(ClusterInfo::new_with_invalid_keypair(
|
||||||
|
ContactInfo::new_localhost(&Pubkey::default(), 0),
|
||||||
|
)));
|
||||||
|
let repair_sock = Arc::new(UdpSocket::bind(socketaddr_any!()).unwrap());
|
||||||
|
let window = WindowService::new(
|
||||||
|
blocktree,
|
||||||
|
cluster_info,
|
||||||
|
packet_receiver,
|
||||||
|
retransmit_sender,
|
||||||
|
repair_sock,
|
||||||
|
&exit,
|
||||||
|
RepairStrategy::RepairRange(RepairSlotRange { start: 0, end: 0 }),
|
||||||
|
&Arc::new(LeaderScheduleCache::default()),
|
||||||
|
|_, _, _, _| true,
|
||||||
|
);
|
||||||
|
window
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_recv_window() {
|
||||||
|
let (packet_sender, packet_receiver) = channel();
|
||||||
|
let exit = Arc::new(AtomicBool::new(false));
|
||||||
|
let window = make_test_window(packet_receiver, exit.clone());
|
||||||
|
// send 5 slots worth of data to the window
|
||||||
|
let (shreds, _) = make_many_slot_entries(0, 5, 10);
|
||||||
|
let packets: Vec<_> = shreds
|
||||||
|
.into_iter()
|
||||||
|
.map(|s| {
|
||||||
|
let mut p = Packet::default();
|
||||||
|
p.data
|
||||||
|
.copy_from_slice(&mut bincode::serialize(&s).unwrap().as_ref());
|
||||||
|
p
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
let mut packets = Packets::new(packets);
|
||||||
|
packet_sender.send(packets.clone()).unwrap();
|
||||||
|
sleep(Duration::from_millis(500));
|
||||||
|
|
||||||
|
// add some empty packets to the data set. These should fail to deserialize
|
||||||
|
packets.packets.append(&mut vec![Packet::default(); 10]);
|
||||||
|
packets.packets.shuffle(&mut thread_rng());
|
||||||
|
packet_sender.send(packets.clone()).unwrap();
|
||||||
|
sleep(Duration::from_millis(500));
|
||||||
|
|
||||||
|
// send 1 empty packet that cannot deserialize into a shred
|
||||||
|
packet_sender
|
||||||
|
.send(Packets::new(vec![Packet::default(); 1]))
|
||||||
|
.unwrap();
|
||||||
|
sleep(Duration::from_millis(500));
|
||||||
|
|
||||||
|
exit.store(true, Ordering::Relaxed);
|
||||||
|
window.join().unwrap();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue