Chunk blobs into window size to avoid window overrun

Fixes #447
This commit is contained in:
Stephen Akridge 2018-06-25 13:29:53 -07:00 committed by Greg Fitzgerald
parent 1919ec247b
commit b48a8c0555
1 changed files with 58 additions and 49 deletions

View File

@ -446,63 +446,72 @@ fn broadcast(
while let Ok(mut nq) = r.try_recv() {
dq.append(&mut nq);
}
let mut blobs: Vec<_> = dq.into_iter().collect();
// flatten deque to vec
let blobs_vec: Vec<_> = dq.into_iter().collect();
// We could receive more blobs than window slots so
// break them up into window-sized chunks to process
let blobs_chunked: Vec<_> = blobs_vec
.chunks(WINDOW_SIZE)
.map(|x| x.to_vec())
.collect();
print_window(window, *receive_index as usize);
// Insert the coding blobs into the blob stream
#[cfg(feature = "erasure")]
erasure::add_coding_blobs(recycler, &mut blobs, *receive_index);
for mut blobs in blobs_chunked {
// Insert the coding blobs into the blob stream
#[cfg(feature = "erasure")]
erasure::add_coding_blobs(recycler, &mut blobs, *receive_index);
let blobs_len = blobs.len();
info!("broadcast blobs.len: {}", blobs_len);
let blobs_len = blobs.len();
debug!("broadcast blobs.len: {}", blobs_len);
// Index the blobs
Crdt::index_blobs(crdt, &blobs, receive_index)?;
// keep the cache of blobs that are broadcast
{
let mut win = window.write().unwrap();
for b in &blobs {
let ix = b.read().unwrap().get_index().expect("blob index");
let pos = (ix as usize) % WINDOW_SIZE;
if let Some(x) = &win[pos] {
trace!(
"popped {} at {}",
x.read().unwrap().get_index().unwrap(),
pos
);
recycler.recycle(x.clone());
}
trace!("null {}", pos);
win[pos] = None;
assert!(win[pos].is_none());
}
while let Some(b) = blobs.pop() {
let ix = b.read().unwrap().get_index().expect("blob index");
let pos = (ix as usize) % WINDOW_SIZE;
trace!("caching {} at {}", ix, pos);
assert!(win[pos].is_none());
win[pos] = Some(b);
}
}
// Fill in the coding blob data from the window data blobs
#[cfg(feature = "erasure")]
{
if erasure::generate_coding(
&mut window.write().unwrap(),
*receive_index as usize,
blobs_len,
).is_err()
// Index the blobs
Crdt::index_blobs(crdt, &blobs, receive_index)?;
// keep the cache of blobs that are broadcast
{
return Err(Error::GenericError);
let mut win = window.write().unwrap();
assert!(blobs.len() <= win.len());
for b in &blobs {
let ix = b.read().unwrap().get_index().expect("blob index");
let pos = (ix as usize) % WINDOW_SIZE;
if let Some(x) = &win[pos] {
trace!(
"popped {} at {}",
x.read().unwrap().get_index().unwrap(),
pos
);
recycler.recycle(x.clone());
}
trace!("null {}", pos);
win[pos] = None;
assert!(win[pos].is_none());
}
while let Some(b) = blobs.pop() {
let ix = b.read().unwrap().get_index().expect("blob index");
let pos = (ix as usize) % WINDOW_SIZE;
trace!("caching {} at {}", ix, pos);
assert!(win[pos].is_none());
win[pos] = Some(b);
}
}
// Fill in the coding blob data from the window data blobs
#[cfg(feature = "erasure")]
{
erasure::generate_coding(
&mut window.write().unwrap(),
*receive_index as usize,
blobs_len,
).map_err(|_| Error::GenericError)?;
}
*receive_index += blobs_len as u64;
// Send blobs out from the window
Crdt::broadcast(crdt, &window, &sock, transmit_index, *receive_index)?;
}
*receive_index += blobs_len as u64;
// Send blobs out from the window
Crdt::broadcast(crdt, &window, &sock, transmit_index, *receive_index)?;
Ok(())
}