Split window into two modules
This commit is contained in:
parent
5025d89c88
commit
7f669094de
|
@ -57,6 +57,7 @@ pub mod tvu;
|
|||
pub mod vote_stage;
|
||||
pub mod wallet;
|
||||
pub mod window;
|
||||
pub mod window_service;
|
||||
pub mod write_stage;
|
||||
extern crate bincode;
|
||||
extern crate bs58;
|
||||
|
|
|
@ -14,7 +14,8 @@ use std::sync::{Arc, RwLock};
|
|||
use std::thread::{self, Builder, JoinHandle};
|
||||
use std::time::Duration;
|
||||
use streamer::BlobReceiver;
|
||||
use window::{self, SharedWindow};
|
||||
use window::SharedWindow;
|
||||
use window_service::window_service;
|
||||
|
||||
fn retransmit(
|
||||
crdt: &Arc<RwLock<Crdt>>,
|
||||
|
@ -95,7 +96,7 @@ impl RetransmitStage {
|
|||
retransmit_receiver,
|
||||
);
|
||||
let (blob_sender, blob_receiver) = channel();
|
||||
let t_window = window::window_service(
|
||||
let t_window = window_service(
|
||||
crdt.clone(),
|
||||
window,
|
||||
entry_height,
|
||||
|
|
562
src/window.rs
562
src/window.rs
|
@ -8,19 +8,13 @@ use erasure;
|
|||
use ledger::Block;
|
||||
use log::Level;
|
||||
use packet::{BlobRecycler, SharedBlob, SharedBlobs};
|
||||
use rand::{thread_rng, Rng};
|
||||
use result::{Error, Result};
|
||||
use result::Result;
|
||||
use signature::Pubkey;
|
||||
use std::cmp;
|
||||
use std::mem;
|
||||
use std::net::{SocketAddr, UdpSocket};
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::atomic::AtomicUsize;
|
||||
use std::sync::mpsc::RecvTimeoutError;
|
||||
use std::sync::{Arc, RwLock};
|
||||
use std::thread::{Builder, JoinHandle};
|
||||
use std::time::{Duration, Instant};
|
||||
use streamer::{BlobReceiver, BlobSender};
|
||||
use timing::duration_as_ms;
|
||||
|
||||
pub const WINDOW_SIZE: u64 = 2 * 1024;
|
||||
|
||||
|
@ -299,128 +293,7 @@ fn calculate_highest_lost_blob_index(num_peers: u64, consumed: u64, received: u6
|
|||
cmp::min(consumed + WINDOW_SIZE - 1, highest_lost)
|
||||
}
|
||||
|
||||
pub const MAX_REPAIR_BACKOFF: usize = 128;
|
||||
|
||||
fn repair_backoff(last: &mut u64, times: &mut usize, consumed: u64) -> bool {
|
||||
//exponential backoff
|
||||
if *last != consumed {
|
||||
//start with a 50% chance of asking for repairs
|
||||
*times = 1;
|
||||
}
|
||||
*last = consumed;
|
||||
*times += 1;
|
||||
|
||||
// Experiment with capping repair request duration.
|
||||
// Once nodes are too far behind they can spend many
|
||||
// seconds without asking for repair
|
||||
if *times > MAX_REPAIR_BACKOFF {
|
||||
// 50% chance that a request will fire between 64 - 128 tries
|
||||
*times = MAX_REPAIR_BACKOFF / 2;
|
||||
}
|
||||
|
||||
//if we get lucky, make the request, which should exponentially get less likely
|
||||
thread_rng().gen_range(0, *times as u64) == 0
|
||||
}
|
||||
|
||||
fn add_block_to_retransmit_queue(
|
||||
b: &SharedBlob,
|
||||
leader_id: Pubkey,
|
||||
recycler: &BlobRecycler,
|
||||
retransmit_queue: &mut Vec<SharedBlob>,
|
||||
) {
|
||||
let p = b
|
||||
.read()
|
||||
.expect("'b' read lock in fn add_block_to_retransmit_queue");
|
||||
//TODO this check isn't safe against adverserial packets
|
||||
//we need to maintain a sequence window
|
||||
trace!(
|
||||
"idx: {} addr: {:?} id: {:?} leader: {:?}",
|
||||
p.get_index()
|
||||
.expect("get_index in fn add_block_to_retransmit_queue"),
|
||||
p.get_id()
|
||||
.expect("get_id in trace! fn add_block_to_retransmit_queue"),
|
||||
p.meta.addr(),
|
||||
leader_id
|
||||
);
|
||||
if p.get_id()
|
||||
.expect("get_id in fn add_block_to_retransmit_queue") == leader_id
|
||||
{
|
||||
//TODO
|
||||
//need to copy the retransmitted blob
|
||||
//otherwise we get into races with which thread
|
||||
//should do the recycling
|
||||
//
|
||||
//a better abstraction would be to recycle when the blob
|
||||
//is dropped via a weakref to the recycler
|
||||
let nv = recycler.allocate();
|
||||
{
|
||||
let mut mnv = nv
|
||||
.write()
|
||||
.expect("recycler write lock in fn add_block_to_retransmit_queue");
|
||||
let sz = p.meta.size;
|
||||
mnv.meta.size = sz;
|
||||
mnv.data[..sz].copy_from_slice(&p.data[..sz]);
|
||||
}
|
||||
retransmit_queue.push(nv);
|
||||
}
|
||||
}
|
||||
|
||||
fn retransmit_all_leader_blocks(
|
||||
window: &SharedWindow,
|
||||
maybe_leader: Option<NodeInfo>,
|
||||
dq: &[SharedBlob],
|
||||
id: &Pubkey,
|
||||
recycler: &BlobRecycler,
|
||||
consumed: u64,
|
||||
received: u64,
|
||||
retransmit: &BlobSender,
|
||||
pending_retransmits: &mut bool,
|
||||
) -> Result<()> {
|
||||
let mut retransmit_queue: Vec<SharedBlob> = Vec::new();
|
||||
if let Some(leader) = maybe_leader {
|
||||
let leader_id = leader.id;
|
||||
for b in dq {
|
||||
add_block_to_retransmit_queue(b, leader_id, recycler, &mut retransmit_queue);
|
||||
}
|
||||
|
||||
if *pending_retransmits {
|
||||
for w in window
|
||||
.write()
|
||||
.expect("Window write failed in retransmit_all_leader_blocks")
|
||||
.iter_mut()
|
||||
{
|
||||
*pending_retransmits = false;
|
||||
if w.leader_unknown {
|
||||
if let Some(b) = w.clone().data {
|
||||
add_block_to_retransmit_queue(
|
||||
&b,
|
||||
leader_id,
|
||||
recycler,
|
||||
&mut retransmit_queue,
|
||||
);
|
||||
w.leader_unknown = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
warn!("{}: no leader to retransmit from", id);
|
||||
}
|
||||
if !retransmit_queue.is_empty() {
|
||||
trace!(
|
||||
"{}: RECV_WINDOW {} {}: retransmit {}",
|
||||
id,
|
||||
consumed,
|
||||
received,
|
||||
retransmit_queue.len(),
|
||||
);
|
||||
inc_new_counter_info!("streamer-recv_window-retransmit", retransmit_queue.len());
|
||||
retransmit.send(retransmit_queue)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn blob_idx_in_window(id: &Pubkey, pix: u64, consumed: u64, received: &mut u64) -> bool {
|
||||
pub fn blob_idx_in_window(id: &Pubkey, pix: u64, consumed: u64, received: &mut u64) -> bool {
|
||||
// Prevent receive window from running over
|
||||
// Got a blob which has already been consumed, skip it
|
||||
// probably from a repair window request
|
||||
|
@ -453,99 +326,6 @@ fn blob_idx_in_window(id: &Pubkey, pix: u64, consumed: u64, received: &mut u64)
|
|||
}
|
||||
}
|
||||
|
||||
#[cfg_attr(feature = "cargo-clippy", allow(too_many_arguments))]
|
||||
fn recv_window(
|
||||
window: &SharedWindow,
|
||||
id: &Pubkey,
|
||||
crdt: &Arc<RwLock<Crdt>>,
|
||||
recycler: &BlobRecycler,
|
||||
consumed: &mut u64,
|
||||
received: &mut u64,
|
||||
r: &BlobReceiver,
|
||||
s: &BlobSender,
|
||||
retransmit: &BlobSender,
|
||||
pending_retransmits: &mut bool,
|
||||
) -> Result<()> {
|
||||
let timer = Duration::from_millis(200);
|
||||
let mut dq = r.recv_timeout(timer)?;
|
||||
let maybe_leader: Option<NodeInfo> = crdt
|
||||
.read()
|
||||
.expect("'crdt' read lock in fn recv_window")
|
||||
.leader_data()
|
||||
.cloned();
|
||||
let leader_unknown = maybe_leader.is_none();
|
||||
while let Ok(mut nq) = r.try_recv() {
|
||||
dq.append(&mut nq)
|
||||
}
|
||||
let now = Instant::now();
|
||||
inc_new_counter_info!("streamer-recv_window-recv", dq.len(), 100);
|
||||
trace!(
|
||||
"{}: RECV_WINDOW {} {}: got packets {}",
|
||||
id,
|
||||
*consumed,
|
||||
*received,
|
||||
dq.len(),
|
||||
);
|
||||
|
||||
retransmit_all_leader_blocks(
|
||||
window,
|
||||
maybe_leader,
|
||||
&dq,
|
||||
id,
|
||||
recycler,
|
||||
*consumed,
|
||||
*received,
|
||||
retransmit,
|
||||
pending_retransmits,
|
||||
)?;
|
||||
|
||||
let mut pixs = Vec::new();
|
||||
//send a contiguous set of blocks
|
||||
let mut consume_queue = Vec::new();
|
||||
for b in dq {
|
||||
let (pix, meta_size) = {
|
||||
let p = b.write().unwrap();
|
||||
(p.get_index()?, p.meta.size)
|
||||
};
|
||||
pixs.push(pix);
|
||||
|
||||
if !blob_idx_in_window(&id, pix, *consumed, received) {
|
||||
recycler.recycle(b, "recv_window");
|
||||
continue;
|
||||
}
|
||||
|
||||
trace!("{} window pix: {} size: {}", id, pix, meta_size);
|
||||
|
||||
window.write().unwrap().process_blob(
|
||||
id,
|
||||
b,
|
||||
pix,
|
||||
&mut consume_queue,
|
||||
recycler,
|
||||
consumed,
|
||||
leader_unknown,
|
||||
pending_retransmits,
|
||||
);
|
||||
}
|
||||
if log_enabled!(Level::Trace) {
|
||||
trace!("{}", window.read().unwrap().print(id, *consumed));
|
||||
trace!(
|
||||
"{}: consumed: {} received: {} sending consume.len: {} pixs: {:?} took {} ms",
|
||||
id,
|
||||
*consumed,
|
||||
*received,
|
||||
consume_queue.len(),
|
||||
pixs,
|
||||
duration_as_ms(&now.elapsed())
|
||||
);
|
||||
}
|
||||
if !consume_queue.is_empty() {
|
||||
inc_new_counter_info!("streamer-recv_window-consume", consume_queue.len());
|
||||
s.send(consume_queue)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn default_window() -> SharedWindow {
|
||||
Arc::new(RwLock::new(vec![
|
||||
WindowSlot::default();
|
||||
|
@ -624,72 +404,6 @@ pub fn new_window_from_entries(
|
|||
initialized_window(&node_info, blobs, entry_height)
|
||||
}
|
||||
|
||||
pub fn window_service(
|
||||
crdt: Arc<RwLock<Crdt>>,
|
||||
window: SharedWindow,
|
||||
entry_height: u64,
|
||||
recycler: BlobRecycler,
|
||||
r: BlobReceiver,
|
||||
s: BlobSender,
|
||||
retransmit: BlobSender,
|
||||
repair_socket: Arc<UdpSocket>,
|
||||
) -> JoinHandle<()> {
|
||||
Builder::new()
|
||||
.name("solana-window".to_string())
|
||||
.spawn(move || {
|
||||
let mut consumed = entry_height;
|
||||
let mut received = entry_height;
|
||||
let mut last = entry_height;
|
||||
let mut times = 0;
|
||||
let id = crdt.read().unwrap().id;
|
||||
let mut pending_retransmits = false;
|
||||
trace!("{}: RECV_WINDOW started", id);
|
||||
loop {
|
||||
if let Err(e) = recv_window(
|
||||
&window,
|
||||
&id,
|
||||
&crdt,
|
||||
&recycler,
|
||||
&mut consumed,
|
||||
&mut received,
|
||||
&r,
|
||||
&s,
|
||||
&retransmit,
|
||||
&mut pending_retransmits,
|
||||
) {
|
||||
match e {
|
||||
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
|
||||
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
|
||||
_ => {
|
||||
inc_new_counter_info!("streamer-window-error", 1, 1);
|
||||
error!("window error: {:?}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if received <= consumed {
|
||||
continue;
|
||||
}
|
||||
|
||||
//exponential backoff
|
||||
if !repair_backoff(&mut last, &mut times, consumed) {
|
||||
trace!("{} !repair_backoff() times = {}", id, times);
|
||||
continue;
|
||||
}
|
||||
|
||||
let mut window = window.write().unwrap();
|
||||
let reqs = window.repair(&crdt, &recycler, &id, times, consumed, received);
|
||||
for (to, req) in reqs {
|
||||
repair_socket.send_to(&req, to).unwrap_or_else(|e| {
|
||||
info!("{} repair req send_to({}) error {:?}", id, to, e);
|
||||
0
|
||||
});
|
||||
}
|
||||
}
|
||||
})
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use crdt::{Crdt, Node};
|
||||
|
@ -705,8 +419,7 @@ mod test {
|
|||
use std::time::Duration;
|
||||
use streamer::{blob_receiver, receiver, responder, BlobReceiver, PacketReceiver};
|
||||
use window::{
|
||||
blob_idx_in_window, calculate_highest_lost_blob_index, default_window, repair_backoff,
|
||||
window_service, WINDOW_SIZE,
|
||||
blob_idx_in_window, calculate_highest_lost_blob_index, default_window, WINDOW_SIZE,
|
||||
};
|
||||
|
||||
fn get_msgs(r: PacketReceiver, num: &mut usize) {
|
||||
|
@ -775,245 +488,6 @@ mod test {
|
|||
t_responder.join().expect("join");
|
||||
}
|
||||
|
||||
fn get_blobs(r: BlobReceiver, num: &mut usize) {
|
||||
for _t in 0..5 {
|
||||
let timer = Duration::new(1, 0);
|
||||
match r.recv_timeout(timer) {
|
||||
Ok(m) => {
|
||||
for (i, v) in m.iter().enumerate() {
|
||||
assert_eq!(v.read().unwrap().get_index().unwrap() as usize, *num + i);
|
||||
}
|
||||
*num += m.len();
|
||||
}
|
||||
e => info!("error {:?}", e),
|
||||
}
|
||||
if *num == 10 {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
pub fn window_send_test() {
|
||||
logger::setup();
|
||||
let tn = Node::new_localhost();
|
||||
let exit = Arc::new(AtomicBool::new(false));
|
||||
let mut crdt_me = Crdt::new(tn.info.clone()).expect("Crdt::new");
|
||||
let me_id = crdt_me.my_data().id;
|
||||
crdt_me.set_leader(me_id);
|
||||
let subs = Arc::new(RwLock::new(crdt_me));
|
||||
|
||||
let resp_recycler = BlobRecycler::default();
|
||||
let (s_reader, r_reader) = channel();
|
||||
let t_receiver = blob_receiver(
|
||||
Arc::new(tn.sockets.gossip),
|
||||
exit.clone(),
|
||||
resp_recycler.clone(),
|
||||
s_reader,
|
||||
);
|
||||
let (s_window, r_window) = channel();
|
||||
let (s_retransmit, r_retransmit) = channel();
|
||||
let win = default_window();
|
||||
let t_window = window_service(
|
||||
subs,
|
||||
win,
|
||||
0,
|
||||
resp_recycler.clone(),
|
||||
r_reader,
|
||||
s_window,
|
||||
s_retransmit,
|
||||
Arc::new(tn.sockets.repair),
|
||||
);
|
||||
let t_responder = {
|
||||
let (s_responder, r_responder) = channel();
|
||||
let t_responder = responder(
|
||||
"window_send_test",
|
||||
Arc::new(tn.sockets.replicate),
|
||||
resp_recycler.clone(),
|
||||
r_responder,
|
||||
);
|
||||
let mut msgs = Vec::new();
|
||||
for v in 0..10 {
|
||||
let i = 9 - v;
|
||||
let b = resp_recycler.allocate();
|
||||
{
|
||||
let mut w = b.write().unwrap();
|
||||
w.set_index(i).unwrap();
|
||||
w.set_id(me_id).unwrap();
|
||||
assert_eq!(i, w.get_index().unwrap());
|
||||
w.meta.size = PACKET_DATA_SIZE;
|
||||
w.meta.set_addr(&tn.info.contact_info.ncp);
|
||||
}
|
||||
msgs.push(b);
|
||||
}
|
||||
s_responder.send(msgs).expect("send");
|
||||
t_responder
|
||||
};
|
||||
|
||||
let mut num = 0;
|
||||
get_blobs(r_window, &mut num);
|
||||
assert_eq!(num, 10);
|
||||
let mut q = r_retransmit.recv().unwrap();
|
||||
while let Ok(mut nq) = r_retransmit.try_recv() {
|
||||
q.append(&mut nq);
|
||||
}
|
||||
assert_eq!(q.len(), 10);
|
||||
exit.store(true, Ordering::Relaxed);
|
||||
t_receiver.join().expect("join");
|
||||
t_responder.join().expect("join");
|
||||
t_window.join().expect("join");
|
||||
}
|
||||
|
||||
#[test]
|
||||
pub fn window_send_no_leader_test() {
|
||||
logger::setup();
|
||||
let tn = Node::new_localhost();
|
||||
let exit = Arc::new(AtomicBool::new(false));
|
||||
let crdt_me = Crdt::new(tn.info.clone()).expect("Crdt::new");
|
||||
let me_id = crdt_me.my_data().id;
|
||||
let subs = Arc::new(RwLock::new(crdt_me));
|
||||
|
||||
let resp_recycler = BlobRecycler::default();
|
||||
let (s_reader, r_reader) = channel();
|
||||
let t_receiver = blob_receiver(
|
||||
Arc::new(tn.sockets.gossip),
|
||||
exit.clone(),
|
||||
resp_recycler.clone(),
|
||||
s_reader,
|
||||
);
|
||||
let (s_window, _r_window) = channel();
|
||||
let (s_retransmit, r_retransmit) = channel();
|
||||
let win = default_window();
|
||||
let t_window = window_service(
|
||||
subs.clone(),
|
||||
win,
|
||||
0,
|
||||
resp_recycler.clone(),
|
||||
r_reader,
|
||||
s_window,
|
||||
s_retransmit,
|
||||
Arc::new(tn.sockets.repair),
|
||||
);
|
||||
let t_responder = {
|
||||
let (s_responder, r_responder) = channel();
|
||||
let t_responder = responder(
|
||||
"window_send_test",
|
||||
Arc::new(tn.sockets.replicate),
|
||||
resp_recycler.clone(),
|
||||
r_responder,
|
||||
);
|
||||
let mut msgs = Vec::new();
|
||||
for v in 0..10 {
|
||||
let i = 9 - v;
|
||||
let b = resp_recycler.allocate();
|
||||
{
|
||||
let mut w = b.write().unwrap();
|
||||
w.set_index(i).unwrap();
|
||||
w.set_id(me_id).unwrap();
|
||||
assert_eq!(i, w.get_index().unwrap());
|
||||
w.meta.size = PACKET_DATA_SIZE;
|
||||
w.meta.set_addr(&tn.info.contact_info.ncp);
|
||||
}
|
||||
msgs.push(b);
|
||||
}
|
||||
s_responder.send(msgs).expect("send");
|
||||
t_responder
|
||||
};
|
||||
|
||||
assert!(r_retransmit.recv_timeout(Duration::new(3, 0)).is_err());
|
||||
exit.store(true, Ordering::Relaxed);
|
||||
t_receiver.join().expect("join");
|
||||
t_responder.join().expect("join");
|
||||
t_window.join().expect("join");
|
||||
}
|
||||
|
||||
#[test]
|
||||
pub fn window_send_late_leader_test() {
|
||||
logger::setup();
|
||||
let tn = Node::new_localhost();
|
||||
let exit = Arc::new(AtomicBool::new(false));
|
||||
let crdt_me = Crdt::new(tn.info.clone()).expect("Crdt::new");
|
||||
let me_id = crdt_me.my_data().id;
|
||||
let subs = Arc::new(RwLock::new(crdt_me));
|
||||
|
||||
let resp_recycler = BlobRecycler::default();
|
||||
let (s_reader, r_reader) = channel();
|
||||
let t_receiver = blob_receiver(
|
||||
Arc::new(tn.sockets.gossip),
|
||||
exit.clone(),
|
||||
resp_recycler.clone(),
|
||||
s_reader,
|
||||
);
|
||||
let (s_window, _r_window) = channel();
|
||||
let (s_retransmit, r_retransmit) = channel();
|
||||
let win = default_window();
|
||||
let t_window = window_service(
|
||||
subs.clone(),
|
||||
win,
|
||||
0,
|
||||
resp_recycler.clone(),
|
||||
r_reader,
|
||||
s_window,
|
||||
s_retransmit,
|
||||
Arc::new(tn.sockets.repair),
|
||||
);
|
||||
let t_responder = {
|
||||
let (s_responder, r_responder) = channel();
|
||||
let t_responder = responder(
|
||||
"window_send_test",
|
||||
Arc::new(tn.sockets.replicate),
|
||||
resp_recycler.clone(),
|
||||
r_responder,
|
||||
);
|
||||
let mut msgs = Vec::new();
|
||||
for v in 0..10 {
|
||||
let i = 9 - v;
|
||||
let b = resp_recycler.allocate();
|
||||
{
|
||||
let mut w = b.write().unwrap();
|
||||
w.set_index(i).unwrap();
|
||||
w.set_id(me_id).unwrap();
|
||||
assert_eq!(i, w.get_index().unwrap());
|
||||
w.meta.size = PACKET_DATA_SIZE;
|
||||
w.meta.set_addr(&tn.info.contact_info.ncp);
|
||||
}
|
||||
msgs.push(b);
|
||||
}
|
||||
s_responder.send(msgs).expect("send");
|
||||
|
||||
assert!(r_retransmit.recv_timeout(Duration::new(3, 0)).is_err());
|
||||
|
||||
subs.write().unwrap().set_leader(me_id);
|
||||
|
||||
let mut msgs1 = Vec::new();
|
||||
for v in 1..5 {
|
||||
let i = 9 + v;
|
||||
let b = resp_recycler.allocate();
|
||||
{
|
||||
let mut w = b.write().unwrap();
|
||||
w.set_index(i).unwrap();
|
||||
w.set_id(me_id).unwrap();
|
||||
assert_eq!(i, w.get_index().unwrap());
|
||||
w.meta.size = PACKET_DATA_SIZE;
|
||||
w.meta.set_addr(&tn.info.contact_info.ncp);
|
||||
}
|
||||
msgs1.push(b);
|
||||
}
|
||||
s_responder.send(msgs1).expect("send");
|
||||
t_responder
|
||||
};
|
||||
|
||||
let mut q = r_retransmit.recv().unwrap();
|
||||
while let Ok(mut nq) = r_retransmit.try_recv() {
|
||||
q.append(&mut nq);
|
||||
}
|
||||
assert!(q.len() > 10);
|
||||
exit.store(true, Ordering::Relaxed);
|
||||
t_receiver.join().expect("join");
|
||||
t_responder.join().expect("join");
|
||||
t_window.join().expect("join");
|
||||
}
|
||||
|
||||
#[test]
|
||||
pub fn calculate_highest_lost_blob_index_test() {
|
||||
assert_eq!(calculate_highest_lost_blob_index(0, 10, 90), 90);
|
||||
|
@ -1061,32 +535,4 @@ mod test {
|
|||
assert_eq!(wrap_blob_idx_in_window(&id, 91, 90, 100), (true, 100));
|
||||
assert_eq!(wrap_blob_idx_in_window(&id, 101, 90, 100), (true, 101));
|
||||
}
|
||||
#[test]
|
||||
pub fn test_repair_backoff() {
|
||||
let num_tests = 100;
|
||||
let res: usize = (0..num_tests)
|
||||
.map(|_| {
|
||||
let mut last = 0;
|
||||
let mut times = 0;
|
||||
let total: usize = (0..127)
|
||||
.map(|x| {
|
||||
let rv = repair_backoff(&mut last, &mut times, 1) as usize;
|
||||
assert_eq!(times, x + 2);
|
||||
rv
|
||||
})
|
||||
.sum();
|
||||
assert_eq!(times, 128);
|
||||
assert_eq!(last, 1);
|
||||
repair_backoff(&mut last, &mut times, 1);
|
||||
assert_eq!(times, 64);
|
||||
repair_backoff(&mut last, &mut times, 2);
|
||||
assert_eq!(times, 2);
|
||||
assert_eq!(last, 2);
|
||||
total
|
||||
})
|
||||
.sum();
|
||||
let avg = res / num_tests;
|
||||
assert!(avg >= 3);
|
||||
assert!(avg <= 5);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,585 @@
|
|||
//! The `window_service` provides a thread for maintaining a window (tail of the ledger).
|
||||
//!
|
||||
use counter::Counter;
|
||||
use crdt::{Crdt, NodeInfo};
|
||||
#[cfg(feature = "erasure")]
|
||||
use erasure;
|
||||
use log::Level;
|
||||
use packet::{BlobRecycler, SharedBlob};
|
||||
use rand::{thread_rng, Rng};
|
||||
use result::{Error, Result};
|
||||
use signature::Pubkey;
|
||||
use std::net::UdpSocket;
|
||||
use std::sync::atomic::AtomicUsize;
|
||||
use std::sync::mpsc::RecvTimeoutError;
|
||||
use std::sync::{Arc, RwLock};
|
||||
use std::thread::{Builder, JoinHandle};
|
||||
use std::time::{Duration, Instant};
|
||||
use streamer::{BlobReceiver, BlobSender};
|
||||
use timing::duration_as_ms;
|
||||
use window::{blob_idx_in_window, SharedWindow, WindowUtil};
|
||||
|
||||
pub const MAX_REPAIR_BACKOFF: usize = 128;
|
||||
|
||||
fn repair_backoff(last: &mut u64, times: &mut usize, consumed: u64) -> bool {
|
||||
//exponential backoff
|
||||
if *last != consumed {
|
||||
//start with a 50% chance of asking for repairs
|
||||
*times = 1;
|
||||
}
|
||||
*last = consumed;
|
||||
*times += 1;
|
||||
|
||||
// Experiment with capping repair request duration.
|
||||
// Once nodes are too far behind they can spend many
|
||||
// seconds without asking for repair
|
||||
if *times > MAX_REPAIR_BACKOFF {
|
||||
// 50% chance that a request will fire between 64 - 128 tries
|
||||
*times = MAX_REPAIR_BACKOFF / 2;
|
||||
}
|
||||
|
||||
//if we get lucky, make the request, which should exponentially get less likely
|
||||
thread_rng().gen_range(0, *times as u64) == 0
|
||||
}
|
||||
|
||||
fn add_block_to_retransmit_queue(
|
||||
b: &SharedBlob,
|
||||
leader_id: Pubkey,
|
||||
recycler: &BlobRecycler,
|
||||
retransmit_queue: &mut Vec<SharedBlob>,
|
||||
) {
|
||||
let p = b
|
||||
.read()
|
||||
.expect("'b' read lock in fn add_block_to_retransmit_queue");
|
||||
//TODO this check isn't safe against adverserial packets
|
||||
//we need to maintain a sequence window
|
||||
trace!(
|
||||
"idx: {} addr: {:?} id: {:?} leader: {:?}",
|
||||
p.get_index()
|
||||
.expect("get_index in fn add_block_to_retransmit_queue"),
|
||||
p.get_id()
|
||||
.expect("get_id in trace! fn add_block_to_retransmit_queue"),
|
||||
p.meta.addr(),
|
||||
leader_id
|
||||
);
|
||||
if p.get_id()
|
||||
.expect("get_id in fn add_block_to_retransmit_queue") == leader_id
|
||||
{
|
||||
//TODO
|
||||
//need to copy the retransmitted blob
|
||||
//otherwise we get into races with which thread
|
||||
//should do the recycling
|
||||
//
|
||||
//a better abstraction would be to recycle when the blob
|
||||
//is dropped via a weakref to the recycler
|
||||
let nv = recycler.allocate();
|
||||
{
|
||||
let mut mnv = nv
|
||||
.write()
|
||||
.expect("recycler write lock in fn add_block_to_retransmit_queue");
|
||||
let sz = p.meta.size;
|
||||
mnv.meta.size = sz;
|
||||
mnv.data[..sz].copy_from_slice(&p.data[..sz]);
|
||||
}
|
||||
retransmit_queue.push(nv);
|
||||
}
|
||||
}
|
||||
|
||||
fn retransmit_all_leader_blocks(
|
||||
window: &SharedWindow,
|
||||
maybe_leader: Option<NodeInfo>,
|
||||
dq: &[SharedBlob],
|
||||
id: &Pubkey,
|
||||
recycler: &BlobRecycler,
|
||||
consumed: u64,
|
||||
received: u64,
|
||||
retransmit: &BlobSender,
|
||||
pending_retransmits: &mut bool,
|
||||
) -> Result<()> {
|
||||
let mut retransmit_queue: Vec<SharedBlob> = Vec::new();
|
||||
if let Some(leader) = maybe_leader {
|
||||
let leader_id = leader.id;
|
||||
for b in dq {
|
||||
add_block_to_retransmit_queue(b, leader_id, recycler, &mut retransmit_queue);
|
||||
}
|
||||
|
||||
if *pending_retransmits {
|
||||
for w in window
|
||||
.write()
|
||||
.expect("Window write failed in retransmit_all_leader_blocks")
|
||||
.iter_mut()
|
||||
{
|
||||
*pending_retransmits = false;
|
||||
if w.leader_unknown {
|
||||
if let Some(b) = w.clone().data {
|
||||
add_block_to_retransmit_queue(
|
||||
&b,
|
||||
leader_id,
|
||||
recycler,
|
||||
&mut retransmit_queue,
|
||||
);
|
||||
w.leader_unknown = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
warn!("{}: no leader to retransmit from", id);
|
||||
}
|
||||
if !retransmit_queue.is_empty() {
|
||||
trace!(
|
||||
"{}: RECV_WINDOW {} {}: retransmit {}",
|
||||
id,
|
||||
consumed,
|
||||
received,
|
||||
retransmit_queue.len(),
|
||||
);
|
||||
inc_new_counter_info!("streamer-recv_window-retransmit", retransmit_queue.len());
|
||||
retransmit.send(retransmit_queue)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg_attr(feature = "cargo-clippy", allow(too_many_arguments))]
|
||||
fn recv_window(
|
||||
window: &SharedWindow,
|
||||
id: &Pubkey,
|
||||
crdt: &Arc<RwLock<Crdt>>,
|
||||
recycler: &BlobRecycler,
|
||||
consumed: &mut u64,
|
||||
received: &mut u64,
|
||||
r: &BlobReceiver,
|
||||
s: &BlobSender,
|
||||
retransmit: &BlobSender,
|
||||
pending_retransmits: &mut bool,
|
||||
) -> Result<()> {
|
||||
let timer = Duration::from_millis(200);
|
||||
let mut dq = r.recv_timeout(timer)?;
|
||||
let maybe_leader: Option<NodeInfo> = crdt
|
||||
.read()
|
||||
.expect("'crdt' read lock in fn recv_window")
|
||||
.leader_data()
|
||||
.cloned();
|
||||
let leader_unknown = maybe_leader.is_none();
|
||||
while let Ok(mut nq) = r.try_recv() {
|
||||
dq.append(&mut nq)
|
||||
}
|
||||
let now = Instant::now();
|
||||
inc_new_counter_info!("streamer-recv_window-recv", dq.len(), 100);
|
||||
trace!(
|
||||
"{}: RECV_WINDOW {} {}: got packets {}",
|
||||
id,
|
||||
*consumed,
|
||||
*received,
|
||||
dq.len(),
|
||||
);
|
||||
|
||||
retransmit_all_leader_blocks(
|
||||
window,
|
||||
maybe_leader,
|
||||
&dq,
|
||||
id,
|
||||
recycler,
|
||||
*consumed,
|
||||
*received,
|
||||
retransmit,
|
||||
pending_retransmits,
|
||||
)?;
|
||||
|
||||
let mut pixs = Vec::new();
|
||||
//send a contiguous set of blocks
|
||||
let mut consume_queue = Vec::new();
|
||||
for b in dq {
|
||||
let (pix, meta_size) = {
|
||||
let p = b.write().unwrap();
|
||||
(p.get_index()?, p.meta.size)
|
||||
};
|
||||
pixs.push(pix);
|
||||
|
||||
if !blob_idx_in_window(&id, pix, *consumed, received) {
|
||||
recycler.recycle(b, "recv_window");
|
||||
continue;
|
||||
}
|
||||
|
||||
trace!("{} window pix: {} size: {}", id, pix, meta_size);
|
||||
|
||||
window.write().unwrap().process_blob(
|
||||
id,
|
||||
b,
|
||||
pix,
|
||||
&mut consume_queue,
|
||||
recycler,
|
||||
consumed,
|
||||
leader_unknown,
|
||||
pending_retransmits,
|
||||
);
|
||||
}
|
||||
if log_enabled!(Level::Trace) {
|
||||
trace!("{}", window.read().unwrap().print(id, *consumed));
|
||||
trace!(
|
||||
"{}: consumed: {} received: {} sending consume.len: {} pixs: {:?} took {} ms",
|
||||
id,
|
||||
*consumed,
|
||||
*received,
|
||||
consume_queue.len(),
|
||||
pixs,
|
||||
duration_as_ms(&now.elapsed())
|
||||
);
|
||||
}
|
||||
if !consume_queue.is_empty() {
|
||||
inc_new_counter_info!("streamer-recv_window-consume", consume_queue.len());
|
||||
s.send(consume_queue)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn window_service(
|
||||
crdt: Arc<RwLock<Crdt>>,
|
||||
window: SharedWindow,
|
||||
entry_height: u64,
|
||||
recycler: BlobRecycler,
|
||||
r: BlobReceiver,
|
||||
s: BlobSender,
|
||||
retransmit: BlobSender,
|
||||
repair_socket: Arc<UdpSocket>,
|
||||
) -> JoinHandle<()> {
|
||||
Builder::new()
|
||||
.name("solana-window".to_string())
|
||||
.spawn(move || {
|
||||
let mut consumed = entry_height;
|
||||
let mut received = entry_height;
|
||||
let mut last = entry_height;
|
||||
let mut times = 0;
|
||||
let id = crdt.read().unwrap().id;
|
||||
let mut pending_retransmits = false;
|
||||
trace!("{}: RECV_WINDOW started", id);
|
||||
loop {
|
||||
if let Err(e) = recv_window(
|
||||
&window,
|
||||
&id,
|
||||
&crdt,
|
||||
&recycler,
|
||||
&mut consumed,
|
||||
&mut received,
|
||||
&r,
|
||||
&s,
|
||||
&retransmit,
|
||||
&mut pending_retransmits,
|
||||
) {
|
||||
match e {
|
||||
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
|
||||
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
|
||||
_ => {
|
||||
inc_new_counter_info!("streamer-window-error", 1, 1);
|
||||
error!("window error: {:?}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if received <= consumed {
|
||||
continue;
|
||||
}
|
||||
|
||||
//exponential backoff
|
||||
if !repair_backoff(&mut last, &mut times, consumed) {
|
||||
trace!("{} !repair_backoff() times = {}", id, times);
|
||||
continue;
|
||||
}
|
||||
|
||||
let mut window = window.write().unwrap();
|
||||
let reqs = window.repair(&crdt, &recycler, &id, times, consumed, received);
|
||||
for (to, req) in reqs {
|
||||
repair_socket.send_to(&req, to).unwrap_or_else(|e| {
|
||||
info!("{} repair req send_to({}) error {:?}", id, to, e);
|
||||
0
|
||||
});
|
||||
}
|
||||
}
|
||||
})
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use crdt::{Crdt, Node};
|
||||
use logger;
|
||||
use packet::{Blob, BlobRecycler, Packet, PacketRecycler, Packets, PACKET_DATA_SIZE};
|
||||
use signature::Pubkey;
|
||||
use std::io;
|
||||
use std::net::UdpSocket;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::mpsc::channel;
|
||||
use std::sync::{Arc, RwLock};
|
||||
use std::time::Duration;
|
||||
use streamer::{blob_receiver, receiver, responder, BlobReceiver, PacketReceiver};
|
||||
use window::{default_window, WINDOW_SIZE};
|
||||
use window_service::{repair_backoff, window_service};
|
||||
|
||||
fn get_blobs(r: BlobReceiver, num: &mut usize) {
|
||||
for _t in 0..5 {
|
||||
let timer = Duration::new(1, 0);
|
||||
match r.recv_timeout(timer) {
|
||||
Ok(m) => {
|
||||
for (i, v) in m.iter().enumerate() {
|
||||
assert_eq!(v.read().unwrap().get_index().unwrap() as usize, *num + i);
|
||||
}
|
||||
*num += m.len();
|
||||
}
|
||||
e => info!("error {:?}", e),
|
||||
}
|
||||
if *num == 10 {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
pub fn window_send_test() {
|
||||
logger::setup();
|
||||
let tn = Node::new_localhost();
|
||||
let exit = Arc::new(AtomicBool::new(false));
|
||||
let mut crdt_me = Crdt::new(tn.info.clone()).expect("Crdt::new");
|
||||
let me_id = crdt_me.my_data().id;
|
||||
crdt_me.set_leader(me_id);
|
||||
let subs = Arc::new(RwLock::new(crdt_me));
|
||||
|
||||
let resp_recycler = BlobRecycler::default();
|
||||
let (s_reader, r_reader) = channel();
|
||||
let t_receiver = blob_receiver(
|
||||
Arc::new(tn.sockets.gossip),
|
||||
exit.clone(),
|
||||
resp_recycler.clone(),
|
||||
s_reader,
|
||||
);
|
||||
let (s_window, r_window) = channel();
|
||||
let (s_retransmit, r_retransmit) = channel();
|
||||
let win = default_window();
|
||||
let t_window = window_service(
|
||||
subs,
|
||||
win,
|
||||
0,
|
||||
resp_recycler.clone(),
|
||||
r_reader,
|
||||
s_window,
|
||||
s_retransmit,
|
||||
Arc::new(tn.sockets.repair),
|
||||
);
|
||||
let t_responder = {
|
||||
let (s_responder, r_responder) = channel();
|
||||
let t_responder = responder(
|
||||
"window_send_test",
|
||||
Arc::new(tn.sockets.replicate),
|
||||
resp_recycler.clone(),
|
||||
r_responder,
|
||||
);
|
||||
let mut msgs = Vec::new();
|
||||
for v in 0..10 {
|
||||
let i = 9 - v;
|
||||
let b = resp_recycler.allocate();
|
||||
{
|
||||
let mut w = b.write().unwrap();
|
||||
w.set_index(i).unwrap();
|
||||
w.set_id(me_id).unwrap();
|
||||
assert_eq!(i, w.get_index().unwrap());
|
||||
w.meta.size = PACKET_DATA_SIZE;
|
||||
w.meta.set_addr(&tn.info.contact_info.ncp);
|
||||
}
|
||||
msgs.push(b);
|
||||
}
|
||||
s_responder.send(msgs).expect("send");
|
||||
t_responder
|
||||
};
|
||||
|
||||
let mut num = 0;
|
||||
get_blobs(r_window, &mut num);
|
||||
assert_eq!(num, 10);
|
||||
let mut q = r_retransmit.recv().unwrap();
|
||||
while let Ok(mut nq) = r_retransmit.try_recv() {
|
||||
q.append(&mut nq);
|
||||
}
|
||||
assert_eq!(q.len(), 10);
|
||||
exit.store(true, Ordering::Relaxed);
|
||||
t_receiver.join().expect("join");
|
||||
t_responder.join().expect("join");
|
||||
t_window.join().expect("join");
|
||||
}
|
||||
|
||||
#[test]
|
||||
pub fn window_send_no_leader_test() {
|
||||
logger::setup();
|
||||
let tn = Node::new_localhost();
|
||||
let exit = Arc::new(AtomicBool::new(false));
|
||||
let crdt_me = Crdt::new(tn.info.clone()).expect("Crdt::new");
|
||||
let me_id = crdt_me.my_data().id;
|
||||
let subs = Arc::new(RwLock::new(crdt_me));
|
||||
|
||||
let resp_recycler = BlobRecycler::default();
|
||||
let (s_reader, r_reader) = channel();
|
||||
let t_receiver = blob_receiver(
|
||||
Arc::new(tn.sockets.gossip),
|
||||
exit.clone(),
|
||||
resp_recycler.clone(),
|
||||
s_reader,
|
||||
);
|
||||
let (s_window, _r_window) = channel();
|
||||
let (s_retransmit, r_retransmit) = channel();
|
||||
let win = default_window();
|
||||
let t_window = window_service(
|
||||
subs.clone(),
|
||||
win,
|
||||
0,
|
||||
resp_recycler.clone(),
|
||||
r_reader,
|
||||
s_window,
|
||||
s_retransmit,
|
||||
Arc::new(tn.sockets.repair),
|
||||
);
|
||||
let t_responder = {
|
||||
let (s_responder, r_responder) = channel();
|
||||
let t_responder = responder(
|
||||
"window_send_test",
|
||||
Arc::new(tn.sockets.replicate),
|
||||
resp_recycler.clone(),
|
||||
r_responder,
|
||||
);
|
||||
let mut msgs = Vec::new();
|
||||
for v in 0..10 {
|
||||
let i = 9 - v;
|
||||
let b = resp_recycler.allocate();
|
||||
{
|
||||
let mut w = b.write().unwrap();
|
||||
w.set_index(i).unwrap();
|
||||
w.set_id(me_id).unwrap();
|
||||
assert_eq!(i, w.get_index().unwrap());
|
||||
w.meta.size = PACKET_DATA_SIZE;
|
||||
w.meta.set_addr(&tn.info.contact_info.ncp);
|
||||
}
|
||||
msgs.push(b);
|
||||
}
|
||||
s_responder.send(msgs).expect("send");
|
||||
t_responder
|
||||
};
|
||||
|
||||
assert!(r_retransmit.recv_timeout(Duration::new(3, 0)).is_err());
|
||||
exit.store(true, Ordering::Relaxed);
|
||||
t_receiver.join().expect("join");
|
||||
t_responder.join().expect("join");
|
||||
t_window.join().expect("join");
|
||||
}
|
||||
|
||||
#[test]
|
||||
pub fn window_send_late_leader_test() {
|
||||
logger::setup();
|
||||
let tn = Node::new_localhost();
|
||||
let exit = Arc::new(AtomicBool::new(false));
|
||||
let crdt_me = Crdt::new(tn.info.clone()).expect("Crdt::new");
|
||||
let me_id = crdt_me.my_data().id;
|
||||
let subs = Arc::new(RwLock::new(crdt_me));
|
||||
|
||||
let resp_recycler = BlobRecycler::default();
|
||||
let (s_reader, r_reader) = channel();
|
||||
let t_receiver = blob_receiver(
|
||||
Arc::new(tn.sockets.gossip),
|
||||
exit.clone(),
|
||||
resp_recycler.clone(),
|
||||
s_reader,
|
||||
);
|
||||
let (s_window, _r_window) = channel();
|
||||
let (s_retransmit, r_retransmit) = channel();
|
||||
let win = default_window();
|
||||
let t_window = window_service(
|
||||
subs.clone(),
|
||||
win,
|
||||
0,
|
||||
resp_recycler.clone(),
|
||||
r_reader,
|
||||
s_window,
|
||||
s_retransmit,
|
||||
Arc::new(tn.sockets.repair),
|
||||
);
|
||||
let t_responder = {
|
||||
let (s_responder, r_responder) = channel();
|
||||
let t_responder = responder(
|
||||
"window_send_test",
|
||||
Arc::new(tn.sockets.replicate),
|
||||
resp_recycler.clone(),
|
||||
r_responder,
|
||||
);
|
||||
let mut msgs = Vec::new();
|
||||
for v in 0..10 {
|
||||
let i = 9 - v;
|
||||
let b = resp_recycler.allocate();
|
||||
{
|
||||
let mut w = b.write().unwrap();
|
||||
w.set_index(i).unwrap();
|
||||
w.set_id(me_id).unwrap();
|
||||
assert_eq!(i, w.get_index().unwrap());
|
||||
w.meta.size = PACKET_DATA_SIZE;
|
||||
w.meta.set_addr(&tn.info.contact_info.ncp);
|
||||
}
|
||||
msgs.push(b);
|
||||
}
|
||||
s_responder.send(msgs).expect("send");
|
||||
|
||||
assert!(r_retransmit.recv_timeout(Duration::new(3, 0)).is_err());
|
||||
|
||||
subs.write().unwrap().set_leader(me_id);
|
||||
|
||||
let mut msgs1 = Vec::new();
|
||||
for v in 1..5 {
|
||||
let i = 9 + v;
|
||||
let b = resp_recycler.allocate();
|
||||
{
|
||||
let mut w = b.write().unwrap();
|
||||
w.set_index(i).unwrap();
|
||||
w.set_id(me_id).unwrap();
|
||||
assert_eq!(i, w.get_index().unwrap());
|
||||
w.meta.size = PACKET_DATA_SIZE;
|
||||
w.meta.set_addr(&tn.info.contact_info.ncp);
|
||||
}
|
||||
msgs1.push(b);
|
||||
}
|
||||
s_responder.send(msgs1).expect("send");
|
||||
t_responder
|
||||
};
|
||||
|
||||
let mut q = r_retransmit.recv().unwrap();
|
||||
while let Ok(mut nq) = r_retransmit.try_recv() {
|
||||
q.append(&mut nq);
|
||||
}
|
||||
assert!(q.len() > 10);
|
||||
exit.store(true, Ordering::Relaxed);
|
||||
t_receiver.join().expect("join");
|
||||
t_responder.join().expect("join");
|
||||
t_window.join().expect("join");
|
||||
}
|
||||
|
||||
#[test]
|
||||
pub fn test_repair_backoff() {
|
||||
let num_tests = 100;
|
||||
let res: usize = (0..num_tests)
|
||||
.map(|_| {
|
||||
let mut last = 0;
|
||||
let mut times = 0;
|
||||
let total: usize = (0..127)
|
||||
.map(|x| {
|
||||
let rv = repair_backoff(&mut last, &mut times, 1) as usize;
|
||||
assert_eq!(times, x + 2);
|
||||
rv
|
||||
})
|
||||
.sum();
|
||||
assert_eq!(times, 128);
|
||||
assert_eq!(last, 1);
|
||||
repair_backoff(&mut last, &mut times, 1);
|
||||
assert_eq!(times, 64);
|
||||
repair_backoff(&mut last, &mut times, 2);
|
||||
assert_eq!(times, 2);
|
||||
assert_eq!(last, 2);
|
||||
total
|
||||
})
|
||||
.sum();
|
||||
let avg = res / num_tests;
|
||||
assert!(avg >= 3);
|
||||
assert!(avg <= 5);
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue