2018-05-25 22:00:47 -07:00
|
|
|
//! The `streamer` module defines a set of services for efficiently pulling data from UDP sockets.
|
|
|
|
//!
|
2018-07-10 12:37:39 -07:00
|
|
|
use counter::Counter;
|
2018-07-11 00:18:48 -07:00
|
|
|
use crdt::{Crdt, CrdtError, NodeInfo};
|
2018-04-28 00:31:20 -07:00
|
|
|
#[cfg(feature = "erasure")]
|
|
|
|
use erasure;
|
2018-06-27 11:33:56 -07:00
|
|
|
use packet::{
|
|
|
|
Blob, BlobRecycler, PacketRecycler, SharedBlob, SharedBlobs, SharedPackets, BLOB_SIZE,
|
|
|
|
};
|
2018-07-05 14:50:42 -07:00
|
|
|
use result::{Error, Result};
|
2018-07-05 12:01:40 -07:00
|
|
|
use std::cmp;
|
2018-03-26 21:07:11 -07:00
|
|
|
use std::collections::VecDeque;
|
2018-06-25 15:50:58 -07:00
|
|
|
use std::mem;
|
2018-05-12 19:00:22 -07:00
|
|
|
use std::net::{SocketAddr, UdpSocket};
|
2018-07-10 12:37:39 -07:00
|
|
|
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
|
2018-07-05 14:50:42 -07:00
|
|
|
use std::sync::mpsc::{Receiver, RecvTimeoutError, Sender};
|
2018-04-12 10:26:32 -07:00
|
|
|
use std::sync::{Arc, RwLock};
|
2018-05-30 13:13:14 -07:00
|
|
|
use std::thread::{Builder, JoinHandle};
|
2018-03-26 21:07:11 -07:00
|
|
|
use std::time::Duration;
|
2018-03-07 13:47:13 -08:00
|
|
|
|
2018-06-27 12:35:58 -07:00
|
|
|
pub const WINDOW_SIZE: u64 = 2 * 1024;
|
2018-06-27 11:33:56 -07:00
|
|
|
pub type PacketReceiver = Receiver<SharedPackets>;
|
|
|
|
pub type PacketSender = Sender<SharedPackets>;
|
|
|
|
pub type BlobSender = Sender<SharedBlobs>;
|
|
|
|
pub type BlobReceiver = Receiver<SharedBlobs>;
|
2018-06-13 21:52:23 -07:00
|
|
|
pub type Window = Arc<RwLock<Vec<Option<SharedBlob>>>>;
|
2018-03-07 13:47:13 -08:00
|
|
|
|
2018-07-05 13:37:13 -07:00
|
|
|
#[derive(Debug, PartialEq, Eq)]
|
|
|
|
pub enum WindowError {
|
|
|
|
GenericError,
|
|
|
|
}
|
|
|
|
|
2018-03-07 13:47:13 -08:00
|
|
|
fn recv_loop(
|
|
|
|
sock: &UdpSocket,
|
2018-03-22 13:31:58 -07:00
|
|
|
exit: &Arc<AtomicBool>,
|
2018-04-02 19:32:58 -07:00
|
|
|
re: &PacketRecycler,
|
|
|
|
channel: &PacketSender,
|
2018-03-07 13:47:13 -08:00
|
|
|
) -> Result<()> {
|
|
|
|
loop {
|
2018-04-02 19:32:58 -07:00
|
|
|
let msgs = re.allocate();
|
2018-03-07 13:47:13 -08:00
|
|
|
loop {
|
2018-06-25 16:13:26 -07:00
|
|
|
let result = msgs.write()
|
2018-05-11 11:38:52 -07:00
|
|
|
.expect("write lock in fn recv_loop")
|
2018-06-25 16:13:26 -07:00
|
|
|
.recv_from(sock);
|
|
|
|
match result {
|
2018-03-07 13:47:13 -08:00
|
|
|
Ok(()) => {
|
2018-06-25 16:13:26 -07:00
|
|
|
channel.send(msgs)?;
|
2018-03-07 13:47:13 -08:00
|
|
|
break;
|
|
|
|
}
|
|
|
|
Err(_) => {
|
2018-03-22 13:05:23 -07:00
|
|
|
if exit.load(Ordering::Relaxed) {
|
2018-06-25 16:13:26 -07:00
|
|
|
re.recycle(msgs);
|
2018-03-07 13:47:13 -08:00
|
|
|
return Ok(());
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn receiver(
|
|
|
|
sock: UdpSocket,
|
2018-03-22 13:05:23 -07:00
|
|
|
exit: Arc<AtomicBool>,
|
2018-03-24 23:31:54 -07:00
|
|
|
recycler: PacketRecycler,
|
2018-05-15 08:53:51 -07:00
|
|
|
packet_sender: PacketSender,
|
|
|
|
) -> JoinHandle<()> {
|
2018-05-22 09:46:52 -07:00
|
|
|
let res = sock.set_read_timeout(Some(Duration::new(1, 0)));
|
|
|
|
if res.is_err() {
|
|
|
|
panic!("streamer::receiver set_read_timeout error");
|
|
|
|
}
|
2018-05-30 13:13:14 -07:00
|
|
|
Builder::new()
|
2018-05-30 13:20:58 -07:00
|
|
|
.name("solana-receiver".to_string())
|
2018-05-30 13:13:14 -07:00
|
|
|
.spawn(move || {
|
|
|
|
let _ = recv_loop(&sock, &exit, &recycler, &packet_sender);
|
|
|
|
()
|
|
|
|
})
|
|
|
|
.unwrap()
|
2018-03-07 13:47:13 -08:00
|
|
|
}
|
|
|
|
|
2018-04-02 19:32:58 -07:00
|
|
|
fn recv_send(sock: &UdpSocket, recycler: &BlobRecycler, r: &BlobReceiver) -> Result<()> {
|
2018-03-07 13:47:13 -08:00
|
|
|
let timer = Duration::new(1, 0);
|
2018-04-02 19:32:58 -07:00
|
|
|
let mut msgs = r.recv_timeout(timer)?;
|
|
|
|
Blob::send_to(recycler, sock, &mut msgs)?;
|
2018-03-07 13:47:13 -08:00
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
2018-05-10 14:47:42 -07:00
|
|
|
pub fn recv_batch(recvr: &PacketReceiver) -> Result<(Vec<SharedPackets>, usize)> {
|
|
|
|
let timer = Duration::new(1, 0);
|
|
|
|
let msgs = recvr.recv_timeout(timer)?;
|
2018-05-12 19:00:22 -07:00
|
|
|
trace!("got msgs");
|
2018-05-10 14:47:42 -07:00
|
|
|
let mut len = msgs.read().unwrap().packets.len();
|
|
|
|
let mut batch = vec![msgs];
|
|
|
|
while let Ok(more) = recvr.try_recv() {
|
|
|
|
trace!("got more msgs");
|
|
|
|
len += more.read().unwrap().packets.len();
|
|
|
|
batch.push(more);
|
|
|
|
|
|
|
|
if len > 100_000 {
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
2018-06-28 14:51:53 -07:00
|
|
|
trace!("batch len {}", batch.len());
|
2018-05-10 14:47:42 -07:00
|
|
|
Ok((batch, len))
|
|
|
|
}
|
|
|
|
|
2018-07-11 07:38:57 -07:00
|
|
|
pub fn responder(
|
|
|
|
name: &'static str,
|
|
|
|
sock: UdpSocket,
|
|
|
|
recycler: BlobRecycler,
|
|
|
|
r: BlobReceiver,
|
|
|
|
) -> JoinHandle<()> {
|
2018-05-30 13:13:14 -07:00
|
|
|
Builder::new()
|
2018-07-11 07:38:57 -07:00
|
|
|
.name(format!("solana-responder-{}", name))
|
2018-05-30 13:13:14 -07:00
|
|
|
.spawn(move || loop {
|
2018-07-05 15:41:03 -07:00
|
|
|
if let Err(e) = recv_send(&sock, &recycler, &r) {
|
|
|
|
match e {
|
|
|
|
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
|
|
|
|
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
|
2018-07-16 19:31:52 -07:00
|
|
|
_ => warn!("{} responder error: {:?}", name, e),
|
2018-07-05 15:41:03 -07:00
|
|
|
}
|
2018-05-30 13:13:14 -07:00
|
|
|
}
|
|
|
|
})
|
|
|
|
.unwrap()
|
2018-03-07 13:47:13 -08:00
|
|
|
}
|
|
|
|
|
2018-04-02 19:32:58 -07:00
|
|
|
//TODO, we would need to stick block authentication before we create the
|
|
|
|
//window.
|
2018-04-12 10:26:32 -07:00
|
|
|
fn recv_blobs(recycler: &BlobRecycler, sock: &UdpSocket, s: &BlobSender) -> Result<()> {
|
2018-05-27 18:21:39 -07:00
|
|
|
trace!("receiving on {}", sock.local_addr().unwrap());
|
2018-04-12 10:26:32 -07:00
|
|
|
let dq = Blob::recv_from(recycler, sock)?;
|
|
|
|
if !dq.is_empty() {
|
|
|
|
s.send(dq)?;
|
|
|
|
}
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn blob_receiver(
|
|
|
|
exit: Arc<AtomicBool>,
|
|
|
|
recycler: BlobRecycler,
|
|
|
|
sock: UdpSocket,
|
|
|
|
s: BlobSender,
|
|
|
|
) -> Result<JoinHandle<()>> {
|
|
|
|
//DOCUMENTED SIDE-EFFECT
|
|
|
|
//1 second timeout on socket read
|
|
|
|
let timer = Duration::new(1, 0);
|
|
|
|
sock.set_read_timeout(Some(timer))?;
|
2018-05-30 13:13:14 -07:00
|
|
|
let t = Builder::new()
|
2018-05-30 13:20:58 -07:00
|
|
|
.name("solana-blob_receiver".to_string())
|
2018-05-30 13:13:14 -07:00
|
|
|
.spawn(move || loop {
|
|
|
|
if exit.load(Ordering::Relaxed) {
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
let _ = recv_blobs(&recycler, &sock, &s);
|
|
|
|
})
|
|
|
|
.unwrap();
|
2018-04-12 10:26:32 -07:00
|
|
|
Ok(t)
|
|
|
|
}
|
|
|
|
|
2018-05-12 19:00:22 -07:00
|
|
|
fn find_next_missing(
|
2018-06-13 21:52:23 -07:00
|
|
|
locked_window: &Window,
|
2018-05-12 19:00:22 -07:00
|
|
|
crdt: &Arc<RwLock<Crdt>>,
|
2018-06-27 12:35:58 -07:00
|
|
|
consumed: &mut u64,
|
|
|
|
received: &mut u64,
|
2018-05-12 19:00:22 -07:00
|
|
|
) -> Result<Vec<(SocketAddr, Vec<u8>)>> {
|
|
|
|
if *received <= *consumed {
|
2018-07-05 13:37:13 -07:00
|
|
|
Err(WindowError::GenericError)?;
|
2018-05-12 19:00:22 -07:00
|
|
|
}
|
|
|
|
let window = locked_window.read().unwrap();
|
|
|
|
let reqs: Vec<_> = (*consumed..*received)
|
|
|
|
.filter_map(|pix| {
|
2018-06-27 12:35:58 -07:00
|
|
|
let i = (pix % WINDOW_SIZE) as usize;
|
2018-07-11 13:40:46 -07:00
|
|
|
if window[i].is_none() {
|
2018-05-12 19:00:22 -07:00
|
|
|
let val = crdt.read().unwrap().window_index_request(pix as u64);
|
|
|
|
if let Ok((to, req)) = val {
|
|
|
|
return Some((to, req));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
None
|
|
|
|
})
|
|
|
|
.collect();
|
|
|
|
Ok(reqs)
|
|
|
|
}
|
|
|
|
|
|
|
|
fn repair_window(
|
2018-06-28 14:51:53 -07:00
|
|
|
debug_id: u64,
|
2018-06-13 21:52:23 -07:00
|
|
|
locked_window: &Window,
|
2018-05-12 19:00:22 -07:00
|
|
|
crdt: &Arc<RwLock<Crdt>>,
|
2018-05-30 12:49:15 -07:00
|
|
|
_recycler: &BlobRecycler,
|
2018-06-27 12:35:58 -07:00
|
|
|
last: &mut u64,
|
2018-05-24 15:08:00 -07:00
|
|
|
times: &mut usize,
|
2018-06-27 12:35:58 -07:00
|
|
|
consumed: &mut u64,
|
|
|
|
received: &mut u64,
|
2018-05-12 19:00:22 -07:00
|
|
|
) -> Result<()> {
|
2018-05-30 12:49:15 -07:00
|
|
|
#[cfg(feature = "erasure")]
|
|
|
|
{
|
2018-06-05 12:24:39 -07:00
|
|
|
if erasure::recover(
|
|
|
|
_recycler,
|
|
|
|
&mut locked_window.write().unwrap(),
|
2018-06-27 12:35:58 -07:00
|
|
|
*consumed as usize,
|
|
|
|
*received as usize,
|
2018-06-05 12:24:39 -07:00
|
|
|
).is_err()
|
|
|
|
{
|
2018-05-30 16:33:05 -07:00
|
|
|
trace!("erasure::recover failed");
|
2018-05-30 12:49:15 -07:00
|
|
|
}
|
|
|
|
}
|
2018-05-24 15:08:00 -07:00
|
|
|
//exponential backoff
|
|
|
|
if *last != *consumed {
|
|
|
|
*times = 0;
|
|
|
|
}
|
|
|
|
*last = *consumed;
|
|
|
|
*times += 1;
|
|
|
|
//if times flips from all 1s 7 -> 8, 15 -> 16, we retry otherwise return Ok
|
|
|
|
if *times & (*times - 1) != 0 {
|
2018-06-28 14:51:53 -07:00
|
|
|
trace!(
|
|
|
|
"repair_window counter {} {} {}",
|
|
|
|
*times,
|
|
|
|
*consumed,
|
|
|
|
*received
|
|
|
|
);
|
2018-05-24 15:08:00 -07:00
|
|
|
return Ok(());
|
|
|
|
}
|
2018-06-28 14:51:53 -07:00
|
|
|
|
2018-06-01 12:49:47 -07:00
|
|
|
let reqs = find_next_missing(locked_window, crdt, consumed, received)?;
|
2018-07-09 17:35:23 -07:00
|
|
|
trace!("{:x}: repair_window missing: {}", debug_id, reqs.len());
|
2018-07-11 13:40:46 -07:00
|
|
|
if !reqs.is_empty() {
|
2018-07-16 18:33:50 -07:00
|
|
|
inc_new_counter!("streamer-repair_window-repair", reqs.len());
|
2018-06-28 14:51:53 -07:00
|
|
|
debug!(
|
|
|
|
"{:x}: repair_window counter times: {} consumed: {} received: {} missing: {}",
|
|
|
|
debug_id,
|
|
|
|
*times,
|
|
|
|
*consumed,
|
|
|
|
*received,
|
|
|
|
reqs.len()
|
|
|
|
);
|
|
|
|
}
|
2018-05-12 19:00:22 -07:00
|
|
|
let sock = UdpSocket::bind("0.0.0.0:0")?;
|
|
|
|
for (to, req) in reqs {
|
|
|
|
//todo cache socket
|
2018-07-09 15:53:49 -07:00
|
|
|
debug!(
|
2018-06-28 14:51:53 -07:00
|
|
|
"{:x} repair_window request {} {} {}",
|
|
|
|
debug_id, *consumed, *received, to
|
|
|
|
);
|
2018-05-24 23:18:41 -07:00
|
|
|
assert!(req.len() < BLOB_SIZE);
|
2018-05-12 19:00:22 -07:00
|
|
|
sock.send_to(&req, to)?;
|
|
|
|
}
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
2018-07-11 20:10:25 -07:00
|
|
|
fn retransmit_all_leader_blocks(
|
|
|
|
maybe_leader: Option<NodeInfo>,
|
|
|
|
dq: &mut SharedBlobs,
|
|
|
|
debug_id: u64,
|
|
|
|
recycler: &BlobRecycler,
|
|
|
|
consumed: &mut u64,
|
|
|
|
received: &mut u64,
|
|
|
|
retransmit: &BlobSender,
|
|
|
|
) -> Result<()> {
|
|
|
|
let mut retransmit_queue = VecDeque::new();
|
|
|
|
if let Some(leader) = maybe_leader {
|
|
|
|
for b in dq {
|
|
|
|
let p = b.read().expect("'b' read lock in fn recv_window");
|
|
|
|
//TODO this check isn't safe against adverserial packets
|
|
|
|
//we need to maintain a sequence window
|
|
|
|
let leader_id = leader.id;
|
|
|
|
trace!(
|
|
|
|
"idx: {} addr: {:?} id: {:?} leader: {:?}",
|
|
|
|
p.get_index().expect("get_index in fn recv_window"),
|
|
|
|
p.get_id().expect("get_id in trace! fn recv_window"),
|
|
|
|
p.meta.addr(),
|
|
|
|
leader_id
|
|
|
|
);
|
|
|
|
if p.get_id().expect("get_id in fn recv_window") == leader_id {
|
|
|
|
//TODO
|
|
|
|
//need to copy the retransmitted blob
|
|
|
|
//otherwise we get into races with which thread
|
|
|
|
//should do the recycling
|
|
|
|
//
|
|
|
|
//a better abstraction would be to recycle when the blob
|
|
|
|
//is dropped via a weakref to the recycler
|
|
|
|
let nv = recycler.allocate();
|
|
|
|
{
|
|
|
|
let mut mnv = nv.write().expect("recycler write lock in fn recv_window");
|
|
|
|
let sz = p.meta.size;
|
|
|
|
mnv.meta.size = sz;
|
|
|
|
mnv.data[..sz].copy_from_slice(&p.data[..sz]);
|
|
|
|
}
|
|
|
|
retransmit_queue.push_back(nv);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
warn!("{:x}: no leader to retransmit from", debug_id);
|
|
|
|
}
|
|
|
|
if !retransmit_queue.is_empty() {
|
|
|
|
debug!(
|
|
|
|
"{:x}: RECV_WINDOW {} {}: retransmit {}",
|
|
|
|
debug_id,
|
|
|
|
*consumed,
|
|
|
|
*received,
|
|
|
|
retransmit_queue.len(),
|
|
|
|
);
|
2018-07-16 18:33:50 -07:00
|
|
|
inc_new_counter!("streamer-recv_window-retransmit", retransmit_queue.len());
|
2018-07-11 20:10:25 -07:00
|
|
|
retransmit.send(retransmit_queue)?;
|
|
|
|
}
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
|
|
|
fn process_blob(
|
|
|
|
b: SharedBlob,
|
|
|
|
pix: u64,
|
|
|
|
w: usize,
|
|
|
|
consume_queue: &mut SharedBlobs,
|
|
|
|
locked_window: &Window,
|
|
|
|
debug_id: u64,
|
|
|
|
recycler: &BlobRecycler,
|
|
|
|
consumed: &mut u64,
|
|
|
|
) {
|
|
|
|
let mut window = locked_window.write().unwrap();
|
|
|
|
|
|
|
|
// Search the window for old blobs in the window
|
|
|
|
// of consumed to received and clear any old ones
|
|
|
|
for ix in *consumed..(pix + 1) {
|
|
|
|
let k = (ix % WINDOW_SIZE) as usize;
|
|
|
|
if let Some(b) = &mut window[k] {
|
|
|
|
if b.read().unwrap().get_index().unwrap() >= *consumed as u64 {
|
|
|
|
continue;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if let Some(b) = mem::replace(&mut window[k], None) {
|
|
|
|
recycler.recycle(b);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Insert the new blob into the window
|
|
|
|
// spot should be free because we cleared it above
|
|
|
|
if window[w].is_none() {
|
|
|
|
window[w] = Some(b);
|
|
|
|
} else if let Some(cblob) = &window[w] {
|
|
|
|
if cblob.read().unwrap().get_index().unwrap() != pix as u64 {
|
|
|
|
warn!("{:x}: overrun blob at index {:}", debug_id, w);
|
|
|
|
} else {
|
|
|
|
debug!("{:x}: duplicate blob at index {:}", debug_id, w);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
loop {
|
|
|
|
let k = (*consumed % WINDOW_SIZE) as usize;
|
|
|
|
trace!("k: {} consumed: {}", k, *consumed);
|
|
|
|
|
|
|
|
if window[k].is_none() {
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
let mut is_coding = false;
|
|
|
|
if let Some(ref cblob) = window[k] {
|
|
|
|
let cblob_r = cblob
|
|
|
|
.read()
|
|
|
|
.expect("blob read lock for flogs streamer::window");
|
|
|
|
if cblob_r.get_index().unwrap() < *consumed {
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
if cblob_r.is_coding() {
|
|
|
|
is_coding = true;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if !is_coding {
|
|
|
|
consume_queue.push_back(window[k].clone().expect("clone in fn recv_window"));
|
|
|
|
*consumed += 1;
|
|
|
|
} else {
|
|
|
|
#[cfg(feature = "erasure")]
|
|
|
|
{
|
|
|
|
let block_start = *consumed - (*consumed % erasure::NUM_CODED as u64);
|
|
|
|
let coding_end = block_start + erasure::NUM_CODED as u64;
|
|
|
|
// We've received all this block's data blobs, go and null out the window now
|
|
|
|
for j in block_start..*consumed {
|
|
|
|
if let Some(b) = mem::replace(&mut window[(j % WINDOW_SIZE) as usize], None) {
|
|
|
|
recycler.recycle(b);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
for j in *consumed..coding_end {
|
|
|
|
window[(j % WINDOW_SIZE) as usize] = None;
|
|
|
|
}
|
|
|
|
|
|
|
|
*consumed += erasure::MAX_MISSING as u64;
|
|
|
|
debug!(
|
|
|
|
"skipping processing coding blob k: {} consumed: {}",
|
|
|
|
k, *consumed
|
|
|
|
);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2018-04-02 19:32:58 -07:00
|
|
|
fn recv_window(
|
2018-06-28 14:51:53 -07:00
|
|
|
debug_id: u64,
|
2018-06-13 21:52:23 -07:00
|
|
|
locked_window: &Window,
|
2018-04-28 00:31:20 -07:00
|
|
|
crdt: &Arc<RwLock<Crdt>>,
|
2018-04-02 19:32:58 -07:00
|
|
|
recycler: &BlobRecycler,
|
2018-06-27 12:35:58 -07:00
|
|
|
consumed: &mut u64,
|
|
|
|
received: &mut u64,
|
2018-04-12 10:26:32 -07:00
|
|
|
r: &BlobReceiver,
|
2018-04-02 19:32:58 -07:00
|
|
|
s: &BlobSender,
|
2018-04-18 20:12:30 -07:00
|
|
|
retransmit: &BlobSender,
|
2018-04-02 19:32:58 -07:00
|
|
|
) -> Result<()> {
|
2018-05-12 19:00:22 -07:00
|
|
|
let timer = Duration::from_millis(200);
|
2018-04-12 10:26:32 -07:00
|
|
|
let mut dq = r.recv_timeout(timer)?;
|
2018-07-11 00:18:48 -07:00
|
|
|
let maybe_leader: Option<NodeInfo> = crdt.read()
|
2018-05-11 11:38:52 -07:00
|
|
|
.expect("'crdt' read lock in fn recv_window")
|
|
|
|
.leader_data()
|
2018-06-28 14:51:53 -07:00
|
|
|
.cloned();
|
2018-04-12 10:26:32 -07:00
|
|
|
while let Ok(mut nq) = r.try_recv() {
|
|
|
|
dq.append(&mut nq)
|
|
|
|
}
|
2018-07-16 18:33:50 -07:00
|
|
|
inc_new_counter!("streamer-recv_window-recv", dq.len());
|
2018-07-09 15:53:49 -07:00
|
|
|
debug!(
|
2018-07-03 15:09:35 -07:00
|
|
|
"{:x}: RECV_WINDOW {} {}: got packets {}",
|
|
|
|
debug_id,
|
|
|
|
*consumed,
|
|
|
|
*received,
|
|
|
|
dq.len(),
|
|
|
|
);
|
2018-07-11 20:10:25 -07:00
|
|
|
|
|
|
|
retransmit_all_leader_blocks(
|
|
|
|
maybe_leader,
|
|
|
|
&mut dq,
|
|
|
|
debug_id,
|
|
|
|
recycler,
|
|
|
|
consumed,
|
|
|
|
received,
|
|
|
|
retransmit,
|
|
|
|
)?;
|
|
|
|
|
2018-04-12 10:26:32 -07:00
|
|
|
//send a contiguous set of blocks
|
2018-07-10 12:37:39 -07:00
|
|
|
let mut consume_queue = VecDeque::new();
|
2018-04-02 19:32:58 -07:00
|
|
|
while let Some(b) = dq.pop_front() {
|
2018-06-25 18:43:44 -07:00
|
|
|
let (pix, meta_size) = {
|
|
|
|
let p = b.write().expect("'b' write lock in fn recv_window");
|
2018-06-27 12:35:58 -07:00
|
|
|
(p.get_index()?, p.meta.size)
|
2018-06-25 18:43:44 -07:00
|
|
|
};
|
2018-05-12 19:00:22 -07:00
|
|
|
if pix > *received {
|
|
|
|
*received = pix;
|
|
|
|
}
|
2018-06-05 12:21:29 -07:00
|
|
|
// Got a blob which has already been consumed, skip it
|
|
|
|
// probably from a repair window request
|
|
|
|
if pix < *consumed {
|
2018-06-05 12:24:39 -07:00
|
|
|
debug!(
|
2018-06-28 14:51:53 -07:00
|
|
|
"{:x}: received: {} but older than consumed: {} skipping..",
|
|
|
|
debug_id, pix, *consumed
|
2018-06-05 12:24:39 -07:00
|
|
|
);
|
2018-06-05 12:21:29 -07:00
|
|
|
continue;
|
|
|
|
}
|
2018-06-27 12:35:58 -07:00
|
|
|
let w = (pix % WINDOW_SIZE) as usize;
|
2018-04-02 19:32:58 -07:00
|
|
|
//TODO, after the block are authenticated
|
|
|
|
//if we get different blocks at the same index
|
|
|
|
//that is a network failure/attack
|
2018-06-25 18:43:44 -07:00
|
|
|
trace!("window w: {} size: {}", w, meta_size);
|
2018-06-27 21:11:16 -07:00
|
|
|
|
2018-07-11 20:10:25 -07:00
|
|
|
process_blob(
|
|
|
|
b,
|
|
|
|
pix,
|
|
|
|
w,
|
|
|
|
&mut consume_queue,
|
|
|
|
locked_window,
|
|
|
|
debug_id,
|
|
|
|
recycler,
|
|
|
|
consumed,
|
|
|
|
);
|
2018-04-02 19:32:58 -07:00
|
|
|
}
|
2018-07-09 17:35:23 -07:00
|
|
|
print_window(debug_id, locked_window, *consumed);
|
2018-07-10 12:37:39 -07:00
|
|
|
trace!("sending consume_queue.len: {}", consume_queue.len());
|
|
|
|
if !consume_queue.is_empty() {
|
2018-07-09 15:53:49 -07:00
|
|
|
debug!(
|
2018-07-10 12:37:39 -07:00
|
|
|
"{:x}: RECV_WINDOW {} {}: forwarding consume_queue {}",
|
2018-07-03 15:09:35 -07:00
|
|
|
debug_id,
|
|
|
|
*consumed,
|
|
|
|
*received,
|
2018-07-10 12:37:39 -07:00
|
|
|
consume_queue.len(),
|
2018-07-03 15:09:35 -07:00
|
|
|
);
|
2018-07-10 12:37:39 -07:00
|
|
|
trace!("sending consume_queue.len: {}", consume_queue.len());
|
2018-07-16 18:33:50 -07:00
|
|
|
inc_new_counter!("streamer-recv_window-consume", consume_queue.len());
|
2018-07-10 12:37:39 -07:00
|
|
|
s.send(consume_queue)?;
|
2018-06-01 11:36:20 -07:00
|
|
|
}
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
2018-07-09 17:35:23 -07:00
|
|
|
fn print_window(debug_id: u64, locked_window: &Window, consumed: u64) {
|
2018-05-12 19:00:22 -07:00
|
|
|
{
|
|
|
|
let buf: Vec<_> = locked_window
|
|
|
|
.read()
|
|
|
|
.unwrap()
|
|
|
|
.iter()
|
|
|
|
.enumerate()
|
|
|
|
.map(|(i, v)| {
|
2018-06-27 12:35:58 -07:00
|
|
|
if i == (consumed % WINDOW_SIZE) as usize {
|
2018-05-12 19:00:22 -07:00
|
|
|
"_"
|
|
|
|
} else if v.is_none() {
|
|
|
|
"0"
|
2018-07-11 13:40:46 -07:00
|
|
|
} else if let Some(ref cblob) = v {
|
|
|
|
if cblob.read().unwrap().is_coding() {
|
|
|
|
"C"
|
2018-06-01 12:49:47 -07:00
|
|
|
} else {
|
2018-07-11 13:40:46 -07:00
|
|
|
"1"
|
2018-06-01 12:49:47 -07:00
|
|
|
}
|
2018-07-11 13:40:46 -07:00
|
|
|
} else {
|
|
|
|
"0"
|
2018-05-12 19:00:22 -07:00
|
|
|
}
|
|
|
|
})
|
|
|
|
.collect();
|
2018-07-09 17:35:23 -07:00
|
|
|
trace!("{:x}:WINDOW ({}): {}", debug_id, consumed, buf.join(""));
|
2018-04-12 10:26:32 -07:00
|
|
|
}
|
2018-04-02 19:32:58 -07:00
|
|
|
}
|
|
|
|
|
2018-06-13 21:52:23 -07:00
|
|
|
pub fn default_window() -> Window {
|
2018-06-27 12:35:58 -07:00
|
|
|
Arc::new(RwLock::new(vec![None; WINDOW_SIZE as usize]))
|
2018-05-12 19:00:22 -07:00
|
|
|
}
|
|
|
|
|
2018-07-02 10:07:32 -07:00
|
|
|
/// Initialize a rebroadcast window with most recent Entry blobs
|
|
|
|
/// * `crdt` - gossip instance, used to set blob ids
|
|
|
|
/// * `blobs` - up to WINDOW_SIZE most recent blobs
|
|
|
|
/// * `entry_height` - current entry height
|
|
|
|
pub fn initialized_window(
|
|
|
|
crdt: &Arc<RwLock<Crdt>>,
|
|
|
|
blobs: Vec<SharedBlob>,
|
|
|
|
entry_height: u64,
|
|
|
|
) -> Window {
|
|
|
|
let window = default_window();
|
|
|
|
|
|
|
|
{
|
|
|
|
let mut win = window.write().unwrap();
|
2018-07-09 15:53:49 -07:00
|
|
|
let me = crdt.read().unwrap().my_data().clone();
|
2018-07-02 10:07:32 -07:00
|
|
|
|
|
|
|
debug!(
|
|
|
|
"initialized window entry_height:{} blobs_len:{}",
|
|
|
|
entry_height,
|
|
|
|
blobs.len()
|
|
|
|
);
|
|
|
|
|
|
|
|
// Index the blobs
|
|
|
|
let mut received = entry_height - blobs.len() as u64;
|
2018-07-09 15:53:49 -07:00
|
|
|
Crdt::index_blobs(&me, &blobs, &mut received).expect("index blobs for initial window");
|
2018-07-02 10:07:32 -07:00
|
|
|
|
|
|
|
// populate the window, offset by implied index
|
2018-07-05 12:01:40 -07:00
|
|
|
let diff = cmp::max(blobs.len() as isize - win.len() as isize, 0) as usize;
|
|
|
|
for b in blobs.into_iter().skip(diff) {
|
2018-07-02 10:07:32 -07:00
|
|
|
let ix = b.read().unwrap().get_index().expect("blob index");
|
|
|
|
let pos = (ix % WINDOW_SIZE) as usize;
|
|
|
|
trace!("caching {} at {}", ix, pos);
|
|
|
|
assert!(win[pos].is_none());
|
|
|
|
win[pos] = Some(b);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
window
|
|
|
|
}
|
|
|
|
|
2018-04-02 19:32:58 -07:00
|
|
|
pub fn window(
|
2018-04-28 00:31:20 -07:00
|
|
|
crdt: Arc<RwLock<Crdt>>,
|
2018-06-13 21:52:23 -07:00
|
|
|
window: Window,
|
2018-06-27 12:35:58 -07:00
|
|
|
entry_height: u64,
|
2018-04-12 10:26:32 -07:00
|
|
|
recycler: BlobRecycler,
|
|
|
|
r: BlobReceiver,
|
2018-04-02 19:32:58 -07:00
|
|
|
s: BlobSender,
|
2018-04-18 20:12:30 -07:00
|
|
|
retransmit: BlobSender,
|
2018-04-02 19:32:58 -07:00
|
|
|
) -> JoinHandle<()> {
|
2018-05-30 13:13:14 -07:00
|
|
|
Builder::new()
|
2018-05-30 13:20:58 -07:00
|
|
|
.name("solana-window".to_string())
|
2018-05-30 13:13:14 -07:00
|
|
|
.spawn(move || {
|
2018-06-27 12:35:58 -07:00
|
|
|
let mut consumed = entry_height;
|
|
|
|
let mut received = entry_height;
|
|
|
|
let mut last = entry_height;
|
2018-05-30 13:13:14 -07:00
|
|
|
let mut times = 0;
|
2018-06-28 14:51:53 -07:00
|
|
|
let debug_id = crdt.read().unwrap().debug_id();
|
2018-07-09 17:35:23 -07:00
|
|
|
trace!("{:x}: RECV_WINDOW started", debug_id);
|
2018-05-30 13:13:14 -07:00
|
|
|
loop {
|
2018-07-05 15:37:04 -07:00
|
|
|
if let Err(e) = recv_window(
|
2018-06-28 14:51:53 -07:00
|
|
|
debug_id,
|
2018-05-30 13:13:14 -07:00
|
|
|
&window,
|
|
|
|
&crdt,
|
|
|
|
&recycler,
|
|
|
|
&mut consumed,
|
|
|
|
&mut received,
|
|
|
|
&r,
|
|
|
|
&s,
|
|
|
|
&retransmit,
|
2018-07-05 15:37:04 -07:00
|
|
|
) {
|
|
|
|
match e {
|
|
|
|
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
|
|
|
|
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
|
2018-07-17 08:20:35 -07:00
|
|
|
_ => {
|
|
|
|
inc_new_counter!("streamer-window-error", 1, 1);
|
|
|
|
error!("window error: {:?}", e);
|
|
|
|
}
|
2018-07-05 15:37:04 -07:00
|
|
|
}
|
|
|
|
}
|
2018-05-30 13:13:14 -07:00
|
|
|
let _ = repair_window(
|
2018-06-28 14:51:53 -07:00
|
|
|
debug_id,
|
2018-05-30 13:13:14 -07:00
|
|
|
&window,
|
|
|
|
&crdt,
|
2018-05-30 12:49:15 -07:00
|
|
|
&recycler,
|
2018-05-30 13:13:14 -07:00
|
|
|
&mut last,
|
|
|
|
&mut times,
|
|
|
|
&mut consumed,
|
|
|
|
&mut received,
|
|
|
|
);
|
2018-06-28 14:51:53 -07:00
|
|
|
assert!(consumed <= (received + 1));
|
2018-04-02 19:32:58 -07:00
|
|
|
}
|
2018-05-30 13:13:14 -07:00
|
|
|
})
|
|
|
|
.unwrap()
|
2018-04-12 10:26:32 -07:00
|
|
|
}
|
|
|
|
|
2018-04-28 00:31:20 -07:00
|
|
|
fn broadcast(
|
2018-07-11 00:18:48 -07:00
|
|
|
me: &NodeInfo,
|
2018-07-11 13:40:46 -07:00
|
|
|
broadcast_table: &[NodeInfo],
|
2018-06-13 21:52:23 -07:00
|
|
|
window: &Window,
|
2018-04-28 00:31:20 -07:00
|
|
|
recycler: &BlobRecycler,
|
|
|
|
r: &BlobReceiver,
|
|
|
|
sock: &UdpSocket,
|
|
|
|
transmit_index: &mut u64,
|
2018-06-04 13:56:38 -07:00
|
|
|
receive_index: &mut u64,
|
2018-04-28 00:31:20 -07:00
|
|
|
) -> Result<()> {
|
2018-07-09 15:53:49 -07:00
|
|
|
let debug_id = me.debug_id();
|
2018-04-28 00:31:20 -07:00
|
|
|
let timer = Duration::new(1, 0);
|
|
|
|
let mut dq = r.recv_timeout(timer)?;
|
|
|
|
while let Ok(mut nq) = r.try_recv() {
|
|
|
|
dq.append(&mut nq);
|
|
|
|
}
|
2018-06-25 13:29:53 -07:00
|
|
|
|
|
|
|
// flatten deque to vec
|
|
|
|
let blobs_vec: Vec<_> = dq.into_iter().collect();
|
|
|
|
|
|
|
|
// We could receive more blobs than window slots so
|
|
|
|
// break them up into window-sized chunks to process
|
2018-06-27 12:35:58 -07:00
|
|
|
let blobs_chunked = blobs_vec.chunks(WINDOW_SIZE as usize).map(|x| x.to_vec());
|
2018-06-01 11:36:20 -07:00
|
|
|
|
2018-07-09 17:35:23 -07:00
|
|
|
print_window(me.debug_id(), window, *receive_index);
|
2018-05-25 11:36:31 -07:00
|
|
|
|
2018-06-25 13:29:53 -07:00
|
|
|
for mut blobs in blobs_chunked {
|
|
|
|
// Insert the coding blobs into the blob stream
|
|
|
|
#[cfg(feature = "erasure")]
|
|
|
|
erasure::add_coding_blobs(recycler, &mut blobs, *receive_index);
|
2018-06-04 13:56:38 -07:00
|
|
|
|
2018-06-25 13:29:53 -07:00
|
|
|
let blobs_len = blobs.len();
|
2018-06-28 14:51:53 -07:00
|
|
|
debug!("{:x} broadcast blobs.len: {}", debug_id, blobs_len);
|
2018-05-28 10:25:15 -07:00
|
|
|
|
2018-06-25 13:29:53 -07:00
|
|
|
// Index the blobs
|
2018-07-09 15:53:49 -07:00
|
|
|
Crdt::index_blobs(&me, &blobs, receive_index)?;
|
2018-06-25 13:29:53 -07:00
|
|
|
// keep the cache of blobs that are broadcast
|
2018-07-16 18:33:50 -07:00
|
|
|
inc_new_counter!("streamer-broadcast-sent", blobs.len());
|
2018-06-25 13:29:53 -07:00
|
|
|
{
|
|
|
|
let mut win = window.write().unwrap();
|
|
|
|
assert!(blobs.len() <= win.len());
|
|
|
|
for b in &blobs {
|
|
|
|
let ix = b.read().unwrap().get_index().expect("blob index");
|
2018-06-27 12:35:58 -07:00
|
|
|
let pos = (ix % WINDOW_SIZE) as usize;
|
2018-06-25 15:50:58 -07:00
|
|
|
if let Some(x) = mem::replace(&mut win[pos], None) {
|
2018-06-25 13:29:53 -07:00
|
|
|
trace!(
|
|
|
|
"popped {} at {}",
|
|
|
|
x.read().unwrap().get_index().unwrap(),
|
|
|
|
pos
|
|
|
|
);
|
2018-06-25 15:50:58 -07:00
|
|
|
recycler.recycle(x);
|
2018-06-25 13:29:53 -07:00
|
|
|
}
|
|
|
|
trace!("null {}", pos);
|
|
|
|
}
|
|
|
|
while let Some(b) = blobs.pop() {
|
|
|
|
let ix = b.read().unwrap().get_index().expect("blob index");
|
2018-06-27 12:35:58 -07:00
|
|
|
let pos = (ix % WINDOW_SIZE) as usize;
|
2018-06-25 13:29:53 -07:00
|
|
|
trace!("caching {} at {}", ix, pos);
|
|
|
|
assert!(win[pos].is_none());
|
|
|
|
win[pos] = Some(b);
|
2018-05-12 19:00:22 -07:00
|
|
|
}
|
|
|
|
}
|
2018-05-25 11:36:31 -07:00
|
|
|
|
2018-06-25 13:29:53 -07:00
|
|
|
// Fill in the coding blob data from the window data blobs
|
|
|
|
#[cfg(feature = "erasure")]
|
2018-05-29 18:50:36 -07:00
|
|
|
{
|
2018-06-25 13:29:53 -07:00
|
|
|
erasure::generate_coding(
|
|
|
|
&mut window.write().unwrap(),
|
|
|
|
*receive_index as usize,
|
|
|
|
blobs_len,
|
2018-07-05 13:37:13 -07:00
|
|
|
)?;
|
2018-05-25 18:21:18 -07:00
|
|
|
}
|
|
|
|
|
2018-06-25 13:29:53 -07:00
|
|
|
*receive_index += blobs_len as u64;
|
2018-06-04 14:29:14 -07:00
|
|
|
|
2018-06-25 13:29:53 -07:00
|
|
|
// Send blobs out from the window
|
2018-07-09 15:53:49 -07:00
|
|
|
Crdt::broadcast(
|
|
|
|
&me,
|
|
|
|
&broadcast_table,
|
|
|
|
&window,
|
|
|
|
&sock,
|
|
|
|
transmit_index,
|
|
|
|
*receive_index,
|
|
|
|
)?;
|
2018-06-25 13:29:53 -07:00
|
|
|
}
|
2018-04-28 00:31:20 -07:00
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Service to broadcast messages from the leader to layer 1 nodes.
|
|
|
|
/// See `crdt` for network layer definitions.
|
|
|
|
/// # Arguments
|
|
|
|
/// * `sock` - Socket to send from.
|
|
|
|
/// * `exit` - Boolean to signal system exit.
|
|
|
|
/// * `crdt` - CRDT structure
|
2018-05-12 19:00:22 -07:00
|
|
|
/// * `window` - Cache of blobs that we have broadcast
|
2018-04-28 00:31:20 -07:00
|
|
|
/// * `recycler` - Blob recycler.
|
|
|
|
/// * `r` - Receive channel for blobs to be retransmitted to all the layer 1 nodes.
|
|
|
|
pub fn broadcaster(
|
|
|
|
sock: UdpSocket,
|
|
|
|
crdt: Arc<RwLock<Crdt>>,
|
2018-06-13 21:52:23 -07:00
|
|
|
window: Window,
|
2018-06-27 12:35:58 -07:00
|
|
|
entry_height: u64,
|
2018-04-28 00:31:20 -07:00
|
|
|
recycler: BlobRecycler,
|
|
|
|
r: BlobReceiver,
|
|
|
|
) -> JoinHandle<()> {
|
2018-05-30 13:13:14 -07:00
|
|
|
Builder::new()
|
2018-05-30 13:20:58 -07:00
|
|
|
.name("solana-broadcaster".to_string())
|
2018-05-30 13:13:14 -07:00
|
|
|
.spawn(move || {
|
2018-06-27 12:35:58 -07:00
|
|
|
let mut transmit_index = entry_height;
|
|
|
|
let mut receive_index = entry_height;
|
2018-07-09 15:53:49 -07:00
|
|
|
let me = crdt.read().unwrap().my_data().clone();
|
2018-05-30 13:13:14 -07:00
|
|
|
loop {
|
2018-07-09 15:53:49 -07:00
|
|
|
let broadcast_table = crdt.read().unwrap().compute_broadcast_table();
|
2018-07-05 14:50:42 -07:00
|
|
|
if let Err(e) = broadcast(
|
2018-07-09 15:53:49 -07:00
|
|
|
&me,
|
|
|
|
&broadcast_table,
|
2018-06-05 12:24:39 -07:00
|
|
|
&window,
|
|
|
|
&recycler,
|
|
|
|
&r,
|
|
|
|
&sock,
|
|
|
|
&mut transmit_index,
|
|
|
|
&mut receive_index,
|
2018-07-05 14:50:42 -07:00
|
|
|
) {
|
|
|
|
match e {
|
|
|
|
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
|
|
|
|
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
|
|
|
|
Error::CrdtError(CrdtError::TooSmall) => (), // TODO: Why are the unit-tests throwing hundreds of these?
|
2018-07-17 08:20:35 -07:00
|
|
|
_ => {
|
|
|
|
inc_new_counter!("streamer-broadcaster-error", 1, 1);
|
|
|
|
error!("broadcaster error: {:?}", e);
|
|
|
|
}
|
2018-07-05 14:50:42 -07:00
|
|
|
}
|
|
|
|
}
|
2018-04-28 00:31:20 -07:00
|
|
|
}
|
2018-05-30 13:13:14 -07:00
|
|
|
})
|
|
|
|
.unwrap()
|
2018-04-28 00:31:20 -07:00
|
|
|
}
|
|
|
|
|
2018-04-12 10:26:32 -07:00
|
|
|
fn retransmit(
|
2018-04-28 00:31:20 -07:00
|
|
|
crdt: &Arc<RwLock<Crdt>>,
|
2018-04-12 10:26:32 -07:00
|
|
|
recycler: &BlobRecycler,
|
|
|
|
r: &BlobReceiver,
|
|
|
|
sock: &UdpSocket,
|
|
|
|
) -> Result<()> {
|
|
|
|
let timer = Duration::new(1, 0);
|
|
|
|
let mut dq = r.recv_timeout(timer)?;
|
|
|
|
while let Ok(mut nq) = r.try_recv() {
|
|
|
|
dq.append(&mut nq);
|
|
|
|
}
|
|
|
|
{
|
|
|
|
for b in &dq {
|
2018-04-28 00:31:20 -07:00
|
|
|
Crdt::retransmit(&crdt, b, sock)?;
|
2018-04-12 10:26:32 -07:00
|
|
|
}
|
|
|
|
}
|
|
|
|
while let Some(b) = dq.pop_front() {
|
|
|
|
recycler.recycle(b);
|
|
|
|
}
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
2018-04-17 19:46:50 -07:00
|
|
|
/// Service to retransmit messages from the leader to layer 1 nodes.
|
2018-04-28 00:31:20 -07:00
|
|
|
/// See `crdt` for network layer definitions.
|
2018-04-17 19:46:50 -07:00
|
|
|
/// # Arguments
|
|
|
|
/// * `sock` - Socket to read from. Read timeout is set to 1.
|
|
|
|
/// * `exit` - Boolean to signal system exit.
|
2018-05-14 14:33:11 -07:00
|
|
|
/// * `crdt` - This structure needs to be updated and populated by the bank and via gossip.
|
2018-04-17 19:46:50 -07:00
|
|
|
/// * `recycler` - Blob recycler.
|
|
|
|
/// * `r` - Receive channel for blobs to be retransmitted to all the layer 1 nodes.
|
2018-04-12 10:26:32 -07:00
|
|
|
pub fn retransmitter(
|
|
|
|
sock: UdpSocket,
|
2018-04-28 00:31:20 -07:00
|
|
|
crdt: Arc<RwLock<Crdt>>,
|
2018-04-12 10:26:32 -07:00
|
|
|
recycler: BlobRecycler,
|
|
|
|
r: BlobReceiver,
|
|
|
|
) -> JoinHandle<()> {
|
2018-05-30 13:13:14 -07:00
|
|
|
Builder::new()
|
2018-05-30 13:20:58 -07:00
|
|
|
.name("solana-retransmitter".to_string())
|
2018-05-30 13:13:14 -07:00
|
|
|
.spawn(move || {
|
|
|
|
trace!("retransmitter started");
|
|
|
|
loop {
|
2018-07-05 15:37:04 -07:00
|
|
|
if let Err(e) = retransmit(&crdt, &recycler, &r, &sock) {
|
|
|
|
match e {
|
|
|
|
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
|
|
|
|
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
|
2018-07-17 08:20:35 -07:00
|
|
|
_ => {
|
|
|
|
inc_new_counter!("streamer-retransmit-error", 1, 1);
|
|
|
|
error!("retransmitter error: {:?}", e);
|
|
|
|
}
|
2018-07-05 15:37:04 -07:00
|
|
|
}
|
2018-05-30 13:13:14 -07:00
|
|
|
}
|
2018-04-28 00:31:20 -07:00
|
|
|
}
|
2018-05-30 13:13:14 -07:00
|
|
|
trace!("exiting retransmitter");
|
|
|
|
})
|
|
|
|
.unwrap()
|
2018-04-02 19:32:58 -07:00
|
|
|
}
|
|
|
|
|
2018-03-19 16:09:47 -07:00
|
|
|
#[cfg(test)]
|
|
|
|
mod test {
|
2018-06-02 08:32:51 -07:00
|
|
|
use crdt::{Crdt, TestNode};
|
2018-06-28 14:51:53 -07:00
|
|
|
use logger;
|
2018-03-26 21:07:11 -07:00
|
|
|
use packet::{Blob, BlobRecycler, Packet, PacketRecycler, Packets, PACKET_DATA_SIZE};
|
|
|
|
use std::collections::VecDeque;
|
|
|
|
use std::io;
|
|
|
|
use std::io::Write;
|
2018-03-19 16:09:47 -07:00
|
|
|
use std::net::UdpSocket;
|
2018-03-22 13:05:23 -07:00
|
|
|
use std::sync::atomic::{AtomicBool, Ordering};
|
2018-03-19 16:09:47 -07:00
|
|
|
use std::sync::mpsc::channel;
|
2018-04-12 10:26:32 -07:00
|
|
|
use std::sync::{Arc, RwLock};
|
2018-03-26 21:03:26 -07:00
|
|
|
use std::time::Duration;
|
2018-05-27 18:21:39 -07:00
|
|
|
use streamer::{blob_receiver, receiver, responder, window};
|
2018-05-30 11:54:53 -07:00
|
|
|
use streamer::{default_window, BlobReceiver, PacketReceiver};
|
2018-03-07 13:47:13 -08:00
|
|
|
|
2018-04-02 19:32:58 -07:00
|
|
|
fn get_msgs(r: PacketReceiver, num: &mut usize) {
|
2018-03-14 11:02:38 -07:00
|
|
|
for _t in 0..5 {
|
2018-03-07 13:47:13 -08:00
|
|
|
let timer = Duration::new(1, 0);
|
|
|
|
match r.recv_timeout(timer) {
|
|
|
|
Ok(m) => *num += m.read().unwrap().packets.len(),
|
2018-04-28 00:31:20 -07:00
|
|
|
e => info!("error {:?}", e),
|
2018-03-07 13:47:13 -08:00
|
|
|
}
|
|
|
|
if *num == 10 {
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
#[test]
|
2018-04-02 19:32:58 -07:00
|
|
|
pub fn streamer_debug() {
|
|
|
|
write!(io::sink(), "{:?}", Packet::default()).unwrap();
|
|
|
|
write!(io::sink(), "{:?}", Packets::default()).unwrap();
|
|
|
|
write!(io::sink(), "{:?}", Blob::default()).unwrap();
|
|
|
|
}
|
|
|
|
#[test]
|
|
|
|
pub fn streamer_send_test() {
|
|
|
|
let read = UdpSocket::bind("127.0.0.1:0").expect("bind");
|
2018-05-15 08:53:51 -07:00
|
|
|
read.set_read_timeout(Some(Duration::new(1, 0))).unwrap();
|
|
|
|
|
2018-03-07 13:47:13 -08:00
|
|
|
let addr = read.local_addr().unwrap();
|
2018-04-02 19:32:58 -07:00
|
|
|
let send = UdpSocket::bind("127.0.0.1:0").expect("bind");
|
|
|
|
let exit = Arc::new(AtomicBool::new(false));
|
|
|
|
let pack_recycler = PacketRecycler::default();
|
|
|
|
let resp_recycler = BlobRecycler::default();
|
2018-03-07 13:47:13 -08:00
|
|
|
let (s_reader, r_reader) = channel();
|
2018-05-15 08:53:51 -07:00
|
|
|
let t_receiver = receiver(read, exit.clone(), pack_recycler.clone(), s_reader);
|
2018-07-05 15:41:03 -07:00
|
|
|
let t_responder = {
|
|
|
|
let (s_responder, r_responder) = channel();
|
2018-07-11 07:38:57 -07:00
|
|
|
let t_responder = responder(
|
|
|
|
"streamer_send_test",
|
|
|
|
send,
|
|
|
|
resp_recycler.clone(),
|
|
|
|
r_responder,
|
|
|
|
);
|
2018-07-05 15:41:03 -07:00
|
|
|
let mut msgs = VecDeque::new();
|
|
|
|
for i in 0..10 {
|
|
|
|
let b = resp_recycler.allocate();
|
|
|
|
{
|
|
|
|
let mut w = b.write().unwrap();
|
|
|
|
w.data[0] = i as u8;
|
|
|
|
w.meta.size = PACKET_DATA_SIZE;
|
|
|
|
w.meta.set_addr(&addr);
|
|
|
|
}
|
|
|
|
msgs.push_back(b);
|
2018-06-25 16:13:26 -07:00
|
|
|
}
|
2018-07-05 15:41:03 -07:00
|
|
|
s_responder.send(msgs).expect("send");
|
|
|
|
t_responder
|
|
|
|
};
|
|
|
|
|
2018-03-07 13:47:13 -08:00
|
|
|
let mut num = 0;
|
|
|
|
get_msgs(r_reader, &mut num);
|
|
|
|
assert_eq!(num, 10);
|
2018-03-22 13:05:23 -07:00
|
|
|
exit.store(true, Ordering::Relaxed);
|
2018-03-07 13:47:13 -08:00
|
|
|
t_receiver.join().expect("join");
|
2018-03-24 23:31:54 -07:00
|
|
|
t_responder.join().expect("join");
|
2018-03-07 13:47:13 -08:00
|
|
|
}
|
2018-04-02 19:32:58 -07:00
|
|
|
|
|
|
|
fn get_blobs(r: BlobReceiver, num: &mut usize) {
|
|
|
|
for _t in 0..5 {
|
|
|
|
let timer = Duration::new(1, 0);
|
|
|
|
match r.recv_timeout(timer) {
|
|
|
|
Ok(m) => {
|
|
|
|
for (i, v) in m.iter().enumerate() {
|
|
|
|
assert_eq!(v.read().unwrap().get_index().unwrap() as usize, *num + i);
|
|
|
|
}
|
|
|
|
*num += m.len();
|
|
|
|
}
|
2018-04-28 00:31:20 -07:00
|
|
|
e => info!("error {:?}", e),
|
2018-04-02 19:32:58 -07:00
|
|
|
}
|
|
|
|
if *num == 10 {
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
2018-03-14 11:28:05 -07:00
|
|
|
}
|
2018-04-02 19:32:58 -07:00
|
|
|
|
2018-03-11 09:22:21 -07:00
|
|
|
#[test]
|
2018-04-02 19:32:58 -07:00
|
|
|
pub fn window_send_test() {
|
2018-06-28 14:51:53 -07:00
|
|
|
logger::setup();
|
2018-07-16 19:31:52 -07:00
|
|
|
let tn = TestNode::new_localhost();
|
2018-03-22 13:05:23 -07:00
|
|
|
let exit = Arc::new(AtomicBool::new(false));
|
2018-07-16 19:31:52 -07:00
|
|
|
let mut crdt_me = Crdt::new(tn.data.clone()).expect("Crdt::new");
|
2018-04-28 00:31:20 -07:00
|
|
|
let me_id = crdt_me.my_data().id;
|
|
|
|
crdt_me.set_leader(me_id);
|
|
|
|
let subs = Arc::new(RwLock::new(crdt_me));
|
|
|
|
|
2018-04-02 19:32:58 -07:00
|
|
|
let resp_recycler = BlobRecycler::default();
|
2018-03-11 09:22:21 -07:00
|
|
|
let (s_reader, r_reader) = channel();
|
2018-06-02 08:32:51 -07:00
|
|
|
let t_receiver = blob_receiver(
|
|
|
|
exit.clone(),
|
|
|
|
resp_recycler.clone(),
|
|
|
|
tn.sockets.gossip,
|
|
|
|
s_reader,
|
|
|
|
).unwrap();
|
2018-04-12 10:26:32 -07:00
|
|
|
let (s_window, r_window) = channel();
|
2018-04-18 20:12:30 -07:00
|
|
|
let (s_retransmit, r_retransmit) = channel();
|
2018-05-12 19:00:22 -07:00
|
|
|
let win = default_window();
|
2018-04-12 10:26:32 -07:00
|
|
|
let t_window = window(
|
|
|
|
subs,
|
2018-05-12 19:00:22 -07:00
|
|
|
win,
|
2018-06-27 12:35:58 -07:00
|
|
|
0,
|
2018-04-12 10:26:32 -07:00
|
|
|
resp_recycler.clone(),
|
|
|
|
r_reader,
|
|
|
|
s_window,
|
2018-04-18 20:12:30 -07:00
|
|
|
s_retransmit,
|
2018-04-12 10:26:32 -07:00
|
|
|
);
|
2018-07-05 15:41:03 -07:00
|
|
|
let t_responder = {
|
|
|
|
let (s_responder, r_responder) = channel();
|
2018-07-11 07:38:57 -07:00
|
|
|
let t_responder = responder(
|
|
|
|
"window_send_test",
|
|
|
|
tn.sockets.replicate,
|
|
|
|
resp_recycler.clone(),
|
|
|
|
r_responder,
|
|
|
|
);
|
2018-07-05 15:41:03 -07:00
|
|
|
let mut msgs = VecDeque::new();
|
|
|
|
for v in 0..10 {
|
|
|
|
let i = 9 - v;
|
|
|
|
let b = resp_recycler.allocate();
|
|
|
|
{
|
|
|
|
let mut w = b.write().unwrap();
|
|
|
|
w.set_index(i).unwrap();
|
|
|
|
w.set_id(me_id).unwrap();
|
|
|
|
assert_eq!(i, w.get_index().unwrap());
|
|
|
|
w.meta.size = PACKET_DATA_SIZE;
|
2018-07-09 17:55:11 -07:00
|
|
|
w.meta.set_addr(&tn.data.contact_info.ncp);
|
2018-07-05 15:41:03 -07:00
|
|
|
}
|
|
|
|
msgs.push_back(b);
|
2018-06-25 18:43:44 -07:00
|
|
|
}
|
2018-07-05 15:41:03 -07:00
|
|
|
s_responder.send(msgs).expect("send");
|
|
|
|
t_responder
|
|
|
|
};
|
|
|
|
|
2018-03-11 09:22:21 -07:00
|
|
|
let mut num = 0;
|
2018-04-12 10:26:32 -07:00
|
|
|
get_blobs(r_window, &mut num);
|
2018-03-11 09:22:21 -07:00
|
|
|
assert_eq!(num, 10);
|
2018-04-18 20:12:30 -07:00
|
|
|
let mut q = r_retransmit.recv().unwrap();
|
|
|
|
while let Ok(mut nq) = r_retransmit.try_recv() {
|
2018-04-16 20:57:15 -07:00
|
|
|
q.append(&mut nq);
|
|
|
|
}
|
|
|
|
assert_eq!(q.len(), 10);
|
2018-03-22 13:05:23 -07:00
|
|
|
exit.store(true, Ordering::Relaxed);
|
2018-03-11 09:22:21 -07:00
|
|
|
t_receiver.join().expect("join");
|
2018-03-24 23:31:54 -07:00
|
|
|
t_responder.join().expect("join");
|
2018-04-12 10:26:32 -07:00
|
|
|
t_window.join().expect("join");
|
2018-03-11 09:22:21 -07:00
|
|
|
}
|
2018-03-07 13:47:13 -08:00
|
|
|
}
|