diff --git a/src/crdt.rs b/src/crdt.rs index 1293ceb931..082b66e9c3 100644 --- a/src/crdt.rs +++ b/src/crdt.rs @@ -17,10 +17,6 @@ use bincode::{deserialize, serialize}; use byteorder::{LittleEndian, ReadBytesExt}; use choose_gossip_peer_strategy::{ChooseGossipPeerStrategy, ChooseWeightedPeerStrategy}; use counter::Counter; - -#[cfg(feature = "erasure")] -use erasure; - use hash::Hash; use packet::{to_blob, Blob, BlobRecycler, SharedBlob, BLOB_SIZE}; use pnet_datalink as datalink; @@ -37,7 +33,7 @@ use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::{Arc, RwLock}; use std::thread::{sleep, Builder, JoinHandle}; use std::time::Duration; -use streamer::{BlobReceiver, BlobSender, Window}; +use streamer::{BlobReceiver, BlobSender, Window, WindowIndex}; use timing::timestamp; use transaction::Vote; @@ -552,7 +548,7 @@ impl Crdt { broadcast_table: &[NodeInfo], window: &Window, s: &UdpSocket, - transmit_index: &mut u64, + transmit_index: &mut WindowIndex, received_index: u64, ) -> Result<()> { if broadcast_table.is_empty() { @@ -561,7 +557,7 @@ impl Crdt { Err(CrdtError::NoPeers)?; } trace!( - "{:x} transmit_index: {} received_index: {} broadcast_len: {}", + "{:x} transmit_index: {:?} received_index: {} broadcast_len: {}", me.debug_id(), *transmit_index, received_index, @@ -570,15 +566,12 @@ impl Crdt { // enumerate all the blobs in the window, those are the indices // transmit them to nodes, starting from a different node - let mut orders = Vec::with_capacity((received_index - *transmit_index) as usize); + let mut orders = Vec::with_capacity((received_index - transmit_index.data) as usize); let window_l = window.write().unwrap(); - #[cfg(feature = "erasure")] - let mut coding_index = None; + let mut br_idx = transmit_index.data as usize % broadcast_table.len(); - let mut br_idx = *transmit_index as usize % broadcast_table.len(); - - for idx in *transmit_index..received_index { + for idx in transmit_index.data..received_index { let w_idx = idx as usize % window_l.len(); trace!( @@ -591,46 +584,26 @@ impl Crdt { orders.push((window_l[w_idx].data.clone(), &broadcast_table[br_idx])); br_idx += 1; br_idx %= broadcast_table.len(); - - #[cfg(feature = "erasure")] - { - // remember first place we saw coding - // if we find a coding blob, it means that a full block has been - // erasure coded, and it's safe to rewind to start of the coding - // blob before current idx - if coding_index.is_none() && window_l[w_idx].coding.is_some() { - coding_index = Some(idx - (idx % erasure::NUM_DATA as u64)); - } - } } - // recall how many orders were actual data blobs - let mut data_orders_len = orders.len(); - #[cfg(feature = "erasure")] - { - // if we have_coding, we've encoded a full erasure block, so rewind a bit... - if let Some(coding_index) = coding_index { - for idx in coding_index..received_index { - let w_idx = idx as usize % window_l.len(); + for idx in transmit_index.coding..received_index { + let w_idx = idx as usize % window_l.len(); - // skip over empty slots - if window_l[w_idx].coding.is_none() { - continue; - } - - trace!( - "{:x} broadcast order coding w_idx: {} br_idx :{}", - me.debug_id(), - w_idx, - br_idx, - ); - - orders.push((window_l[w_idx].coding.clone(), &broadcast_table[br_idx])); - - br_idx += 1; - br_idx %= broadcast_table.len(); - } + // skip over empty slots + if window_l[w_idx].coding.is_none() { + continue; } + + trace!( + "{:x} broadcast order coding w_idx: {} br_idx :{}", + me.debug_id(), + w_idx, + br_idx, + ); + + orders.push((window_l[w_idx].coding.clone(), &broadcast_table[br_idx])); + br_idx += 1; + br_idx %= broadcast_table.len(); } trace!("broadcast orders table {}", orders.len()); @@ -666,16 +639,15 @@ impl Crdt { trace!("broadcast results {}", errs.len()); for e in errs { - if data_orders_len == 0 { - break; - } if let Err(e) = &e { - error!("broadcast result {:?}", e); + eprintln!("broadcast result {:?}", e); } e?; - *transmit_index += 1; - data_orders_len -= 1; + if transmit_index.data < received_index { + transmit_index.data += 1; + } } + transmit_index.coding = transmit_index.data; Ok(()) } diff --git a/src/erasure.rs b/src/erasure.rs index b627728d8d..bf034dd081 100644 --- a/src/erasure.rs +++ b/src/erasure.rs @@ -209,9 +209,15 @@ pub fn generate_coding( debug_id: u64, window: &mut [WindowSlot], recycler: &BlobRecycler, - start_idx: usize, + receive_index: u64, num_blobs: usize, + transmit_index_coding: &mut u64, ) -> Result<()> { + // beginning of the coding blobs of the block that receive_index points into + let coding_index_start = + receive_index - (receive_index % NUM_DATA as u64) + (NUM_DATA - NUM_CODING) as u64; + + let start_idx = receive_index as usize % window.len(); let mut block_start = start_idx - (start_idx % NUM_DATA); loop { @@ -256,10 +262,12 @@ pub fn generate_coding( } } + // getting ready to do erasure coding, means that we're potentially going back in time, + // tell our caller we've inserted coding blocks starting at coding_index_start + *transmit_index_coding = cmp::min(*transmit_index_coding, coding_index_start); + let mut coding_blobs = Vec::with_capacity(NUM_CODING); - let coding_start = block_end - NUM_CODING; - for i in coding_start..block_end { let n = i % window.len(); assert!(window[n].coding.is_none()); @@ -613,7 +621,7 @@ mod test { assert!( erasure::generate_coding_blocks( coding_blocks_slices.as_mut_slice(), - v_slices.as_slice() + v_slices.as_slice(), ).is_ok() ); } @@ -797,9 +805,19 @@ mod test { print_window(&window); // Generate the coding blocks + let mut index = (erasure::NUM_DATA + 2) as u64; assert!( - erasure::generate_coding(0, &mut window, &blob_recycler, offset, num_blobs).is_ok() + erasure::generate_coding( + 0, + &mut window, + &blob_recycler, + offset as u64, + num_blobs, + &mut index + ).is_ok() ); + assert_eq!(index, (erasure::NUM_DATA - erasure::NUM_CODING) as u64); + println!("** after-gen-coding:"); print_window(&window); diff --git a/src/streamer.rs b/src/streamer.rs index e2c460bd29..54c819e7de 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -38,6 +38,12 @@ pub enum WindowError { GenericError, } +#[derive(Debug)] +pub struct WindowIndex { + pub data: u64, + pub coding: u64, +} + fn recv_loop( sock: &UdpSocket, exit: &Arc, @@ -674,7 +680,7 @@ fn broadcast( recycler: &BlobRecycler, r: &BlobReceiver, sock: &UdpSocket, - transmit_index: &mut u64, + transmit_index: &mut WindowIndex, receive_index: &mut u64, ) -> Result<()> { let debug_id = node_info.debug_id(); @@ -747,8 +753,9 @@ fn broadcast( debug_id, &mut window.write().unwrap(), recycler, - (*receive_index % WINDOW_SIZE) as usize, + *receive_index, blobs_len, + &mut transmit_index.coding, )?; } @@ -787,7 +794,10 @@ pub fn broadcaster( Builder::new() .name("solana-broadcaster".to_string()) .spawn(move || { - let mut transmit_index = entry_height; + let mut transmit_index = WindowIndex { + data: entry_height, + coding: entry_height, + }; let mut receive_index = entry_height; let me = crdt.read().unwrap().my_data().clone(); loop {