send coding in broadcast(), fixups

This commit is contained in:
Rob Walker 2018-07-18 10:10:34 -07:00
parent 5d20d1ddbf
commit d713e3c2cf
3 changed files with 53 additions and 70 deletions

View File

@ -575,13 +575,22 @@ impl Crdt {
// transmit them to nodes, starting from a different node
let mut orders = Vec::new();
let window_l = window.write().unwrap();
for i in *transmit_index..received_index {
let is = i as usize;
let k = is % window_l.len();
assert!(window_l[k].data.is_some());
let mut br_idx = *transmit_index as usize % broadcast_table.len();
let pos = is % broadcast_table.len();
orders.push((window_l[k].data.clone(), &broadcast_table[pos]));
for idx in *transmit_index..received_index {
let w_idx = idx as usize % window_l.len();
assert!(window_l[w_idx].data.is_some());
orders.push((window_l[w_idx].data.clone(), &broadcast_table[br_idx]));
br_idx += 1;
br_idx %= broadcast_table.len();
if window_l[w_idx].coding.is_some() {
orders.push((window_l[w_idx].coding.clone(), &broadcast_table[br_idx]));
br_idx += 1;
br_idx %= broadcast_table.len();
}
}
trace!("broadcast orders table {}", orders.len());

View File

@ -1,5 +1,6 @@
// Support erasure coding
use packet::{BlobRecycler, SharedBlob, BLOB_HEADER_SIZE};
use std::cmp;
use std::result;
use streamer::WindowSlot;
@ -201,20 +202,11 @@ pub fn generate_coding(
trace!("data block is null @ {}", n);
return Ok(());
}
let data = window[n].data.clone().unwrap();
{
let data_rl = data.read().unwrap();
if data_rl.meta.size > max_data_size {
max_data_size = data_rl.meta.size;
}
}
data_blobs.push(
window[n]
.data
.clone()
.expect("'data_blobs' arr in pub fn generate_coding"),
);
let data = window[n].data.clone().unwrap();
max_data_size = cmp::max(data.read().unwrap().meta.size, max_data_size);
data_blobs.push(data);
}
let mut coding_blobs = Vec::with_capacity(NUM_CODING);

View File

@ -24,7 +24,7 @@ pub type PacketSender = Sender<SharedPackets>;
pub type BlobSender = Sender<SharedBlobs>;
pub type BlobReceiver = Receiver<SharedBlobs>;
#[derive(Clone)]
#[derive(Clone, Default)]
pub struct WindowSlot {
pub data: Option<SharedBlob>,
pub coding: Option<SharedBlob>,
@ -371,17 +371,37 @@ fn process_blob(
}
}
// Insert the new blob into the window
// spot should be free because we cleared it above
if window[w].data.is_none() {
window[w].data = Some(b);
} else if let Some(cblob) = &window[w].data {
if cblob.read().unwrap().get_index().unwrap() != pix as u64 {
warn!("{:x}: overrun blob at index {:}", debug_id, w);
} else {
debug!("{:x}: duplicate blob at index {:}", debug_id, w);
let is_coding = {
let blob_r = b.read().expect("blob read lock for flogs streamer::window");
blob_r.is_coding()
};
// insert the new blob into the window if it's coding or data
if is_coding {
// Insert the new blob into the window
// spot should be free because we cleared it above
if window[w].coding.is_none() {
window[w].coding = Some(b);
} else if let Some(blob) = &window[w].coding {
if blob.read().unwrap().get_index().unwrap() != pix as u64 {
warn!("{:x}: overrun blob at index {:}", debug_id, w);
} else {
debug!("{:x}: duplicate blob at index {:}", debug_id, w);
}
}
} else {
if window[w].data.is_none() {
window[w].data = Some(b);
} else if let Some(blob) = &window[w].data {
if blob.read().unwrap().get_index().unwrap() != pix as u64 {
warn!("{:x}: overrun blob at index {:}", debug_id, w);
} else {
debug!("{:x}: duplicate blob at index {:}", debug_id, w);
}
}
}
// push all contiguous blobs into consumed queue, increment consumed
loop {
let k = (*consumed % WINDOW_SIZE) as usize;
trace!("k: {} consumed: {}", k, *consumed);
@ -389,43 +409,8 @@ fn process_blob(
if window[k].data.is_none() {
break;
}
let mut is_coding = false;
if let Some(ref cblob) = window[k].data {
let cblob_r = cblob
.read()
.expect("blob read lock for flogs streamer::window");
if cblob_r.get_index().unwrap() < *consumed {
break;
}
if cblob_r.is_coding() {
is_coding = true;
}
}
if !is_coding {
consume_queue.push_back(window[k].data.clone().expect("clone in fn recv_window"));
*consumed += 1;
} else {
// #[cfg(feature = "erasure")]
// {
// let block_start = *consumed - (*consumed % erasure::NUM_DATA as u64);
// let coding_end = block_start + erasure::NUM_DATA as u64;
// // We've received all this block's data blobs, go and null out the window now
// for j in block_start..*consumed {
// if let Some(b) = mem::replace(&mut window[(j % WINDOW_SIZE) as usize], None) {
// recycler.recycle(b);
// }
// }
// for j in *consumed..coding_end {
// window[(j % WINDOW_SIZE) as usize] = None;
// }
//
// *consumed += erasure::MAX_MISSING as u64;
// debug!(
// "skipping processing coding blob k: {} consumed: {}",
// k, *consumed
// );
// }
}
consume_queue.push_back(window[k].data.clone().expect("clone in fn recv_window"));
*consumed += 1;
}
}
@ -550,10 +535,7 @@ fn print_window(debug_id: u64, locked_window: &Window, consumed: u64) {
pub fn default_window() -> Window {
Arc::new(RwLock::new(vec![
WindowSlot {
data: None,
coding: None,
};
WindowSlot::default();
WINDOW_SIZE as usize
]))
}