2019-11-13 20:10:38 -08:00
|
|
|
use clap::{crate_description, crate_name, App, Arg};
|
2019-11-04 20:13:43 -08:00
|
|
|
use solana_core::blob::BLOB_SIZE;
|
|
|
|
use solana_core::packet::{Packet, Packets, PacketsRecycler, PACKET_DATA_SIZE};
|
2019-08-21 10:23:33 -07:00
|
|
|
use solana_core::result::Result;
|
|
|
|
use solana_core::streamer::{receiver, PacketReceiver};
|
2018-09-06 14:13:40 -07:00
|
|
|
use std::cmp::max;
|
|
|
|
use std::net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket};
|
2018-07-20 10:09:26 -07:00
|
|
|
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
|
2018-07-10 19:33:16 -07:00
|
|
|
use std::sync::mpsc::channel;
|
2018-07-20 10:09:26 -07:00
|
|
|
use std::sync::Arc;
|
2018-07-10 19:33:16 -07:00
|
|
|
use std::thread::sleep;
|
|
|
|
use std::thread::{spawn, JoinHandle};
|
|
|
|
use std::time::Duration;
|
|
|
|
use std::time::SystemTime;
|
|
|
|
|
2018-09-26 09:50:12 -07:00
|
|
|
fn producer(addr: &SocketAddr, exit: Arc<AtomicBool>) -> JoinHandle<()> {
|
2018-07-10 19:33:16 -07:00
|
|
|
let send = UdpSocket::bind("0.0.0.0:0").unwrap();
|
2019-04-17 18:15:50 -07:00
|
|
|
let mut msgs = Packets::default();
|
|
|
|
msgs.packets.resize(10, Packet::default());
|
2019-06-27 00:32:32 -07:00
|
|
|
for w in msgs.packets.iter_mut() {
|
2018-07-10 19:33:16 -07:00
|
|
|
w.meta.size = PACKET_DATA_SIZE;
|
|
|
|
w.meta.set_addr(&addr);
|
|
|
|
}
|
2019-05-20 09:15:00 -07:00
|
|
|
let msgs = Arc::new(msgs);
|
2018-07-10 19:33:16 -07:00
|
|
|
spawn(move || loop {
|
|
|
|
if exit.load(Ordering::Relaxed) {
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
let mut num = 0;
|
2019-05-20 09:15:00 -07:00
|
|
|
for p in &msgs.packets {
|
2018-07-10 19:33:16 -07:00
|
|
|
let a = p.meta.addr();
|
|
|
|
assert!(p.meta.size < BLOB_SIZE);
|
|
|
|
send.send_to(&p.data[..p.meta.size], &a).unwrap();
|
|
|
|
num += 1;
|
|
|
|
}
|
|
|
|
assert_eq!(num, 10);
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
2018-09-20 15:28:45 -07:00
|
|
|
fn sink(exit: Arc<AtomicBool>, rvs: Arc<AtomicUsize>, r: PacketReceiver) -> JoinHandle<()> {
|
2018-07-10 19:33:16 -07:00
|
|
|
spawn(move || loop {
|
|
|
|
if exit.load(Ordering::Relaxed) {
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
let timer = Duration::new(1, 0);
|
2018-07-20 10:09:26 -07:00
|
|
|
if let Ok(msgs) = r.recv_timeout(timer) {
|
2019-04-17 18:15:50 -07:00
|
|
|
rvs.fetch_add(msgs.packets.len(), Ordering::Relaxed);
|
2018-07-10 19:33:16 -07:00
|
|
|
}
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
2018-07-19 13:05:36 -07:00
|
|
|
fn main() -> Result<()> {
|
2018-09-06 14:13:40 -07:00
|
|
|
let mut num_sockets = 1usize;
|
|
|
|
|
2019-03-13 20:54:30 -07:00
|
|
|
let matches = App::new(crate_name!())
|
|
|
|
.about(crate_description!())
|
2019-11-13 20:10:38 -08:00
|
|
|
.version(solana_clap_utils::version!())
|
2018-09-06 14:13:40 -07:00
|
|
|
.arg(
|
|
|
|
Arg::with_name("num-recv-sockets")
|
|
|
|
.long("num-recv-sockets")
|
|
|
|
.value_name("NUM")
|
|
|
|
.takes_value(true)
|
|
|
|
.help("Use NUM receive sockets"),
|
2018-12-07 19:01:28 -08:00
|
|
|
)
|
|
|
|
.get_matches();
|
2018-09-06 14:13:40 -07:00
|
|
|
|
|
|
|
if let Some(n) = matches.value_of("num-recv-sockets") {
|
|
|
|
num_sockets = max(num_sockets, n.to_string().parse().expect("integer"));
|
|
|
|
}
|
|
|
|
|
|
|
|
let mut port = 0;
|
|
|
|
let mut addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0);
|
2018-07-10 19:33:16 -07:00
|
|
|
|
|
|
|
let exit = Arc::new(AtomicBool::new(false));
|
|
|
|
|
2018-09-06 14:13:40 -07:00
|
|
|
let mut read_channels = Vec::new();
|
|
|
|
let mut read_threads = Vec::new();
|
2019-06-27 00:32:32 -07:00
|
|
|
let recycler = PacketsRecycler::default();
|
2018-09-06 14:13:40 -07:00
|
|
|
for _ in 0..num_sockets {
|
2019-11-12 12:37:13 -08:00
|
|
|
let read = solana_net_utils::bind_to(port, false).unwrap();
|
2018-09-06 14:13:40 -07:00
|
|
|
read.set_read_timeout(Some(Duration::new(1, 0))).unwrap();
|
|
|
|
|
|
|
|
addr = read.local_addr().unwrap();
|
|
|
|
port = addr.port();
|
|
|
|
|
|
|
|
let (s_reader, r_reader) = channel();
|
|
|
|
read_channels.push(r_reader);
|
2019-06-27 00:32:32 -07:00
|
|
|
read_threads.push(receiver(
|
|
|
|
Arc::new(read),
|
|
|
|
&exit,
|
|
|
|
s_reader,
|
|
|
|
recycler.clone(),
|
|
|
|
"bench-streamer-test",
|
|
|
|
));
|
2018-09-06 14:13:40 -07:00
|
|
|
}
|
|
|
|
|
2018-09-26 09:50:12 -07:00
|
|
|
let t_producer1 = producer(&addr, exit.clone());
|
|
|
|
let t_producer2 = producer(&addr, exit.clone());
|
|
|
|
let t_producer3 = producer(&addr, exit.clone());
|
2018-07-10 19:33:16 -07:00
|
|
|
|
2018-07-20 10:09:26 -07:00
|
|
|
let rvs = Arc::new(AtomicUsize::new(0));
|
2018-09-06 14:13:40 -07:00
|
|
|
let sink_threads: Vec<_> = read_channels
|
|
|
|
.into_iter()
|
2018-09-20 15:28:45 -07:00
|
|
|
.map(|r_reader| sink(exit.clone(), rvs.clone(), r_reader))
|
2018-09-06 14:13:40 -07:00
|
|
|
.collect();
|
2018-07-10 19:33:16 -07:00
|
|
|
let start = SystemTime::now();
|
2018-07-20 10:09:26 -07:00
|
|
|
let start_val = rvs.load(Ordering::Relaxed);
|
2018-07-10 19:33:16 -07:00
|
|
|
sleep(Duration::new(5, 0));
|
|
|
|
let elapsed = start.elapsed().unwrap();
|
2018-07-20 10:09:26 -07:00
|
|
|
let end_val = rvs.load(Ordering::Relaxed);
|
|
|
|
let time = elapsed.as_secs() * 10_000_000_000 + u64::from(elapsed.subsec_nanos());
|
|
|
|
let ftime = (time as f64) / 10_000_000_000_f64;
|
2018-07-10 19:33:16 -07:00
|
|
|
let fcount = (end_val - start_val) as f64;
|
2018-07-19 13:05:36 -07:00
|
|
|
println!("performance: {:?}", fcount / ftime);
|
2018-07-10 19:33:16 -07:00
|
|
|
exit.store(true, Ordering::Relaxed);
|
2018-09-06 14:13:40 -07:00
|
|
|
for t_reader in read_threads {
|
|
|
|
t_reader.join()?;
|
|
|
|
}
|
2018-07-10 19:33:16 -07:00
|
|
|
t_producer1.join()?;
|
|
|
|
t_producer2.join()?;
|
|
|
|
t_producer3.join()?;
|
2018-09-06 14:13:40 -07:00
|
|
|
for t_sink in sink_threads {
|
|
|
|
t_sink.join()?;
|
|
|
|
}
|
2018-07-10 19:33:16 -07:00
|
|
|
Ok(())
|
|
|
|
}
|