From cb16fe84cdfb19865e19c95073bfaa3f7f1cdb6a Mon Sep 17 00:00:00 2001 From: Stephen Akridge Date: Tue, 5 Jun 2018 09:50:50 -0700 Subject: [PATCH] Rework to fix coding blob insertion --- src/erasure.rs | 191 +++++++++++++++++++++++++------------------------ 1 file changed, 99 insertions(+), 92 deletions(-) diff --git a/src/erasure.rs b/src/erasure.rs index ccdba609d3..af8d56c859 100644 --- a/src/erasure.rs +++ b/src/erasure.rs @@ -4,8 +4,8 @@ use packet::{BlobRecycler, SharedBlob, BLOB_HEADER_SIZE}; use std::result; //TODO(sakridge) pick these values -const NUM_CODED: usize = 3; -const MAX_MISSING: usize = 1; +const NUM_CODED: usize = 20; +const MAX_MISSING: usize = 4; const NUM_DATA: usize = NUM_CODED - MAX_MISSING; #[derive(Debug, PartialEq, Eq)] @@ -155,26 +155,29 @@ pub fn decode_blocks(data: &mut [&mut [u8]], coding: &[&[u8]], erasures: &[i32]) // Allocate some coding blobs and insert into the blobs array pub fn add_coding_blobs(recycler: &BlobRecycler, blobs: &mut Vec, consumed: u64) { - let num_data_segments = blobs.len() / NUM_DATA; - info!( - "add_coding num_data: {} blobs.len(): {}", - num_data_segments, - blobs.len() - ); - for i in 0..num_data_segments { - let idx = (i * NUM_CODED) + NUM_DATA - (consumed as usize) % NUM_CODED; - for j in idx..idx + MAX_MISSING { - trace!("putting coding at {}", j); - if j <= blobs.len() { + let mut added = 0; + let blobs_len = blobs.len() as u64; + for i in consumed..consumed + blobs_len { + let is = i as usize; + if is != 0 && ((is + MAX_MISSING) % NUM_CODED) == 0 { + for _ in 0..MAX_MISSING { + trace!("putting coding at {}", (i - consumed)); let new_blob = recycler.allocate(); let new_blob_clone = new_blob.clone(); let mut new_blob_l = new_blob_clone.write().unwrap(); new_blob_l.meta.size = new_blob_l.data().len(); drop(new_blob_l); - blobs.insert(j, new_blob); + blobs.insert((i - consumed) as usize, new_blob); + added += 1; } } } + info!( + "add_coding consumed: {} blobs.len(): {} added: {}", + consumed, + blobs.len(), + added + ); } // Generate coding blocks in window starting from consumed @@ -182,83 +185,86 @@ pub fn generate_coding(window: &mut Vec>, consumed: usize, nu let mut block_start = consumed - (consumed % NUM_CODED); - let num_blocks = num_blobs / NUM_CODED; + for i in consumed..consumed + num_blobs { + if i != 0 && (i % (NUM_CODED - 1)) == 0 { - for _ in 0..num_blocks { + let mut data_blobs = Vec::new(); + let mut coding_blobs = Vec::new(); + let mut data_locks = Vec::new(); + let mut data_ptrs: Vec<&[u8]> = Vec::new(); + let mut coding_locks = Vec::new(); + let mut coding_ptrs: Vec<&mut [u8]> = Vec::new(); - let mut data_blobs = Vec::new(); - let mut coding_blobs = Vec::new(); - let mut data_locks = Vec::new(); - let mut data_ptrs: Vec<&[u8]> = Vec::new(); - let mut coding_locks = Vec::new(); - let mut coding_ptrs: Vec<&mut [u8]> = Vec::new(); - - info!( - "generate_coding start: {} end: {}", - block_start, - block_start + NUM_DATA - ); - for i in block_start..block_start + NUM_DATA { - let n = i % window.len(); - trace!("window[{}] = {:?}", n, window[n]); - if window[n].is_none() { - trace!("data block is null @ {}", n); - return Ok(()); - } - data_blobs.push( - window[n] - .clone() - .expect("'data_blobs' arr in pub fn generate_coding"), + info!( + "generate_coding start: {} end: {} consumed: {} num_blobs: {}", + block_start, + block_start + NUM_DATA, + consumed, + num_blobs ); - } - let mut max_data_size = 0; - for b in &data_blobs { - let lck = b.write().expect("'b' write lock in pub fn generate_coding"); - if lck.meta.size > max_data_size { - max_data_size = lck.meta.size; + for i in block_start..block_start + NUM_DATA { + let n = i % window.len(); + trace!("window[{}] = {:?}", n, window[n]); + if window[n].is_none() { + trace!("data block is null @ {}", n); + return Ok(()); + } + data_blobs.push( + window[n] + .clone() + .expect("'data_blobs' arr in pub fn generate_coding"), + ); + } + let mut max_data_size = 0; + for b in &data_blobs { + let lck = b.write().expect("'b' write lock in pub fn generate_coding"); + if lck.meta.size > max_data_size { + max_data_size = lck.meta.size; + } + data_locks.push(lck); + } + trace!("max_data_size: {}", max_data_size); + for (i, l) in data_locks.iter_mut().enumerate() { + trace!("i: {} data: {}", i, l.data[0]); + data_ptrs.push(&l.data[..max_data_size]); } - data_locks.push(lck); - } - trace!("max_data_size: {}", max_data_size); - for (i, l) in data_locks.iter_mut().enumerate() { - trace!("i: {} data: {}", i, l.data[0]); - data_ptrs.push(&l.data[..max_data_size]); - } - // generate coding ptr array - let coding_start = block_start + NUM_DATA; - let coding_end = block_start + NUM_CODED; - for i in coding_start..coding_end { - let n = i % window.len(); - if window[n].is_none() { - trace!("coding block is null @ {}", n); - return Ok(()); + // generate coding ptr array + let coding_start = block_start + NUM_DATA; + let coding_end = block_start + NUM_CODED; + for i in coding_start..coding_end { + let n = i % window.len(); + if window[n].is_none() { + trace!("coding block is null @ {}", n); + return Ok(()); + } + let w_l = window[n].clone().unwrap(); + w_l.write().unwrap().set_size(max_data_size); + if w_l.write().unwrap().set_coding().is_err() { + return Err(ErasureError::EncodeError); + } + coding_blobs.push( + window[n] + .clone() + .expect("'coding_blobs' arr in pub fn generate_coding"), + ); } - let w_l = window[n].clone().unwrap(); - w_l.write().unwrap().set_size(max_data_size); - if w_l.write().unwrap().set_coding().is_err() { - return Err(ErasureError::EncodeError); + for b in &coding_blobs { + coding_locks.push( + b.write() + .expect("'coding_locks' arr in pub fn generate_coding"), + ); + } + for (i, l) in coding_locks.iter_mut().enumerate() { + trace!("i: {} coding: {} size: {}", i, l.data[0], max_data_size); + coding_ptrs.push(&mut l.data_mut()[..max_data_size]); } - coding_blobs.push( - window[n] - .clone() - .expect("'coding_blobs' arr in pub fn generate_coding"), - ); - } - for b in &coding_blobs { - coding_locks.push( - b.write() - .expect("'coding_locks' arr in pub fn generate_coding"), - ); - } - for (i, l) in coding_locks.iter_mut().enumerate() { - trace!("i: {} coding: {} size: {}", i, l.data[0], max_data_size); - coding_ptrs.push(&mut l.data_mut()[..max_data_size]); - } - generate_coding_blocks(coding_ptrs.as_mut_slice(), &data_ptrs)?; - debug!("consumed: {}", consumed); - block_start += NUM_CODED; + generate_coding_blocks(coding_ptrs.as_mut_slice(), &data_ptrs)?; + debug!("consumed: {} data: {}:{} coding: {}:{}", consumed, + block_start, block_start + NUM_DATA, coding_start, coding_end); + block_start += NUM_CODED; + } } Ok(()) } @@ -460,8 +466,8 @@ mod test { blob_recycler: &BlobRecycler, offset: usize, num_blobs: usize, - ) -> Vec> { - let mut window = vec![None; 16]; + ) -> (Vec>, usize) { + let mut window = vec![None; 32]; let mut blobs = Vec::new(); for i in 0..num_blobs { let b = blob_recycler.allocate(); @@ -474,6 +480,7 @@ mod test { blobs.push(b_); } erasure::add_coding_blobs(blob_recycler, &mut blobs, offset as u64); + let blobs_len = blobs.len(); let d = crdt::ReplicatedData::new( KeyPair::new().pubkey(), @@ -490,7 +497,7 @@ mod test { let idx = b.read().unwrap().get_index().unwrap() as usize; window[idx] = Some(b); } - window + (window, blobs_len) } #[test] @@ -502,12 +509,12 @@ mod test { // Generate a window let offset = 1; let num_blobs = erasure::NUM_DATA + 2; - let mut window = generate_window(data_len, &blob_recycler, 0, num_blobs); + let (mut window, blobs_len) = generate_window(data_len, &blob_recycler, 0, num_blobs); println!("** after-gen-window:"); print_window(&window); // Generate the coding blocks - assert!(erasure::generate_coding(&mut window, offset, num_blobs).is_ok()); + assert!(erasure::generate_coding(&mut window, offset, blobs_len).is_ok()); println!("** after-gen-coding:"); print_window(&window); @@ -517,7 +524,7 @@ mod test { window[erase_offset] = None; // Recover it from coding - assert!(erasure::recover(&blob_recycler, &mut window, offset, offset + num_blobs).is_ok()); + assert!(erasure::recover(&blob_recycler, &mut window, offset, offset + blobs_len).is_ok()); println!("** after-recover:"); print_window(&window); @@ -546,10 +553,10 @@ mod test { let offset = 4; let data_len = 16; let num_blobs = erasure::NUM_DATA + 2; - let mut window = generate_window(data_len, &blob_recycler, offset, num_blobs); + let (mut window, blobs_len) = generate_window(data_len, &blob_recycler, offset, num_blobs); println!("** after-gen:"); print_window(&window); - assert!(erasure::generate_coding(&mut window, offset, num_blobs).is_ok()); + assert!(erasure::generate_coding(&mut window, offset, blobs_len).is_ok()); println!("** after-coding:"); print_window(&window); let refwindow = window[offset + 1].clone(); @@ -563,7 +570,7 @@ mod test { window_l0.write().unwrap().data[0] = 55; println!("** after-nulling:"); print_window(&window); - assert!(erasure::recover(&blob_recycler, &mut window, offset, offset + num_blobs).is_ok()); + assert!(erasure::recover(&blob_recycler, &mut window, offset, offset + blobs_len).is_ok()); println!("** after-restore:"); print_window(&window); let window_l = window[offset + 1].clone().unwrap();