2018-03-26 21:07:11 -07:00
|
|
|
use packet::{Blob, BlobRecycler, PacketRecycler, SharedBlob, SharedPackets, NUM_BLOBS};
|
|
|
|
use result::Result;
|
|
|
|
use std::collections::VecDeque;
|
|
|
|
use std::net::UdpSocket;
|
2018-03-26 21:03:26 -07:00
|
|
|
use std::sync::atomic::{AtomicBool, Ordering};
|
|
|
|
use std::sync::mpsc;
|
2018-04-11 19:24:14 -07:00
|
|
|
use std::sync::Arc;
|
2018-04-02 19:32:58 -07:00
|
|
|
use std::thread::{spawn, JoinHandle};
|
2018-03-26 21:07:11 -07:00
|
|
|
use std::time::Duration;
|
2018-03-07 13:47:13 -08:00
|
|
|
|
2018-04-02 19:32:58 -07:00
|
|
|
pub type PacketReceiver = mpsc::Receiver<SharedPackets>;
|
|
|
|
pub type PacketSender = mpsc::Sender<SharedPackets>;
|
|
|
|
pub type BlobSender = mpsc::Sender<VecDeque<SharedBlob>>;
|
|
|
|
pub type BlobReceiver = mpsc::Receiver<VecDeque<SharedBlob>>;
|
2018-03-07 13:47:13 -08:00
|
|
|
|
|
|
|
fn recv_loop(
|
|
|
|
sock: &UdpSocket,
|
2018-03-22 13:31:58 -07:00
|
|
|
exit: &Arc<AtomicBool>,
|
2018-04-02 19:32:58 -07:00
|
|
|
re: &PacketRecycler,
|
|
|
|
channel: &PacketSender,
|
2018-03-07 13:47:13 -08:00
|
|
|
) -> Result<()> {
|
|
|
|
loop {
|
2018-04-02 19:32:58 -07:00
|
|
|
let msgs = re.allocate();
|
2018-03-07 13:47:13 -08:00
|
|
|
let msgs_ = msgs.clone();
|
|
|
|
loop {
|
2018-04-02 19:32:58 -07:00
|
|
|
match msgs.write().unwrap().recv_from(sock) {
|
2018-03-07 13:47:13 -08:00
|
|
|
Ok(()) => {
|
|
|
|
channel.send(msgs_)?;
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
Err(_) => {
|
2018-03-22 13:05:23 -07:00
|
|
|
if exit.load(Ordering::Relaxed) {
|
2018-04-02 19:32:58 -07:00
|
|
|
re.recycle(msgs_);
|
2018-03-07 13:47:13 -08:00
|
|
|
return Ok(());
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn receiver(
|
|
|
|
sock: UdpSocket,
|
2018-03-22 13:05:23 -07:00
|
|
|
exit: Arc<AtomicBool>,
|
2018-03-24 23:31:54 -07:00
|
|
|
recycler: PacketRecycler,
|
2018-04-02 19:32:58 -07:00
|
|
|
channel: PacketSender,
|
2018-03-07 13:47:13 -08:00
|
|
|
) -> Result<JoinHandle<()>> {
|
|
|
|
let timer = Duration::new(1, 0);
|
|
|
|
sock.set_read_timeout(Some(timer))?;
|
|
|
|
Ok(spawn(move || {
|
2018-03-22 13:31:58 -07:00
|
|
|
let _ = recv_loop(&sock, &exit, &recycler, &channel);
|
2018-03-07 13:47:13 -08:00
|
|
|
()
|
|
|
|
}))
|
|
|
|
}
|
|
|
|
|
2018-04-02 19:32:58 -07:00
|
|
|
fn recv_send(sock: &UdpSocket, recycler: &BlobRecycler, r: &BlobReceiver) -> Result<()> {
|
2018-03-07 13:47:13 -08:00
|
|
|
let timer = Duration::new(1, 0);
|
2018-04-02 19:32:58 -07:00
|
|
|
let mut msgs = r.recv_timeout(timer)?;
|
|
|
|
Blob::send_to(recycler, sock, &mut msgs)?;
|
2018-03-07 13:47:13 -08:00
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
2018-03-24 23:31:54 -07:00
|
|
|
pub fn responder(
|
2018-03-07 13:47:13 -08:00
|
|
|
sock: UdpSocket,
|
2018-03-22 13:05:23 -07:00
|
|
|
exit: Arc<AtomicBool>,
|
2018-04-02 19:32:58 -07:00
|
|
|
recycler: BlobRecycler,
|
|
|
|
r: BlobReceiver,
|
2018-03-07 13:47:13 -08:00
|
|
|
) -> JoinHandle<()> {
|
|
|
|
spawn(move || loop {
|
2018-03-26 21:07:11 -07:00
|
|
|
if recv_send(&sock, &recycler, &r).is_err() && exit.load(Ordering::Relaxed) {
|
2018-03-07 13:47:13 -08:00
|
|
|
break;
|
|
|
|
}
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
2018-04-02 19:32:58 -07:00
|
|
|
//TODO, we would need to stick block authentication before we create the
|
|
|
|
//window.
|
|
|
|
fn recv_window(
|
|
|
|
window: &mut Vec<Option<SharedBlob>>,
|
|
|
|
recycler: &BlobRecycler,
|
|
|
|
consumed: &mut usize,
|
|
|
|
socket: &UdpSocket,
|
|
|
|
s: &BlobSender,
|
|
|
|
) -> Result<()> {
|
|
|
|
let mut dq = Blob::recv_from(recycler, socket)?;
|
|
|
|
while let Some(b) = dq.pop_front() {
|
|
|
|
let b_ = b.clone();
|
|
|
|
let mut p = b.write().unwrap();
|
|
|
|
let pix = p.get_index()? as usize;
|
|
|
|
let w = pix % NUM_BLOBS;
|
|
|
|
//TODO, after the block are authenticated
|
|
|
|
//if we get different blocks at the same index
|
|
|
|
//that is a network failure/attack
|
|
|
|
{
|
|
|
|
if window[w].is_none() {
|
|
|
|
window[w] = Some(b_);
|
|
|
|
} else {
|
|
|
|
debug!("duplicate blob at index {:}", w);
|
|
|
|
}
|
|
|
|
//send a contiguous set of blocks
|
|
|
|
let mut dq = VecDeque::new();
|
|
|
|
loop {
|
|
|
|
let k = *consumed % NUM_BLOBS;
|
|
|
|
if window[k].is_none() {
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
dq.push_back(window[k].clone().unwrap());
|
|
|
|
window[k] = None;
|
|
|
|
*consumed += 1;
|
|
|
|
}
|
|
|
|
if !dq.is_empty() {
|
|
|
|
s.send(dq)?;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn window(
|
|
|
|
sock: UdpSocket,
|
|
|
|
exit: Arc<AtomicBool>,
|
|
|
|
r: BlobRecycler,
|
|
|
|
s: BlobSender,
|
|
|
|
) -> JoinHandle<()> {
|
|
|
|
spawn(move || {
|
|
|
|
let mut window = vec![None; NUM_BLOBS];
|
|
|
|
let mut consumed = 0;
|
|
|
|
let timer = Duration::new(1, 0);
|
|
|
|
sock.set_read_timeout(Some(timer)).unwrap();
|
|
|
|
loop {
|
|
|
|
if recv_window(&mut window, &r, &mut consumed, &sock, &s).is_err()
|
|
|
|
|| exit.load(Ordering::Relaxed)
|
|
|
|
{
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
2018-03-19 16:09:47 -07:00
|
|
|
#[cfg(all(feature = "unstable", test))]
|
|
|
|
mod bench {
|
|
|
|
extern crate test;
|
|
|
|
use self::test::Bencher;
|
2018-03-26 21:07:11 -07:00
|
|
|
use packet::{Packet, PacketRecycler, PACKET_DATA_SIZE};
|
2018-03-26 21:03:26 -07:00
|
|
|
use result::Result;
|
2018-03-07 13:47:13 -08:00
|
|
|
use std::net::{SocketAddr, UdpSocket};
|
2018-03-26 21:07:11 -07:00
|
|
|
use std::sync::atomic::{AtomicBool, Ordering};
|
|
|
|
use std::sync::mpsc::channel;
|
2018-03-26 21:03:26 -07:00
|
|
|
use std::sync::{Arc, Mutex};
|
|
|
|
use std::thread::sleep;
|
|
|
|
use std::thread::{spawn, JoinHandle};
|
2018-03-07 13:47:13 -08:00
|
|
|
use std::time::Duration;
|
|
|
|
use std::time::SystemTime;
|
2018-04-02 19:32:58 -07:00
|
|
|
use streamer::{receiver, PacketReceiver};
|
2018-03-07 13:47:13 -08:00
|
|
|
|
2018-03-25 15:37:00 -07:00
|
|
|
fn producer(
|
|
|
|
addr: &SocketAddr,
|
|
|
|
recycler: PacketRecycler,
|
|
|
|
exit: Arc<AtomicBool>,
|
|
|
|
) -> JoinHandle<()> {
|
2018-03-07 13:47:13 -08:00
|
|
|
let send = UdpSocket::bind("0.0.0.0:0").unwrap();
|
2018-04-02 19:32:58 -07:00
|
|
|
let msgs = recycler.allocate();
|
2018-03-25 15:37:00 -07:00
|
|
|
let msgs_ = msgs.clone();
|
2018-03-07 13:47:13 -08:00
|
|
|
msgs.write().unwrap().packets.resize(10, Packet::default());
|
|
|
|
for w in msgs.write().unwrap().packets.iter_mut() {
|
2018-03-26 21:07:11 -07:00
|
|
|
w.meta.size = PACKET_DATA_SIZE;
|
2018-03-25 15:37:00 -07:00
|
|
|
w.meta.set_addr(&addr);
|
2018-03-07 13:47:13 -08:00
|
|
|
}
|
|
|
|
spawn(move || loop {
|
2018-03-22 13:05:23 -07:00
|
|
|
if exit.load(Ordering::Relaxed) {
|
2018-03-07 13:47:13 -08:00
|
|
|
return;
|
|
|
|
}
|
|
|
|
let mut num = 0;
|
2018-03-25 15:37:00 -07:00
|
|
|
for p in msgs_.read().unwrap().packets.iter() {
|
2018-04-02 19:32:58 -07:00
|
|
|
let a = p.meta.addr();
|
2018-03-25 15:37:00 -07:00
|
|
|
send.send_to(&p.data[..p.meta.size], &a).unwrap();
|
|
|
|
num += 1;
|
|
|
|
}
|
2018-03-07 13:47:13 -08:00
|
|
|
assert_eq!(num, 10);
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
2018-03-29 11:20:54 -07:00
|
|
|
fn sink(
|
2018-03-25 15:37:00 -07:00
|
|
|
recycler: PacketRecycler,
|
2018-03-22 13:05:23 -07:00
|
|
|
exit: Arc<AtomicBool>,
|
2018-03-07 13:47:13 -08:00
|
|
|
rvs: Arc<Mutex<usize>>,
|
2018-04-02 19:32:58 -07:00
|
|
|
r: PacketReceiver,
|
2018-03-07 13:47:13 -08:00
|
|
|
) -> JoinHandle<()> {
|
|
|
|
spawn(move || loop {
|
2018-03-22 13:05:23 -07:00
|
|
|
if exit.load(Ordering::Relaxed) {
|
2018-03-07 13:47:13 -08:00
|
|
|
return;
|
|
|
|
}
|
|
|
|
let timer = Duration::new(1, 0);
|
|
|
|
match r.recv_timeout(timer) {
|
|
|
|
Ok(msgs) => {
|
|
|
|
let msgs_ = msgs.clone();
|
|
|
|
*rvs.lock().unwrap() += msgs.read().unwrap().packets.len();
|
2018-04-02 19:32:58 -07:00
|
|
|
recycler.recycle(msgs_);
|
2018-03-07 13:47:13 -08:00
|
|
|
}
|
|
|
|
_ => (),
|
|
|
|
}
|
|
|
|
})
|
|
|
|
}
|
|
|
|
fn run_streamer_bench() -> Result<()> {
|
|
|
|
let read = UdpSocket::bind("127.0.0.1:0")?;
|
|
|
|
let addr = read.local_addr()?;
|
2018-03-22 13:05:23 -07:00
|
|
|
let exit = Arc::new(AtomicBool::new(false));
|
2018-04-02 19:32:58 -07:00
|
|
|
let pack_recycler = PacketRecycler::default();
|
2018-03-07 13:47:13 -08:00
|
|
|
|
|
|
|
let (s_reader, r_reader) = channel();
|
2018-04-02 19:32:58 -07:00
|
|
|
let t_reader = receiver(read, exit.clone(), pack_recycler.clone(), s_reader)?;
|
|
|
|
let t_producer1 = producer(&addr, pack_recycler.clone(), exit.clone());
|
|
|
|
let t_producer2 = producer(&addr, pack_recycler.clone(), exit.clone());
|
|
|
|
let t_producer3 = producer(&addr, pack_recycler.clone(), exit.clone());
|
2018-03-07 13:47:13 -08:00
|
|
|
|
|
|
|
let rvs = Arc::new(Mutex::new(0));
|
2018-04-02 19:32:58 -07:00
|
|
|
let t_sink = sink(pack_recycler.clone(), exit.clone(), rvs.clone(), r_reader);
|
2018-03-07 13:47:13 -08:00
|
|
|
|
|
|
|
let start = SystemTime::now();
|
|
|
|
let start_val = *rvs.lock().unwrap();
|
|
|
|
sleep(Duration::new(5, 0));
|
|
|
|
let elapsed = start.elapsed().unwrap();
|
|
|
|
let end_val = *rvs.lock().unwrap();
|
|
|
|
let time = elapsed.as_secs() * 10000000000 + elapsed.subsec_nanos() as u64;
|
|
|
|
let ftime = (time as f64) / 10000000000f64;
|
|
|
|
let fcount = (end_val - start_val) as f64;
|
|
|
|
println!("performance: {:?}", fcount / ftime);
|
2018-03-22 13:05:23 -07:00
|
|
|
exit.store(true, Ordering::Relaxed);
|
2018-03-07 13:47:13 -08:00
|
|
|
t_reader.join()?;
|
|
|
|
t_producer1.join()?;
|
|
|
|
t_producer2.join()?;
|
|
|
|
t_producer3.join()?;
|
2018-03-29 11:20:54 -07:00
|
|
|
t_sink.join()?;
|
2018-03-07 13:47:13 -08:00
|
|
|
Ok(())
|
|
|
|
}
|
2018-03-19 16:09:47 -07:00
|
|
|
#[bench]
|
|
|
|
pub fn streamer_bench(_bench: &mut Bencher) {
|
2018-03-07 13:47:13 -08:00
|
|
|
run_streamer_bench().unwrap();
|
|
|
|
}
|
2018-03-19 16:09:47 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
#[cfg(test)]
|
|
|
|
mod test {
|
2018-03-26 21:07:11 -07:00
|
|
|
use packet::{Blob, BlobRecycler, Packet, PacketRecycler, Packets, PACKET_DATA_SIZE};
|
|
|
|
use std::collections::VecDeque;
|
|
|
|
use std::io;
|
|
|
|
use std::io::Write;
|
2018-03-19 16:09:47 -07:00
|
|
|
use std::net::UdpSocket;
|
2018-03-22 13:05:23 -07:00
|
|
|
use std::sync::atomic::{AtomicBool, Ordering};
|
2018-03-19 16:09:47 -07:00
|
|
|
use std::sync::mpsc::channel;
|
2018-04-11 19:24:14 -07:00
|
|
|
use std::sync::Arc;
|
2018-03-26 21:03:26 -07:00
|
|
|
use std::time::Duration;
|
2018-04-02 19:32:58 -07:00
|
|
|
use streamer::{receiver, responder, window, BlobReceiver, PacketReceiver};
|
2018-03-07 13:47:13 -08:00
|
|
|
|
2018-04-02 19:32:58 -07:00
|
|
|
fn get_msgs(r: PacketReceiver, num: &mut usize) {
|
2018-03-14 11:02:38 -07:00
|
|
|
for _t in 0..5 {
|
2018-03-07 13:47:13 -08:00
|
|
|
let timer = Duration::new(1, 0);
|
|
|
|
match r.recv_timeout(timer) {
|
|
|
|
Ok(m) => *num += m.read().unwrap().packets.len(),
|
2018-03-14 11:02:38 -07:00
|
|
|
e => println!("error {:?}", e),
|
2018-03-07 13:47:13 -08:00
|
|
|
}
|
|
|
|
if *num == 10 {
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
#[test]
|
2018-04-02 19:32:58 -07:00
|
|
|
pub fn streamer_debug() {
|
|
|
|
write!(io::sink(), "{:?}", Packet::default()).unwrap();
|
|
|
|
write!(io::sink(), "{:?}", Packets::default()).unwrap();
|
|
|
|
write!(io::sink(), "{:?}", Blob::default()).unwrap();
|
|
|
|
}
|
|
|
|
#[test]
|
|
|
|
pub fn streamer_send_test() {
|
|
|
|
let read = UdpSocket::bind("127.0.0.1:0").expect("bind");
|
2018-03-07 13:47:13 -08:00
|
|
|
let addr = read.local_addr().unwrap();
|
2018-04-02 19:32:58 -07:00
|
|
|
let send = UdpSocket::bind("127.0.0.1:0").expect("bind");
|
|
|
|
let exit = Arc::new(AtomicBool::new(false));
|
|
|
|
let pack_recycler = PacketRecycler::default();
|
|
|
|
let resp_recycler = BlobRecycler::default();
|
2018-03-07 13:47:13 -08:00
|
|
|
let (s_reader, r_reader) = channel();
|
2018-04-02 19:32:58 -07:00
|
|
|
let t_receiver = receiver(read, exit.clone(), pack_recycler.clone(), s_reader).unwrap();
|
2018-03-24 23:31:54 -07:00
|
|
|
let (s_responder, r_responder) = channel();
|
2018-04-02 19:32:58 -07:00
|
|
|
let t_responder = responder(send, exit.clone(), resp_recycler.clone(), r_responder);
|
|
|
|
let mut msgs = VecDeque::new();
|
|
|
|
for i in 0..10 {
|
|
|
|
let b = resp_recycler.allocate();
|
|
|
|
let b_ = b.clone();
|
|
|
|
let mut w = b.write().unwrap();
|
2018-03-07 13:47:13 -08:00
|
|
|
w.data[0] = i as u8;
|
2018-03-26 21:07:11 -07:00
|
|
|
w.meta.size = PACKET_DATA_SIZE;
|
2018-04-02 19:32:58 -07:00
|
|
|
w.meta.set_addr(&addr);
|
|
|
|
msgs.push_back(b_);
|
2018-03-07 13:47:13 -08:00
|
|
|
}
|
2018-03-24 23:31:54 -07:00
|
|
|
s_responder.send(msgs).expect("send");
|
2018-03-07 13:47:13 -08:00
|
|
|
let mut num = 0;
|
|
|
|
get_msgs(r_reader, &mut num);
|
|
|
|
assert_eq!(num, 10);
|
2018-03-22 13:05:23 -07:00
|
|
|
exit.store(true, Ordering::Relaxed);
|
2018-03-07 13:47:13 -08:00
|
|
|
t_receiver.join().expect("join");
|
2018-03-24 23:31:54 -07:00
|
|
|
t_responder.join().expect("join");
|
2018-03-07 13:47:13 -08:00
|
|
|
}
|
2018-04-02 19:32:58 -07:00
|
|
|
|
|
|
|
fn get_blobs(r: BlobReceiver, num: &mut usize) {
|
|
|
|
for _t in 0..5 {
|
|
|
|
let timer = Duration::new(1, 0);
|
|
|
|
match r.recv_timeout(timer) {
|
|
|
|
Ok(m) => {
|
|
|
|
for (i, v) in m.iter().enumerate() {
|
|
|
|
assert_eq!(v.read().unwrap().get_index().unwrap() as usize, *num + i);
|
|
|
|
}
|
|
|
|
*num += m.len();
|
|
|
|
}
|
|
|
|
e => println!("error {:?}", e),
|
|
|
|
}
|
|
|
|
if *num == 10 {
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
2018-03-14 11:28:05 -07:00
|
|
|
}
|
2018-04-02 19:32:58 -07:00
|
|
|
|
2018-03-11 09:22:21 -07:00
|
|
|
#[test]
|
2018-04-02 19:32:58 -07:00
|
|
|
pub fn window_send_test() {
|
2018-03-11 09:22:21 -07:00
|
|
|
let read = UdpSocket::bind("127.0.0.1:0").expect("bind");
|
|
|
|
let addr = read.local_addr().unwrap();
|
|
|
|
let send = UdpSocket::bind("127.0.0.1:0").expect("bind");
|
2018-03-22 13:05:23 -07:00
|
|
|
let exit = Arc::new(AtomicBool::new(false));
|
2018-04-02 19:32:58 -07:00
|
|
|
let resp_recycler = BlobRecycler::default();
|
2018-03-11 09:22:21 -07:00
|
|
|
let (s_reader, r_reader) = channel();
|
2018-04-02 19:32:58 -07:00
|
|
|
let t_receiver = window(read, exit.clone(), resp_recycler.clone(), s_reader);
|
2018-03-24 23:31:54 -07:00
|
|
|
let (s_responder, r_responder) = channel();
|
|
|
|
let t_responder = responder(send, exit.clone(), resp_recycler.clone(), r_responder);
|
2018-04-02 19:32:58 -07:00
|
|
|
let mut msgs = VecDeque::new();
|
|
|
|
for v in 0..10 {
|
|
|
|
let i = 9 - v;
|
|
|
|
let b = resp_recycler.allocate();
|
|
|
|
let b_ = b.clone();
|
|
|
|
let mut w = b.write().unwrap();
|
|
|
|
w.set_index(i).unwrap();
|
|
|
|
assert_eq!(i, w.get_index().unwrap());
|
2018-03-26 21:07:11 -07:00
|
|
|
w.meta.size = PACKET_DATA_SIZE;
|
2018-03-24 18:01:40 -07:00
|
|
|
w.meta.set_addr(&addr);
|
2018-04-02 19:32:58 -07:00
|
|
|
msgs.push_back(b_);
|
2018-03-11 09:22:21 -07:00
|
|
|
}
|
2018-03-24 23:31:54 -07:00
|
|
|
s_responder.send(msgs).expect("send");
|
2018-03-11 09:22:21 -07:00
|
|
|
let mut num = 0;
|
2018-04-02 19:32:58 -07:00
|
|
|
get_blobs(r_reader, &mut num);
|
2018-03-11 09:22:21 -07:00
|
|
|
assert_eq!(num, 10);
|
2018-03-22 13:05:23 -07:00
|
|
|
exit.store(true, Ordering::Relaxed);
|
2018-03-11 09:22:21 -07:00
|
|
|
t_receiver.join().expect("join");
|
2018-03-24 23:31:54 -07:00
|
|
|
t_responder.join().expect("join");
|
2018-03-11 09:22:21 -07:00
|
|
|
}
|
2018-03-07 13:47:13 -08:00
|
|
|
}
|