diff --git a/src/crdt.rs b/src/crdt.rs index 862fc7bff..b1880c92e 100644 --- a/src/crdt.rs +++ b/src/crdt.rs @@ -575,13 +575,22 @@ impl Crdt { // transmit them to nodes, starting from a different node let mut orders = Vec::new(); let window_l = window.write().unwrap(); - for i in *transmit_index..received_index { - let is = i as usize; - let k = is % window_l.len(); - assert!(window_l[k].data.is_some()); + let mut br_idx = *transmit_index as usize % broadcast_table.len(); - let pos = is % broadcast_table.len(); - orders.push((window_l[k].data.clone(), &broadcast_table[pos])); + for idx in *transmit_index..received_index { + let w_idx = idx as usize % window_l.len(); + assert!(window_l[w_idx].data.is_some()); + + orders.push((window_l[w_idx].data.clone(), &broadcast_table[br_idx])); + + br_idx += 1; + br_idx %= broadcast_table.len(); + + if window_l[w_idx].coding.is_some() { + orders.push((window_l[w_idx].coding.clone(), &broadcast_table[br_idx])); + br_idx += 1; + br_idx %= broadcast_table.len(); + } } trace!("broadcast orders table {}", orders.len()); diff --git a/src/erasure.rs b/src/erasure.rs index c0466026c..343c275a1 100644 --- a/src/erasure.rs +++ b/src/erasure.rs @@ -1,5 +1,6 @@ // Support erasure coding use packet::{BlobRecycler, SharedBlob, BLOB_HEADER_SIZE}; +use std::cmp; use std::result; use streamer::WindowSlot; @@ -201,20 +202,11 @@ pub fn generate_coding( trace!("data block is null @ {}", n); return Ok(()); } - let data = window[n].data.clone().unwrap(); - { - let data_rl = data.read().unwrap(); - if data_rl.meta.size > max_data_size { - max_data_size = data_rl.meta.size; - } - } - data_blobs.push( - window[n] - .data - .clone() - .expect("'data_blobs' arr in pub fn generate_coding"), - ); + let data = window[n].data.clone().unwrap(); + max_data_size = cmp::max(data.read().unwrap().meta.size, max_data_size); + + data_blobs.push(data); } let mut coding_blobs = Vec::with_capacity(NUM_CODING); diff --git a/src/streamer.rs b/src/streamer.rs index c6062899e..fb38542e3 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -24,7 +24,7 @@ pub type PacketSender = Sender; pub type BlobSender = Sender; pub type BlobReceiver = Receiver; -#[derive(Clone)] +#[derive(Clone, Default)] pub struct WindowSlot { pub data: Option, pub coding: Option, @@ -371,17 +371,37 @@ fn process_blob( } } - // Insert the new blob into the window - // spot should be free because we cleared it above - if window[w].data.is_none() { - window[w].data = Some(b); - } else if let Some(cblob) = &window[w].data { - if cblob.read().unwrap().get_index().unwrap() != pix as u64 { - warn!("{:x}: overrun blob at index {:}", debug_id, w); - } else { - debug!("{:x}: duplicate blob at index {:}", debug_id, w); + let is_coding = { + let blob_r = b.read().expect("blob read lock for flogs streamer::window"); + blob_r.is_coding() + }; + + // insert the new blob into the window if it's coding or data + if is_coding { + // Insert the new blob into the window + // spot should be free because we cleared it above + if window[w].coding.is_none() { + window[w].coding = Some(b); + } else if let Some(blob) = &window[w].coding { + if blob.read().unwrap().get_index().unwrap() != pix as u64 { + warn!("{:x}: overrun blob at index {:}", debug_id, w); + } else { + debug!("{:x}: duplicate blob at index {:}", debug_id, w); + } + } + } else { + if window[w].data.is_none() { + window[w].data = Some(b); + } else if let Some(blob) = &window[w].data { + if blob.read().unwrap().get_index().unwrap() != pix as u64 { + warn!("{:x}: overrun blob at index {:}", debug_id, w); + } else { + debug!("{:x}: duplicate blob at index {:}", debug_id, w); + } } } + + // push all contiguous blobs into consumed queue, increment consumed loop { let k = (*consumed % WINDOW_SIZE) as usize; trace!("k: {} consumed: {}", k, *consumed); @@ -389,43 +409,8 @@ fn process_blob( if window[k].data.is_none() { break; } - let mut is_coding = false; - if let Some(ref cblob) = window[k].data { - let cblob_r = cblob - .read() - .expect("blob read lock for flogs streamer::window"); - if cblob_r.get_index().unwrap() < *consumed { - break; - } - if cblob_r.is_coding() { - is_coding = true; - } - } - if !is_coding { - consume_queue.push_back(window[k].data.clone().expect("clone in fn recv_window")); - *consumed += 1; - } else { - // #[cfg(feature = "erasure")] - // { - // let block_start = *consumed - (*consumed % erasure::NUM_DATA as u64); - // let coding_end = block_start + erasure::NUM_DATA as u64; - // // We've received all this block's data blobs, go and null out the window now - // for j in block_start..*consumed { - // if let Some(b) = mem::replace(&mut window[(j % WINDOW_SIZE) as usize], None) { - // recycler.recycle(b); - // } - // } - // for j in *consumed..coding_end { - // window[(j % WINDOW_SIZE) as usize] = None; - // } - // - // *consumed += erasure::MAX_MISSING as u64; - // debug!( - // "skipping processing coding blob k: {} consumed: {}", - // k, *consumed - // ); - // } - } + consume_queue.push_back(window[k].data.clone().expect("clone in fn recv_window")); + *consumed += 1; } } @@ -550,10 +535,7 @@ fn print_window(debug_id: u64, locked_window: &Window, consumed: u64) { pub fn default_window() -> Window { Arc::new(RwLock::new(vec![ - WindowSlot { - data: None, - coding: None, - }; + WindowSlot::default(); WINDOW_SIZE as usize ])) }