From 99653a4d045c5cf2d4ca4d5b5449f7399d06a996 Mon Sep 17 00:00:00 2001 From: Rob Walker Date: Tue, 17 Jul 2018 13:02:38 -0700 Subject: [PATCH] rework erasure to have data and coding blobs side-by-side in window --- src/erasure.rs | 440 +++++++++++++++++++++++++++---------------------- 1 file changed, 239 insertions(+), 201 deletions(-) diff --git a/src/erasure.rs b/src/erasure.rs index 449361edc..53419b884 100644 --- a/src/erasure.rs +++ b/src/erasure.rs @@ -1,12 +1,12 @@ // Support erasure coding - use packet::{BlobRecycler, SharedBlob, BLOB_HEADER_SIZE}; use std::result; +use streamer::WindowSlot; //TODO(sakridge) pick these values -pub const NUM_CODED: usize = 20; -pub const MAX_MISSING: usize = 4; -const NUM_DATA: usize = NUM_CODED - MAX_MISSING; +pub const NUM_DATA: usize = 16; // number of data blobs +pub const NUM_CODING: usize = 4; // number of coded blobs, also the maximum number that can go missing +pub const ERASURE_SET_SIZE: usize = NUM_DATA + NUM_CODING; // total number of blobs in an erasure set, includes data and coded blobs #[derive(Debug, PartialEq, Eq)] pub enum ErasureError { @@ -67,13 +67,13 @@ pub fn generate_coding_blocks(coding: &mut [&mut [u8]], data: &[&[u8]]) -> Resul if data.len() == 0 { return Ok(()); } + let k = data.len() as i32; let m = coding.len() as i32; - let block_len = data[0].len(); - let matrix: Vec = get_matrix(m, data.len() as i32, ERASURE_W); - let mut coding_arg = Vec::new(); - let mut data_arg = Vec::new(); + let block_len = data[0].len() as i32; + let matrix: Vec = get_matrix(m, k, ERASURE_W); + let mut data_arg = Vec::with_capacity(data.len()); for block in data { - if block_len != block.len() { + if block_len != block.len() as i32 { trace!( "data block size incorrect {} expected {}", block.len(), @@ -83,8 +83,9 @@ pub fn generate_coding_blocks(coding: &mut [&mut [u8]], data: &[&[u8]]) -> Resul } data_arg.push(block.as_ptr()); } + let mut coding_arg = Vec::with_capacity(coding.len()); for mut block in coding { - if block_len != block.len() { + if block_len != block.len() as i32 { trace!( "coding block size incorrect {} expected {}", block.len(), @@ -97,13 +98,13 @@ pub fn generate_coding_blocks(coding: &mut [&mut [u8]], data: &[&[u8]]) -> Resul unsafe { jerasure_matrix_encode( - data.len() as i32, + k, m, ERASURE_W, matrix.as_ptr(), data_arg.as_ptr(), coding_arg.as_ptr(), - data[0].len() as i32, + block_len, ); } Ok(()) @@ -162,49 +163,57 @@ pub fn decode_blocks(data: &mut [&mut [u8]], coding: &[&[u8]], erasures: &[i32]) } // Allocate some coding blobs and insert into the blobs array -pub fn add_coding_blobs(recycler: &BlobRecycler, blobs: &mut Vec, consumed: u64) { - let mut added = 0; - let blobs_len = blobs.len() as u64; - for i in consumed..consumed + blobs_len { - let is = i as usize; - if is != 0 && ((is + MAX_MISSING) % NUM_CODED) == 0 { - for _ in 0..MAX_MISSING { - trace!("putting coding at {}", (i - consumed)); - let new_blob = recycler.allocate(); - let new_blob_clone = new_blob.clone(); - let mut new_blob_l = new_blob_clone.write().unwrap(); - new_blob_l.set_size(0); - new_blob_l.set_coding().unwrap(); - drop(new_blob_l); - blobs.insert((i - consumed) as usize, new_blob); - added += 1; - } - } - } - info!( - "add_coding consumed: {} blobs.len(): {} added: {}", - consumed, - blobs.len(), - added - ); -} +//pub fn add_coding_blobs(recycler: &BlobRecycler, blobs: &mut [WindowSlot], consumed: u64) { +// let mut added = 0; +// let blobs_len = blobs.len() as u64; +// for i in consumed..consumed + blobs_len { +// let is = i as usize; +// if is != 0 && ((is + NUM_CODING) % NUM_DATA) == 0 { +// for _ in 0..NUM_CODING { +// trace!("putting coding at {}", (i - consumed)); +// let new_blob = recycler.allocate(); +// let new_blob_clone = new_blob.clone(); +// let mut new_blob_l = new_blob_clone.write().unwrap(); +// new_blob_l.set_size(0); +// new_blob_l.set_coding().unwrap(); +// drop(new_blob_l); +// blobs.insert((i - consumed) as usize, new_blob); +// added += 1; +// } +// } +// } +// info!( +// "add_coding consumed: {} blobs.len(): {} added: {}", +// consumed, +// blobs.len(), +// added +// ); +//} -// Generate coding blocks in window starting from consumed +// Generate coding blocks in window starting from consumed, +// for each block place the coding blobs at the end of the block +// +// block-size part of a Window, with each element a WindowSlot.. +// |<======================= NUM_DATA ==============================>| +// |<==== NUM_CODING ===>| +// +---+ +---+ +---+ +---+ +---+ +---+ +---+ +---+ +---+ +---+ +// | D | | D | | D | | D | | D | | D | | D | | D | | D | | D | +// +---+ +---+ +---+ +---+ +---+ . . . +---+ +---+ +---+ +---+ +---+ +// | | | | | | | | | | | | | C | | C | | C | | C | +// +---+ +---+ +---+ +---+ +---+ +---+ +---+ +---+ +---+ +---+ pub fn generate_coding( - window: &mut Vec>, + window: &mut [WindowSlot], + recycler: &BlobRecycler, consumed: usize, num_blobs: usize, ) -> Result<()> { - let mut block_start = consumed - (consumed % NUM_CODED); + let mut block_start = consumed - (consumed % NUM_DATA); for i in consumed..consumed + num_blobs { - if (i % NUM_CODED) == (NUM_CODED - 1) { - let mut data_blobs = Vec::new(); - let mut coding_blobs = Vec::new(); - let mut data_locks = Vec::new(); - let mut data_ptrs: Vec<&[u8]> = Vec::new(); - let mut coding_locks = Vec::new(); - let mut coding_ptrs: Vec<&mut [u8]> = Vec::new(); + if (i % NUM_DATA) == (NUM_DATA - 1) { + let mut data_blobs = Vec::with_capacity(NUM_DATA); + let mut data_locks = Vec::with_capacity(NUM_DATA); + let mut data_ptrs: Vec<&[u8]> = Vec::with_capacity(NUM_DATA); info!( "generate_coding start: {} end: {} consumed: {} num_blobs: {}", @@ -215,13 +224,14 @@ pub fn generate_coding( ); for i in block_start..block_start + NUM_DATA { let n = i % window.len(); - trace!("window[{}] = {:?}", n, window[n]); - if window[n].is_none() { + trace!("window[{}] = {:?}", n, window[n].data); + if window[n].data.is_none() { trace!("data block is null @ {}", n); return Ok(()); } data_blobs.push( window[n] + .data .clone() .expect("'data_blobs' arr in pub fn generate_coding"), ); @@ -240,22 +250,33 @@ pub fn generate_coding( data_ptrs.push(&l.data[..max_data_size]); } - // generate coding ptr array - let coding_start = block_start + NUM_DATA; - let coding_end = block_start + NUM_CODED; + let mut coding_blobs = Vec::with_capacity(NUM_CODING); + let mut coding_locks = Vec::with_cpacity(NUM_CODING); + let mut coding_ptrs: Vec<&mut [u8]> = Vec::with_cpacity(NUM_CODING); + + let coding_start = block_start + NUM_DATA - NUM_CODING; + let coding_end = block_start + NUM_DATA; for i in coding_start..coding_end { let n = i % window.len(); - if window[n].is_none() { - trace!("coding block is null @ {}", n); - return Ok(()); + if window[n].coding.is_none() { + window[n].coding = Some(recycler.allocate()); } - let w_l = window[n].clone().unwrap(); + + let w_l = window[n].coding.clone().unwrap(); w_l.write().unwrap().set_size(max_data_size); + w_l.write() + .unwrap() + .set_index(window[n].data.get_index().unwrap()); + w_l.write() + .unwrap() + .set_id(window[n].data.get_id().unwrap()); + if w_l.write().unwrap().set_coding().is_err() { return Err(ErasureError::EncodeError); } coding_blobs.push( window[n] + .coding .clone() .expect("'coding_blobs' arr in pub fn generate_coding"), ); @@ -280,7 +301,7 @@ pub fn generate_coding( coding_start, coding_end ); - block_start += NUM_CODED; + block_start += NUM_DATA; } } Ok(()) @@ -291,8 +312,8 @@ pub fn generate_coding( // to allocate new ones. Returns err if not enough // coding blocks are present to restore pub fn recover( - re: &BlobRecycler, - window: &mut Vec>, + recycler: &BlobRecycler, + window: &mut [WindowSlot], consumed: usize, received: usize, ) -> Result<()> { @@ -300,8 +321,8 @@ pub fn recover( if received <= consumed { return Ok(()); } - let num_blocks = (received - consumed) / NUM_CODED; - let mut block_start = consumed - (consumed % NUM_CODED); + let num_blocks = (received - consumed) / NUM_DATA; + let mut block_start = consumed - (consumed % NUM_DATA); if num_blocks > 0 { debug!( @@ -315,9 +336,9 @@ pub fn recover( break; } let mut data_missing = 0; - let mut coded_missing = 0; - let coding_start = block_start + NUM_DATA; - let coding_end = block_start + NUM_CODED; + let mut coding_missing = 0; + let coding_start = block_start + NUM_DATA - NUM_CODING; + let coding_end = block_start + NUM_DATA; trace!( "recover: block_start: {} coding_start: {} coding_end: {}", block_start, @@ -326,98 +347,113 @@ pub fn recover( ); for i in block_start..coding_end { let n = i % window.len(); - if window[n].is_none() { - if i >= coding_start { - coded_missing += 1; - } else { - data_missing += 1; - } + if window[n].coding.is_none() && i >= coding_start { + coding_missing += 1; + } + if window[n].data.is_none() { + data_missing += 1; } } - if (data_missing + coded_missing) != NUM_CODED && (data_missing + coded_missing) != 0 { + + // if we're not missing data, or if we have too much missin but have enough coding + if data_missing == 0 || (data_missing + coding_missing) > NUM_CODING { debug!( - "1: start: {} recovering: data: {} coding: {}", - block_start, data_missing, coded_missing + "1: start: {} skipping recovery data: {} coding: {}", + block_start, data_missing, coding_missing + ); + block_start += NUM_DATA; + continue; + } + debug!( + "2: recovering: data: {} coding: {}", + data_missing, coding_missing + ); + let mut blobs: Vec = Vec::with_capacity(NUM_DATA + NUM_CODING); + let mut locks = Vec::with_capacity(NUM_DATA + NUM_CODING); + let mut erasures: Vec = Vec::with_capacity(NUM_CODING); + let mut meta = None; + let mut size = None; + + // add the data blobs we have into recovery blob vector + for i in block_start..coding_end { + let j = i % window.len(); + let mut b = &mut window[j]; + if b.data.is_some() { + if meta.is_none() { + let bl = b.data.clone().unwrap(); + meta = Some(bl.read().unwrap().meta.clone()); + } + blobs.push(b.data.clone().expect("'blobs' arr in pb fn recover")); + } else { + let n = recycler.allocate(); + *b.data = Some(n.clone()); + // mark the missing memory + blobs.push(n); + erasures.push((i - block_start) as i32); + } + } + for i in coding_start..coding_end { + let j = i % window.len(); + let mut b = &mut window[j]; + if b.coding.is_some() { + if size.is_none() { + let bl = b.coding.clone().unwrap(); + size = Some(bl.read().unwrap().meta.size - BLOB_HEADER_SIZE); + } + blobs.push(b.coding.clone().expect("'blobs' arr in pb fn recover")); + } else { + let n = recycler.allocate(); + *b = Some(n.clone()); + //mark the missing memory + blobs.push(n); + erasures.push((i - block_start + NUM_DATA) as i32); + } + } + erasures.push(-1); + trace!( + "erasures: {:?} data_size: {} header_size: {}", + erasures, + size.unwrap(), + BLOB_HEADER_SIZE + ); + //lock everything + for b in &blobs { + locks.push(b.write().expect("'locks' arr in pb fn recover")); + } + { + let mut coding_ptrs: Vec<&[u8]> = Vec::with_capacity(NUM_CODING); + let mut data_ptrs: Vec<&mut [u8]> = Vec::with_capacity(NUM_DATA); + for (i, l) in locks.iter_mut().enumerate() { + if i >= NUM_DATA { + trace!("pushing coding: {}", i); + coding_ptrs.push(&l.data()[..size.unwrap()]); + } else { + trace!("pushing data: {}", i); + data_ptrs.push(&mut l.data[..size.unwrap()]); + } + } + trace!( + "coding_ptrs.len: {} data_ptrs.len {}", + coding_ptrs.len(), + data_ptrs.len() + ); + decode_blocks(data_ptrs.as_mut_slice(), &coding_ptrs, &erasures)?; + } + for i in &erasures[..erasures.len() - 1] { + let idx = *i as usize; + let data_size = locks[idx].get_data_size().unwrap() - BLOB_HEADER_SIZE as u64; + locks[idx].meta = meta.clone().unwrap(); + locks[idx].set_size(data_size as usize); + trace!( + "erasures[{}] size: {} data[0]: {}", + *i, + data_size, + locks[idx].data()[0] ); } - if data_missing > 0 { - if (data_missing + coded_missing) <= MAX_MISSING { - debug!( - "2: recovering: data: {} coding: {}", - data_missing, coded_missing - ); - let mut blobs: Vec = Vec::new(); - let mut locks = Vec::new(); - let mut erasures: Vec = Vec::new(); - let mut meta = None; - let mut size = None; - for i in block_start..coding_end { - let j = i % window.len(); - let mut b = &mut window[j]; - if b.is_some() { - if i >= NUM_DATA && size.is_none() { - let bl = b.clone().unwrap(); - size = Some(bl.read().unwrap().meta.size - BLOB_HEADER_SIZE); - } - if meta.is_none() { - let bl = b.clone().unwrap(); - meta = Some(bl.read().unwrap().meta.clone()); - } - blobs.push(b.clone().expect("'blobs' arr in pb fn recover")); - continue; - } - let n = re.allocate(); - *b = Some(n.clone()); - //mark the missing memory - blobs.push(n); - erasures.push((i - block_start) as i32); - } - erasures.push(-1); - trace!( - "erasures: {:?} data_size: {} header_size: {}", - erasures, - size.unwrap(), - BLOB_HEADER_SIZE - ); - //lock everything - for b in &blobs { - locks.push(b.write().expect("'locks' arr in pb fn recover")); - } - { - let mut coding_ptrs: Vec<&[u8]> = Vec::new(); - let mut data_ptrs: Vec<&mut [u8]> = Vec::new(); - for (i, l) in locks.iter_mut().enumerate() { - if i >= NUM_DATA { - trace!("pushing coding: {}", i); - coding_ptrs.push(&l.data()[..size.unwrap()]); - } else { - trace!("pushing data: {}", i); - data_ptrs.push(&mut l.data[..size.unwrap()]); - } - } - trace!( - "coding_ptrs.len: {} data_ptrs.len {}", - coding_ptrs.len(), - data_ptrs.len() - ); - decode_blocks(data_ptrs.as_mut_slice(), &coding_ptrs, &erasures)?; - } - for i in &erasures[..erasures.len() - 1] { - let idx = *i as usize; - let data_size = locks[idx].get_data_size().unwrap() - BLOB_HEADER_SIZE as u64; - locks[idx].meta = meta.clone().unwrap(); - locks[idx].set_size(data_size as usize); - trace!( - "erasures[{}] size: {} data[0]: {}", - *i, - data_size, - locks[idx].data()[0] - ); - } - } - } - block_start += NUM_CODED; + block_start += NUM_DATA; } + Ok(()) } @@ -426,10 +462,11 @@ mod test { use crdt; use erasure; use logger; - use packet::{BlobRecycler, SharedBlob, BLOB_HEADER_SIZE}; + use packet::{BlobRecycler, BLOB_HEADER_SIZE}; use signature::KeyPair; use signature::KeyPairUtil; - use std::sync::{Arc, RwLock}; + // use std::sync::{Arc, RwLock}; + use streamer::WindowSlot; #[test] pub fn test_coding() { @@ -481,11 +518,11 @@ mod test { assert_eq!(v_orig, vs[0]); } - fn print_window(window: &Vec>) { + fn print_window(window: &[WindowSlot]) { for (i, w) in window.iter().enumerate() { print!("window({}): ", i); - if w.is_some() { - let window_l1 = w.clone().unwrap(); + if w.data.is_some() { + let window_l1 = w.data.clone().unwrap(); let window_l2 = window_l1.read().unwrap(); print!( "index: {:?} meta.size: {} data: ", @@ -507,8 +544,11 @@ mod test { blob_recycler: &BlobRecycler, offset: usize, num_blobs: usize, - ) -> (Vec>, usize) { - let mut window = vec![None; 32]; + ) -> [WindowSlot; 32] { + let mut window = [WindowSlot { + data: None, + coding: None, + }; 32]; let mut blobs = Vec::new(); for i in 0..num_blobs { let b = blob_recycler.allocate(); @@ -520,8 +560,6 @@ mod test { } blobs.push(b_); } - erasure::add_coding_blobs(blob_recycler, &mut blobs, offset as u64); - let blobs_len = blobs.len(); let d = crdt::NodeInfo::new( KeyPair::new().pubkey(), @@ -534,9 +572,9 @@ mod test { assert!(crdt::Crdt::index_blobs(&d, &blobs, &mut (offset as u64)).is_ok()); for b in blobs { let idx = b.read().unwrap().get_index().unwrap() as usize; - window[idx] = Some(b); + window[idx].data = Some(b); } - (window, blobs_len) + window } #[test] @@ -548,12 +586,12 @@ mod test { // Generate a window let offset = 1; let num_blobs = erasure::NUM_DATA + 2; - let (mut window, blobs_len) = generate_window(data_len, &blob_recycler, 0, num_blobs); + let mut window = generate_window(data_len, &blob_recycler, 0, num_blobs); println!("** after-gen-window:"); print_window(&window); // Generate the coding blocks - assert!(erasure::generate_coding(&mut window, offset, blobs_len).is_ok()); + assert!(erasure::generate_coding(&mut window, blob_recycler, offset, num_blobs).is_ok()); println!("** after-gen-coding:"); print_window(&window); @@ -563,7 +601,7 @@ mod test { window[erase_offset] = None; // Recover it from coding - assert!(erasure::recover(&blob_recycler, &mut window, offset, offset + blobs_len).is_ok()); + assert!(erasure::recover(&blob_recycler, &mut window, offset, offset + num_blobs).is_ok()); println!("** after-recover:"); print_window(&window); @@ -583,40 +621,40 @@ mod test { assert_eq!(window_l2.get_index().unwrap(), erase_offset as u64); } - //TODO This needs to be reworked - #[test] - #[ignore] - pub fn test_window_recover() { - logger::setup(); - let blob_recycler = BlobRecycler::default(); - let offset = 4; - let data_len = 16; - let num_blobs = erasure::NUM_DATA + 2; - let (mut window, blobs_len) = generate_window(data_len, &blob_recycler, offset, num_blobs); - println!("** after-gen:"); - print_window(&window); - assert!(erasure::generate_coding(&mut window, offset, blobs_len).is_ok()); - println!("** after-coding:"); - print_window(&window); - let refwindow = window[offset + 1].clone(); - window[offset + 1] = None; - window[offset + 2] = None; - window[offset + erasure::NUM_CODED + 3] = None; - window[offset + (2 * erasure::NUM_CODED) + 0] = None; - window[offset + (2 * erasure::NUM_CODED) + 1] = None; - window[offset + (2 * erasure::NUM_CODED) + 2] = None; - let window_l0 = &(window[offset + (3 * erasure::NUM_CODED)]).clone().unwrap(); - window_l0.write().unwrap().data[0] = 55; - println!("** after-nulling:"); - print_window(&window); - assert!(erasure::recover(&blob_recycler, &mut window, offset, offset + blobs_len).is_ok()); - println!("** after-restore:"); - print_window(&window); - let window_l = window[offset + 1].clone().unwrap(); - let ref_l = refwindow.clone().unwrap(); - assert_eq!( - window_l.read().unwrap().data()[..data_len], - ref_l.read().unwrap().data()[..data_len] - ); - } + // //TODO This needs to be reworked + // #[test] + // #[ignore] + // pub fn test_window_recover() { + // logger::setup(); + // let blob_recycler = BlobRecycler::default(); + // let offset = 4; + // let data_len = 16; + // let num_blobs = erasure::NUM_DATA + 2; + // let (mut window, blobs_len) = generate_window(data_len, &blob_recycler, offset, num_blobs); + // println!("** after-gen:"); + // print_window(&window); + // assert!(erasure::generate_coding(&mut window, offset, blobs_len).is_ok()); + // println!("** after-coding:"); + // print_window(&window); + // let refwindow = window[offset + 1].clone(); + // window[offset + 1] = None; + // window[offset + 2] = None; + // window[offset + erasure::SET_SIZE + 3] = None; + // window[offset + (2 * erasure::SET_SIZE) + 0] = None; + // window[offset + (2 * erasure::SET_SIZE) + 1] = None; + // window[offset + (2 * erasure::SET_SIZE) + 2] = None; + // let window_l0 = &(window[offset + (3 * erasure::SET_SIZE)]).clone().unwrap(); + // window_l0.write().unwrap().data[0] = 55; + // println!("** after-nulling:"); + // print_window(&window); + // assert!(erasure::recover(&blob_recycler, &mut window, offset, offset + blobs_len).is_ok()); + // println!("** after-restore:"); + // print_window(&window); + // let window_l = window[offset + 1].clone().unwrap(); + // let ref_l = refwindow.clone().unwrap(); + // assert_eq!( + // window_l.read().unwrap().data()[..data_len], + // ref_l.read().unwrap().data()[..data_len] + // ); + // } }