fix broadcast to *always* call erasure generation, simplify generator, test slot reset better (#3764)

This commit is contained in:
Rob Walker 2019-04-14 18:12:37 -07:00 committed by GitHub
parent 542bafeb71
commit dd005fb50e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 88 additions and 71 deletions

1
Cargo.lock generated
View File

@ -2387,7 +2387,6 @@ version = "0.13.0"
dependencies = [
"clap 2.33.0 (registry+https://github.com/rust-lang/crates.io-index)",
"env_logger 0.6.1 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)",
"solana 0.13.0",
"solana-sdk 0.13.0",
]

View File

@ -2760,7 +2760,7 @@ pub mod tests {
assert_eq!(erasure_meta.coding, 0x0);
let mut coding_generator = CodingGenerator::new();
let coding_blobs = coding_generator.next(&shared_blobs[..NUM_DATA]).unwrap();
let coding_blobs = coding_generator.next(&shared_blobs[..NUM_DATA]);
for shared_coding_blob in coding_blobs {
let blob = shared_coding_blob.read().unwrap();
@ -2799,7 +2799,7 @@ pub mod tests {
for (set_index, data_blobs) in data_blobs.chunks_exact(NUM_DATA).enumerate() {
let focused_index = (set_index + 1) * NUM_DATA - 1;
let coding_blobs = coding_generator.next(&data_blobs).unwrap();
let coding_blobs = coding_generator.next(&data_blobs);
assert_eq!(coding_blobs.len(), NUM_CODING);
let deleted_data = data_blobs[NUM_DATA - 1].clone();

View File

@ -119,6 +119,9 @@ impl Broadcast {
blocktree.write_shared_blobs(&blobs)?;
#[cfg(feature = "erasure")]
let coding = self.coding_generator.next(&blobs);
let to_blobs_elapsed = duration_as_ms(&to_blobs_start.elapsed());
let broadcast_start = Instant::now();
@ -126,18 +129,14 @@ impl Broadcast {
// Send out data
ClusterInfo::broadcast(&self.id, contains_last_tick, &broadcast_table, sock, &blobs)?;
#[cfg(feature = "erasure")]
ClusterInfo::broadcast(&self.id, false, &broadcast_table, sock, &coding)?;
inc_new_counter_info!("streamer-broadcast-sent", blobs.len());
// generate and transmit any erasure coding blobs. if erasure isn't supported, just send everything again
#[cfg(not(feature = "erasure"))]
ClusterInfo::broadcast(&self.id, contains_last_tick, &broadcast_table, sock, &blobs)?;
#[cfg(feature = "erasure")]
{
let coding = self.coding_generator.next(&blobs)?;
// send out erasures
ClusterInfo::broadcast(&self.id, false, &broadcast_table, sock, &coding)?;
}
let broadcast_elapsed = duration_as_ms(&broadcast_start.elapsed());

View File

@ -243,7 +243,7 @@ impl CodingGenerator {
}
// must be called with consecutive data blobs from previous invocation
pub fn next(&mut self, next_data: &[SharedBlob]) -> Result<Vec<SharedBlob>> {
pub fn next(&mut self, next_data: &[SharedBlob]) -> Vec<SharedBlob> {
let mut next_coding =
Vec::with_capacity((self.leftover.len() + next_data.len()) / NUM_DATA * NUM_CODING);
@ -291,24 +291,27 @@ impl CodingGenerator {
coding_blob.set_size(max_data_size);
coding_blob.set_coding();
coding_blobs.push(Arc::new(RwLock::new(coding_blob)));
coding_blobs.push(coding_blob);
}
{
let mut coding_locks: Vec<_> =
coding_blobs.iter().map(|b| b.write().unwrap()).collect();
let mut coding_ptrs: Vec<_> = coding_locks
if {
let mut coding_ptrs: Vec<_> = coding_blobs
.iter_mut()
.map(|l| &mut l.data_mut()[..max_data_size])
.map(|blob| &mut blob.data_mut()[..max_data_size])
.collect();
generate_coding_blocks(coding_ptrs.as_mut_slice(), &data_ptrs)?;
generate_coding_blocks(coding_ptrs.as_mut_slice(), &data_ptrs)
}
.is_ok()
{
next_coding.append(&mut coding_blobs);
}
next_coding.append(&mut coding_blobs);
}
Ok(next_coding)
next_coding
.into_iter()
.map(|blob| Arc::new(RwLock::new(blob)))
.collect()
}
}
@ -407,6 +410,54 @@ pub mod test {
assert_eq!(v_orig, vs[0]);
}
fn test_toss_and_recover(
data_blobs: &[SharedBlob],
coding_blobs: &[SharedBlob],
block_start_idx: usize,
) {
let size = coding_blobs[0].read().unwrap().size();
// toss one data and one coding
let erasures: Vec<i32> = vec![0, NUM_DATA as i32, -1];
let mut blobs: Vec<SharedBlob> = Vec::with_capacity(ERASURE_SET_SIZE);
blobs.push(SharedBlob::default()); // empty data, erasure at zero
for blob in &data_blobs[block_start_idx + 1..block_start_idx + NUM_DATA] {
// skip first blob
blobs.push(blob.clone());
}
blobs.push(SharedBlob::default()); // empty coding, erasure at zero
for blob in &coding_blobs[1..NUM_CODING] {
blobs.push(blob.clone());
}
let corrupt = decode_blobs(&blobs, &erasures, size, block_start_idx as u64, 0).unwrap();
assert!(!corrupt);
assert_eq!(
blobs[1].read().unwrap().meta,
data_blobs[block_start_idx + 1].read().unwrap().meta
);
assert_eq!(
blobs[1].read().unwrap().data(),
data_blobs[block_start_idx + 1].read().unwrap().data()
);
assert_eq!(
blobs[0].read().unwrap().meta,
data_blobs[block_start_idx].read().unwrap().meta
);
assert_eq!(
blobs[0].read().unwrap().data(),
data_blobs[block_start_idx].read().unwrap().data()
);
assert_eq!(
blobs[NUM_DATA].read().unwrap().data(),
coding_blobs[0].read().unwrap().data()
);
}
#[test]
fn test_erasure_generate_coding() {
solana_logger::setup();
@ -415,7 +466,7 @@ pub mod test {
let mut coding_generator = CodingGenerator::new();
let blobs = Vec::new();
for _ in 0..NUM_DATA * 2 {
let coding = coding_generator.next(&blobs).unwrap();
let coding = coding_generator.next(&blobs);
assert_eq!(coding.len(), 0);
}
@ -423,55 +474,18 @@ pub mod test {
let data_blobs = generate_test_blobs(0, NUM_DATA * 2);
for (i, blob) in data_blobs.iter().cloned().enumerate() {
let coding = coding_generator.next(&[blob]).unwrap();
if !coding.is_empty() {
let coding_blobs = coding_generator.next(&[blob]);
if !coding_blobs.is_empty() {
assert_eq!(i % NUM_DATA, NUM_DATA - 1);
assert_eq!(coding.len(), NUM_CODING);
assert_eq!(coding_blobs.len(), NUM_CODING);
let size = coding[0].read().unwrap().size();
// toss one data and one coding
let erasures: Vec<i32> = vec![0, NUM_DATA as i32, -1];
let block_start_idx = i - (i % NUM_DATA);
let mut blobs: Vec<SharedBlob> = Vec::with_capacity(ERASURE_SET_SIZE);
blobs.push(SharedBlob::default()); // empty data, erasure at zero
for blob in &data_blobs[block_start_idx + 1..block_start_idx + NUM_DATA] {
// skip first blob
blobs.push(blob.clone());
for j in 0..NUM_CODING {
assert_eq!(
coding_blobs[j].read().unwrap().index(),
((i / NUM_DATA) * NUM_DATA + j) as u64
);
}
blobs.push(SharedBlob::default()); // empty coding, erasure at zero
for blob in &coding[1..NUM_CODING] {
blobs.push(blob.clone());
}
let corrupt =
decode_blobs(&blobs, &erasures, size, block_start_idx as u64, 0).unwrap();
assert!(!corrupt);
assert_eq!(
blobs[1].read().unwrap().meta,
data_blobs[block_start_idx + 1].read().unwrap().meta
);
assert_eq!(
blobs[1].read().unwrap().data(),
data_blobs[block_start_idx + 1].read().unwrap().data()
);
assert_eq!(
blobs[0].read().unwrap().meta,
data_blobs[block_start_idx].read().unwrap().meta
);
assert_eq!(
blobs[0].read().unwrap().data(),
data_blobs[block_start_idx].read().unwrap().data()
);
assert_eq!(
blobs[NUM_DATA].read().unwrap().data(),
coding[0].read().unwrap().data()
);
test_toss_and_recover(&data_blobs, &coding_blobs, i - (i % NUM_DATA));
}
}
}
@ -489,9 +503,14 @@ pub mod test {
data_blobs[i].write().unwrap().set_slot(1);
}
let coding = coding_generator.next(&data_blobs[1..]).unwrap();
let coding_blobs = coding_generator.next(&data_blobs[0..NUM_DATA - 1]);
assert_eq!(coding_blobs.len(), 0);
assert_eq!(coding.len(), NUM_CODING);
let coding_blobs = coding_generator.next(&data_blobs[NUM_DATA..]);
assert_eq!(coding_blobs.len(), NUM_CODING);
test_toss_and_recover(&data_blobs, &coding_blobs, NUM_DATA);
}
#[test]
@ -699,7 +718,7 @@ pub mod test {
);
let mut coding_generator = CodingGenerator::new();
let mut coding_blobs = coding_generator.next(&blobs).unwrap();
let mut coding_blobs = coding_generator.next(&blobs);
blobs.drain(erasure_spec.num_data..);
coding_blobs.drain(erasure_spec.num_coding..);