solana/src/streamer.rs

955 lines
29 KiB
Rust
Raw Normal View History

2018-05-25 22:00:47 -07:00
//! The `streamer` module defines a set of services for efficiently pulling data from UDP sockets.
//!
use counter::Counter;
use crdt::{Crdt, NodeInfo};
2018-04-28 00:31:20 -07:00
#[cfg(feature = "erasure")]
use erasure;
use log::Level;
2018-06-27 11:33:56 -07:00
use packet::{
Blob, BlobRecycler, PacketRecycler, SharedBlob, SharedBlobs, SharedPackets, BLOB_SIZE,
};
use result::{Error, Result};
use std::cmp;
2018-03-26 21:07:11 -07:00
use std::collections::VecDeque;
use std::mem;
2018-05-12 19:00:22 -07:00
use std::net::{SocketAddr, UdpSocket};
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::mpsc::{Receiver, RecvTimeoutError, Sender};
use std::sync::{Arc, RwLock};
2018-05-30 13:13:14 -07:00
use std::thread::{Builder, JoinHandle};
use std::time::{Duration, Instant};
use timing::duration_as_ms;
2018-06-27 12:35:58 -07:00
pub const WINDOW_SIZE: u64 = 2 * 1024;
2018-06-27 11:33:56 -07:00
pub type PacketReceiver = Receiver<SharedPackets>;
pub type PacketSender = Sender<SharedPackets>;
pub type BlobSender = Sender<SharedBlobs>;
pub type BlobReceiver = Receiver<SharedBlobs>;
2018-07-17 15:00:22 -07:00
2018-07-18 10:10:34 -07:00
#[derive(Clone, Default)]
2018-07-17 15:00:22 -07:00
pub struct WindowSlot {
pub data: Option<SharedBlob>,
pub coding: Option<SharedBlob>,
}
pub type SharedWindow = Arc<RwLock<Vec<WindowSlot>>>;
#[derive(Debug, PartialEq, Eq)]
pub enum WindowError {
GenericError,
}
#[derive(Debug)]
pub struct WindowIndex {
pub data: u64,
pub coding: u64,
}
fn recv_loop(
sock: &UdpSocket,
2018-03-22 13:31:58 -07:00
exit: &Arc<AtomicBool>,
re: &PacketRecycler,
channel: &PacketSender,
) -> Result<()> {
loop {
let msgs = re.allocate();
loop {
2018-08-03 11:27:44 -07:00
let result = msgs
.write()
2018-05-11 11:38:52 -07:00
.expect("write lock in fn recv_loop")
.recv_from(sock);
match result {
Ok(()) => {
channel.send(msgs)?;
break;
}
Err(_) => {
2018-03-22 13:05:23 -07:00
if exit.load(Ordering::Relaxed) {
re.recycle(msgs);
return Ok(());
}
}
}
}
}
}
pub fn receiver(
sock: UdpSocket,
2018-03-22 13:05:23 -07:00
exit: Arc<AtomicBool>,
2018-03-24 23:31:54 -07:00
recycler: PacketRecycler,
packet_sender: PacketSender,
) -> JoinHandle<()> {
let res = sock.set_read_timeout(Some(Duration::new(1, 0)));
if res.is_err() {
panic!("streamer::receiver set_read_timeout error");
}
2018-05-30 13:13:14 -07:00
Builder::new()
2018-05-30 13:20:58 -07:00
.name("solana-receiver".to_string())
2018-05-30 13:13:14 -07:00
.spawn(move || {
let _ = recv_loop(&sock, &exit, &recycler, &packet_sender);
()
})
.unwrap()
}
fn recv_send(sock: &UdpSocket, recycler: &BlobRecycler, r: &BlobReceiver) -> Result<()> {
let timer = Duration::new(1, 0);
let mut msgs = r.recv_timeout(timer)?;
Blob::send_to(recycler, sock, &mut msgs)?;
Ok(())
}
pub fn recv_batch(recvr: &PacketReceiver) -> Result<(Vec<SharedPackets>, usize)> {
let timer = Duration::new(1, 0);
let msgs = recvr.recv_timeout(timer)?;
2018-05-12 19:00:22 -07:00
trace!("got msgs");
let mut len = msgs.read().unwrap().packets.len();
let mut batch = vec![msgs];
while let Ok(more) = recvr.try_recv() {
trace!("got more msgs");
len += more.read().unwrap().packets.len();
batch.push(more);
if len > 100_000 {
break;
}
}
trace!("batch len {}", batch.len());
Ok((batch, len))
}
pub fn responder(
name: &'static str,
sock: UdpSocket,
recycler: BlobRecycler,
r: BlobReceiver,
) -> JoinHandle<()> {
2018-05-30 13:13:14 -07:00
Builder::new()
.name(format!("solana-responder-{}", name))
2018-05-30 13:13:14 -07:00
.spawn(move || loop {
if let Err(e) = recv_send(&sock, &recycler, &r) {
match e {
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
_ => warn!("{} responder error: {:?}", name, e),
}
2018-05-30 13:13:14 -07:00
}
})
.unwrap()
}
//TODO, we would need to stick block authentication before we create the
//window.
fn recv_blobs(recycler: &BlobRecycler, sock: &UdpSocket, s: &BlobSender) -> Result<()> {
2018-07-18 12:44:59 -07:00
trace!("recv_blobs: receiving on {}", sock.local_addr().unwrap());
let dq = Blob::recv_from(recycler, sock)?;
if !dq.is_empty() {
s.send(dq)?;
}
Ok(())
}
pub fn blob_receiver(
exit: Arc<AtomicBool>,
recycler: BlobRecycler,
sock: UdpSocket,
s: BlobSender,
) -> Result<JoinHandle<()>> {
//DOCUMENTED SIDE-EFFECT
//1 second timeout on socket read
let timer = Duration::new(1, 0);
sock.set_read_timeout(Some(timer))?;
2018-05-30 13:13:14 -07:00
let t = Builder::new()
2018-05-30 13:20:58 -07:00
.name("solana-blob_receiver".to_string())
2018-05-30 13:13:14 -07:00
.spawn(move || loop {
if exit.load(Ordering::Relaxed) {
break;
}
let _ = recv_blobs(&recycler, &sock, &s);
})
.unwrap();
Ok(t)
}
2018-05-12 19:00:22 -07:00
fn find_next_missing(
window: &SharedWindow,
2018-05-12 19:00:22 -07:00
crdt: &Arc<RwLock<Crdt>>,
2018-07-31 16:52:50 -07:00
recycler: &BlobRecycler,
consumed: u64,
received: u64,
2018-05-12 19:00:22 -07:00
) -> Result<Vec<(SocketAddr, Vec<u8>)>> {
if received <= consumed {
Err(WindowError::GenericError)?;
2018-05-12 19:00:22 -07:00
}
let mut window = window.write().unwrap();
let reqs: Vec<_> = (consumed..received)
2018-05-12 19:00:22 -07:00
.filter_map(|pix| {
2018-06-27 12:35:58 -07:00
let i = (pix % WINDOW_SIZE) as usize;
if let Some(blob) = mem::replace(&mut window[i].data, None) {
let blob_idx = blob.read().unwrap().get_index().unwrap();
if blob_idx == pix {
mem::replace(&mut window[i].data, Some(blob));
2018-07-31 16:52:50 -07:00
} else {
recycler.recycle(blob);
}
}
2018-07-17 15:00:22 -07:00
if window[i].data.is_none() {
2018-05-12 19:00:22 -07:00
let val = crdt.read().unwrap().window_index_request(pix as u64);
if let Ok((to, req)) = val {
return Some((to, req));
}
}
None
})
.collect();
Ok(reqs)
}
fn calculate_highest_lost_blob_index(num_peers: u64, consumed: u64, received: u64) -> u64 {
// Calculate the highest blob index that this node should have already received
// via avalanche. The avalanche splits data stream into nodes and each node retransmits
// the data to their peer nodes. So there's a possibility that a blob (with index lower
// than current received index) is being retransmitted by a peer node.
let highest_lost = cmp::max(consumed, received.saturating_sub(num_peers));
// This check prevents repairing a blob that will cause window to roll over. Even if
// the highes_lost blob is actually missing, asking to repair it might cause our
// current window to move past other missing blobs
cmp::min(consumed + WINDOW_SIZE - 1, highest_lost)
}
2018-05-12 19:00:22 -07:00
fn repair_window(
debug_id: u64,
window: &SharedWindow,
2018-05-12 19:00:22 -07:00
crdt: &Arc<RwLock<Crdt>>,
2018-07-31 16:52:50 -07:00
recycler: &BlobRecycler,
2018-06-27 12:35:58 -07:00
last: &mut u64,
2018-05-24 15:08:00 -07:00
times: &mut usize,
consumed: u64,
received: u64,
2018-05-12 19:00:22 -07:00
) -> Result<()> {
2018-05-24 15:08:00 -07:00
//exponential backoff
if *last != consumed {
2018-05-24 15:08:00 -07:00
*times = 0;
}
*last = consumed;
2018-05-24 15:08:00 -07:00
*times += 1;
//if times flips from all 1s 7 -> 8, 15 -> 16, we retry otherwise return Ok
if *times & (*times - 1) != 0 {
trace!("repair_window counter {} {} {}", *times, consumed, received);
2018-05-24 15:08:00 -07:00
return Ok(());
}
let highest_lost = calculate_highest_lost_blob_index(
crdt.read().unwrap().table.len() as u64,
consumed,
received,
);
let reqs = find_next_missing(window, crdt, recycler, consumed, highest_lost)?;
2018-07-09 17:35:23 -07:00
trace!("{:x}: repair_window missing: {}", debug_id, reqs.len());
2018-07-11 13:40:46 -07:00
if !reqs.is_empty() {
inc_new_counter_info!("streamer-repair_window-repair", reqs.len());
info!(
"{:x}: repair_window counter times: {} consumed: {} highest_lost: {} missing: {}",
debug_id,
*times,
consumed,
highest_lost,
reqs.len()
);
}
2018-05-12 19:00:22 -07:00
let sock = UdpSocket::bind("0.0.0.0:0")?;
for (to, req) in reqs {
//todo cache socket
2018-07-09 15:53:49 -07:00
debug!(
"{:x}: repair_window request {} {} {}",
debug_id, consumed, highest_lost, to
);
2018-07-26 08:55:00 -07:00
assert!(req.len() <= BLOB_SIZE);
2018-05-12 19:00:22 -07:00
sock.send_to(&req, to)?;
}
Ok(())
}
fn retransmit_all_leader_blocks(
maybe_leader: Option<NodeInfo>,
dq: &mut SharedBlobs,
debug_id: u64,
recycler: &BlobRecycler,
consumed: u64,
received: u64,
retransmit: &BlobSender,
) -> Result<()> {
let mut retransmit_queue = VecDeque::new();
if let Some(leader) = maybe_leader {
for b in dq {
let p = b.read().expect("'b' read lock in fn recv_window");
//TODO this check isn't safe against adverserial packets
//we need to maintain a sequence window
let leader_id = leader.id;
trace!(
"idx: {} addr: {:?} id: {:?} leader: {:?}",
p.get_index().expect("get_index in fn recv_window"),
p.get_id().expect("get_id in trace! fn recv_window"),
p.meta.addr(),
leader_id
);
if p.get_id().expect("get_id in fn recv_window") == leader_id {
//TODO
//need to copy the retransmitted blob
//otherwise we get into races with which thread
//should do the recycling
//
//a better abstraction would be to recycle when the blob
//is dropped via a weakref to the recycler
let nv = recycler.allocate();
{
let mut mnv = nv.write().expect("recycler write lock in fn recv_window");
let sz = p.meta.size;
mnv.meta.size = sz;
mnv.data[..sz].copy_from_slice(&p.data[..sz]);
}
retransmit_queue.push_back(nv);
}
}
} else {
warn!("{:x}: no leader to retransmit from", debug_id);
}
if !retransmit_queue.is_empty() {
debug!(
"{:x}: RECV_WINDOW {} {}: retransmit {}",
debug_id,
consumed,
received,
retransmit_queue.len(),
);
inc_new_counter_info!("streamer-recv_window-retransmit", retransmit_queue.len());
retransmit.send(retransmit_queue)?;
}
Ok(())
}
2018-07-16 13:37:04 -07:00
/// process a blob: Add blob to the window. If a continuous set of blobs
/// starting from consumed is thereby formed, add that continuous
/// range of blobs to a queue to be sent on to the next stage.
///
2018-07-23 22:20:37 -07:00
/// * `debug_id` - this node's id in a useful-for-debug format
/// * `blob` - the blob to be processed into the window and rebroadcast
2018-07-16 13:37:04 -07:00
/// * `pix` - the index of the blob, corresponds to
/// the entry height of this blob
/// * `consume_queue` - output, blobs to be rebroadcast are placed here
/// * `window` - the window we're operating on
2018-07-16 13:37:04 -07:00
/// * `recycler` - where to return the blob once processed, also where
/// to return old blobs from the window
/// * `consumed` - input/output, the entry-height to which this
/// node has populated and rebroadcast entries
fn process_blob(
2018-07-23 22:20:37 -07:00
debug_id: u64,
blob: SharedBlob,
pix: u64,
consume_queue: &mut SharedBlobs,
window: &SharedWindow,
recycler: &BlobRecycler,
consumed: &mut u64,
) {
let mut window = window.write().unwrap();
let w = (pix % WINDOW_SIZE) as usize;
2018-07-18 10:10:34 -07:00
let is_coding = {
2018-08-03 11:27:44 -07:00
let blob_r = blob
.read()
.expect("blob read lock for flogs streamer::window");
2018-07-18 10:10:34 -07:00
blob_r.is_coding()
};
2018-07-23 22:20:37 -07:00
// insert a newly received blob into a window slot, clearing out and recycling any previous
// blob unless the incoming blob is a duplicate (based on idx)
// returns whether the incoming is a duplicate blob
fn insert_blob_is_dup(
debug_id: u64,
blob: SharedBlob,
pix: u64,
window_slot: &mut Option<SharedBlob>,
recycler: &BlobRecycler,
c_or_d: &str,
) -> bool {
if let Some(old) = mem::replace(window_slot, Some(blob)) {
let is_dup = old.read().unwrap().get_index().unwrap() == pix;
recycler.recycle(old);
2018-08-01 08:56:55 -07:00
trace!(
"{:x}: occupied {} window slot {:}, is_dup: {}",
debug_id,
c_or_d,
pix,
is_dup
);
is_dup
} else {
2018-08-01 08:56:55 -07:00
trace!("{:x}: empty {} window slot {:}", debug_id, c_or_d, pix);
false
2018-07-18 10:10:34 -07:00
}
2018-07-23 22:20:37 -07:00
}
// insert the new blob into the window, overwrite and recycle old (or duplicate) entry
let is_duplicate = if is_coding {
insert_blob_is_dup(
debug_id,
blob,
pix,
&mut window[w].coding,
recycler,
"coding",
)
2018-07-18 10:10:34 -07:00
} else {
2018-07-23 22:20:37 -07:00
insert_blob_is_dup(debug_id, blob, pix, &mut window[w].data, recycler, "data")
};
if is_duplicate {
return;
}
2018-07-18 10:10:34 -07:00
2018-07-18 13:28:03 -07:00
#[cfg(feature = "erasure")]
{
if erasure::recover(
debug_id,
recycler,
&mut window,
*consumed,
(*consumed % WINDOW_SIZE) as usize,
).is_err()
{
trace!("{:x}: erasure::recover failed", debug_id);
2018-07-18 13:28:03 -07:00
}
}
2018-07-18 10:10:34 -07:00
// push all contiguous blobs into consumed queue, increment consumed
2018-07-19 12:28:58 -07:00
loop {
let k = (*consumed % WINDOW_SIZE) as usize;
trace!("{:x}: k: {} consumed: {}", debug_id, k, *consumed,);
2018-07-19 12:28:58 -07:00
if let Some(blob) = &window[k].data {
if blob.read().unwrap().get_index().unwrap() < *consumed {
// window wrap-around, end of received
break;
}
} else {
// window[k].data is None, end of received
break;
}
2018-07-18 10:10:34 -07:00
consume_queue.push_back(window[k].data.clone().expect("clone in fn recv_window"));
*consumed += 1;
}
}
fn blob_idx_in_window(debug_id: u64, pix: u64, consumed: u64, received: &mut u64) -> bool {
// Prevent receive window from running over
// Got a blob which has already been consumed, skip it
// probably from a repair window request
if pix < consumed {
trace!(
"{:x}: received: {} but older than consumed: {} skipping..",
debug_id,
pix,
consumed
);
false
} else {
// received always has to be updated even if we don't accept the packet into
// the window. The worst case here is the server *starts* outside
// the window, none of the packets it receives fits in the window
// and repair requests (which are based on received) are never generated
*received = cmp::max(pix, *received);
if pix >= consumed + WINDOW_SIZE {
trace!(
"{:x}: received: {} will overrun window: {} skipping..",
debug_id,
pix,
consumed + WINDOW_SIZE
);
false
} else {
true
}
}
}
fn recv_window(
debug_id: u64,
window: &SharedWindow,
2018-04-28 00:31:20 -07:00
crdt: &Arc<RwLock<Crdt>>,
recycler: &BlobRecycler,
2018-06-27 12:35:58 -07:00
consumed: &mut u64,
received: &mut u64,
r: &BlobReceiver,
s: &BlobSender,
2018-04-18 20:12:30 -07:00
retransmit: &BlobSender,
) -> Result<()> {
2018-05-12 19:00:22 -07:00
let timer = Duration::from_millis(200);
let mut dq = r.recv_timeout(timer)?;
2018-08-03 11:27:44 -07:00
let maybe_leader: Option<NodeInfo> = crdt
.read()
2018-05-11 11:38:52 -07:00
.expect("'crdt' read lock in fn recv_window")
.leader_data()
.cloned();
while let Ok(mut nq) = r.try_recv() {
dq.append(&mut nq)
}
let now = Instant::now();
inc_new_counter_info!("streamer-recv_window-recv", dq.len(), 100);
2018-07-09 15:53:49 -07:00
debug!(
2018-07-03 15:09:35 -07:00
"{:x}: RECV_WINDOW {} {}: got packets {}",
debug_id,
*consumed,
*received,
dq.len(),
);
retransmit_all_leader_blocks(
maybe_leader,
&mut dq,
debug_id,
recycler,
*consumed,
*received,
retransmit,
)?;
let mut pixs = Vec::new();
//send a contiguous set of blocks
let mut consume_queue = VecDeque::new();
while let Some(b) = dq.pop_front() {
let (pix, meta_size) = {
let p = b.write().expect("'b' write lock in fn recv_window");
2018-06-27 12:35:58 -07:00
(p.get_index()?, p.meta.size)
};
pixs.push(pix);
if !blob_idx_in_window(debug_id, pix, *consumed, received) {
recycler.recycle(b);
continue;
}
trace!("{:x} window pix: {} size: {}", debug_id, pix, meta_size);
process_blob(
2018-07-23 22:20:37 -07:00
debug_id,
b,
pix,
&mut consume_queue,
window,
recycler,
consumed,
);
}
if log_enabled!(Level::Trace) {
2018-07-24 12:56:39 -07:00
trace!("{}", print_window(debug_id, window, *consumed));
}
info!(
"{:x}: consumed: {} received: {} sending consume.len: {} pixs: {:?} took {} ms",
debug_id,
*consumed,
*received,
consume_queue.len(),
pixs,
duration_as_ms(&now.elapsed())
);
if !consume_queue.is_empty() {
2018-07-09 15:53:49 -07:00
debug!(
"{:x}: RECV_WINDOW {} {}: forwarding consume_queue {}",
2018-07-03 15:09:35 -07:00
debug_id,
*consumed,
*received,
consume_queue.len(),
2018-07-03 15:09:35 -07:00
);
trace!(
"{:x}: sending consume_queue.len: {}",
debug_id,
consume_queue.len()
);
inc_new_counter_info!("streamer-recv_window-consume", consume_queue.len());
s.send(consume_queue)?;
}
Ok(())
}
pub fn print_window(debug_id: u64, window: &SharedWindow, consumed: u64) -> String {
let pointer: Vec<_> = window
2018-07-18 13:28:03 -07:00
.read()
.unwrap()
.iter()
.enumerate()
.map(|(i, _v)| {
2018-07-18 13:28:03 -07:00
if i == (consumed % WINDOW_SIZE) as usize {
"V"
} else {
" "
}
})
.collect();
let buf: Vec<_> = window
.read()
.unwrap()
.iter()
.map(|v| {
if v.data.is_none() && v.coding.is_none() {
"O"
2018-07-18 13:28:03 -07:00
} else if v.data.is_some() && v.coding.is_some() {
"D"
2018-07-18 13:28:03 -07:00
} else if v.data.is_some() {
// coding.is_none()
"d"
2018-07-18 13:28:03 -07:00
} else {
// data.is_none()
"c"
2018-07-18 13:28:03 -07:00
}
})
.collect();
2018-07-24 11:31:03 -07:00
format!(
"\n{:x}: WINDOW ({}): {}\n{:x}: WINDOW ({}): {}",
debug_id,
consumed,
2018-07-24 11:31:03 -07:00
pointer.join(""),
debug_id,
consumed,
buf.join("")
)
}
pub fn default_window() -> SharedWindow {
2018-07-17 15:00:22 -07:00
Arc::new(RwLock::new(vec![
2018-07-18 10:10:34 -07:00
WindowSlot::default();
2018-07-17 15:00:22 -07:00
WINDOW_SIZE as usize
]))
2018-05-12 19:00:22 -07:00
}
pub fn index_blobs(
node_info: &NodeInfo,
blobs: &[SharedBlob],
receive_index: &mut u64,
) -> Result<()> {
// enumerate all the blobs, those are the indices
trace!("{:x}: INDEX_BLOBS {}", node_info.debug_id(), blobs.len());
for (i, b) in blobs.iter().enumerate() {
// only leader should be broadcasting
let mut blob = b.write().expect("'blob' write lock in crdt::index_blobs");
blob.set_id(node_info.id)
.expect("set_id in pub fn broadcast");
blob.set_index(*receive_index + i as u64)
.expect("set_index in pub fn broadcast");
blob.set_flags(0).unwrap();
}
Ok(())
}
/// Initialize a rebroadcast window with most recent Entry blobs
/// * `crdt` - gossip instance, used to set blob ids
/// * `blobs` - up to WINDOW_SIZE most recent blobs
/// * `entry_height` - current entry height
pub fn initialized_window(
node_info: &NodeInfo,
blobs: Vec<SharedBlob>,
entry_height: u64,
) -> SharedWindow {
let window = default_window();
let debug_id = node_info.debug_id();
{
let mut win = window.write().unwrap();
trace!(
"{:x} initialized window entry_height:{} blobs_len:{}",
debug_id,
entry_height,
blobs.len()
);
// Index the blobs
let mut received = entry_height - blobs.len() as u64;
index_blobs(&node_info, &blobs, &mut received).expect("index blobs for initial window");
// populate the window, offset by implied index
let diff = cmp::max(blobs.len() as isize - win.len() as isize, 0) as usize;
for b in blobs.into_iter().skip(diff) {
let ix = b.read().unwrap().get_index().expect("blob index");
let pos = (ix % WINDOW_SIZE) as usize;
trace!("{:x} caching {} at {}", debug_id, ix, pos);
2018-07-17 15:00:22 -07:00
assert!(win[pos].data.is_none());
win[pos].data = Some(b);
}
}
window
}
pub fn window(
2018-04-28 00:31:20 -07:00
crdt: Arc<RwLock<Crdt>>,
window: SharedWindow,
2018-06-27 12:35:58 -07:00
entry_height: u64,
recycler: BlobRecycler,
r: BlobReceiver,
s: BlobSender,
2018-04-18 20:12:30 -07:00
retransmit: BlobSender,
) -> JoinHandle<()> {
2018-05-30 13:13:14 -07:00
Builder::new()
2018-05-30 13:20:58 -07:00
.name("solana-window".to_string())
2018-05-30 13:13:14 -07:00
.spawn(move || {
2018-06-27 12:35:58 -07:00
let mut consumed = entry_height;
let mut received = entry_height;
let mut last = entry_height;
2018-05-30 13:13:14 -07:00
let mut times = 0;
let debug_id = crdt.read().unwrap().debug_id();
2018-07-09 17:35:23 -07:00
trace!("{:x}: RECV_WINDOW started", debug_id);
2018-05-30 13:13:14 -07:00
loop {
if let Err(e) = recv_window(
debug_id,
2018-05-30 13:13:14 -07:00
&window,
&crdt,
&recycler,
&mut consumed,
&mut received,
&r,
&s,
&retransmit,
) {
match e {
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
_ => {
inc_new_counter_info!("streamer-window-error", 1, 1);
error!("window error: {:?}", e);
}
}
}
2018-05-30 13:13:14 -07:00
let _ = repair_window(
2018-07-31 16:52:50 -07:00
debug_id, &window, &crdt, &recycler, &mut last, &mut times, consumed, received,
2018-05-30 13:13:14 -07:00
);
}
2018-05-30 13:13:14 -07:00
})
.unwrap()
}
#[cfg(test)]
mod test {
use crdt::{Crdt, TestNode};
use logger;
2018-03-26 21:07:11 -07:00
use packet::{Blob, BlobRecycler, Packet, PacketRecycler, Packets, PACKET_DATA_SIZE};
use std::collections::VecDeque;
use std::io;
use std::io::Write;
use std::net::UdpSocket;
2018-03-22 13:05:23 -07:00
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::channel;
use std::sync::{Arc, RwLock};
use std::time::Duration;
use streamer::blob_idx_in_window;
use streamer::calculate_highest_lost_blob_index;
use streamer::{blob_receiver, receiver, responder, window};
use streamer::{default_window, BlobReceiver, PacketReceiver, WINDOW_SIZE};
fn get_msgs(r: PacketReceiver, num: &mut usize) {
2018-03-14 11:02:38 -07:00
for _t in 0..5 {
let timer = Duration::new(1, 0);
match r.recv_timeout(timer) {
Ok(m) => *num += m.read().unwrap().packets.len(),
2018-04-28 00:31:20 -07:00
e => info!("error {:?}", e),
}
if *num == 10 {
break;
}
}
}
#[test]
pub fn streamer_debug() {
write!(io::sink(), "{:?}", Packet::default()).unwrap();
write!(io::sink(), "{:?}", Packets::default()).unwrap();
write!(io::sink(), "{:?}", Blob::default()).unwrap();
}
#[test]
pub fn streamer_send_test() {
let read = UdpSocket::bind("127.0.0.1:0").expect("bind");
read.set_read_timeout(Some(Duration::new(1, 0))).unwrap();
let addr = read.local_addr().unwrap();
let send = UdpSocket::bind("127.0.0.1:0").expect("bind");
let exit = Arc::new(AtomicBool::new(false));
let pack_recycler = PacketRecycler::default();
let resp_recycler = BlobRecycler::default();
let (s_reader, r_reader) = channel();
let t_receiver = receiver(read, exit.clone(), pack_recycler.clone(), s_reader);
let t_responder = {
let (s_responder, r_responder) = channel();
let t_responder = responder(
"streamer_send_test",
send,
resp_recycler.clone(),
r_responder,
);
let mut msgs = VecDeque::new();
for i in 0..10 {
let b = resp_recycler.allocate();
{
let mut w = b.write().unwrap();
w.data[0] = i as u8;
w.meta.size = PACKET_DATA_SIZE;
w.meta.set_addr(&addr);
}
msgs.push_back(b);
}
s_responder.send(msgs).expect("send");
t_responder
};
let mut num = 0;
get_msgs(r_reader, &mut num);
assert_eq!(num, 10);
2018-03-22 13:05:23 -07:00
exit.store(true, Ordering::Relaxed);
t_receiver.join().expect("join");
2018-03-24 23:31:54 -07:00
t_responder.join().expect("join");
}
fn get_blobs(r: BlobReceiver, num: &mut usize) {
for _t in 0..5 {
let timer = Duration::new(1, 0);
match r.recv_timeout(timer) {
Ok(m) => {
for (i, v) in m.iter().enumerate() {
assert_eq!(v.read().unwrap().get_index().unwrap() as usize, *num + i);
}
*num += m.len();
}
2018-04-28 00:31:20 -07:00
e => info!("error {:?}", e),
}
if *num == 10 {
break;
}
}
2018-03-14 11:28:05 -07:00
}
2018-03-11 09:22:21 -07:00
#[test]
pub fn window_send_test() {
logger::setup();
let tn = TestNode::new_localhost();
2018-03-22 13:05:23 -07:00
let exit = Arc::new(AtomicBool::new(false));
let mut crdt_me = Crdt::new(tn.data.clone()).expect("Crdt::new");
2018-04-28 00:31:20 -07:00
let me_id = crdt_me.my_data().id;
crdt_me.set_leader(me_id);
let subs = Arc::new(RwLock::new(crdt_me));
let resp_recycler = BlobRecycler::default();
2018-03-11 09:22:21 -07:00
let (s_reader, r_reader) = channel();
let t_receiver = blob_receiver(
exit.clone(),
resp_recycler.clone(),
tn.sockets.gossip,
s_reader,
).unwrap();
let (s_window, r_window) = channel();
2018-04-18 20:12:30 -07:00
let (s_retransmit, r_retransmit) = channel();
2018-05-12 19:00:22 -07:00
let win = default_window();
let t_window = window(
subs,
2018-05-12 19:00:22 -07:00
win,
2018-06-27 12:35:58 -07:00
0,
resp_recycler.clone(),
r_reader,
s_window,
2018-04-18 20:12:30 -07:00
s_retransmit,
);
let t_responder = {
let (s_responder, r_responder) = channel();
let t_responder = responder(
"window_send_test",
tn.sockets.replicate,
resp_recycler.clone(),
r_responder,
);
let mut msgs = VecDeque::new();
for v in 0..10 {
let i = 9 - v;
let b = resp_recycler.allocate();
{
let mut w = b.write().unwrap();
w.set_index(i).unwrap();
w.set_id(me_id).unwrap();
assert_eq!(i, w.get_index().unwrap());
w.meta.size = PACKET_DATA_SIZE;
2018-07-09 17:55:11 -07:00
w.meta.set_addr(&tn.data.contact_info.ncp);
}
msgs.push_back(b);
}
s_responder.send(msgs).expect("send");
t_responder
};
2018-03-11 09:22:21 -07:00
let mut num = 0;
get_blobs(r_window, &mut num);
2018-03-11 09:22:21 -07:00
assert_eq!(num, 10);
2018-04-18 20:12:30 -07:00
let mut q = r_retransmit.recv().unwrap();
while let Ok(mut nq) = r_retransmit.try_recv() {
2018-04-16 20:57:15 -07:00
q.append(&mut nq);
}
assert_eq!(q.len(), 10);
2018-03-22 13:05:23 -07:00
exit.store(true, Ordering::Relaxed);
2018-03-11 09:22:21 -07:00
t_receiver.join().expect("join");
2018-03-24 23:31:54 -07:00
t_responder.join().expect("join");
t_window.join().expect("join");
2018-03-11 09:22:21 -07:00
}
#[test]
pub fn calculate_highest_lost_blob_index_test() {
assert_eq!(calculate_highest_lost_blob_index(0, 10, 90), 90);
assert_eq!(calculate_highest_lost_blob_index(15, 10, 90), 75);
assert_eq!(calculate_highest_lost_blob_index(90, 10, 90), 10);
assert_eq!(calculate_highest_lost_blob_index(90, 10, 50), 10);
assert_eq!(calculate_highest_lost_blob_index(90, 10, 99), 10);
assert_eq!(calculate_highest_lost_blob_index(90, 10, 101), 11);
assert_eq!(
calculate_highest_lost_blob_index(90, 10, 95 + WINDOW_SIZE),
WINDOW_SIZE + 5
);
assert_eq!(
calculate_highest_lost_blob_index(90, 10, 99 + WINDOW_SIZE),
WINDOW_SIZE + 9
);
assert_eq!(
calculate_highest_lost_blob_index(90, 10, 100 + WINDOW_SIZE),
WINDOW_SIZE + 9
);
assert_eq!(
calculate_highest_lost_blob_index(90, 10, 120 + WINDOW_SIZE),
WINDOW_SIZE + 9
);
}
fn wrap_blob_idx_in_window(
debug_id: u64,
pix: u64,
consumed: u64,
received: u64,
) -> (bool, u64) {
let mut received = received;
let is_in_window = blob_idx_in_window(debug_id, pix, consumed, &mut received);
(is_in_window, received)
}
#[test]
pub fn blob_idx_in_window_test() {
assert_eq!(
wrap_blob_idx_in_window(0, 90 + WINDOW_SIZE, 90, 100),
(false, 90 + WINDOW_SIZE)
);
assert_eq!(
wrap_blob_idx_in_window(0, 91 + WINDOW_SIZE, 90, 100),
(false, 91 + WINDOW_SIZE)
);
assert_eq!(wrap_blob_idx_in_window(0, 89, 90, 100), (false, 100));
assert_eq!(wrap_blob_idx_in_window(0, 91, 90, 100), (true, 100));
assert_eq!(wrap_blob_idx_in_window(0, 101, 90, 100), (true, 101));
}
}