rework broadcast to understand a separate transmit index for coding blobs

This commit is contained in:
Rob Walker 2018-07-25 20:21:56 -07:00
parent e0cdcb0973
commit cbb8dee360
3 changed files with 63 additions and 63 deletions

View File

@ -17,10 +17,6 @@ use bincode::{deserialize, serialize};
use byteorder::{LittleEndian, ReadBytesExt}; use byteorder::{LittleEndian, ReadBytesExt};
use choose_gossip_peer_strategy::{ChooseGossipPeerStrategy, ChooseWeightedPeerStrategy}; use choose_gossip_peer_strategy::{ChooseGossipPeerStrategy, ChooseWeightedPeerStrategy};
use counter::Counter; use counter::Counter;
#[cfg(feature = "erasure")]
use erasure;
use hash::Hash; use hash::Hash;
use packet::{to_blob, Blob, BlobRecycler, SharedBlob, BLOB_SIZE}; use packet::{to_blob, Blob, BlobRecycler, SharedBlob, BLOB_SIZE};
use pnet_datalink as datalink; use pnet_datalink as datalink;
@ -37,7 +33,7 @@ use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
use std::thread::{sleep, Builder, JoinHandle}; use std::thread::{sleep, Builder, JoinHandle};
use std::time::Duration; use std::time::Duration;
use streamer::{BlobReceiver, BlobSender, Window}; use streamer::{BlobReceiver, BlobSender, Window, WindowIndex};
use timing::timestamp; use timing::timestamp;
use transaction::Vote; use transaction::Vote;
@ -552,7 +548,7 @@ impl Crdt {
broadcast_table: &[NodeInfo], broadcast_table: &[NodeInfo],
window: &Window, window: &Window,
s: &UdpSocket, s: &UdpSocket,
transmit_index: &mut u64, transmit_index: &mut WindowIndex,
received_index: u64, received_index: u64,
) -> Result<()> { ) -> Result<()> {
if broadcast_table.is_empty() { if broadcast_table.is_empty() {
@ -561,7 +557,7 @@ impl Crdt {
Err(CrdtError::NoPeers)?; Err(CrdtError::NoPeers)?;
} }
trace!( trace!(
"{:x} transmit_index: {} received_index: {} broadcast_len: {}", "{:x} transmit_index: {:?} received_index: {} broadcast_len: {}",
me.debug_id(), me.debug_id(),
*transmit_index, *transmit_index,
received_index, received_index,
@ -570,15 +566,12 @@ impl Crdt {
// enumerate all the blobs in the window, those are the indices // enumerate all the blobs in the window, those are the indices
// transmit them to nodes, starting from a different node // transmit them to nodes, starting from a different node
let mut orders = Vec::with_capacity((received_index - *transmit_index) as usize); let mut orders = Vec::with_capacity((received_index - transmit_index.data) as usize);
let window_l = window.write().unwrap(); let window_l = window.write().unwrap();
#[cfg(feature = "erasure")] let mut br_idx = transmit_index.data as usize % broadcast_table.len();
let mut coding_index = None;
let mut br_idx = *transmit_index as usize % broadcast_table.len(); for idx in transmit_index.data..received_index {
for idx in *transmit_index..received_index {
let w_idx = idx as usize % window_l.len(); let w_idx = idx as usize % window_l.len();
trace!( trace!(
@ -591,46 +584,26 @@ impl Crdt {
orders.push((window_l[w_idx].data.clone(), &broadcast_table[br_idx])); orders.push((window_l[w_idx].data.clone(), &broadcast_table[br_idx]));
br_idx += 1; br_idx += 1;
br_idx %= broadcast_table.len(); br_idx %= broadcast_table.len();
#[cfg(feature = "erasure")]
{
// remember first place we saw coding
// if we find a coding blob, it means that a full block has been
// erasure coded, and it's safe to rewind to start of the coding
// blob before current idx
if coding_index.is_none() && window_l[w_idx].coding.is_some() {
coding_index = Some(idx - (idx % erasure::NUM_DATA as u64));
}
}
} }
// recall how many orders were actual data blobs
let mut data_orders_len = orders.len();
#[cfg(feature = "erasure")] for idx in transmit_index.coding..received_index {
{ let w_idx = idx as usize % window_l.len();
// if we have_coding, we've encoded a full erasure block, so rewind a bit...
if let Some(coding_index) = coding_index {
for idx in coding_index..received_index {
let w_idx = idx as usize % window_l.len();
// skip over empty slots // skip over empty slots
if window_l[w_idx].coding.is_none() { if window_l[w_idx].coding.is_none() {
continue; continue;
}
trace!(
"{:x} broadcast order coding w_idx: {} br_idx :{}",
me.debug_id(),
w_idx,
br_idx,
);
orders.push((window_l[w_idx].coding.clone(), &broadcast_table[br_idx]));
br_idx += 1;
br_idx %= broadcast_table.len();
}
} }
trace!(
"{:x} broadcast order coding w_idx: {} br_idx :{}",
me.debug_id(),
w_idx,
br_idx,
);
orders.push((window_l[w_idx].coding.clone(), &broadcast_table[br_idx]));
br_idx += 1;
br_idx %= broadcast_table.len();
} }
trace!("broadcast orders table {}", orders.len()); trace!("broadcast orders table {}", orders.len());
@ -666,16 +639,15 @@ impl Crdt {
trace!("broadcast results {}", errs.len()); trace!("broadcast results {}", errs.len());
for e in errs { for e in errs {
if data_orders_len == 0 {
break;
}
if let Err(e) = &e { if let Err(e) = &e {
error!("broadcast result {:?}", e); eprintln!("broadcast result {:?}", e);
} }
e?; e?;
*transmit_index += 1; if transmit_index.data < received_index {
data_orders_len -= 1; transmit_index.data += 1;
}
} }
transmit_index.coding = transmit_index.data;
Ok(()) Ok(())
} }

View File

@ -209,9 +209,15 @@ pub fn generate_coding(
debug_id: u64, debug_id: u64,
window: &mut [WindowSlot], window: &mut [WindowSlot],
recycler: &BlobRecycler, recycler: &BlobRecycler,
start_idx: usize, receive_index: u64,
num_blobs: usize, num_blobs: usize,
transmit_index_coding: &mut u64,
) -> Result<()> { ) -> Result<()> {
// beginning of the coding blobs of the block that receive_index points into
let coding_index_start =
receive_index - (receive_index % NUM_DATA as u64) + (NUM_DATA - NUM_CODING) as u64;
let start_idx = receive_index as usize % window.len();
let mut block_start = start_idx - (start_idx % NUM_DATA); let mut block_start = start_idx - (start_idx % NUM_DATA);
loop { loop {
@ -256,10 +262,12 @@ pub fn generate_coding(
} }
} }
// getting ready to do erasure coding, means that we're potentially going back in time,
// tell our caller we've inserted coding blocks starting at coding_index_start
*transmit_index_coding = cmp::min(*transmit_index_coding, coding_index_start);
let mut coding_blobs = Vec::with_capacity(NUM_CODING); let mut coding_blobs = Vec::with_capacity(NUM_CODING);
let coding_start = block_end - NUM_CODING; let coding_start = block_end - NUM_CODING;
for i in coding_start..block_end { for i in coding_start..block_end {
let n = i % window.len(); let n = i % window.len();
assert!(window[n].coding.is_none()); assert!(window[n].coding.is_none());
@ -613,7 +621,7 @@ mod test {
assert!( assert!(
erasure::generate_coding_blocks( erasure::generate_coding_blocks(
coding_blocks_slices.as_mut_slice(), coding_blocks_slices.as_mut_slice(),
v_slices.as_slice() v_slices.as_slice(),
).is_ok() ).is_ok()
); );
} }
@ -797,9 +805,19 @@ mod test {
print_window(&window); print_window(&window);
// Generate the coding blocks // Generate the coding blocks
let mut index = (erasure::NUM_DATA + 2) as u64;
assert!( assert!(
erasure::generate_coding(0, &mut window, &blob_recycler, offset, num_blobs).is_ok() erasure::generate_coding(
0,
&mut window,
&blob_recycler,
offset as u64,
num_blobs,
&mut index
).is_ok()
); );
assert_eq!(index, (erasure::NUM_DATA - erasure::NUM_CODING) as u64);
println!("** after-gen-coding:"); println!("** after-gen-coding:");
print_window(&window); print_window(&window);

View File

@ -38,6 +38,12 @@ pub enum WindowError {
GenericError, GenericError,
} }
#[derive(Debug)]
pub struct WindowIndex {
pub data: u64,
pub coding: u64,
}
fn recv_loop( fn recv_loop(
sock: &UdpSocket, sock: &UdpSocket,
exit: &Arc<AtomicBool>, exit: &Arc<AtomicBool>,
@ -674,7 +680,7 @@ fn broadcast(
recycler: &BlobRecycler, recycler: &BlobRecycler,
r: &BlobReceiver, r: &BlobReceiver,
sock: &UdpSocket, sock: &UdpSocket,
transmit_index: &mut u64, transmit_index: &mut WindowIndex,
receive_index: &mut u64, receive_index: &mut u64,
) -> Result<()> { ) -> Result<()> {
let debug_id = node_info.debug_id(); let debug_id = node_info.debug_id();
@ -747,8 +753,9 @@ fn broadcast(
debug_id, debug_id,
&mut window.write().unwrap(), &mut window.write().unwrap(),
recycler, recycler,
(*receive_index % WINDOW_SIZE) as usize, *receive_index,
blobs_len, blobs_len,
&mut transmit_index.coding,
)?; )?;
} }
@ -787,7 +794,10 @@ pub fn broadcaster(
Builder::new() Builder::new()
.name("solana-broadcaster".to_string()) .name("solana-broadcaster".to_string())
.spawn(move || { .spawn(move || {
let mut transmit_index = entry_height; let mut transmit_index = WindowIndex {
data: entry_height,
coding: entry_height,
};
let mut receive_index = entry_height; let mut receive_index = entry_height;
let me = crdt.read().unwrap().my_data().clone(); let me = crdt.read().unwrap().my_data().clone();
loop { loop {