This commit is contained in:
Rob Walker 2018-07-19 12:28:58 -07:00
parent ddb24ebb61
commit 1eec8bf57f
2 changed files with 53 additions and 37 deletions

View File

@ -303,15 +303,13 @@ pub fn recover(
start: usize,
num_blobs: usize,
) -> Result<()> {
let num_blocks = num_blobs / NUM_DATA;
let num_blocks = (num_blobs / NUM_DATA) + 1;
let mut block_start = start - (start % NUM_DATA);
if num_blocks > 0 {
debug!(
"num_blocks: {} start: {} num_blobs: {} block_start: {}",
num_blocks, start, num_blobs, block_start
);
}
debug!(
"num_blocks: {} start: {} num_blobs: {} block_start: {}",
num_blocks, start, num_blobs, block_start
);
for _ in 0..num_blocks {
let mut data_missing = 0;

View File

@ -298,6 +298,40 @@ fn retransmit_all_leader_blocks(
Ok(())
}
/// make space in window for newly received blobs that come after
/// consumed, before received, clear any old ones
fn reset_slots(
window: &mut [WindowSlot],
recycler: &BlobRecycler,
consumed: u64,
received: u64,
debug_id: u64,
) {
for ix in consumed..received {
let k = (ix % WINDOW_SIZE) as usize;
let mut old = false;
if let Some(b) = &window[k].data {
old = b.read().unwrap().get_index().unwrap() < consumed;
}
if old {
if let Some(b) = mem::replace(&mut window[k].data, None) {
debug!("{:x}: recycling data blob at index {:}", debug_id, k);
recycler.recycle(b);
}
}
if let Some(b) = &window[k].coding {
old = b.read().unwrap().get_index().unwrap() < consumed;
}
if old {
if let Some(b) = mem::replace(&mut window[k].coding, None) {
debug!("{:x}: recycling coding blob at index {:}", debug_id, k);
recycler.recycle(b);
}
}
}
}
/// process a blob: Add blob to the window. If a continuous set of blobs
/// starting from consumed is thereby formed, add that continuous
/// range of blobs to a queue to be sent on to the next stage.
@ -326,30 +360,11 @@ fn process_blob(
) {
let mut window = window.write().unwrap();
// Search the window for old blobs in the window
// of consumed to received and clear any old ones
for ix in *consumed..(received + 1) {
let k = (ix % WINDOW_SIZE) as usize;
let mut old = false;
if let Some(b) = &window[k].data {
old = b.read().unwrap().get_index().unwrap() < *consumed;
}
if old {
if let Some(b) = mem::replace(&mut window[k].data, None) {
debug!("{:x}: recycling data blob at index {:}", debug_id, k);
recycler.recycle(b);
}
}
if let Some(b) = &window[k].coding {
old = b.read().unwrap().get_index().unwrap() < *consumed;
}
if old {
if let Some(b) = mem::replace(&mut window[k].coding, None) {
debug!("{:x}: recycling coding blob at index {:}", debug_id, k);
recycler.recycle(b);
}
}
if pix == received {
// When pix == received, we've *just* updated received, which means
// possibly new slots between consumed and received have been exposed,
// so clean up old blobs between consumed and received
reset_slots(&mut window, recycler, *consumed, received, debug_id);
}
let is_coding = {
@ -405,17 +420,20 @@ fn process_blob(
// }
// push all contiguous blobs into consumed queue, increment consumed
while *consumed < received {
loop {
let k = (*consumed % WINDOW_SIZE) as usize;
trace!("k: {} consumed: {}", k, *consumed);
trace!("k: {} consumed: {} received: {}", k, *consumed, received);
if window[k].data.is_none() {
if let Some(blob) = &window[k].data {
if blob.read().unwrap().get_index().unwrap() < *consumed {
// window wrap-around, end of received
break;
}
} else {
// window[k].data is None, end of received
break;
}
if let Some(blob) = &window[w].data {
assert!(blob.read().unwrap().meta.size < BLOB_SIZE);
}
consume_queue.push_back(window[k].data.clone().expect("clone in fn recv_window"));
*consumed += 1;
}