solana/src/streamer.rs

393 lines
12 KiB
Rust
Raw Normal View History

use std::sync::{Arc, Mutex, RwLock};
2018-03-22 13:05:23 -07:00
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc;
2018-03-14 11:02:38 -07:00
use std::fmt;
use std::time::Duration;
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, UdpSocket};
use std::thread::{spawn, JoinHandle};
use result::{Error, Result};
const BLOCK_SIZE: usize = 1024 * 8;
2018-03-24 18:01:40 -07:00
pub const PACKET_SIZE: usize = 256;
#[derive(Clone, Default)]
pub struct Meta {
pub size: usize,
pub addr: [u16; 8],
pub port: u16,
pub v6: bool,
}
2018-03-24 18:01:40 -07:00
#[derive(Clone)]
pub struct Packet {
pub data: [u8; PACKET_SIZE],
pub meta: Meta,
}
2018-03-14 11:02:38 -07:00
impl fmt::Debug for Packet {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
2018-03-14 11:14:40 -07:00
write!(
f,
"Packet {{ size: {:?}, addr: {:?} }}",
2018-03-24 18:01:40 -07:00
self.meta.size,
self.meta.get_addr()
2018-03-14 11:14:40 -07:00
)
2018-03-14 11:02:38 -07:00
}
}
impl Default for Packet {
fn default() -> Packet {
Packet {
data: [0u8; PACKET_SIZE],
2018-03-24 18:01:40 -07:00
meta: Meta::default(),
}
}
}
2018-03-24 18:01:40 -07:00
impl Meta {
pub fn get_addr(&self) -> SocketAddr {
if !self.v6 {
let ipv4 = Ipv4Addr::new(
self.addr[0] as u8,
self.addr[1] as u8,
self.addr[2] as u8,
self.addr[3] as u8,
);
SocketAddr::new(IpAddr::V4(ipv4), self.port)
} else {
let ipv6 = Ipv6Addr::new(
self.addr[0],
self.addr[1],
self.addr[2],
self.addr[3],
self.addr[4],
self.addr[5],
self.addr[6],
self.addr[7],
);
SocketAddr::new(IpAddr::V6(ipv6), self.port)
}
}
pub fn set_addr(&mut self, a: &SocketAddr) {
2018-03-22 13:31:58 -07:00
match *a {
SocketAddr::V4(v4) => {
let ip = v4.ip().octets();
2018-03-22 13:50:24 -07:00
self.addr[0] = u16::from(ip[0]);
self.addr[1] = u16::from(ip[1]);
self.addr[2] = u16::from(ip[2]);
self.addr[3] = u16::from(ip[3]);
self.port = a.port();
}
2018-03-22 13:31:58 -07:00
SocketAddr::V6(v6) => {
self.addr = v6.ip().segments();
self.port = a.port();
self.v6 = true;
}
}
}
}
2018-03-22 13:50:24 -07:00
#[derive(Clone, Debug, Default)]
pub struct PacketData {
pub packets: Vec<Packet>,
}
pub type SharedPacketData = Arc<RwLock<PacketData>>;
pub type Recycler = Arc<Mutex<Vec<SharedPacketData>>>;
pub type Receiver = mpsc::Receiver<SharedPacketData>;
pub type Sender = mpsc::Sender<SharedPacketData>;
impl PacketData {
pub fn new() -> PacketData {
PacketData {
packets: vec![Packet::default(); BLOCK_SIZE],
}
}
fn run_read_from(&mut self, socket: &UdpSocket) -> Result<usize> {
self.packets.resize(BLOCK_SIZE, Packet::default());
let mut i = 0;
2018-03-22 13:31:58 -07:00
for p in &mut self.packets {
2018-03-24 18:01:40 -07:00
p.meta.size = 0;
match socket.recv_from(&mut p.data) {
Err(_) if i > 0 => {
trace!("got {:?} messages", i);
break;
}
Err(e) => {
info!("recv_from err {:?}", e);
return Err(Error::IO(e));
}
Ok((nrecv, from)) => {
2018-03-24 18:01:40 -07:00
p.meta.size = nrecv;
p.meta.set_addr(&from);
if i == 0 {
socket.set_nonblocking(true)?;
}
}
}
i += 1;
}
2018-03-11 09:45:17 -07:00
Ok(i)
}
fn read_from(&mut self, socket: &UdpSocket) -> Result<()> {
let sz = self.run_read_from(socket)?;
self.packets.resize(sz, Packet::default());
2018-03-11 09:45:17 -07:00
Ok(())
}
fn send_to(&self, socket: &UdpSocket, num: &mut usize) -> Result<()> {
2018-03-22 13:31:58 -07:00
for p in &self.packets {
2018-03-24 18:01:40 -07:00
let a = p.meta.get_addr();
socket.send_to(&p.data[..p.meta.size], &a)?;
//TODO(anatoly): wtf do we do about errors?
*num += 1;
}
2018-03-11 09:45:17 -07:00
Ok(())
}
}
2018-03-22 13:31:58 -07:00
pub fn allocate(recycler: &Recycler) -> SharedPacketData {
let mut gc = recycler.lock().expect("lock");
gc.pop()
.unwrap_or_else(|| Arc::new(RwLock::new(PacketData::new())))
}
2018-03-22 13:31:58 -07:00
pub fn recycle(recycler: &Recycler, msgs: SharedPacketData) {
let mut gc = recycler.lock().expect("lock");
gc.push(msgs);
}
fn recv_loop(
sock: &UdpSocket,
2018-03-22 13:31:58 -07:00
exit: &Arc<AtomicBool>,
recycler: &Recycler,
channel: &Sender,
) -> Result<()> {
loop {
2018-03-22 13:31:58 -07:00
let msgs = allocate(recycler);
let msgs_ = msgs.clone();
loop {
2018-03-22 13:31:58 -07:00
match msgs.write().unwrap().read_from(sock) {
Ok(()) => {
channel.send(msgs_)?;
break;
}
Err(_) => {
2018-03-22 13:05:23 -07:00
if exit.load(Ordering::Relaxed) {
2018-03-22 13:31:58 -07:00
recycle(recycler, msgs_);
return Ok(());
}
}
}
}
}
}
pub fn receiver(
sock: UdpSocket,
2018-03-22 13:05:23 -07:00
exit: Arc<AtomicBool>,
recycler: Recycler,
channel: Sender,
) -> Result<JoinHandle<()>> {
let timer = Duration::new(1, 0);
sock.set_read_timeout(Some(timer))?;
Ok(spawn(move || {
2018-03-22 13:31:58 -07:00
let _ = recv_loop(&sock, &exit, &recycler, &channel);
()
}))
}
2018-03-22 13:31:58 -07:00
fn recv_send(sock: &UdpSocket, recycler: &Recycler, r: &Receiver) -> Result<()> {
let timer = Duration::new(1, 0);
let msgs = r.recv_timeout(timer)?;
let msgs_ = msgs.clone();
let mut num = 0;
msgs.read().unwrap().send_to(sock, &mut num)?;
recycle(recycler, msgs_);
Ok(())
}
pub fn sender(
sock: UdpSocket,
2018-03-22 13:05:23 -07:00
exit: Arc<AtomicBool>,
recycler: Recycler,
r: Receiver,
) -> JoinHandle<()> {
spawn(move || loop {
2018-03-22 13:31:58 -07:00
if recv_send(&sock, &recycler, &r).is_err() && exit.load(Ordering::Relaxed) {
break;
}
})
}
#[cfg(all(feature = "unstable", test))]
mod bench {
extern crate test;
use self::test::Bencher;
use std::thread::sleep;
use std::sync::{Arc, Mutex};
use std::net::{SocketAddr, UdpSocket};
use std::time::Duration;
use std::time::SystemTime;
use std::thread::{spawn, JoinHandle};
use std::sync::mpsc::channel;
use result::Result;
use streamer::{allocate, receiver, recycle, Packet, Receiver, Recycler, PACKET_SIZE};
2018-03-22 13:31:58 -07:00
fn producer(addr: &SocketAddr, recycler: &Recycler, exit: Arc<AtomicBool>) -> JoinHandle<()> {
let send = UdpSocket::bind("0.0.0.0:0").unwrap();
2018-03-22 13:31:58 -07:00
let msgs = allocate(recycler);
msgs.write().unwrap().packets.resize(10, Packet::default());
for w in msgs.write().unwrap().packets.iter_mut() {
w.size = PACKET_SIZE;
w.set_addr(&addr);
}
spawn(move || loop {
2018-03-22 13:05:23 -07:00
if exit.load(Ordering::Relaxed) {
return;
}
let mut num = 0;
msgs.read().unwrap().send_to(&send, &mut num).unwrap();
assert_eq!(num, 10);
})
}
fn sinc(
2018-03-22 13:31:58 -07:00
recycler: &Recycler,
2018-03-22 13:05:23 -07:00
exit: Arc<AtomicBool>,
rvs: Arc<Mutex<usize>>,
r: Receiver,
) -> JoinHandle<()> {
spawn(move || loop {
2018-03-22 13:05:23 -07:00
if exit.load(Ordering::Relaxed) {
return;
}
let timer = Duration::new(1, 0);
match r.recv_timeout(timer) {
Ok(msgs) => {
let msgs_ = msgs.clone();
*rvs.lock().unwrap() += msgs.read().unwrap().packets.len();
2018-03-22 13:31:58 -07:00
recycle(recycler, msgs_);
}
_ => (),
}
})
}
fn run_streamer_bench() -> Result<()> {
let read = UdpSocket::bind("127.0.0.1:0")?;
let addr = read.local_addr()?;
2018-03-22 13:05:23 -07:00
let exit = Arc::new(AtomicBool::new(false));
let recycler = Arc::new(Mutex::new(Vec::new()));
let (s_reader, r_reader) = channel();
2018-03-22 13:31:58 -07:00
let t_reader = receiver(read, exit.clone(), &recycler, s_reader)?;
let t_producer1 = producer(&addr, &recycler, exit.clone());
let t_producer2 = producer(&addr, &recycler, exit.clone());
let t_producer3 = producer(&addr, &recycler, exit.clone());
let rvs = Arc::new(Mutex::new(0));
let t_sinc = sinc(recycler.clone(), exit.clone(), rvs.clone(), r_reader);
let start = SystemTime::now();
let start_val = *rvs.lock().unwrap();
sleep(Duration::new(5, 0));
let elapsed = start.elapsed().unwrap();
let end_val = *rvs.lock().unwrap();
let time = elapsed.as_secs() * 10000000000 + elapsed.subsec_nanos() as u64;
let ftime = (time as f64) / 10000000000f64;
let fcount = (end_val - start_val) as f64;
println!("performance: {:?}", fcount / ftime);
2018-03-22 13:05:23 -07:00
exit.store(true, Ordering::Relaxed);
t_reader.join()?;
t_producer1.join()?;
t_producer2.join()?;
t_producer3.join()?;
t_sinc.join()?;
Ok(())
}
#[bench]
pub fn streamer_bench(_bench: &mut Bencher) {
run_streamer_bench().unwrap();
}
}
#[cfg(test)]
mod test {
use std::sync::{Arc, Mutex};
use std::net::UdpSocket;
use std::time::Duration;
2018-03-22 13:05:23 -07:00
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::channel;
use std::io::Write;
use std::io;
use streamer::{allocate, receiver, sender, Packet, Receiver, PACKET_SIZE};
fn get_msgs(r: Receiver, num: &mut usize) {
2018-03-14 11:02:38 -07:00
for _t in 0..5 {
let timer = Duration::new(1, 0);
match r.recv_timeout(timer) {
Ok(m) => *num += m.read().unwrap().packets.len(),
2018-03-14 11:02:38 -07:00
e => println!("error {:?}", e),
}
if *num == 10 {
break;
}
}
}
2018-03-11 09:22:21 -07:00
#[cfg(ipv6)]
#[test]
2018-03-11 09:22:21 -07:00
pub fn streamer_send_test_ipv6() {
let read = UdpSocket::bind("[::1]:0").expect("bind");
let addr = read.local_addr().unwrap();
let send = UdpSocket::bind("[::1]:0").expect("bind");
let exit = Arc::new(Mutex::new(false));
let recycler = Arc::new(Mutex::new(Vec::new()));
let (s_reader, r_reader) = channel();
let t_receiver = receiver(read, exit.clone(), recycler.clone(), s_reader).unwrap();
let (s_sender, r_sender) = channel();
let t_sender = sender(send, exit.clone(), recycler.clone(), r_sender);
2018-03-22 13:31:58 -07:00
let msgs = allocate(&recycler);
msgs.write().unwrap().packets.resize(10, Packet::default());
for (i, w) in msgs.write().unwrap().packets.iter_mut().enumerate() {
w.data[0] = i as u8;
w.size = PACKET_SIZE;
w.set_addr(&addr);
assert_eq!(w.get_addr(), addr);
}
s_sender.send(msgs).expect("send");
let mut num = 0;
get_msgs(r_reader, &mut num);
assert_eq!(num, 10);
2018-03-22 13:05:23 -07:00
exit.store(true, Ordering::Relaxed);
t_receiver.join().expect("join");
t_sender.join().expect("join");
}
2018-03-14 11:28:05 -07:00
#[test]
pub fn streamer_debug() {
write!(io::sink(), "{:?}", Packet::default()).unwrap();
}
2018-03-11 09:22:21 -07:00
#[test]
pub fn streamer_send_test() {
let read = UdpSocket::bind("127.0.0.1:0").expect("bind");
let addr = read.local_addr().unwrap();
let send = UdpSocket::bind("127.0.0.1:0").expect("bind");
2018-03-22 13:05:23 -07:00
let exit = Arc::new(AtomicBool::new(false));
2018-03-11 09:22:21 -07:00
let recycler = Arc::new(Mutex::new(Vec::new()));
let (s_reader, r_reader) = channel();
let t_receiver = receiver(read, exit.clone(), recycler.clone(), s_reader).unwrap();
let (s_sender, r_sender) = channel();
let t_sender = sender(send, exit.clone(), recycler.clone(), r_sender);
2018-03-22 13:31:58 -07:00
let msgs = allocate(&recycler);
2018-03-11 09:22:21 -07:00
msgs.write().unwrap().packets.resize(10, Packet::default());
for (i, w) in msgs.write().unwrap().packets.iter_mut().enumerate() {
w.data[0] = i as u8;
2018-03-24 18:01:40 -07:00
w.meta.size = PACKET_SIZE;
w.meta.set_addr(&addr);
assert_eq!(w.meta.get_addr(), addr);
2018-03-11 09:22:21 -07:00
}
s_sender.send(msgs).expect("send");
let mut num = 0;
get_msgs(r_reader, &mut num);
assert_eq!(num, 10);
2018-03-22 13:05:23 -07:00
exit.store(true, Ordering::Relaxed);
2018-03-11 09:22:21 -07:00
t_receiver.join().expect("join");
t_sender.join().expect("join");
}
}