solana/src/streamer.rs

550 lines
18 KiB
Rust
Raw Normal View History

2018-04-17 19:46:50 -07:00
//! The `streamer` module defines a set of services for effecently pulling data from udp sockets.
2018-03-26 21:07:11 -07:00
use packet::{Blob, BlobRecycler, PacketRecycler, SharedBlob, SharedPackets, NUM_BLOBS};
use result::Result;
use std::collections::VecDeque;
use std::net::UdpSocket;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc;
use std::sync::{Arc, RwLock};
use std::thread::{spawn, JoinHandle};
2018-03-26 21:07:11 -07:00
use std::time::Duration;
2018-04-18 20:12:30 -07:00
use subscribers::Subscribers;
pub type PacketReceiver = mpsc::Receiver<SharedPackets>;
pub type PacketSender = mpsc::Sender<SharedPackets>;
pub type BlobSender = mpsc::Sender<VecDeque<SharedBlob>>;
pub type BlobReceiver = mpsc::Receiver<VecDeque<SharedBlob>>;
fn recv_loop(
sock: &UdpSocket,
2018-03-22 13:31:58 -07:00
exit: &Arc<AtomicBool>,
re: &PacketRecycler,
channel: &PacketSender,
) -> Result<()> {
loop {
let msgs = re.allocate();
let msgs_ = msgs.clone();
loop {
match msgs.write().unwrap().recv_from(sock) {
Ok(()) => {
channel.send(msgs_)?;
break;
}
Err(_) => {
2018-03-22 13:05:23 -07:00
if exit.load(Ordering::Relaxed) {
re.recycle(msgs_);
return Ok(());
}
}
}
}
}
}
pub fn receiver(
sock: UdpSocket,
2018-03-22 13:05:23 -07:00
exit: Arc<AtomicBool>,
2018-03-24 23:31:54 -07:00
recycler: PacketRecycler,
channel: PacketSender,
) -> Result<JoinHandle<()>> {
let timer = Duration::new(1, 0);
sock.set_read_timeout(Some(timer))?;
Ok(spawn(move || {
2018-03-22 13:31:58 -07:00
let _ = recv_loop(&sock, &exit, &recycler, &channel);
()
}))
}
fn recv_send(sock: &UdpSocket, recycler: &BlobRecycler, r: &BlobReceiver) -> Result<()> {
let timer = Duration::new(1, 0);
let mut msgs = r.recv_timeout(timer)?;
Blob::send_to(recycler, sock, &mut msgs)?;
Ok(())
}
2018-03-24 23:31:54 -07:00
pub fn responder(
sock: UdpSocket,
2018-03-22 13:05:23 -07:00
exit: Arc<AtomicBool>,
recycler: BlobRecycler,
r: BlobReceiver,
) -> JoinHandle<()> {
spawn(move || loop {
2018-03-26 21:07:11 -07:00
if recv_send(&sock, &recycler, &r).is_err() && exit.load(Ordering::Relaxed) {
break;
}
})
}
//TODO, we would need to stick block authentication before we create the
//window.
fn recv_blobs(recycler: &BlobRecycler, sock: &UdpSocket, s: &BlobSender) -> Result<()> {
let dq = Blob::recv_from(recycler, sock)?;
if !dq.is_empty() {
s.send(dq)?;
}
Ok(())
}
pub fn blob_receiver(
exit: Arc<AtomicBool>,
recycler: BlobRecycler,
sock: UdpSocket,
s: BlobSender,
) -> Result<JoinHandle<()>> {
//DOCUMENTED SIDE-EFFECT
//1 second timeout on socket read
let timer = Duration::new(1, 0);
sock.set_read_timeout(Some(timer))?;
let t = spawn(move || loop {
if exit.load(Ordering::Relaxed) {
break;
}
let ret = recv_blobs(&recycler, &sock, &s);
if ret.is_err() {
break;
}
});
Ok(t)
}
fn recv_window(
window: &mut Vec<Option<SharedBlob>>,
2018-04-18 20:12:30 -07:00
subs: &Arc<RwLock<Subscribers>>,
recycler: &BlobRecycler,
consumed: &mut usize,
r: &BlobReceiver,
s: &BlobSender,
2018-04-18 20:12:30 -07:00
retransmit: &BlobSender,
) -> Result<()> {
let timer = Duration::new(1, 0);
let mut dq = r.recv_timeout(timer)?;
while let Ok(mut nq) = r.try_recv() {
dq.append(&mut nq)
}
{
//retransmit all leader blocks
2018-04-18 20:12:30 -07:00
let mut retransmitq = VecDeque::new();
let rsubs = subs.read().unwrap();
for b in &dq {
let p = b.read().unwrap();
2018-04-17 11:07:43 -07:00
//TODO this check isn't safe against adverserial packets
//we need to maintain a sequence window
trace!(
"idx: {} addr: {:?} leader: {:?}",
p.get_index().unwrap(),
p.meta.addr(),
rsubs.leader.addr
);
if p.meta.addr() == rsubs.leader.addr {
//TODO
//need to copy the retransmited blob
//otherwise we get into races with which thread
//should do the recycling
//
//a better absraction would be to recycle when the blob
//is dropped via a weakref to the recycler
let nv = recycler.allocate();
{
let mut mnv = nv.write().unwrap();
let sz = p.meta.size;
mnv.meta.size = sz;
mnv.data[..sz].copy_from_slice(&p.data[..sz]);
}
2018-04-18 20:12:30 -07:00
retransmitq.push_back(nv);
}
}
2018-04-18 20:12:30 -07:00
if !retransmitq.is_empty() {
retransmit.send(retransmitq)?;
}
}
//send a contiguous set of blocks
let mut contq = VecDeque::new();
while let Some(b) = dq.pop_front() {
let b_ = b.clone();
2018-04-19 07:00:16 -07:00
let p = b.write().unwrap();
let pix = p.get_index()? as usize;
let w = pix % NUM_BLOBS;
//TODO, after the block are authenticated
//if we get different blocks at the same index
//that is a network failure/attack
trace!("window w: {} size: {}", w, p.meta.size);
{
if window[w].is_none() {
window[w] = Some(b_);
} else {
debug!("duplicate blob at index {:}", w);
}
loop {
let k = *consumed % NUM_BLOBS;
trace!("k: {} consumed: {}", k, *consumed);
if window[k].is_none() {
break;
}
contq.push_back(window[k].clone().unwrap());
window[k] = None;
*consumed += 1;
}
}
}
trace!("sending contq.len: {}", contq.len());
if !contq.is_empty() {
s.send(contq)?;
}
Ok(())
}
pub fn window(
exit: Arc<AtomicBool>,
2018-04-18 20:12:30 -07:00
subs: Arc<RwLock<Subscribers>>,
recycler: BlobRecycler,
r: BlobReceiver,
s: BlobSender,
2018-04-18 20:12:30 -07:00
retransmit: BlobSender,
) -> JoinHandle<()> {
spawn(move || {
let mut window = vec![None; NUM_BLOBS];
let mut consumed = 0;
loop {
if exit.load(Ordering::Relaxed) {
break;
}
let _ = recv_window(
&mut window,
&subs,
&recycler,
&mut consumed,
&r,
&s,
&retransmit,
);
}
})
}
fn retransmit(
2018-04-18 20:12:30 -07:00
subs: &Arc<RwLock<Subscribers>>,
recycler: &BlobRecycler,
r: &BlobReceiver,
sock: &UdpSocket,
) -> Result<()> {
let timer = Duration::new(1, 0);
let mut dq = r.recv_timeout(timer)?;
while let Ok(mut nq) = r.try_recv() {
dq.append(&mut nq);
}
{
let wsubs = subs.read().unwrap();
for b in &dq {
let mut mb = b.write().unwrap();
wsubs.retransmit(&mut mb, sock)?;
}
}
while let Some(b) = dq.pop_front() {
recycler.recycle(b);
}
Ok(())
}
2018-04-17 19:46:50 -07:00
/// Service to retransmit messages from the leader to layer 1 nodes.
/// See `subscribers` for network layer definitions.
/// # Arguments
/// * `sock` - Socket to read from. Read timeout is set to 1.
/// * `exit` - Boolean to signal system exit.
/// * `subs` - Shared Subscriber structure. This structure needs to be updated and popualted by
/// the accountant.
/// * `recycler` - Blob recycler.
/// * `r` - Receive channel for blobs to be retransmitted to all the layer 1 nodes.
pub fn retransmitter(
sock: UdpSocket,
exit: Arc<AtomicBool>,
2018-04-18 20:12:30 -07:00
subs: Arc<RwLock<Subscribers>>,
recycler: BlobRecycler,
r: BlobReceiver,
) -> JoinHandle<()> {
spawn(move || loop {
if exit.load(Ordering::Relaxed) {
break;
}
let _ = retransmit(&subs, &recycler, &r, &sock);
})
}
#[cfg(all(feature = "unstable", test))]
mod bench {
extern crate test;
use self::test::Bencher;
2018-03-26 21:07:11 -07:00
use packet::{Packet, PacketRecycler, PACKET_DATA_SIZE};
use result::Result;
use std::net::{SocketAddr, UdpSocket};
2018-03-26 21:07:11 -07:00
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::channel;
use std::sync::{Arc, Mutex};
use std::thread::sleep;
use std::thread::{spawn, JoinHandle};
use std::time::Duration;
use std::time::SystemTime;
use streamer::{receiver, PacketReceiver};
2018-03-25 15:37:00 -07:00
fn producer(
addr: &SocketAddr,
recycler: PacketRecycler,
exit: Arc<AtomicBool>,
) -> JoinHandle<()> {
let send = UdpSocket::bind("0.0.0.0:0").unwrap();
let msgs = recycler.allocate();
2018-03-25 15:37:00 -07:00
let msgs_ = msgs.clone();
msgs.write().unwrap().packets.resize(10, Packet::default());
for w in msgs.write().unwrap().packets.iter_mut() {
2018-03-26 21:07:11 -07:00
w.meta.size = PACKET_DATA_SIZE;
2018-03-25 15:37:00 -07:00
w.meta.set_addr(&addr);
}
spawn(move || loop {
2018-03-22 13:05:23 -07:00
if exit.load(Ordering::Relaxed) {
return;
}
let mut num = 0;
2018-03-25 15:37:00 -07:00
for p in msgs_.read().unwrap().packets.iter() {
let a = p.meta.addr();
2018-03-25 15:37:00 -07:00
send.send_to(&p.data[..p.meta.size], &a).unwrap();
num += 1;
}
assert_eq!(num, 10);
})
}
2018-03-29 11:20:54 -07:00
fn sink(
2018-03-25 15:37:00 -07:00
recycler: PacketRecycler,
2018-03-22 13:05:23 -07:00
exit: Arc<AtomicBool>,
rvs: Arc<Mutex<usize>>,
r: PacketReceiver,
) -> JoinHandle<()> {
spawn(move || loop {
2018-03-22 13:05:23 -07:00
if exit.load(Ordering::Relaxed) {
return;
}
let timer = Duration::new(1, 0);
match r.recv_timeout(timer) {
Ok(msgs) => {
let msgs_ = msgs.clone();
*rvs.lock().unwrap() += msgs.read().unwrap().packets.len();
recycler.recycle(msgs_);
}
_ => (),
}
})
}
fn run_streamer_bench() -> Result<()> {
let read = UdpSocket::bind("127.0.0.1:0")?;
let addr = read.local_addr()?;
2018-03-22 13:05:23 -07:00
let exit = Arc::new(AtomicBool::new(false));
let pack_recycler = PacketRecycler::default();
let (s_reader, r_reader) = channel();
let t_reader = receiver(read, exit.clone(), pack_recycler.clone(), s_reader)?;
let t_producer1 = producer(&addr, pack_recycler.clone(), exit.clone());
let t_producer2 = producer(&addr, pack_recycler.clone(), exit.clone());
let t_producer3 = producer(&addr, pack_recycler.clone(), exit.clone());
let rvs = Arc::new(Mutex::new(0));
let t_sink = sink(pack_recycler.clone(), exit.clone(), rvs.clone(), r_reader);
let start = SystemTime::now();
let start_val = *rvs.lock().unwrap();
sleep(Duration::new(5, 0));
let elapsed = start.elapsed().unwrap();
let end_val = *rvs.lock().unwrap();
let time = elapsed.as_secs() * 10000000000 + elapsed.subsec_nanos() as u64;
let ftime = (time as f64) / 10000000000f64;
let fcount = (end_val - start_val) as f64;
println!("performance: {:?}", fcount / ftime);
2018-03-22 13:05:23 -07:00
exit.store(true, Ordering::Relaxed);
t_reader.join()?;
t_producer1.join()?;
t_producer2.join()?;
t_producer3.join()?;
2018-03-29 11:20:54 -07:00
t_sink.join()?;
Ok(())
}
#[bench]
pub fn streamer_bench(_bench: &mut Bencher) {
run_streamer_bench().unwrap();
}
}
#[cfg(test)]
mod test {
2018-03-26 21:07:11 -07:00
use packet::{Blob, BlobRecycler, Packet, PacketRecycler, Packets, PACKET_DATA_SIZE};
use std::collections::VecDeque;
use std::io;
use std::io::Write;
use std::net::UdpSocket;
2018-03-22 13:05:23 -07:00
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::channel;
use std::sync::{Arc, RwLock};
use std::time::Duration;
2018-05-02 10:05:11 -07:00
use streamer::{blob_receiver, receiver, responder, retransmitter, window, BlobReceiver,
PacketReceiver};
use subscribers::{Node, Subscribers};
fn get_msgs(r: PacketReceiver, num: &mut usize) {
2018-03-14 11:02:38 -07:00
for _t in 0..5 {
let timer = Duration::new(1, 0);
match r.recv_timeout(timer) {
Ok(m) => *num += m.read().unwrap().packets.len(),
2018-03-14 11:02:38 -07:00
e => println!("error {:?}", e),
}
if *num == 10 {
break;
}
}
}
#[test]
pub fn streamer_debug() {
write!(io::sink(), "{:?}", Packet::default()).unwrap();
write!(io::sink(), "{:?}", Packets::default()).unwrap();
write!(io::sink(), "{:?}", Blob::default()).unwrap();
}
#[test]
pub fn streamer_send_test() {
let read = UdpSocket::bind("127.0.0.1:0").expect("bind");
let addr = read.local_addr().unwrap();
let send = UdpSocket::bind("127.0.0.1:0").expect("bind");
let exit = Arc::new(AtomicBool::new(false));
let pack_recycler = PacketRecycler::default();
let resp_recycler = BlobRecycler::default();
let (s_reader, r_reader) = channel();
let t_receiver = receiver(read, exit.clone(), pack_recycler.clone(), s_reader).unwrap();
2018-03-24 23:31:54 -07:00
let (s_responder, r_responder) = channel();
let t_responder = responder(send, exit.clone(), resp_recycler.clone(), r_responder);
let mut msgs = VecDeque::new();
for i in 0..10 {
let b = resp_recycler.allocate();
let b_ = b.clone();
let mut w = b.write().unwrap();
w.data[0] = i as u8;
2018-03-26 21:07:11 -07:00
w.meta.size = PACKET_DATA_SIZE;
w.meta.set_addr(&addr);
msgs.push_back(b_);
}
2018-03-24 23:31:54 -07:00
s_responder.send(msgs).expect("send");
let mut num = 0;
get_msgs(r_reader, &mut num);
assert_eq!(num, 10);
2018-03-22 13:05:23 -07:00
exit.store(true, Ordering::Relaxed);
t_receiver.join().expect("join");
2018-03-24 23:31:54 -07:00
t_responder.join().expect("join");
}
fn get_blobs(r: BlobReceiver, num: &mut usize) {
for _t in 0..5 {
let timer = Duration::new(1, 0);
match r.recv_timeout(timer) {
Ok(m) => {
for (i, v) in m.iter().enumerate() {
assert_eq!(v.read().unwrap().get_index().unwrap() as usize, *num + i);
}
*num += m.len();
}
e => println!("error {:?}", e),
}
if *num == 10 {
break;
}
}
2018-03-14 11:28:05 -07:00
}
2018-03-11 09:22:21 -07:00
#[test]
pub fn window_send_test() {
2018-03-11 09:22:21 -07:00
let read = UdpSocket::bind("127.0.0.1:0").expect("bind");
let addr = read.local_addr().unwrap();
let send = UdpSocket::bind("127.0.0.1:0").expect("bind");
2018-03-22 13:05:23 -07:00
let exit = Arc::new(AtomicBool::new(false));
let subs = Arc::new(RwLock::new(Subscribers::new(
Node::default(),
2018-04-16 21:02:37 -07:00
Node::new([0; 8], 0, send.local_addr().unwrap()),
2018-04-18 20:12:30 -07:00
&[],
)));
let resp_recycler = BlobRecycler::default();
2018-03-11 09:22:21 -07:00
let (s_reader, r_reader) = channel();
let t_receiver =
blob_receiver(exit.clone(), resp_recycler.clone(), read, s_reader).unwrap();
let (s_window, r_window) = channel();
2018-04-18 20:12:30 -07:00
let (s_retransmit, r_retransmit) = channel();
let t_window = window(
exit.clone(),
subs,
resp_recycler.clone(),
r_reader,
s_window,
2018-04-18 20:12:30 -07:00
s_retransmit,
);
2018-03-24 23:31:54 -07:00
let (s_responder, r_responder) = channel();
let t_responder = responder(send, exit.clone(), resp_recycler.clone(), r_responder);
let mut msgs = VecDeque::new();
for v in 0..10 {
let i = 9 - v;
let b = resp_recycler.allocate();
let b_ = b.clone();
let mut w = b.write().unwrap();
w.set_index(i).unwrap();
assert_eq!(i, w.get_index().unwrap());
2018-03-26 21:07:11 -07:00
w.meta.size = PACKET_DATA_SIZE;
2018-03-24 18:01:40 -07:00
w.meta.set_addr(&addr);
msgs.push_back(b_);
2018-03-11 09:22:21 -07:00
}
2018-03-24 23:31:54 -07:00
s_responder.send(msgs).expect("send");
2018-03-11 09:22:21 -07:00
let mut num = 0;
get_blobs(r_window, &mut num);
2018-03-11 09:22:21 -07:00
assert_eq!(num, 10);
2018-04-18 20:12:30 -07:00
let mut q = r_retransmit.recv().unwrap();
while let Ok(mut nq) = r_retransmit.try_recv() {
2018-04-16 20:57:15 -07:00
q.append(&mut nq);
}
assert_eq!(q.len(), 10);
2018-03-22 13:05:23 -07:00
exit.store(true, Ordering::Relaxed);
2018-03-11 09:22:21 -07:00
t_receiver.join().expect("join");
2018-03-24 23:31:54 -07:00
t_responder.join().expect("join");
t_window.join().expect("join");
2018-03-11 09:22:21 -07:00
}
#[test]
pub fn retransmit() {
let read = UdpSocket::bind("127.0.0.1:0").expect("bind");
let send = UdpSocket::bind("127.0.0.1:0").expect("bind");
let exit = Arc::new(AtomicBool::new(false));
let subs = Arc::new(RwLock::new(Subscribers::new(
Node::default(),
Node::default(),
&[Node::new([0; 8], 1, read.local_addr().unwrap())],
)));
2018-04-17 11:05:15 -07:00
let (s_retransmit, r_retransmit) = channel();
let blob_recycler = BlobRecycler::default();
let saddr = send.local_addr().unwrap();
2018-04-17 11:05:35 -07:00
let t_retransmit = retransmitter(
send,
exit.clone(),
subs,
blob_recycler.clone(),
r_retransmit,
);
let mut bq = VecDeque::new();
2018-04-17 11:05:15 -07:00
let b = blob_recycler.allocate();
b.write().unwrap().meta.size = 10;
bq.push_back(b);
2018-04-17 11:05:15 -07:00
s_retransmit.send(bq).unwrap();
let (s_blob_receiver, r_blob_receiver) = channel();
2018-04-17 11:05:35 -07:00
let t_receiver =
blob_receiver(exit.clone(), blob_recycler.clone(), read, s_blob_receiver).unwrap();
2018-04-17 11:05:15 -07:00
let mut oq = r_blob_receiver.recv().unwrap();
assert_eq!(oq.len(), 1);
let o = oq.pop_front().unwrap();
let ro = o.read().unwrap();
assert_eq!(ro.meta.size, 10);
assert_eq!(ro.meta.addr(), saddr);
exit.store(true, Ordering::Relaxed);
t_receiver.join().expect("join");
t_retransmit.join().expect("join");
}
}