Rework to fix coding blob insertion

This commit is contained in:
Stephen Akridge 2018-06-05 09:50:50 -07:00 committed by Greg Fitzgerald
parent ec3569aa39
commit cb16fe84cd
1 changed files with 99 additions and 92 deletions

View File

@ -4,8 +4,8 @@ use packet::{BlobRecycler, SharedBlob, BLOB_HEADER_SIZE};
use std::result;
//TODO(sakridge) pick these values
const NUM_CODED: usize = 3;
const MAX_MISSING: usize = 1;
const NUM_CODED: usize = 20;
const MAX_MISSING: usize = 4;
const NUM_DATA: usize = NUM_CODED - MAX_MISSING;
#[derive(Debug, PartialEq, Eq)]
@ -155,26 +155,29 @@ pub fn decode_blocks(data: &mut [&mut [u8]], coding: &[&[u8]], erasures: &[i32])
// Allocate some coding blobs and insert into the blobs array
pub fn add_coding_blobs(recycler: &BlobRecycler, blobs: &mut Vec<SharedBlob>, consumed: u64) {
let num_data_segments = blobs.len() / NUM_DATA;
info!(
"add_coding num_data: {} blobs.len(): {}",
num_data_segments,
blobs.len()
);
for i in 0..num_data_segments {
let idx = (i * NUM_CODED) + NUM_DATA - (consumed as usize) % NUM_CODED;
for j in idx..idx + MAX_MISSING {
trace!("putting coding at {}", j);
if j <= blobs.len() {
let mut added = 0;
let blobs_len = blobs.len() as u64;
for i in consumed..consumed + blobs_len {
let is = i as usize;
if is != 0 && ((is + MAX_MISSING) % NUM_CODED) == 0 {
for _ in 0..MAX_MISSING {
trace!("putting coding at {}", (i - consumed));
let new_blob = recycler.allocate();
let new_blob_clone = new_blob.clone();
let mut new_blob_l = new_blob_clone.write().unwrap();
new_blob_l.meta.size = new_blob_l.data().len();
drop(new_blob_l);
blobs.insert(j, new_blob);
blobs.insert((i - consumed) as usize, new_blob);
added += 1;
}
}
}
info!(
"add_coding consumed: {} blobs.len(): {} added: {}",
consumed,
blobs.len(),
added
);
}
// Generate coding blocks in window starting from consumed
@ -182,83 +185,86 @@ pub fn generate_coding(window: &mut Vec<Option<SharedBlob>>, consumed: usize, nu
let mut block_start = consumed - (consumed % NUM_CODED);
let num_blocks = num_blobs / NUM_CODED;
for i in consumed..consumed + num_blobs {
if i != 0 && (i % (NUM_CODED - 1)) == 0 {
for _ in 0..num_blocks {
let mut data_blobs = Vec::new();
let mut coding_blobs = Vec::new();
let mut data_locks = Vec::new();
let mut data_ptrs: Vec<&[u8]> = Vec::new();
let mut coding_locks = Vec::new();
let mut coding_ptrs: Vec<&mut [u8]> = Vec::new();
let mut data_blobs = Vec::new();
let mut coding_blobs = Vec::new();
let mut data_locks = Vec::new();
let mut data_ptrs: Vec<&[u8]> = Vec::new();
let mut coding_locks = Vec::new();
let mut coding_ptrs: Vec<&mut [u8]> = Vec::new();
info!(
"generate_coding start: {} end: {}",
block_start,
block_start + NUM_DATA
);
for i in block_start..block_start + NUM_DATA {
let n = i % window.len();
trace!("window[{}] = {:?}", n, window[n]);
if window[n].is_none() {
trace!("data block is null @ {}", n);
return Ok(());
}
data_blobs.push(
window[n]
.clone()
.expect("'data_blobs' arr in pub fn generate_coding"),
info!(
"generate_coding start: {} end: {} consumed: {} num_blobs: {}",
block_start,
block_start + NUM_DATA,
consumed,
num_blobs
);
}
let mut max_data_size = 0;
for b in &data_blobs {
let lck = b.write().expect("'b' write lock in pub fn generate_coding");
if lck.meta.size > max_data_size {
max_data_size = lck.meta.size;
for i in block_start..block_start + NUM_DATA {
let n = i % window.len();
trace!("window[{}] = {:?}", n, window[n]);
if window[n].is_none() {
trace!("data block is null @ {}", n);
return Ok(());
}
data_blobs.push(
window[n]
.clone()
.expect("'data_blobs' arr in pub fn generate_coding"),
);
}
let mut max_data_size = 0;
for b in &data_blobs {
let lck = b.write().expect("'b' write lock in pub fn generate_coding");
if lck.meta.size > max_data_size {
max_data_size = lck.meta.size;
}
data_locks.push(lck);
}
trace!("max_data_size: {}", max_data_size);
for (i, l) in data_locks.iter_mut().enumerate() {
trace!("i: {} data: {}", i, l.data[0]);
data_ptrs.push(&l.data[..max_data_size]);
}
data_locks.push(lck);
}
trace!("max_data_size: {}", max_data_size);
for (i, l) in data_locks.iter_mut().enumerate() {
trace!("i: {} data: {}", i, l.data[0]);
data_ptrs.push(&l.data[..max_data_size]);
}
// generate coding ptr array
let coding_start = block_start + NUM_DATA;
let coding_end = block_start + NUM_CODED;
for i in coding_start..coding_end {
let n = i % window.len();
if window[n].is_none() {
trace!("coding block is null @ {}", n);
return Ok(());
// generate coding ptr array
let coding_start = block_start + NUM_DATA;
let coding_end = block_start + NUM_CODED;
for i in coding_start..coding_end {
let n = i % window.len();
if window[n].is_none() {
trace!("coding block is null @ {}", n);
return Ok(());
}
let w_l = window[n].clone().unwrap();
w_l.write().unwrap().set_size(max_data_size);
if w_l.write().unwrap().set_coding().is_err() {
return Err(ErasureError::EncodeError);
}
coding_blobs.push(
window[n]
.clone()
.expect("'coding_blobs' arr in pub fn generate_coding"),
);
}
let w_l = window[n].clone().unwrap();
w_l.write().unwrap().set_size(max_data_size);
if w_l.write().unwrap().set_coding().is_err() {
return Err(ErasureError::EncodeError);
for b in &coding_blobs {
coding_locks.push(
b.write()
.expect("'coding_locks' arr in pub fn generate_coding"),
);
}
for (i, l) in coding_locks.iter_mut().enumerate() {
trace!("i: {} coding: {} size: {}", i, l.data[0], max_data_size);
coding_ptrs.push(&mut l.data_mut()[..max_data_size]);
}
coding_blobs.push(
window[n]
.clone()
.expect("'coding_blobs' arr in pub fn generate_coding"),
);
}
for b in &coding_blobs {
coding_locks.push(
b.write()
.expect("'coding_locks' arr in pub fn generate_coding"),
);
}
for (i, l) in coding_locks.iter_mut().enumerate() {
trace!("i: {} coding: {} size: {}", i, l.data[0], max_data_size);
coding_ptrs.push(&mut l.data_mut()[..max_data_size]);
}
generate_coding_blocks(coding_ptrs.as_mut_slice(), &data_ptrs)?;
debug!("consumed: {}", consumed);
block_start += NUM_CODED;
generate_coding_blocks(coding_ptrs.as_mut_slice(), &data_ptrs)?;
debug!("consumed: {} data: {}:{} coding: {}:{}", consumed,
block_start, block_start + NUM_DATA, coding_start, coding_end);
block_start += NUM_CODED;
}
}
Ok(())
}
@ -460,8 +466,8 @@ mod test {
blob_recycler: &BlobRecycler,
offset: usize,
num_blobs: usize,
) -> Vec<Option<SharedBlob>> {
let mut window = vec![None; 16];
) -> (Vec<Option<SharedBlob>>, usize) {
let mut window = vec![None; 32];
let mut blobs = Vec::new();
for i in 0..num_blobs {
let b = blob_recycler.allocate();
@ -474,6 +480,7 @@ mod test {
blobs.push(b_);
}
erasure::add_coding_blobs(blob_recycler, &mut blobs, offset as u64);
let blobs_len = blobs.len();
let d = crdt::ReplicatedData::new(
KeyPair::new().pubkey(),
@ -490,7 +497,7 @@ mod test {
let idx = b.read().unwrap().get_index().unwrap() as usize;
window[idx] = Some(b);
}
window
(window, blobs_len)
}
#[test]
@ -502,12 +509,12 @@ mod test {
// Generate a window
let offset = 1;
let num_blobs = erasure::NUM_DATA + 2;
let mut window = generate_window(data_len, &blob_recycler, 0, num_blobs);
let (mut window, blobs_len) = generate_window(data_len, &blob_recycler, 0, num_blobs);
println!("** after-gen-window:");
print_window(&window);
// Generate the coding blocks
assert!(erasure::generate_coding(&mut window, offset, num_blobs).is_ok());
assert!(erasure::generate_coding(&mut window, offset, blobs_len).is_ok());
println!("** after-gen-coding:");
print_window(&window);
@ -517,7 +524,7 @@ mod test {
window[erase_offset] = None;
// Recover it from coding
assert!(erasure::recover(&blob_recycler, &mut window, offset, offset + num_blobs).is_ok());
assert!(erasure::recover(&blob_recycler, &mut window, offset, offset + blobs_len).is_ok());
println!("** after-recover:");
print_window(&window);
@ -546,10 +553,10 @@ mod test {
let offset = 4;
let data_len = 16;
let num_blobs = erasure::NUM_DATA + 2;
let mut window = generate_window(data_len, &blob_recycler, offset, num_blobs);
let (mut window, blobs_len) = generate_window(data_len, &blob_recycler, offset, num_blobs);
println!("** after-gen:");
print_window(&window);
assert!(erasure::generate_coding(&mut window, offset, num_blobs).is_ok());
assert!(erasure::generate_coding(&mut window, offset, blobs_len).is_ok());
println!("** after-coding:");
print_window(&window);
let refwindow = window[offset + 1].clone();
@ -563,7 +570,7 @@ mod test {
window_l0.write().unwrap().data[0] = 55;
println!("** after-nulling:");
print_window(&window);
assert!(erasure::recover(&blob_recycler, &mut window, offset, offset + num_blobs).is_ok());
assert!(erasure::recover(&blob_recycler, &mut window, offset, offset + blobs_len).is_ok());
println!("** after-restore:");
print_window(&window);
let window_l = window[offset + 1].clone().unwrap();