Retransmit blobs from leader from window (#975)

- Some nodes don't have leader information while leader is broadcasting
  blobs to those nodes. Such blobs are not retransmitted. This change
  rertansmits the blobs once the leader's identity is know.
This commit is contained in:
Pankaj Garg 2018-08-14 21:51:37 -07:00 committed by GitHub
parent dccae18b53
commit 982afa87a6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 234 additions and 29 deletions

View File

@ -713,7 +713,8 @@ mod test {
let mut window = vec![
WindowSlot {
data: None,
coding: None
coding: None,
leader_unknown: false,
};
WINDOW_SIZE
];

View File

@ -9,6 +9,7 @@ use ledger::Block;
use log::Level;
use packet::{BlobRecycler, SharedBlob, SharedBlobs, BLOB_SIZE};
use result::{Error, Result};
use signature::Pubkey;
use std::cmp;
use std::collections::VecDeque;
use std::mem;
@ -27,6 +28,7 @@ pub const WINDOW_SIZE: u64 = 2 * 1024;
pub struct WindowSlot {
pub data: Option<SharedBlob>,
pub coding: Option<SharedBlob>,
pub leader_unknown: bool,
}
pub type SharedWindow = Arc<RwLock<Vec<WindowSlot>>>;
@ -143,6 +145,49 @@ fn repair_window(
Ok(())
}
fn add_block_to_retransmit_queue(
b: &SharedBlob,
leader_id: Pubkey,
recycler: &BlobRecycler,
retransmit_queue: &mut VecDeque<SharedBlob>,
) {
let p = b
.read()
.expect("'b' read lock in fn add_block_to_retransmit_queue");
//TODO this check isn't safe against adverserial packets
//we need to maintain a sequence window
trace!(
"idx: {} addr: {:?} id: {:?} leader: {:?}",
p.get_index()
.expect("get_index in fn add_block_to_retransmit_queue"),
p.get_id()
.expect("get_id in trace! fn add_block_to_retransmit_queue"),
p.meta.addr(),
leader_id
);
if p.get_id()
.expect("get_id in fn add_block_to_retransmit_queue") == leader_id
{
//TODO
//need to copy the retransmitted blob
//otherwise we get into races with which thread
//should do the recycling
//
//a better abstraction would be to recycle when the blob
//is dropped via a weakref to the recycler
let nv = recycler.allocate();
{
let mut mnv = nv
.write()
.expect("recycler write lock in fn add_block_to_retransmit_queue");
let sz = p.meta.size;
mnv.meta.size = sz;
mnv.data[..sz].copy_from_slice(&p.data[..sz]);
}
retransmit_queue.push_back(nv);
}
}
fn retransmit_all_leader_blocks(
maybe_leader: Option<NodeInfo>,
dq: &mut SharedBlobs,
@ -151,37 +196,34 @@ fn retransmit_all_leader_blocks(
consumed: u64,
received: u64,
retransmit: &BlobSender,
window: &SharedWindow,
pending_retransmits: &mut bool,
) -> Result<()> {
let mut retransmit_queue = VecDeque::new();
let mut retransmit_queue: VecDeque<SharedBlob> = VecDeque::new();
if let Some(leader) = maybe_leader {
let leader_id = leader.id;
for b in dq {
let p = b.read().expect("'b' read lock in fn recv_window");
//TODO this check isn't safe against adverserial packets
//we need to maintain a sequence window
let leader_id = leader.id;
trace!(
"idx: {} addr: {:?} id: {:?} leader: {:?}",
p.get_index().expect("get_index in fn recv_window"),
p.get_id().expect("get_id in trace! fn recv_window"),
p.meta.addr(),
leader_id
);
if p.get_id().expect("get_id in fn recv_window") == leader_id {
//TODO
//need to copy the retransmitted blob
//otherwise we get into races with which thread
//should do the recycling
//
//a better abstraction would be to recycle when the blob
//is dropped via a weakref to the recycler
let nv = recycler.allocate();
{
let mut mnv = nv.write().expect("recycler write lock in fn recv_window");
let sz = p.meta.size;
mnv.meta.size = sz;
mnv.data[..sz].copy_from_slice(&p.data[..sz]);
add_block_to_retransmit_queue(b, leader_id, recycler, &mut retransmit_queue);
}
if *pending_retransmits {
for w in window
.write()
.expect("Window write failed in retransmit_all_leader_blocks")
.iter_mut()
{
*pending_retransmits = false;
if w.leader_unknown {
if let Some(b) = w.clone().data {
add_block_to_retransmit_queue(
&b,
leader_id,
recycler,
&mut retransmit_queue,
);
w.leader_unknown = false;
}
}
retransmit_queue.push_back(nv);
}
}
} else {
@ -223,6 +265,8 @@ fn process_blob(
window: &SharedWindow,
recycler: &BlobRecycler,
consumed: &mut u64,
leader_unknown: bool,
pending_retransmits: &mut bool,
) {
let mut window = window.write().unwrap();
let w = (pix % WINDOW_SIZE) as usize;
@ -280,6 +324,9 @@ fn process_blob(
return;
}
window[w].leader_unknown = leader_unknown;
*pending_retransmits = true;
#[cfg(feature = "erasure")]
{
if erasure::recover(
@ -346,6 +393,7 @@ fn blob_idx_in_window(debug_id: u64, pix: u64, consumed: u64, received: &mut u64
}
}
#[cfg_attr(feature = "cargo-clippy", allow(too_many_arguments))]
fn recv_window(
debug_id: u64,
window: &SharedWindow,
@ -356,6 +404,7 @@ fn recv_window(
r: &BlobReceiver,
s: &BlobSender,
retransmit: &BlobSender,
pending_retransmits: &mut bool,
) -> Result<()> {
let timer = Duration::from_millis(200);
let mut dq = r.recv_timeout(timer)?;
@ -364,6 +413,7 @@ fn recv_window(
.expect("'crdt' read lock in fn recv_window")
.leader_data()
.cloned();
let leader_unknown = maybe_leader.is_none();
while let Ok(mut nq) = r.try_recv() {
dq.append(&mut nq)
}
@ -385,6 +435,8 @@ fn recv_window(
*consumed,
*received,
retransmit,
window,
pending_retransmits,
)?;
let mut pixs = Vec::new();
@ -412,6 +464,8 @@ fn recv_window(
window,
recycler,
consumed,
leader_unknown,
pending_retransmits,
);
}
if log_enabled!(Level::Trace) {
@ -588,6 +642,7 @@ pub fn window(
let mut last = entry_height;
let mut times = 0;
let debug_id = crdt.read().unwrap().debug_id();
let mut pending_retransmits = false;
trace!("{:x}: RECV_WINDOW started", debug_id);
loop {
if let Err(e) = recv_window(
@ -600,6 +655,7 @@ pub fn window(
&r,
&s,
&retransmit,
&mut pending_retransmits,
) {
match e {
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
@ -785,6 +841,154 @@ mod test {
t_window.join().expect("join");
}
#[test]
pub fn window_send_no_leader_test() {
logger::setup();
let tn = TestNode::new_localhost();
let exit = Arc::new(AtomicBool::new(false));
let crdt_me = Crdt::new(tn.data.clone()).expect("Crdt::new");
let me_id = crdt_me.my_data().id;
let subs = Arc::new(RwLock::new(crdt_me));
let resp_recycler = BlobRecycler::default();
let (s_reader, r_reader) = channel();
let t_receiver = blob_receiver(
exit.clone(),
resp_recycler.clone(),
tn.sockets.gossip,
s_reader,
).unwrap();
let (s_window, _r_window) = channel();
let (s_retransmit, r_retransmit) = channel();
let win = default_window();
let t_window = window(
subs.clone(),
win,
0,
resp_recycler.clone(),
r_reader,
s_window,
s_retransmit,
);
let t_responder = {
let (s_responder, r_responder) = channel();
let t_responder = responder(
"window_send_test",
tn.sockets.replicate,
resp_recycler.clone(),
r_responder,
);
let mut msgs = VecDeque::new();
for v in 0..10 {
let i = 9 - v;
let b = resp_recycler.allocate();
{
let mut w = b.write().unwrap();
w.set_index(i).unwrap();
w.set_id(me_id).unwrap();
assert_eq!(i, w.get_index().unwrap());
w.meta.size = PACKET_DATA_SIZE;
w.meta.set_addr(&tn.data.contact_info.ncp);
}
msgs.push_back(b);
}
s_responder.send(msgs).expect("send");
t_responder
};
assert!(r_retransmit.recv_timeout(Duration::new(3, 0)).is_err());
exit.store(true, Ordering::Relaxed);
t_receiver.join().expect("join");
t_responder.join().expect("join");
t_window.join().expect("join");
}
#[test]
pub fn window_send_late_leader_test() {
logger::setup();
let tn = TestNode::new_localhost();
let exit = Arc::new(AtomicBool::new(false));
let crdt_me = Crdt::new(tn.data.clone()).expect("Crdt::new");
let me_id = crdt_me.my_data().id;
let subs = Arc::new(RwLock::new(crdt_me));
let resp_recycler = BlobRecycler::default();
let (s_reader, r_reader) = channel();
let t_receiver = blob_receiver(
exit.clone(),
resp_recycler.clone(),
tn.sockets.gossip,
s_reader,
).unwrap();
let (s_window, _r_window) = channel();
let (s_retransmit, r_retransmit) = channel();
let win = default_window();
let t_window = window(
subs.clone(),
win,
0,
resp_recycler.clone(),
r_reader,
s_window,
s_retransmit,
);
let t_responder = {
let (s_responder, r_responder) = channel();
let t_responder = responder(
"window_send_test",
tn.sockets.replicate,
resp_recycler.clone(),
r_responder,
);
let mut msgs = VecDeque::new();
for v in 0..10 {
let i = 9 - v;
let b = resp_recycler.allocate();
{
let mut w = b.write().unwrap();
w.set_index(i).unwrap();
w.set_id(me_id).unwrap();
assert_eq!(i, w.get_index().unwrap());
w.meta.size = PACKET_DATA_SIZE;
w.meta.set_addr(&tn.data.contact_info.ncp);
}
msgs.push_back(b);
}
s_responder.send(msgs).expect("send");
assert!(r_retransmit.recv_timeout(Duration::new(3, 0)).is_err());
subs.write().unwrap().set_leader(me_id);
let mut msgs1 = VecDeque::new();
for v in 1..5 {
let i = 9 + v;
let b = resp_recycler.allocate();
{
let mut w = b.write().unwrap();
w.set_index(i).unwrap();
w.set_id(me_id).unwrap();
assert_eq!(i, w.get_index().unwrap());
w.meta.size = PACKET_DATA_SIZE;
w.meta.set_addr(&tn.data.contact_info.ncp);
}
msgs1.push_back(b);
}
s_responder.send(msgs1).expect("send");
t_responder
};
let mut q = r_retransmit.recv().unwrap();
while let Ok(mut nq) = r_retransmit.try_recv() {
q.append(&mut nq);
}
assert!(q.len() > 10);
exit.store(true, Ordering::Relaxed);
t_receiver.join().expect("join");
t_responder.join().expect("join");
t_window.join().expect("join");
}
#[test]
pub fn calculate_highest_lost_blob_index_test() {
assert_eq!(calculate_highest_lost_blob_index(0, 10, 90), 90);

2
tests/multinode.rs Executable file → Normal file
View File

@ -524,7 +524,7 @@ fn test_multi_node_dynamic_network() {
Ok(val) => val
.parse()
.expect(&format!("env var {} is not parse-able as usize", key)),
Err(_) => 100,
Err(_) => 170,
};
let purge_key = "SOLANA_DYNAMIC_NODES_PURGE_LAG";