2018-08-09 12:31:34 -07:00
|
|
|
//! The `window` module defines data structure for storing the tail of the ledger.
|
|
|
|
//!
|
2018-11-07 13:18:14 -08:00
|
|
|
use cluster_info::ClusterInfo;
|
2018-08-09 12:31:34 -07:00
|
|
|
use counter::Counter;
|
2018-08-09 12:40:47 -07:00
|
|
|
use entry::Entry;
|
2018-08-09 12:31:34 -07:00
|
|
|
#[cfg(feature = "erasure")]
|
|
|
|
use erasure;
|
2018-10-10 16:49:41 -07:00
|
|
|
use leader_scheduler::LeaderScheduler;
|
2018-10-30 10:05:18 -07:00
|
|
|
use ledger::reconstruct_entries_from_blobs;
|
2018-08-09 12:31:34 -07:00
|
|
|
use log::Level;
|
2018-09-26 09:50:12 -07:00
|
|
|
use packet::SharedBlob;
|
2018-10-25 11:13:08 -07:00
|
|
|
use solana_sdk::pubkey::Pubkey;
|
2018-08-09 12:31:34 -07:00
|
|
|
use std::cmp;
|
|
|
|
use std::mem;
|
2018-09-07 15:00:26 -07:00
|
|
|
use std::net::SocketAddr;
|
2018-08-09 12:31:34 -07:00
|
|
|
use std::sync::atomic::AtomicUsize;
|
|
|
|
use std::sync::{Arc, RwLock};
|
|
|
|
|
2018-09-18 08:02:57 -07:00
|
|
|
#[derive(Default, Clone)]
|
2018-08-09 12:31:34 -07:00
|
|
|
pub struct WindowSlot {
|
|
|
|
pub data: Option<SharedBlob>,
|
|
|
|
pub coding: Option<SharedBlob>,
|
2018-08-14 21:51:37 -07:00
|
|
|
pub leader_unknown: bool,
|
2018-08-09 12:31:34 -07:00
|
|
|
}
|
|
|
|
|
2018-09-07 09:52:58 -07:00
|
|
|
impl WindowSlot {
|
|
|
|
fn blob_index(&self) -> Option<u64> {
|
|
|
|
match self.data {
|
2018-11-07 13:18:14 -08:00
|
|
|
Some(ref blob) => blob.read().unwrap().index().ok(),
|
2018-09-07 09:52:58 -07:00
|
|
|
None => None,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2018-09-20 15:28:45 -07:00
|
|
|
fn clear_data(&mut self) {
|
|
|
|
self.data.take();
|
2018-09-07 09:52:58 -07:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2018-09-07 08:47:13 -07:00
|
|
|
type Window = Vec<WindowSlot>;
|
|
|
|
pub type SharedWindow = Arc<RwLock<Window>>;
|
2018-08-09 12:31:34 -07:00
|
|
|
|
|
|
|
#[derive(Debug)]
|
|
|
|
pub struct WindowIndex {
|
|
|
|
pub data: u64,
|
|
|
|
pub coding: u64,
|
|
|
|
}
|
|
|
|
|
2018-09-07 12:33:35 -07:00
|
|
|
pub trait WindowUtil {
|
2018-09-07 12:48:35 -07:00
|
|
|
/// Finds available slots, clears them, and returns their indices.
|
2018-09-20 15:28:45 -07:00
|
|
|
fn clear_slots(&mut self, consumed: u64, received: u64) -> Vec<u64>;
|
2018-09-07 12:33:35 -07:00
|
|
|
|
2018-10-30 10:05:18 -07:00
|
|
|
fn window_size(&self) -> u64;
|
|
|
|
|
2018-09-07 12:33:35 -07:00
|
|
|
fn repair(
|
|
|
|
&mut self,
|
2018-10-08 19:55:54 -07:00
|
|
|
cluster_info: &Arc<RwLock<ClusterInfo>>,
|
2018-09-07 12:33:35 -07:00
|
|
|
id: &Pubkey,
|
|
|
|
times: usize,
|
|
|
|
consumed: u64,
|
|
|
|
received: u64,
|
2018-10-18 22:57:48 -07:00
|
|
|
tick_height: u64,
|
2018-09-24 14:10:51 -07:00
|
|
|
max_entry_height: u64,
|
2018-10-10 16:49:41 -07:00
|
|
|
leader_scheduler_option: &Arc<RwLock<LeaderScheduler>>,
|
2018-09-07 12:33:35 -07:00
|
|
|
) -> Vec<(SocketAddr, Vec<u8>)>;
|
|
|
|
|
|
|
|
fn print(&self, id: &Pubkey, consumed: u64) -> String;
|
|
|
|
|
|
|
|
fn process_blob(
|
|
|
|
&mut self,
|
|
|
|
id: &Pubkey,
|
|
|
|
blob: SharedBlob,
|
|
|
|
pix: u64,
|
2018-09-21 16:01:24 -07:00
|
|
|
consume_queue: &mut Vec<Entry>,
|
2018-09-07 12:33:35 -07:00
|
|
|
consumed: &mut u64,
|
2018-10-18 22:57:48 -07:00
|
|
|
tick_height: &mut u64,
|
2018-09-07 12:33:35 -07:00
|
|
|
leader_unknown: bool,
|
|
|
|
pending_retransmits: &mut bool,
|
|
|
|
);
|
2018-10-30 10:05:18 -07:00
|
|
|
|
|
|
|
fn blob_idx_in_window(&self, id: &Pubkey, pix: u64, consumed: u64, received: &mut u64) -> bool;
|
2018-09-07 12:33:35 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
impl WindowUtil for Window {
|
2018-09-20 15:28:45 -07:00
|
|
|
fn clear_slots(&mut self, consumed: u64, received: u64) -> Vec<u64> {
|
2018-09-07 12:48:35 -07:00
|
|
|
(consumed..received)
|
|
|
|
.filter_map(|pix| {
|
2018-10-30 10:05:18 -07:00
|
|
|
let i = (pix % self.window_size()) as usize;
|
2018-09-07 12:48:35 -07:00
|
|
|
if let Some(blob_idx) = self[i].blob_index() {
|
|
|
|
if blob_idx == pix {
|
|
|
|
return None;
|
|
|
|
}
|
|
|
|
}
|
2018-09-20 15:28:45 -07:00
|
|
|
self[i].clear_data();
|
2018-09-07 12:48:35 -07:00
|
|
|
Some(pix)
|
2018-09-14 16:25:14 -07:00
|
|
|
}).collect()
|
2018-09-07 12:33:35 -07:00
|
|
|
}
|
|
|
|
|
2018-10-30 10:05:18 -07:00
|
|
|
fn blob_idx_in_window(&self, id: &Pubkey, pix: u64, consumed: u64, received: &mut u64) -> bool {
|
|
|
|
// Prevent receive window from running over
|
|
|
|
// Got a blob which has already been consumed, skip it
|
|
|
|
// probably from a repair window request
|
|
|
|
if pix < consumed {
|
|
|
|
trace!(
|
|
|
|
"{}: received: {} but older than consumed: {} skipping..",
|
|
|
|
id,
|
|
|
|
pix,
|
|
|
|
consumed
|
|
|
|
);
|
|
|
|
false
|
|
|
|
} else {
|
|
|
|
// received always has to be updated even if we don't accept the packet into
|
|
|
|
// the window. The worst case here is the server *starts* outside
|
|
|
|
// the window, none of the packets it receives fits in the window
|
|
|
|
// and repair requests (which are based on received) are never generated
|
|
|
|
*received = cmp::max(pix, *received);
|
|
|
|
|
|
|
|
if pix >= consumed + self.window_size() {
|
|
|
|
trace!(
|
|
|
|
"{}: received: {} will overrun window: {} skipping..",
|
|
|
|
id,
|
|
|
|
pix,
|
|
|
|
consumed + self.window_size()
|
|
|
|
);
|
|
|
|
false
|
|
|
|
} else {
|
|
|
|
true
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
fn window_size(&self) -> u64 {
|
|
|
|
self.len() as u64
|
|
|
|
}
|
|
|
|
|
2018-09-07 12:33:35 -07:00
|
|
|
fn repair(
|
|
|
|
&mut self,
|
2018-10-08 19:55:54 -07:00
|
|
|
cluster_info: &Arc<RwLock<ClusterInfo>>,
|
2018-09-07 12:33:35 -07:00
|
|
|
id: &Pubkey,
|
|
|
|
times: usize,
|
|
|
|
consumed: u64,
|
|
|
|
received: u64,
|
2018-10-18 22:57:48 -07:00
|
|
|
tick_height: u64,
|
2018-09-24 14:10:51 -07:00
|
|
|
max_entry_height: u64,
|
2018-10-10 16:49:41 -07:00
|
|
|
leader_scheduler_option: &Arc<RwLock<LeaderScheduler>>,
|
2018-09-07 12:33:35 -07:00
|
|
|
) -> Vec<(SocketAddr, Vec<u8>)> {
|
2018-10-08 19:55:54 -07:00
|
|
|
let rcluster_info = cluster_info.read().unwrap();
|
2018-10-10 16:49:41 -07:00
|
|
|
let mut is_next_leader = false;
|
|
|
|
{
|
|
|
|
let ls_lock = leader_scheduler_option.read().unwrap();
|
|
|
|
if !ls_lock.use_only_bootstrap_leader {
|
|
|
|
// Calculate the next leader rotation height and check if we are the leader
|
2018-10-18 22:57:48 -07:00
|
|
|
if let Some(next_leader_rotation_height) =
|
|
|
|
ls_lock.max_height_for_leader(tick_height)
|
|
|
|
{
|
|
|
|
match ls_lock.get_scheduled_leader(next_leader_rotation_height) {
|
2018-11-07 13:18:14 -08:00
|
|
|
Some((leader_id, _)) if leader_id == *id => is_next_leader = true,
|
2018-10-18 22:57:48 -07:00
|
|
|
// In the case that we are not in the current scope of the leader schedule
|
|
|
|
// window then either:
|
|
|
|
//
|
|
|
|
// 1) The replicate stage hasn't caught up to the "consumed" entries we sent,
|
|
|
|
// in which case it will eventually catch up
|
|
|
|
//
|
|
|
|
// 2) We are on the border between seed_rotation_intervals, so the
|
|
|
|
// schedule won't be known until the entry on that cusp is received
|
|
|
|
// by the replicate stage (which comes after this stage). Hence, the next
|
2018-10-25 16:58:40 -07:00
|
|
|
// leader at the beginning of that next epoch will not know they are the
|
|
|
|
// leader until they receive that last "cusp" entry. The leader also won't ask for repairs
|
2018-10-18 22:57:48 -07:00
|
|
|
// for that entry because "is_next_leader" won't be set here. In this case,
|
|
|
|
// everybody will be blocking waiting for that "cusp" entry instead of repairing,
|
|
|
|
// until the leader hits "times" >= the max times in calculate_max_repair().
|
|
|
|
// The impact of this, along with the similar problem from broadcast for the transitioning
|
|
|
|
// leader, can be observed in the multinode test, test_full_leader_validator_network(),
|
|
|
|
None => (),
|
|
|
|
_ => (),
|
|
|
|
}
|
2018-10-10 16:49:41 -07:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2018-10-08 19:55:54 -07:00
|
|
|
let num_peers = rcluster_info.table.len() as u64;
|
2018-09-25 15:41:29 -07:00
|
|
|
|
2018-09-24 14:10:51 -07:00
|
|
|
let max_repair = if max_entry_height == 0 {
|
2018-10-30 10:05:18 -07:00
|
|
|
calculate_max_repair(
|
|
|
|
num_peers,
|
|
|
|
consumed,
|
|
|
|
received,
|
|
|
|
times,
|
|
|
|
is_next_leader,
|
|
|
|
self.window_size(),
|
|
|
|
)
|
2018-09-24 14:10:51 -07:00
|
|
|
} else {
|
|
|
|
max_entry_height + 1
|
|
|
|
};
|
|
|
|
|
2018-09-21 21:01:13 -07:00
|
|
|
let idxs = self.clear_slots(consumed, max_repair);
|
2018-09-07 12:48:35 -07:00
|
|
|
let reqs: Vec<_> = idxs
|
|
|
|
.into_iter()
|
2018-10-08 19:55:54 -07:00
|
|
|
.filter_map(|pix| rcluster_info.window_index_request(pix).ok())
|
2018-09-07 12:48:35 -07:00
|
|
|
.collect();
|
|
|
|
|
2018-10-08 19:55:54 -07:00
|
|
|
drop(rcluster_info);
|
2018-09-25 15:41:29 -07:00
|
|
|
|
2018-09-07 12:48:35 -07:00
|
|
|
inc_new_counter_info!("streamer-repair_window-repair", reqs.len());
|
2018-09-25 15:41:29 -07:00
|
|
|
|
2018-09-07 12:48:35 -07:00
|
|
|
if log_enabled!(Level::Trace) {
|
|
|
|
trace!(
|
2018-09-21 21:01:13 -07:00
|
|
|
"{}: repair_window counter times: {} consumed: {} received: {} max_repair: {} missing: {}",
|
2018-09-07 12:48:35 -07:00
|
|
|
id,
|
|
|
|
times,
|
|
|
|
consumed,
|
2018-09-21 21:01:13 -07:00
|
|
|
received,
|
|
|
|
max_repair,
|
2018-09-07 12:48:35 -07:00
|
|
|
reqs.len()
|
|
|
|
);
|
|
|
|
for (to, _) in &reqs {
|
|
|
|
trace!("{}: repair_window request to {}", id, to);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
reqs
|
2018-09-07 12:33:35 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
fn print(&self, id: &Pubkey, consumed: u64) -> String {
|
2018-09-07 12:48:35 -07:00
|
|
|
let pointer: Vec<_> = self
|
|
|
|
.iter()
|
|
|
|
.enumerate()
|
|
|
|
.map(|(i, _v)| {
|
2018-10-30 10:05:18 -07:00
|
|
|
if i == (consumed % self.window_size()) as usize {
|
2018-09-07 12:48:35 -07:00
|
|
|
"V"
|
|
|
|
} else {
|
|
|
|
" "
|
|
|
|
}
|
2018-09-14 16:25:14 -07:00
|
|
|
}).collect();
|
2018-09-07 12:48:35 -07:00
|
|
|
|
|
|
|
let buf: Vec<_> = self
|
|
|
|
.iter()
|
|
|
|
.map(|v| {
|
|
|
|
if v.data.is_none() && v.coding.is_none() {
|
|
|
|
"O"
|
|
|
|
} else if v.data.is_some() && v.coding.is_some() {
|
|
|
|
"D"
|
|
|
|
} else if v.data.is_some() {
|
|
|
|
// coding.is_none()
|
|
|
|
"d"
|
|
|
|
} else {
|
|
|
|
// data.is_none()
|
|
|
|
"c"
|
|
|
|
}
|
2018-09-14 16:25:14 -07:00
|
|
|
}).collect();
|
2018-09-07 12:48:35 -07:00
|
|
|
format!(
|
|
|
|
"\n{}: WINDOW ({}): {}\n{}: WINDOW ({}): {}",
|
|
|
|
id,
|
|
|
|
consumed,
|
|
|
|
pointer.join(""),
|
|
|
|
id,
|
|
|
|
consumed,
|
|
|
|
buf.join("")
|
|
|
|
)
|
2018-09-07 12:33:35 -07:00
|
|
|
}
|
|
|
|
|
2018-09-07 12:48:35 -07:00
|
|
|
/// process a blob: Add blob to the window. If a continuous set of blobs
|
|
|
|
/// starting from consumed is thereby formed, add that continuous
|
|
|
|
/// range of blobs to a queue to be sent on to the next stage.
|
|
|
|
///
|
|
|
|
/// * `self` - the window we're operating on
|
|
|
|
/// * `id` - this node's id
|
|
|
|
/// * `blob` - the blob to be processed into the window and rebroadcast
|
|
|
|
/// * `pix` - the index of the blob, corresponds to
|
|
|
|
/// the entry height of this blob
|
|
|
|
/// * `consume_queue` - output, blobs to be rebroadcast are placed here
|
|
|
|
/// * `consumed` - input/output, the entry-height to which this
|
|
|
|
/// node has populated and rebroadcast entries
|
2018-09-07 12:33:35 -07:00
|
|
|
fn process_blob(
|
|
|
|
&mut self,
|
|
|
|
id: &Pubkey,
|
|
|
|
blob: SharedBlob,
|
|
|
|
pix: u64,
|
2018-09-21 16:01:24 -07:00
|
|
|
consume_queue: &mut Vec<Entry>,
|
2018-09-07 12:33:35 -07:00
|
|
|
consumed: &mut u64,
|
2018-10-18 22:57:48 -07:00
|
|
|
tick_height: &mut u64,
|
2018-09-07 12:33:35 -07:00
|
|
|
leader_unknown: bool,
|
|
|
|
pending_retransmits: &mut bool,
|
|
|
|
) {
|
2018-10-30 10:05:18 -07:00
|
|
|
let w = (pix % self.window_size()) as usize;
|
2018-09-07 12:33:35 -07:00
|
|
|
|
2018-09-26 09:50:12 -07:00
|
|
|
let is_coding = blob.read().unwrap().is_coding();
|
2018-09-07 12:48:35 -07:00
|
|
|
|
|
|
|
// insert a newly received blob into a window slot, clearing out and recycling any previous
|
|
|
|
// blob unless the incoming blob is a duplicate (based on idx)
|
|
|
|
// returns whether the incoming is a duplicate blob
|
|
|
|
fn insert_blob_is_dup(
|
|
|
|
id: &Pubkey,
|
|
|
|
blob: SharedBlob,
|
|
|
|
pix: u64,
|
|
|
|
window_slot: &mut Option<SharedBlob>,
|
|
|
|
c_or_d: &str,
|
|
|
|
) -> bool {
|
|
|
|
if let Some(old) = mem::replace(window_slot, Some(blob)) {
|
2018-11-07 13:18:14 -08:00
|
|
|
let is_dup = old.read().unwrap().index().unwrap() == pix;
|
2018-09-07 12:48:35 -07:00
|
|
|
trace!(
|
|
|
|
"{}: occupied {} window slot {:}, is_dup: {}",
|
|
|
|
id,
|
|
|
|
c_or_d,
|
|
|
|
pix,
|
|
|
|
is_dup
|
|
|
|
);
|
|
|
|
is_dup
|
|
|
|
} else {
|
|
|
|
trace!("{}: empty {} window slot {:}", id, c_or_d, pix);
|
|
|
|
false
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// insert the new blob into the window, overwrite and recycle old (or duplicate) entry
|
|
|
|
let is_duplicate = if is_coding {
|
2018-09-20 15:28:45 -07:00
|
|
|
insert_blob_is_dup(id, blob, pix, &mut self[w].coding, "coding")
|
2018-09-07 12:48:35 -07:00
|
|
|
} else {
|
2018-09-20 15:28:45 -07:00
|
|
|
insert_blob_is_dup(id, blob, pix, &mut self[w].data, "data")
|
2018-09-07 12:48:35 -07:00
|
|
|
};
|
|
|
|
|
|
|
|
if is_duplicate {
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
self[w].leader_unknown = leader_unknown;
|
|
|
|
*pending_retransmits = true;
|
|
|
|
|
|
|
|
#[cfg(feature = "erasure")]
|
|
|
|
{
|
2018-10-30 10:05:18 -07:00
|
|
|
let window_size = self.window_size();
|
|
|
|
if erasure::recover(id, self, *consumed, (*consumed % window_size) as usize).is_err() {
|
2018-09-07 12:48:35 -07:00
|
|
|
trace!("{}: erasure::recover failed", id);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// push all contiguous blobs into consumed queue, increment consumed
|
|
|
|
loop {
|
2018-10-30 10:05:18 -07:00
|
|
|
let k = (*consumed % self.window_size()) as usize;
|
2018-09-07 12:48:35 -07:00
|
|
|
trace!("{}: k: {} consumed: {}", id, k, *consumed,);
|
|
|
|
|
2018-09-21 16:01:24 -07:00
|
|
|
let k_data_blob;
|
|
|
|
let k_data_slot = &mut self[k].data;
|
|
|
|
if let Some(blob) = k_data_slot {
|
2018-11-07 13:18:14 -08:00
|
|
|
if blob.read().unwrap().index().unwrap() < *consumed {
|
2018-09-07 12:48:35 -07:00
|
|
|
// window wrap-around, end of received
|
|
|
|
break;
|
2018-08-09 12:31:34 -07:00
|
|
|
}
|
2018-09-21 16:01:24 -07:00
|
|
|
k_data_blob = (*blob).clone();
|
2018-09-07 12:48:35 -07:00
|
|
|
} else {
|
|
|
|
// self[k].data is None, end of received
|
|
|
|
break;
|
2018-08-09 12:31:34 -07:00
|
|
|
}
|
2018-09-21 16:01:24 -07:00
|
|
|
|
|
|
|
// Check that we can get the entries from this blob
|
|
|
|
match reconstruct_entries_from_blobs(vec![k_data_blob]) {
|
|
|
|
Ok(entries) => {
|
2018-10-18 22:57:48 -07:00
|
|
|
for entry in &entries {
|
|
|
|
*tick_height += entry.is_tick() as u64;
|
|
|
|
}
|
2018-09-21 16:01:24 -07:00
|
|
|
consume_queue.extend(entries);
|
|
|
|
}
|
|
|
|
Err(_) => {
|
|
|
|
// If the blob can't be deserialized, then remove it from the
|
|
|
|
// window and exit. *k_data_slot cannot be None at this point,
|
|
|
|
// so it's safe to unwrap.
|
|
|
|
k_data_slot.take();
|
|
|
|
break;
|
|
|
|
}
|
2018-09-18 08:02:57 -07:00
|
|
|
}
|
2018-09-21 16:01:24 -07:00
|
|
|
|
2018-09-07 12:48:35 -07:00
|
|
|
*consumed += 1;
|
|
|
|
}
|
|
|
|
}
|
2018-08-09 12:31:34 -07:00
|
|
|
}
|
|
|
|
|
2018-09-25 15:41:29 -07:00
|
|
|
fn calculate_max_repair(
|
|
|
|
num_peers: u64,
|
|
|
|
consumed: u64,
|
|
|
|
received: u64,
|
|
|
|
times: usize,
|
|
|
|
is_next_leader: bool,
|
2018-10-30 10:05:18 -07:00
|
|
|
window_size: u64,
|
2018-09-25 15:41:29 -07:00
|
|
|
) -> u64 {
|
2018-08-09 12:31:34 -07:00
|
|
|
// Calculate the highest blob index that this node should have already received
|
|
|
|
// via avalanche. The avalanche splits data stream into nodes and each node retransmits
|
|
|
|
// the data to their peer nodes. So there's a possibility that a blob (with index lower
|
|
|
|
// than current received index) is being retransmitted by a peer node.
|
2018-09-25 15:41:29 -07:00
|
|
|
let max_repair = if times >= 8 || is_next_leader {
|
|
|
|
// if repair backoff is getting high, or if we are the next leader,
|
|
|
|
// don't wait for avalanche
|
2018-09-21 21:01:13 -07:00
|
|
|
cmp::max(consumed, received)
|
|
|
|
} else {
|
|
|
|
cmp::max(consumed, received.saturating_sub(num_peers))
|
|
|
|
};
|
2018-08-09 12:31:34 -07:00
|
|
|
|
|
|
|
// This check prevents repairing a blob that will cause window to roll over. Even if
|
|
|
|
// the highes_lost blob is actually missing, asking to repair it might cause our
|
|
|
|
// current window to move past other missing blobs
|
2018-10-30 10:05:18 -07:00
|
|
|
cmp::min(consumed + window_size - 1, max_repair)
|
2018-08-09 12:31:34 -07:00
|
|
|
}
|
|
|
|
|
2018-10-30 10:05:18 -07:00
|
|
|
pub fn new_window(window_size: usize) -> Window {
|
|
|
|
(0..window_size).map(|_| WindowSlot::default()).collect()
|
2018-08-09 12:31:34 -07:00
|
|
|
}
|
|
|
|
|
2018-09-07 15:08:37 -07:00
|
|
|
pub fn default_window() -> Window {
|
2018-10-30 10:05:18 -07:00
|
|
|
(0..2048).map(|_| WindowSlot::default()).collect()
|
2018-08-09 12:31:34 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
#[cfg(test)]
|
|
|
|
mod test {
|
2018-09-26 09:50:12 -07:00
|
|
|
use packet::{Blob, Packet, Packets, SharedBlob, PACKET_DATA_SIZE};
|
2018-10-25 11:13:08 -07:00
|
|
|
use solana_sdk::pubkey::Pubkey;
|
2018-08-09 12:31:34 -07:00
|
|
|
use std::io;
|
|
|
|
use std::io::Write;
|
|
|
|
use std::net::UdpSocket;
|
|
|
|
use std::sync::atomic::{AtomicBool, Ordering};
|
|
|
|
use std::sync::mpsc::channel;
|
2018-09-07 15:08:37 -07:00
|
|
|
use std::sync::Arc;
|
2018-08-09 12:31:34 -07:00
|
|
|
use std::time::Duration;
|
2018-09-07 15:08:37 -07:00
|
|
|
use streamer::{receiver, responder, PacketReceiver};
|
2018-10-30 10:05:18 -07:00
|
|
|
use window::{calculate_max_repair, new_window, Window, WindowUtil};
|
2018-08-09 12:31:34 -07:00
|
|
|
|
|
|
|
fn get_msgs(r: PacketReceiver, num: &mut usize) {
|
|
|
|
for _t in 0..5 {
|
|
|
|
let timer = Duration::new(1, 0);
|
|
|
|
match r.recv_timeout(timer) {
|
2018-09-26 09:50:12 -07:00
|
|
|
Ok(m) => *num += m.read().unwrap().packets.len(),
|
2018-08-09 12:31:34 -07:00
|
|
|
e => info!("error {:?}", e),
|
|
|
|
}
|
|
|
|
if *num == 10 {
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
#[test]
|
|
|
|
pub fn streamer_debug() {
|
|
|
|
write!(io::sink(), "{:?}", Packet::default()).unwrap();
|
|
|
|
write!(io::sink(), "{:?}", Packets::default()).unwrap();
|
|
|
|
write!(io::sink(), "{:?}", Blob::default()).unwrap();
|
|
|
|
}
|
|
|
|
#[test]
|
|
|
|
pub fn streamer_send_test() {
|
|
|
|
let read = UdpSocket::bind("127.0.0.1:0").expect("bind");
|
|
|
|
read.set_read_timeout(Some(Duration::new(1, 0))).unwrap();
|
|
|
|
|
|
|
|
let addr = read.local_addr().unwrap();
|
|
|
|
let send = UdpSocket::bind("127.0.0.1:0").expect("bind");
|
|
|
|
let exit = Arc::new(AtomicBool::new(false));
|
|
|
|
let (s_reader, r_reader) = channel();
|
2018-09-24 17:13:49 -07:00
|
|
|
let t_receiver = receiver(
|
|
|
|
Arc::new(read),
|
|
|
|
exit.clone(),
|
|
|
|
s_reader,
|
|
|
|
"window-streamer-test",
|
|
|
|
);
|
2018-08-09 12:31:34 -07:00
|
|
|
let t_responder = {
|
|
|
|
let (s_responder, r_responder) = channel();
|
2018-09-18 08:02:57 -07:00
|
|
|
let t_responder = responder("streamer_send_test", Arc::new(send), r_responder);
|
2018-09-03 03:22:47 -07:00
|
|
|
let mut msgs = Vec::new();
|
2018-08-09 12:31:34 -07:00
|
|
|
for i in 0..10 {
|
2018-09-26 09:50:12 -07:00
|
|
|
let mut b = SharedBlob::default();
|
2018-08-09 12:31:34 -07:00
|
|
|
{
|
2018-09-26 09:50:12 -07:00
|
|
|
let mut w = b.write().unwrap();
|
2018-08-09 12:31:34 -07:00
|
|
|
w.data[0] = i as u8;
|
|
|
|
w.meta.size = PACKET_DATA_SIZE;
|
|
|
|
w.meta.set_addr(&addr);
|
|
|
|
}
|
2018-09-03 03:22:47 -07:00
|
|
|
msgs.push(b);
|
2018-08-09 12:31:34 -07:00
|
|
|
}
|
|
|
|
s_responder.send(msgs).expect("send");
|
|
|
|
t_responder
|
|
|
|
};
|
|
|
|
|
|
|
|
let mut num = 0;
|
|
|
|
get_msgs(r_reader, &mut num);
|
|
|
|
assert_eq!(num, 10);
|
|
|
|
exit.store(true, Ordering::Relaxed);
|
|
|
|
t_receiver.join().expect("join");
|
|
|
|
t_responder.join().expect("join");
|
|
|
|
}
|
|
|
|
|
|
|
|
#[test]
|
2018-10-30 10:05:18 -07:00
|
|
|
pub fn test_calculate_max_repair() {
|
|
|
|
const WINDOW_SIZE: u64 = 200;
|
|
|
|
|
|
|
|
assert_eq!(calculate_max_repair(0, 10, 90, 0, false, WINDOW_SIZE), 90);
|
|
|
|
assert_eq!(calculate_max_repair(15, 10, 90, 32, false, WINDOW_SIZE), 90);
|
|
|
|
assert_eq!(calculate_max_repair(15, 10, 90, 0, false, WINDOW_SIZE), 75);
|
|
|
|
assert_eq!(calculate_max_repair(90, 10, 90, 0, false, WINDOW_SIZE), 10);
|
|
|
|
assert_eq!(calculate_max_repair(90, 10, 50, 0, false, WINDOW_SIZE), 10);
|
|
|
|
assert_eq!(calculate_max_repair(90, 10, 99, 0, false, WINDOW_SIZE), 10);
|
|
|
|
assert_eq!(calculate_max_repair(90, 10, 101, 0, false, WINDOW_SIZE), 11);
|
2018-08-09 12:31:34 -07:00
|
|
|
assert_eq!(
|
2018-10-30 10:05:18 -07:00
|
|
|
calculate_max_repair(90, 10, 95 + WINDOW_SIZE, 0, false, WINDOW_SIZE),
|
2018-08-09 12:31:34 -07:00
|
|
|
WINDOW_SIZE + 5
|
|
|
|
);
|
|
|
|
assert_eq!(
|
2018-10-30 10:05:18 -07:00
|
|
|
calculate_max_repair(90, 10, 99 + WINDOW_SIZE, 0, false, WINDOW_SIZE),
|
2018-08-09 12:31:34 -07:00
|
|
|
WINDOW_SIZE + 9
|
|
|
|
);
|
|
|
|
assert_eq!(
|
2018-10-30 10:05:18 -07:00
|
|
|
calculate_max_repair(90, 10, 100 + WINDOW_SIZE, 0, false, WINDOW_SIZE),
|
2018-08-09 12:31:34 -07:00
|
|
|
WINDOW_SIZE + 9
|
|
|
|
);
|
|
|
|
assert_eq!(
|
2018-10-30 10:05:18 -07:00
|
|
|
calculate_max_repair(90, 10, 120 + WINDOW_SIZE, 0, false, WINDOW_SIZE),
|
2018-08-09 12:31:34 -07:00
|
|
|
WINDOW_SIZE + 9
|
|
|
|
);
|
2018-09-25 15:41:29 -07:00
|
|
|
assert_eq!(
|
2018-10-30 10:05:18 -07:00
|
|
|
calculate_max_repair(50, 100, 50 + WINDOW_SIZE, 0, false, WINDOW_SIZE),
|
2018-09-25 15:41:29 -07:00
|
|
|
WINDOW_SIZE
|
|
|
|
);
|
|
|
|
assert_eq!(
|
2018-10-30 10:05:18 -07:00
|
|
|
calculate_max_repair(50, 100, 50 + WINDOW_SIZE, 0, true, WINDOW_SIZE),
|
2018-09-25 15:41:29 -07:00
|
|
|
50 + WINDOW_SIZE
|
|
|
|
);
|
2018-08-09 12:31:34 -07:00
|
|
|
}
|
|
|
|
|
2018-10-30 10:05:18 -07:00
|
|
|
fn wrap_blob_idx_in_window(
|
|
|
|
window: &Window,
|
|
|
|
id: &Pubkey,
|
|
|
|
pix: u64,
|
|
|
|
consumed: u64,
|
|
|
|
received: u64,
|
|
|
|
) -> (bool, u64) {
|
2018-08-09 12:31:34 -07:00
|
|
|
let mut received = received;
|
2018-10-30 10:05:18 -07:00
|
|
|
let is_in_window = window.blob_idx_in_window(&id, pix, consumed, &mut received);
|
2018-08-09 12:31:34 -07:00
|
|
|
(is_in_window, received)
|
|
|
|
}
|
|
|
|
#[test]
|
2018-10-30 10:05:18 -07:00
|
|
|
pub fn test_blob_idx_in_window() {
|
2018-09-05 21:36:59 -07:00
|
|
|
let id = Pubkey::default();
|
2018-10-30 10:05:18 -07:00
|
|
|
const WINDOW_SIZE: u64 = 200;
|
|
|
|
let window = new_window(WINDOW_SIZE as usize);
|
|
|
|
|
2018-08-09 12:31:34 -07:00
|
|
|
assert_eq!(
|
2018-10-30 10:05:18 -07:00
|
|
|
wrap_blob_idx_in_window(&window, &id, 90 + WINDOW_SIZE, 90, 100),
|
2018-08-09 12:31:34 -07:00
|
|
|
(false, 90 + WINDOW_SIZE)
|
|
|
|
);
|
|
|
|
assert_eq!(
|
2018-10-30 10:05:18 -07:00
|
|
|
wrap_blob_idx_in_window(&window, &id, 91 + WINDOW_SIZE, 90, 100),
|
2018-08-09 12:31:34 -07:00
|
|
|
(false, 91 + WINDOW_SIZE)
|
|
|
|
);
|
2018-10-30 10:05:18 -07:00
|
|
|
assert_eq!(
|
|
|
|
wrap_blob_idx_in_window(&window, &id, 89, 90, 100),
|
|
|
|
(false, 100)
|
|
|
|
);
|
2018-08-09 12:31:34 -07:00
|
|
|
|
2018-10-30 10:05:18 -07:00
|
|
|
assert_eq!(
|
|
|
|
wrap_blob_idx_in_window(&window, &id, 91, 90, 100),
|
|
|
|
(true, 100)
|
|
|
|
);
|
|
|
|
assert_eq!(
|
|
|
|
wrap_blob_idx_in_window(&window, &id, 101, 90, 100),
|
|
|
|
(true, 101)
|
|
|
|
);
|
2018-08-09 12:31:34 -07:00
|
|
|
}
|
|
|
|
}
|