Generate coding for the current blob set not just the first coding set
This commit is contained in:
parent
d1eaecde9a
commit
5711fb9969
165
src/erasure.rs
165
src/erasure.rs
|
@ -178,80 +178,88 @@ pub fn add_coding_blobs(recycler: &BlobRecycler, blobs: &mut Vec<SharedBlob>, co
|
|||
}
|
||||
|
||||
// Generate coding blocks in window starting from consumed
|
||||
pub fn generate_coding(window: &mut Vec<Option<SharedBlob>>, consumed: usize) -> Result<()> {
|
||||
let mut data_blobs = Vec::new();
|
||||
let mut coding_blobs = Vec::new();
|
||||
let mut data_locks = Vec::new();
|
||||
let mut data_ptrs: Vec<&[u8]> = Vec::new();
|
||||
let mut coding_locks = Vec::new();
|
||||
let mut coding_ptrs: Vec<&mut [u8]> = Vec::new();
|
||||
pub fn generate_coding(window: &mut Vec<Option<SharedBlob>>, consumed: usize, num_blobs: usize) -> Result<()> {
|
||||
|
||||
let block_start = consumed - (consumed % NUM_CODED);
|
||||
info!(
|
||||
"generate_coding start: {} end: {}",
|
||||
block_start,
|
||||
block_start + NUM_DATA
|
||||
);
|
||||
for i in block_start..block_start + NUM_DATA {
|
||||
let n = i % window.len();
|
||||
trace!("window[{}] = {:?}", n, window[n]);
|
||||
if window[n].is_none() {
|
||||
trace!("data block is null @ {}", n);
|
||||
return Ok(());
|
||||
}
|
||||
data_blobs.push(
|
||||
window[n]
|
||||
.clone()
|
||||
.expect("'data_blobs' arr in pub fn generate_coding"),
|
||||
);
|
||||
}
|
||||
let mut max_data_size = 0;
|
||||
for b in &data_blobs {
|
||||
let lck = b.write().expect("'b' write lock in pub fn generate_coding");
|
||||
if lck.meta.size > max_data_size {
|
||||
max_data_size = lck.meta.size;
|
||||
}
|
||||
data_locks.push(lck);
|
||||
}
|
||||
for (i, l) in data_locks.iter_mut().enumerate() {
|
||||
trace!("i: {} data: {}", i, l.data[0]);
|
||||
data_ptrs.push(&l.data()[..max_data_size]);
|
||||
}
|
||||
let mut block_start = consumed - (consumed % NUM_CODED);
|
||||
|
||||
// generate coding ptr array
|
||||
let coding_start = block_start + NUM_DATA;
|
||||
let coding_end = block_start + NUM_CODED;
|
||||
for i in coding_start..coding_end {
|
||||
let n = i % window.len();
|
||||
if window[n].is_none() {
|
||||
trace!("coding block is null @ {}", n);
|
||||
return Ok(());
|
||||
}
|
||||
let w_l = window[n].clone().unwrap();
|
||||
w_l.write().unwrap().meta.size = max_data_size;
|
||||
let flags = w_l.write().unwrap().get_flags().unwrap();
|
||||
if w_l.write().unwrap().set_flags(flags | BLOB_FLAG_IS_CODING).is_err() {
|
||||
return Err(ErasureError::EncodeError);
|
||||
}
|
||||
coding_blobs.push(
|
||||
window[n]
|
||||
.clone()
|
||||
.expect("'coding_blobs' arr in pub fn generate_coding"),
|
||||
);
|
||||
}
|
||||
for b in &coding_blobs {
|
||||
coding_locks.push(
|
||||
b.write()
|
||||
.expect("'coding_locks' arr in pub fn generate_coding"),
|
||||
);
|
||||
}
|
||||
for (i, l) in coding_locks.iter_mut().enumerate() {
|
||||
trace!("i: {} coding: {}", i, l.data[0]);
|
||||
coding_ptrs.push(&mut l.data_mut()[..max_data_size]);
|
||||
}
|
||||
let num_blocks = num_blobs / NUM_CODED;
|
||||
|
||||
generate_coding_blocks(coding_ptrs.as_mut_slice(), &data_ptrs)?;
|
||||
trace!("consumed: {}", consumed);
|
||||
for _ in 0..num_blocks {
|
||||
|
||||
let mut data_blobs = Vec::new();
|
||||
let mut coding_blobs = Vec::new();
|
||||
let mut data_locks = Vec::new();
|
||||
let mut data_ptrs: Vec<&[u8]> = Vec::new();
|
||||
let mut coding_locks = Vec::new();
|
||||
let mut coding_ptrs: Vec<&mut [u8]> = Vec::new();
|
||||
|
||||
info!(
|
||||
"generate_coding start: {} end: {}",
|
||||
block_start,
|
||||
block_start + NUM_DATA
|
||||
);
|
||||
for i in block_start..block_start + NUM_DATA {
|
||||
let n = i % window.len();
|
||||
trace!("window[{}] = {:?}", n, window[n]);
|
||||
if window[n].is_none() {
|
||||
trace!("data block is null @ {}", n);
|
||||
return Ok(());
|
||||
}
|
||||
data_blobs.push(
|
||||
window[n]
|
||||
.clone()
|
||||
.expect("'data_blobs' arr in pub fn generate_coding"),
|
||||
);
|
||||
}
|
||||
let mut max_data_size = 0;
|
||||
for b in &data_blobs {
|
||||
let lck = b.write().expect("'b' write lock in pub fn generate_coding");
|
||||
if lck.meta.size > max_data_size {
|
||||
max_data_size = lck.meta.size;
|
||||
}
|
||||
data_locks.push(lck);
|
||||
}
|
||||
for (i, l) in data_locks.iter_mut().enumerate() {
|
||||
trace!("i: {} data: {}", i, l.data[0]);
|
||||
data_ptrs.push(&l.data()[..max_data_size]);
|
||||
}
|
||||
|
||||
// generate coding ptr array
|
||||
let coding_start = block_start + NUM_DATA;
|
||||
let coding_end = block_start + NUM_CODED;
|
||||
for i in coding_start..coding_end {
|
||||
let n = i % window.len();
|
||||
if window[n].is_none() {
|
||||
trace!("coding block is null @ {}", n);
|
||||
return Ok(());
|
||||
}
|
||||
let w_l = window[n].clone().unwrap();
|
||||
w_l.write().unwrap().meta.size = max_data_size;
|
||||
let flags = w_l.write().unwrap().get_flags().unwrap();
|
||||
if w_l.write().unwrap().set_flags(flags | BLOB_FLAG_IS_CODING).is_err() {
|
||||
return Err(ErasureError::EncodeError);
|
||||
}
|
||||
coding_blobs.push(
|
||||
window[n]
|
||||
.clone()
|
||||
.expect("'coding_blobs' arr in pub fn generate_coding"),
|
||||
);
|
||||
}
|
||||
for b in &coding_blobs {
|
||||
coding_locks.push(
|
||||
b.write()
|
||||
.expect("'coding_locks' arr in pub fn generate_coding"),
|
||||
);
|
||||
}
|
||||
for (i, l) in coding_locks.iter_mut().enumerate() {
|
||||
trace!("i: {} coding: {}", i, l.data[0]);
|
||||
coding_ptrs.push(&mut l.data_mut()[..max_data_size]);
|
||||
}
|
||||
|
||||
generate_coding_blocks(coding_ptrs.as_mut_slice(), &data_ptrs)?;
|
||||
debug!("consumed: {}", consumed);
|
||||
block_start += NUM_CODED;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
@ -416,10 +424,11 @@ mod test {
|
|||
data_len: usize,
|
||||
blob_recycler: &BlobRecycler,
|
||||
offset: usize,
|
||||
num_blobs: usize,
|
||||
) -> Vec<Option<SharedBlob>> {
|
||||
let mut window = vec![None; 16];
|
||||
let mut blobs = Vec::new();
|
||||
for i in 0..erasure::NUM_DATA + 2 {
|
||||
for i in 0..num_blobs {
|
||||
let b = blob_recycler.allocate();
|
||||
let b_ = b.clone();
|
||||
let mut w = b.write().unwrap();
|
||||
|
@ -441,7 +450,7 @@ mod test {
|
|||
let crdt = Arc::new(RwLock::new(crdt::Crdt::new(d.clone())));
|
||||
|
||||
assert!(crdt::Crdt::index_blobs(&crdt, &blobs, &mut (offset as u64)).is_ok());
|
||||
for (i, b) in blobs.into_iter().enumerate() {
|
||||
for b in blobs {
|
||||
let idx = b.read().unwrap().get_index().unwrap() as usize;
|
||||
window[idx] = Some(b);
|
||||
}
|
||||
|
@ -456,12 +465,13 @@ mod test {
|
|||
|
||||
// Generate a window
|
||||
let offset = 1;
|
||||
let mut window = generate_window(data_len, &blob_recycler, 0);
|
||||
let num_blobs = erasure::NUM_DATA + 2;
|
||||
let mut window = generate_window(data_len, &blob_recycler, 0, num_blobs);
|
||||
println!("** after-gen-window:");
|
||||
print_window(&window);
|
||||
|
||||
// Generate the coding blocks
|
||||
assert!(erasure::generate_coding(&mut window, offset).is_ok());
|
||||
assert!(erasure::generate_coding(&mut window, offset, num_blobs).is_ok());
|
||||
println!("** after-gen-coding:");
|
||||
print_window(&window);
|
||||
|
||||
|
@ -494,10 +504,11 @@ mod test {
|
|||
let blob_recycler = BlobRecycler::default();
|
||||
let offset = 4;
|
||||
let data_len = 16;
|
||||
let mut window = generate_window(data_len, &blob_recycler, offset);
|
||||
let num_blobs = erasure::NUM_DATA + 2;
|
||||
let mut window = generate_window(data_len, &blob_recycler, offset, num_blobs);
|
||||
println!("** after-gen:");
|
||||
print_window(&window);
|
||||
assert!(erasure::generate_coding(&mut window, offset).is_ok());
|
||||
assert!(erasure::generate_coding(&mut window, offset, num_blobs).is_ok());
|
||||
println!("** after-coding:");
|
||||
print_window(&window);
|
||||
let refwindow = window[offset + 1].clone();
|
||||
|
|
|
@ -305,6 +305,19 @@ fn recv_window(
|
|||
}
|
||||
}
|
||||
}
|
||||
print_window(locked_window, *consumed);
|
||||
trace!("sending contq.len: {}", contq.len());
|
||||
if !contq.is_empty() {
|
||||
trace!("sending contq.len: {}", contq.len());
|
||||
s.send(contq)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn print_window(
|
||||
locked_window: &Arc<RwLock<Vec<Option<SharedBlob>>>>,
|
||||
consumed: usize,
|
||||
) {
|
||||
{
|
||||
let buf: Vec<_> = locked_window
|
||||
.read()
|
||||
|
@ -312,8 +325,7 @@ fn recv_window(
|
|||
.iter()
|
||||
.enumerate()
|
||||
.map(|(i, v)| {
|
||||
if i == (*consumed % WINDOW_SIZE) {
|
||||
assert!(v.is_none());
|
||||
if i == (consumed % WINDOW_SIZE) {
|
||||
"_"
|
||||
} else if v.is_none() {
|
||||
"0"
|
||||
|
@ -322,14 +334,8 @@ fn recv_window(
|
|||
}
|
||||
})
|
||||
.collect();
|
||||
trace!("WINDOW: {}", buf.join(""));
|
||||
info!("WINDOW ({}): {}", consumed, buf.join(""));
|
||||
}
|
||||
trace!("sending contq.len: {}", contq.len());
|
||||
if !contq.is_empty() {
|
||||
trace!("sending contq.len: {}", contq.len());
|
||||
s.send(contq)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn default_window() -> Arc<RwLock<Vec<Option<SharedBlob>>>> {
|
||||
|
@ -393,7 +399,11 @@ fn broadcast(
|
|||
while let Ok(mut nq) = r.try_recv() {
|
||||
dq.append(&mut nq);
|
||||
}
|
||||
let mut blobs = dq.into_iter().collect();
|
||||
let mut blobs: Vec<_> = dq.into_iter().collect();
|
||||
|
||||
let blobs_len = blobs.len();
|
||||
info!("broadcast blobs.len: {}", blobs_len);
|
||||
print_window(window, *transmit_index as usize);
|
||||
|
||||
// Insert the coding blobs into the blob stream
|
||||
#[cfg(feature = "erasure")]
|
||||
|
@ -431,7 +441,7 @@ fn broadcast(
|
|||
// Fill in the coding blob data from the window data blobs
|
||||
#[cfg(feature = "erasure")]
|
||||
{
|
||||
if erasure::generate_coding(&mut window.write().unwrap(), *transmit_index as usize).is_err()
|
||||
if erasure::generate_coding(&mut window.write().unwrap(), *transmit_index as usize, blobs_len).is_err()
|
||||
{
|
||||
return Err(Error::GenericError);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue