fix broadcast of erasure coding blobs

erasure coding blobs were being counted as window slots, skewing transmit_index

erasure coding blobs were being skipped over for broadcast, because they're
  only generated when the last data blob in an erasure block is added to the
  window.... rewind the index to pick up and broadcast those coding blobs
This commit is contained in:
Rob Walker 2018-07-25 15:02:39 -07:00
parent 297896bc49
commit a6a2a745ae
1 changed files with 65 additions and 9 deletions

View File

@ -17,6 +17,12 @@ use bincode::{deserialize, serialize};
use byteorder::{LittleEndian, ReadBytesExt};
use choose_gossip_peer_strategy::{ChooseGossipPeerStrategy, ChooseWeightedPeerStrategy};
use counter::Counter;
#[cfg(feature = "erasure")]
use erasure;
#[cfg(feature = "erasure")]
use rand::Rng;
use hash::Hash;
use packet::{to_blob, Blob, BlobRecycler, SharedBlob, BLOB_SIZE};
use pnet_datalink as datalink;
@ -556,27 +562,71 @@ impl Crdt {
inc_new_counter!("crdt-broadcast-not_enough_peers_error", 1);
Err(CrdtError::NoPeers)?;
}
trace!("broadcast nodes {}", broadcast_table.len());
trace!(
"{:x} transmit_index: {} received_index: {} broadcast_len: {}",
me.debug_id(),
*transmit_index,
received_index,
broadcast_table.len()
);
// enumerate all the blobs in the window, those are the indices
// transmit them to nodes, starting from a different node
let mut orders = Vec::new();
let mut orders = Vec::with_capacity((received_index - *transmit_index) as usize);
let window_l = window.write().unwrap();
let mut br_idx = *transmit_index as usize % broadcast_table.len();
#[cfg(feature = "erasure")]
let mut coding_index = None;
for idx in *transmit_index..received_index {
let w_idx = idx as usize % window_l.len();
let br_idx = idx as usize % broadcast_table.len();
assert!(window_l[w_idx].data.is_some());
orders.push((window_l[w_idx].data.clone(), &broadcast_table[br_idx]));
trace!(
"{:x} broadcast order data w_idx {} br_idx {}",
me.debug_id(),
w_idx,
br_idx
);
br_idx += 1;
br_idx %= broadcast_table.len();
#[cfg(feature = "erasure")]
{
// remember first place we saw coding
// if we find a coding blob, it means that a full block has been
// erasure coded, and it's safe to rewind to start of the coding
// blob before current idx
if coding_index.is_none() && window_l[w_idx].coding.is_some() {
coding_index = Some(idx - (idx % erasure::NUM_DATA as u64));
}
}
}
// recall how many orders were actual data blobs
let mut data_orders_len = orders.len();
if window_l[w_idx].coding.is_some() {
orders.push((window_l[w_idx].coding.clone(), &broadcast_table[br_idx]));
br_idx += 1;
br_idx %= broadcast_table.len();
#[cfg(feature = "erasure")]
{
// if we have_coding, we've encoded a full erasure block, so rewind a bit...
if let Some(coding_index) = coding_index {
for idx in coding_index..received_index {
let w_idx = idx as usize % window_l.len();
// skip over empty slots
if window_l[w_idx].coding.is_none() {
continue;
}
let br_idx = thread_rng().gen_range(0, broadcast_table.len());
orders.push((window_l[w_idx].coding.clone(), &broadcast_table[br_idx]));
trace!(
"{:x} broadcast order coding w_idx: {} br_idx :{}",
me.debug_id(),
w_idx,
br_idx,
);
}
}
}
@ -610,14 +660,20 @@ impl Crdt {
e
})
.collect();
trace!("broadcast results {}", errs.len());
for e in errs {
if data_orders_len == 0 {
break;
}
if let Err(e) = &e {
error!("broadcast result {:?}", e);
}
e?;
*transmit_index += 1;
data_orders_len -= 1;
}
Ok(())
}