This commit is contained in:
Rob Walker 2018-07-31 16:52:50 -07:00
parent a58df52205
commit 2ece27ee3a
1 changed files with 6 additions and 2 deletions

View File

@ -173,6 +173,7 @@ pub fn blob_receiver(
fn find_next_missing(
window: &SharedWindow,
crdt: &Arc<RwLock<Crdt>>,
recycler: &BlobRecycler,
consumed: u64,
received: u64,
) -> Result<Vec<(SocketAddr, Vec<u8>)>> {
@ -188,6 +189,8 @@ fn find_next_missing(
let blob_idx = blob.read().unwrap().get_index().unwrap();
if blob_idx == pix {
mem::replace(&mut window[i].data, Some(blob));
} else {
recycler.recycle(blob);
}
}
if window[i].data.is_none() {
@ -206,6 +209,7 @@ fn repair_window(
debug_id: u64,
window: &SharedWindow,
crdt: &Arc<RwLock<Crdt>>,
recycler: &BlobRecycler,
last: &mut u64,
times: &mut usize,
consumed: u64,
@ -223,7 +227,7 @@ fn repair_window(
return Ok(());
}
let reqs = find_next_missing(window, crdt, consumed, received)?;
let reqs = find_next_missing(window, crdt, recycler, consumed, received)?;
trace!("{:x}: repair_window missing: {}", debug_id, reqs.len());
if !reqs.is_empty() {
inc_new_counter!("streamer-repair_window-repair", reqs.len());
@ -673,7 +677,7 @@ pub fn window(
}
}
let _ = repair_window(
debug_id, &window, &crdt, &mut last, &mut times, consumed, received,
debug_id, &window, &crdt, &recycler, &mut last, &mut times, consumed, received,
);
}
})