Clear old blobs before putting in the new one

Otherwise we will just warn about overrun and not insert new blob
Also, break if the index we find is less than consumed otherwise
we can infinite loop
This commit is contained in:
Stephen Akridge 2018-06-27 21:11:16 -07:00 committed by Greg Fitzgerald
parent 1c9e7dbc45
commit 8effa4e3e0
1 changed files with 22 additions and 15 deletions

View File

@ -294,17 +294,10 @@ fn recv_window(
trace!("window w: {} size: {}", w, meta_size);
{
let mut window = locked_window.write().unwrap();
if window[w].is_none() {
window[w] = Some(b);
} else if let Some(cblob) = &window[w] {
if cblob.read().unwrap().get_index().unwrap() != pix as u64 {
warn!("overrun blob at index {:}", w);
} else {
debug!("duplicate blob at index {:}", w);
}
}
// recycle old references
for ix in *consumed..pix {
// Search the window for old blobs in the window
// of consumed to received and clear any old ones
for ix in *consumed..(pix + 1) {
let k = (ix % WINDOW_SIZE) as usize;
if let Some(b) = &mut window[k] {
if b.read().unwrap().get_index().unwrap() >= *consumed as u64 {
@ -315,6 +308,18 @@ fn recv_window(
recycler.recycle(b);
}
}
// Insert the new blob into the window
// spot should be free because we cleared it above
if window[w].is_none() {
window[w] = Some(b);
} else if let Some(cblob) = &window[w] {
if cblob.read().unwrap().get_index().unwrap() != pix as u64 {
warn!("overrun blob at index {:}", w);
} else {
debug!("duplicate blob at index {:}", w);
}
}
loop {
let k = (*consumed % WINDOW_SIZE) as usize;
trace!("k: {} consumed: {}", k, *consumed);
@ -324,11 +329,13 @@ fn recv_window(
}
let mut is_coding = false;
if let &Some(ref cblob) = &window[k] {
if cblob
let cblob_r = cblob
.read()
.expect("blob read lock for flags streamer::window")
.is_coding()
{
.expect("blob read lock for flogs streamer::window");
if cblob_r.get_index().unwrap() < *consumed {
break;
}
if cblob_r.is_coding() {
is_coding = true;
}
}