Don't clone() Arc before recycling
This might fix an awful bug where the streamer reuses a Blob before the current user is done with it. Recycler should probably assert ref count is one? * Also don't collect() an iterator before iterating over it.
This commit is contained in:
parent
b48a8c0555
commit
4a534d6abb
|
@ -6,6 +6,7 @@ use erasure;
|
||||||
use packet::{Blob, BlobRecycler, PacketRecycler, SharedBlob, SharedPackets, BLOB_SIZE};
|
use packet::{Blob, BlobRecycler, PacketRecycler, SharedBlob, SharedPackets, BLOB_SIZE};
|
||||||
use result::{Error, Result};
|
use result::{Error, Result};
|
||||||
use std::collections::VecDeque;
|
use std::collections::VecDeque;
|
||||||
|
use std::mem;
|
||||||
use std::net::{SocketAddr, UdpSocket};
|
use std::net::{SocketAddr, UdpSocket};
|
||||||
use std::sync::atomic::{AtomicBool, Ordering};
|
use std::sync::atomic::{AtomicBool, Ordering};
|
||||||
use std::sync::mpsc;
|
use std::sync::mpsc;
|
||||||
|
@ -452,10 +453,7 @@ fn broadcast(
|
||||||
|
|
||||||
// We could receive more blobs than window slots so
|
// We could receive more blobs than window slots so
|
||||||
// break them up into window-sized chunks to process
|
// break them up into window-sized chunks to process
|
||||||
let blobs_chunked: Vec<_> = blobs_vec
|
let blobs_chunked = blobs_vec.chunks(WINDOW_SIZE).map(|x| x.to_vec());
|
||||||
.chunks(WINDOW_SIZE)
|
|
||||||
.map(|x| x.to_vec())
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
print_window(window, *receive_index as usize);
|
print_window(window, *receive_index as usize);
|
||||||
|
|
||||||
|
@ -476,17 +474,15 @@ fn broadcast(
|
||||||
for b in &blobs {
|
for b in &blobs {
|
||||||
let ix = b.read().unwrap().get_index().expect("blob index");
|
let ix = b.read().unwrap().get_index().expect("blob index");
|
||||||
let pos = (ix as usize) % WINDOW_SIZE;
|
let pos = (ix as usize) % WINDOW_SIZE;
|
||||||
if let Some(x) = &win[pos] {
|
if let Some(x) = mem::replace(&mut win[pos], None) {
|
||||||
trace!(
|
trace!(
|
||||||
"popped {} at {}",
|
"popped {} at {}",
|
||||||
x.read().unwrap().get_index().unwrap(),
|
x.read().unwrap().get_index().unwrap(),
|
||||||
pos
|
pos
|
||||||
);
|
);
|
||||||
recycler.recycle(x.clone());
|
recycler.recycle(x);
|
||||||
}
|
}
|
||||||
trace!("null {}", pos);
|
trace!("null {}", pos);
|
||||||
win[pos] = None;
|
|
||||||
assert!(win[pos].is_none());
|
|
||||||
}
|
}
|
||||||
while let Some(b) = blobs.pop() {
|
while let Some(b) = blobs.pop() {
|
||||||
let ix = b.read().unwrap().get_index().expect("blob index");
|
let ix = b.read().unwrap().get_index().expect("blob index");
|
||||||
|
|
Loading…
Reference in New Issue