2018-05-25 22:00:47 -07:00
|
|
|
//! The `streamer` module defines a set of services for efficiently pulling data from UDP sockets.
|
|
|
|
//!
|
2018-08-09 12:31:34 -07:00
|
|
|
use packet::{Blob, BlobRecycler, PacketRecycler, SharedBlobs, SharedPackets};
|
2018-07-05 14:50:42 -07:00
|
|
|
use result::{Error, Result};
|
2018-08-09 12:31:34 -07:00
|
|
|
use std::net::UdpSocket;
|
|
|
|
use std::sync::atomic::{AtomicBool, Ordering};
|
2018-07-05 14:50:42 -07:00
|
|
|
use std::sync::mpsc::{Receiver, RecvTimeoutError, Sender};
|
2018-08-09 12:31:34 -07:00
|
|
|
use std::sync::Arc;
|
2018-05-30 13:13:14 -07:00
|
|
|
use std::thread::{Builder, JoinHandle};
|
2018-08-09 12:31:34 -07:00
|
|
|
use std::time::Duration;
|
2018-03-07 13:47:13 -08:00
|
|
|
|
2018-06-27 11:33:56 -07:00
|
|
|
pub type PacketReceiver = Receiver<SharedPackets>;
|
|
|
|
pub type PacketSender = Sender<SharedPackets>;
|
|
|
|
pub type BlobSender = Sender<SharedBlobs>;
|
|
|
|
pub type BlobReceiver = Receiver<SharedBlobs>;
|
2018-07-17 15:00:22 -07:00
|
|
|
|
2018-03-07 13:47:13 -08:00
|
|
|
fn recv_loop(
|
|
|
|
sock: &UdpSocket,
|
2018-03-22 13:31:58 -07:00
|
|
|
exit: &Arc<AtomicBool>,
|
2018-04-02 19:32:58 -07:00
|
|
|
re: &PacketRecycler,
|
|
|
|
channel: &PacketSender,
|
2018-03-07 13:47:13 -08:00
|
|
|
) -> Result<()> {
|
|
|
|
loop {
|
2018-04-02 19:32:58 -07:00
|
|
|
let msgs = re.allocate();
|
2018-03-07 13:47:13 -08:00
|
|
|
loop {
|
2018-08-03 11:27:44 -07:00
|
|
|
let result = msgs
|
|
|
|
.write()
|
2018-05-11 11:38:52 -07:00
|
|
|
.expect("write lock in fn recv_loop")
|
2018-06-25 16:13:26 -07:00
|
|
|
.recv_from(sock);
|
|
|
|
match result {
|
2018-03-07 13:47:13 -08:00
|
|
|
Ok(()) => {
|
2018-06-25 16:13:26 -07:00
|
|
|
channel.send(msgs)?;
|
2018-03-07 13:47:13 -08:00
|
|
|
break;
|
|
|
|
}
|
|
|
|
Err(_) => {
|
2018-03-22 13:05:23 -07:00
|
|
|
if exit.load(Ordering::Relaxed) {
|
2018-06-25 16:13:26 -07:00
|
|
|
re.recycle(msgs);
|
2018-03-07 13:47:13 -08:00
|
|
|
return Ok(());
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn receiver(
|
2018-08-28 16:32:40 -07:00
|
|
|
sock: Arc<UdpSocket>,
|
2018-03-22 13:05:23 -07:00
|
|
|
exit: Arc<AtomicBool>,
|
2018-03-24 23:31:54 -07:00
|
|
|
recycler: PacketRecycler,
|
2018-05-15 08:53:51 -07:00
|
|
|
packet_sender: PacketSender,
|
|
|
|
) -> JoinHandle<()> {
|
2018-05-22 09:46:52 -07:00
|
|
|
let res = sock.set_read_timeout(Some(Duration::new(1, 0)));
|
|
|
|
if res.is_err() {
|
|
|
|
panic!("streamer::receiver set_read_timeout error");
|
|
|
|
}
|
2018-05-30 13:13:14 -07:00
|
|
|
Builder::new()
|
2018-05-30 13:20:58 -07:00
|
|
|
.name("solana-receiver".to_string())
|
2018-05-30 13:13:14 -07:00
|
|
|
.spawn(move || {
|
|
|
|
let _ = recv_loop(&sock, &exit, &recycler, &packet_sender);
|
|
|
|
()
|
|
|
|
})
|
|
|
|
.unwrap()
|
2018-03-07 13:47:13 -08:00
|
|
|
}
|
|
|
|
|
2018-04-02 19:32:58 -07:00
|
|
|
fn recv_send(sock: &UdpSocket, recycler: &BlobRecycler, r: &BlobReceiver) -> Result<()> {
|
2018-03-07 13:47:13 -08:00
|
|
|
let timer = Duration::new(1, 0);
|
2018-04-02 19:32:58 -07:00
|
|
|
let mut msgs = r.recv_timeout(timer)?;
|
|
|
|
Blob::send_to(recycler, sock, &mut msgs)?;
|
2018-03-07 13:47:13 -08:00
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
2018-05-10 14:47:42 -07:00
|
|
|
pub fn recv_batch(recvr: &PacketReceiver) -> Result<(Vec<SharedPackets>, usize)> {
|
|
|
|
let timer = Duration::new(1, 0);
|
|
|
|
let msgs = recvr.recv_timeout(timer)?;
|
2018-05-12 19:00:22 -07:00
|
|
|
trace!("got msgs");
|
2018-05-10 14:47:42 -07:00
|
|
|
let mut len = msgs.read().unwrap().packets.len();
|
|
|
|
let mut batch = vec![msgs];
|
|
|
|
while let Ok(more) = recvr.try_recv() {
|
|
|
|
trace!("got more msgs");
|
|
|
|
len += more.read().unwrap().packets.len();
|
|
|
|
batch.push(more);
|
|
|
|
|
|
|
|
if len > 100_000 {
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
2018-06-28 14:51:53 -07:00
|
|
|
trace!("batch len {}", batch.len());
|
2018-05-10 14:47:42 -07:00
|
|
|
Ok((batch, len))
|
|
|
|
}
|
|
|
|
|
2018-07-11 07:38:57 -07:00
|
|
|
pub fn responder(
|
|
|
|
name: &'static str,
|
2018-08-28 16:32:40 -07:00
|
|
|
sock: Arc<UdpSocket>,
|
2018-07-11 07:38:57 -07:00
|
|
|
recycler: BlobRecycler,
|
|
|
|
r: BlobReceiver,
|
|
|
|
) -> JoinHandle<()> {
|
2018-05-30 13:13:14 -07:00
|
|
|
Builder::new()
|
2018-07-11 07:38:57 -07:00
|
|
|
.name(format!("solana-responder-{}", name))
|
2018-05-30 13:13:14 -07:00
|
|
|
.spawn(move || loop {
|
2018-07-05 15:41:03 -07:00
|
|
|
if let Err(e) = recv_send(&sock, &recycler, &r) {
|
|
|
|
match e {
|
|
|
|
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
|
|
|
|
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
|
2018-07-16 19:31:52 -07:00
|
|
|
_ => warn!("{} responder error: {:?}", name, e),
|
2018-07-05 15:41:03 -07:00
|
|
|
}
|
2018-05-30 13:13:14 -07:00
|
|
|
}
|
|
|
|
})
|
|
|
|
.unwrap()
|
2018-03-07 13:47:13 -08:00
|
|
|
}
|
|
|
|
|
2018-04-02 19:32:58 -07:00
|
|
|
//TODO, we would need to stick block authentication before we create the
|
|
|
|
//window.
|
2018-04-12 10:26:32 -07:00
|
|
|
fn recv_blobs(recycler: &BlobRecycler, sock: &UdpSocket, s: &BlobSender) -> Result<()> {
|
2018-07-18 12:44:59 -07:00
|
|
|
trace!("recv_blobs: receiving on {}", sock.local_addr().unwrap());
|
2018-04-12 10:26:32 -07:00
|
|
|
let dq = Blob::recv_from(recycler, sock)?;
|
|
|
|
if !dq.is_empty() {
|
|
|
|
s.send(dq)?;
|
|
|
|
}
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn blob_receiver(
|
2018-08-28 16:32:40 -07:00
|
|
|
sock: Arc<UdpSocket>,
|
2018-04-12 10:26:32 -07:00
|
|
|
exit: Arc<AtomicBool>,
|
|
|
|
recycler: BlobRecycler,
|
|
|
|
s: BlobSender,
|
|
|
|
) -> Result<JoinHandle<()>> {
|
|
|
|
//DOCUMENTED SIDE-EFFECT
|
|
|
|
//1 second timeout on socket read
|
|
|
|
let timer = Duration::new(1, 0);
|
|
|
|
sock.set_read_timeout(Some(timer))?;
|
2018-05-30 13:13:14 -07:00
|
|
|
let t = Builder::new()
|
2018-05-30 13:20:58 -07:00
|
|
|
.name("solana-blob_receiver".to_string())
|
2018-05-30 13:13:14 -07:00
|
|
|
.spawn(move || loop {
|
|
|
|
if exit.load(Ordering::Relaxed) {
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
let _ = recv_blobs(&recycler, &sock, &s);
|
|
|
|
})
|
|
|
|
.unwrap();
|
2018-04-12 10:26:32 -07:00
|
|
|
Ok(t)
|
|
|
|
}
|
|
|
|
|
2018-03-19 16:09:47 -07:00
|
|
|
#[cfg(test)]
|
|
|
|
mod test {
|
2018-03-26 21:07:11 -07:00
|
|
|
use packet::{Blob, BlobRecycler, Packet, PacketRecycler, Packets, PACKET_DATA_SIZE};
|
|
|
|
use std::collections::VecDeque;
|
|
|
|
use std::io;
|
|
|
|
use std::io::Write;
|
2018-03-19 16:09:47 -07:00
|
|
|
use std::net::UdpSocket;
|
2018-03-22 13:05:23 -07:00
|
|
|
use std::sync::atomic::{AtomicBool, Ordering};
|
2018-03-19 16:09:47 -07:00
|
|
|
use std::sync::mpsc::channel;
|
2018-08-09 12:31:34 -07:00
|
|
|
use std::sync::Arc;
|
2018-03-26 21:03:26 -07:00
|
|
|
use std::time::Duration;
|
2018-08-09 12:31:34 -07:00
|
|
|
use streamer::PacketReceiver;
|
|
|
|
use streamer::{receiver, responder};
|
2018-03-07 13:47:13 -08:00
|
|
|
|
2018-04-02 19:32:58 -07:00
|
|
|
fn get_msgs(r: PacketReceiver, num: &mut usize) {
|
2018-03-14 11:02:38 -07:00
|
|
|
for _t in 0..5 {
|
2018-03-07 13:47:13 -08:00
|
|
|
let timer = Duration::new(1, 0);
|
|
|
|
match r.recv_timeout(timer) {
|
|
|
|
Ok(m) => *num += m.read().unwrap().packets.len(),
|
2018-04-28 00:31:20 -07:00
|
|
|
e => info!("error {:?}", e),
|
2018-03-07 13:47:13 -08:00
|
|
|
}
|
|
|
|
if *num == 10 {
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
#[test]
|
2018-04-02 19:32:58 -07:00
|
|
|
pub fn streamer_debug() {
|
|
|
|
write!(io::sink(), "{:?}", Packet::default()).unwrap();
|
|
|
|
write!(io::sink(), "{:?}", Packets::default()).unwrap();
|
|
|
|
write!(io::sink(), "{:?}", Blob::default()).unwrap();
|
|
|
|
}
|
|
|
|
#[test]
|
|
|
|
pub fn streamer_send_test() {
|
|
|
|
let read = UdpSocket::bind("127.0.0.1:0").expect("bind");
|
2018-05-15 08:53:51 -07:00
|
|
|
read.set_read_timeout(Some(Duration::new(1, 0))).unwrap();
|
|
|
|
|
2018-03-07 13:47:13 -08:00
|
|
|
let addr = read.local_addr().unwrap();
|
2018-04-02 19:32:58 -07:00
|
|
|
let send = UdpSocket::bind("127.0.0.1:0").expect("bind");
|
|
|
|
let exit = Arc::new(AtomicBool::new(false));
|
|
|
|
let pack_recycler = PacketRecycler::default();
|
|
|
|
let resp_recycler = BlobRecycler::default();
|
2018-03-07 13:47:13 -08:00
|
|
|
let (s_reader, r_reader) = channel();
|
2018-08-28 16:32:40 -07:00
|
|
|
let t_receiver = receiver(
|
|
|
|
Arc::new(read),
|
|
|
|
exit.clone(),
|
|
|
|
pack_recycler.clone(),
|
|
|
|
s_reader,
|
|
|
|
);
|
2018-07-05 15:41:03 -07:00
|
|
|
let t_responder = {
|
|
|
|
let (s_responder, r_responder) = channel();
|
2018-07-11 07:38:57 -07:00
|
|
|
let t_responder = responder(
|
|
|
|
"streamer_send_test",
|
2018-08-28 16:32:40 -07:00
|
|
|
Arc::new(send),
|
2018-07-11 07:38:57 -07:00
|
|
|
resp_recycler.clone(),
|
|
|
|
r_responder,
|
|
|
|
);
|
2018-07-05 15:41:03 -07:00
|
|
|
let mut msgs = VecDeque::new();
|
|
|
|
for i in 0..10 {
|
|
|
|
let b = resp_recycler.allocate();
|
|
|
|
{
|
|
|
|
let mut w = b.write().unwrap();
|
|
|
|
w.data[0] = i as u8;
|
|
|
|
w.meta.size = PACKET_DATA_SIZE;
|
|
|
|
w.meta.set_addr(&addr);
|
|
|
|
}
|
|
|
|
msgs.push_back(b);
|
2018-06-25 16:13:26 -07:00
|
|
|
}
|
2018-07-05 15:41:03 -07:00
|
|
|
s_responder.send(msgs).expect("send");
|
|
|
|
t_responder
|
|
|
|
};
|
|
|
|
|
2018-03-07 13:47:13 -08:00
|
|
|
let mut num = 0;
|
|
|
|
get_msgs(r_reader, &mut num);
|
|
|
|
assert_eq!(num, 10);
|
2018-03-22 13:05:23 -07:00
|
|
|
exit.store(true, Ordering::Relaxed);
|
2018-03-07 13:47:13 -08:00
|
|
|
t_receiver.join().expect("join");
|
2018-03-24 23:31:54 -07:00
|
|
|
t_responder.join().expect("join");
|
2018-03-07 13:47:13 -08:00
|
|
|
}
|
|
|
|
}
|