From dd005fb50eddb303cbec62b46740d642fb26381d Mon Sep 17 00:00:00 2001 From: Rob Walker Date: Sun, 14 Apr 2019 18:12:37 -0700 Subject: [PATCH] fix broadcast to *always* call erasure generation, simplify generator, test slot reset better (#3764) --- Cargo.lock | 1 - core/src/blocktree.rs | 4 +- core/src/broadcast_stage.rs | 13 ++-- core/src/erasure.rs | 141 ++++++++++++++++++++---------------- 4 files changed, 88 insertions(+), 71 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 99366ed59..ef2850d06 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2387,7 +2387,6 @@ version = "0.13.0" dependencies = [ "clap 2.33.0 (registry+https://github.com/rust-lang/crates.io-index)", "env_logger 0.6.1 (registry+https://github.com/rust-lang/crates.io-index)", - "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", "solana 0.13.0", "solana-sdk 0.13.0", ] diff --git a/core/src/blocktree.rs b/core/src/blocktree.rs index a2be77cb6..931607183 100644 --- a/core/src/blocktree.rs +++ b/core/src/blocktree.rs @@ -2760,7 +2760,7 @@ pub mod tests { assert_eq!(erasure_meta.coding, 0x0); let mut coding_generator = CodingGenerator::new(); - let coding_blobs = coding_generator.next(&shared_blobs[..NUM_DATA]).unwrap(); + let coding_blobs = coding_generator.next(&shared_blobs[..NUM_DATA]); for shared_coding_blob in coding_blobs { let blob = shared_coding_blob.read().unwrap(); @@ -2799,7 +2799,7 @@ pub mod tests { for (set_index, data_blobs) in data_blobs.chunks_exact(NUM_DATA).enumerate() { let focused_index = (set_index + 1) * NUM_DATA - 1; - let coding_blobs = coding_generator.next(&data_blobs).unwrap(); + let coding_blobs = coding_generator.next(&data_blobs); assert_eq!(coding_blobs.len(), NUM_CODING); let deleted_data = data_blobs[NUM_DATA - 1].clone(); diff --git a/core/src/broadcast_stage.rs b/core/src/broadcast_stage.rs index b1070f6db..a2df995e4 100644 --- a/core/src/broadcast_stage.rs +++ b/core/src/broadcast_stage.rs @@ -119,6 +119,9 @@ impl Broadcast { blocktree.write_shared_blobs(&blobs)?; + #[cfg(feature = "erasure")] + let coding = self.coding_generator.next(&blobs); + let to_blobs_elapsed = duration_as_ms(&to_blobs_start.elapsed()); let broadcast_start = Instant::now(); @@ -126,18 +129,14 @@ impl Broadcast { // Send out data ClusterInfo::broadcast(&self.id, contains_last_tick, &broadcast_table, sock, &blobs)?; + #[cfg(feature = "erasure")] + ClusterInfo::broadcast(&self.id, false, &broadcast_table, sock, &coding)?; + inc_new_counter_info!("streamer-broadcast-sent", blobs.len()); // generate and transmit any erasure coding blobs. if erasure isn't supported, just send everything again #[cfg(not(feature = "erasure"))] ClusterInfo::broadcast(&self.id, contains_last_tick, &broadcast_table, sock, &blobs)?; - #[cfg(feature = "erasure")] - { - let coding = self.coding_generator.next(&blobs)?; - - // send out erasures - ClusterInfo::broadcast(&self.id, false, &broadcast_table, sock, &coding)?; - } let broadcast_elapsed = duration_as_ms(&broadcast_start.elapsed()); diff --git a/core/src/erasure.rs b/core/src/erasure.rs index 7838abb51..e2150f946 100644 --- a/core/src/erasure.rs +++ b/core/src/erasure.rs @@ -243,7 +243,7 @@ impl CodingGenerator { } // must be called with consecutive data blobs from previous invocation - pub fn next(&mut self, next_data: &[SharedBlob]) -> Result> { + pub fn next(&mut self, next_data: &[SharedBlob]) -> Vec { let mut next_coding = Vec::with_capacity((self.leftover.len() + next_data.len()) / NUM_DATA * NUM_CODING); @@ -291,24 +291,27 @@ impl CodingGenerator { coding_blob.set_size(max_data_size); coding_blob.set_coding(); - coding_blobs.push(Arc::new(RwLock::new(coding_blob))); + coding_blobs.push(coding_blob); } - { - let mut coding_locks: Vec<_> = - coding_blobs.iter().map(|b| b.write().unwrap()).collect(); - - let mut coding_ptrs: Vec<_> = coding_locks + if { + let mut coding_ptrs: Vec<_> = coding_blobs .iter_mut() - .map(|l| &mut l.data_mut()[..max_data_size]) + .map(|blob| &mut blob.data_mut()[..max_data_size]) .collect(); - generate_coding_blocks(coding_ptrs.as_mut_slice(), &data_ptrs)?; + generate_coding_blocks(coding_ptrs.as_mut_slice(), &data_ptrs) + } + .is_ok() + { + next_coding.append(&mut coding_blobs); } - next_coding.append(&mut coding_blobs); } - Ok(next_coding) + next_coding + .into_iter() + .map(|blob| Arc::new(RwLock::new(blob))) + .collect() } } @@ -407,6 +410,54 @@ pub mod test { assert_eq!(v_orig, vs[0]); } + fn test_toss_and_recover( + data_blobs: &[SharedBlob], + coding_blobs: &[SharedBlob], + block_start_idx: usize, + ) { + let size = coding_blobs[0].read().unwrap().size(); + + // toss one data and one coding + let erasures: Vec = vec![0, NUM_DATA as i32, -1]; + + let mut blobs: Vec = Vec::with_capacity(ERASURE_SET_SIZE); + + blobs.push(SharedBlob::default()); // empty data, erasure at zero + for blob in &data_blobs[block_start_idx + 1..block_start_idx + NUM_DATA] { + // skip first blob + blobs.push(blob.clone()); + } + blobs.push(SharedBlob::default()); // empty coding, erasure at zero + for blob in &coding_blobs[1..NUM_CODING] { + blobs.push(blob.clone()); + } + + let corrupt = decode_blobs(&blobs, &erasures, size, block_start_idx as u64, 0).unwrap(); + + assert!(!corrupt); + + assert_eq!( + blobs[1].read().unwrap().meta, + data_blobs[block_start_idx + 1].read().unwrap().meta + ); + assert_eq!( + blobs[1].read().unwrap().data(), + data_blobs[block_start_idx + 1].read().unwrap().data() + ); + assert_eq!( + blobs[0].read().unwrap().meta, + data_blobs[block_start_idx].read().unwrap().meta + ); + assert_eq!( + blobs[0].read().unwrap().data(), + data_blobs[block_start_idx].read().unwrap().data() + ); + assert_eq!( + blobs[NUM_DATA].read().unwrap().data(), + coding_blobs[0].read().unwrap().data() + ); + } + #[test] fn test_erasure_generate_coding() { solana_logger::setup(); @@ -415,7 +466,7 @@ pub mod test { let mut coding_generator = CodingGenerator::new(); let blobs = Vec::new(); for _ in 0..NUM_DATA * 2 { - let coding = coding_generator.next(&blobs).unwrap(); + let coding = coding_generator.next(&blobs); assert_eq!(coding.len(), 0); } @@ -423,55 +474,18 @@ pub mod test { let data_blobs = generate_test_blobs(0, NUM_DATA * 2); for (i, blob) in data_blobs.iter().cloned().enumerate() { - let coding = coding_generator.next(&[blob]).unwrap(); - - if !coding.is_empty() { + let coding_blobs = coding_generator.next(&[blob]); + if !coding_blobs.is_empty() { assert_eq!(i % NUM_DATA, NUM_DATA - 1); - assert_eq!(coding.len(), NUM_CODING); + assert_eq!(coding_blobs.len(), NUM_CODING); - let size = coding[0].read().unwrap().size(); - - // toss one data and one coding - let erasures: Vec = vec![0, NUM_DATA as i32, -1]; - - let block_start_idx = i - (i % NUM_DATA); - let mut blobs: Vec = Vec::with_capacity(ERASURE_SET_SIZE); - - blobs.push(SharedBlob::default()); // empty data, erasure at zero - for blob in &data_blobs[block_start_idx + 1..block_start_idx + NUM_DATA] { - // skip first blob - blobs.push(blob.clone()); + for j in 0..NUM_CODING { + assert_eq!( + coding_blobs[j].read().unwrap().index(), + ((i / NUM_DATA) * NUM_DATA + j) as u64 + ); } - blobs.push(SharedBlob::default()); // empty coding, erasure at zero - for blob in &coding[1..NUM_CODING] { - blobs.push(blob.clone()); - } - - let corrupt = - decode_blobs(&blobs, &erasures, size, block_start_idx as u64, 0).unwrap(); - - assert!(!corrupt); - - assert_eq!( - blobs[1].read().unwrap().meta, - data_blobs[block_start_idx + 1].read().unwrap().meta - ); - assert_eq!( - blobs[1].read().unwrap().data(), - data_blobs[block_start_idx + 1].read().unwrap().data() - ); - assert_eq!( - blobs[0].read().unwrap().meta, - data_blobs[block_start_idx].read().unwrap().meta - ); - assert_eq!( - blobs[0].read().unwrap().data(), - data_blobs[block_start_idx].read().unwrap().data() - ); - assert_eq!( - blobs[NUM_DATA].read().unwrap().data(), - coding[0].read().unwrap().data() - ); + test_toss_and_recover(&data_blobs, &coding_blobs, i - (i % NUM_DATA)); } } } @@ -489,9 +503,14 @@ pub mod test { data_blobs[i].write().unwrap().set_slot(1); } - let coding = coding_generator.next(&data_blobs[1..]).unwrap(); + let coding_blobs = coding_generator.next(&data_blobs[0..NUM_DATA - 1]); + assert_eq!(coding_blobs.len(), 0); - assert_eq!(coding.len(), NUM_CODING); + let coding_blobs = coding_generator.next(&data_blobs[NUM_DATA..]); + + assert_eq!(coding_blobs.len(), NUM_CODING); + + test_toss_and_recover(&data_blobs, &coding_blobs, NUM_DATA); } #[test] @@ -699,7 +718,7 @@ pub mod test { ); let mut coding_generator = CodingGenerator::new(); - let mut coding_blobs = coding_generator.next(&blobs).unwrap(); + let mut coding_blobs = coding_generator.next(&blobs); blobs.drain(erasure_spec.num_data..); coding_blobs.drain(erasure_spec.num_coding..);