From 4a534d6abb5ee1c3ac8bd6bb34e23b10cec50ba7 Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Mon, 25 Jun 2018 16:50:58 -0600 Subject: [PATCH] Don't clone() Arc before recycling This might fix an awful bug where the streamer reuses a Blob before the current user is done with it. Recycler should probably assert ref count is one? * Also don't collect() an iterator before iterating over it. --- src/streamer.rs | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/src/streamer.rs b/src/streamer.rs index abd25ff4d..958724b82 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -6,6 +6,7 @@ use erasure; use packet::{Blob, BlobRecycler, PacketRecycler, SharedBlob, SharedPackets, BLOB_SIZE}; use result::{Error, Result}; use std::collections::VecDeque; +use std::mem; use std::net::{SocketAddr, UdpSocket}; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc; @@ -452,10 +453,7 @@ fn broadcast( // We could receive more blobs than window slots so // break them up into window-sized chunks to process - let blobs_chunked: Vec<_> = blobs_vec - .chunks(WINDOW_SIZE) - .map(|x| x.to_vec()) - .collect(); + let blobs_chunked = blobs_vec.chunks(WINDOW_SIZE).map(|x| x.to_vec()); print_window(window, *receive_index as usize); @@ -476,17 +474,15 @@ fn broadcast( for b in &blobs { let ix = b.read().unwrap().get_index().expect("blob index"); let pos = (ix as usize) % WINDOW_SIZE; - if let Some(x) = &win[pos] { + if let Some(x) = mem::replace(&mut win[pos], None) { trace!( "popped {} at {}", x.read().unwrap().get_index().unwrap(), pos ); - recycler.recycle(x.clone()); + recycler.recycle(x); } trace!("null {}", pos); - win[pos] = None; - assert!(win[pos].is_none()); } while let Some(b) = blobs.pop() { let ix = b.read().unwrap().get_index().expect("blob index");