diff --git a/src/bin/bench-streamer.rs b/src/bin/bench-streamer.rs index d03e90cb9a..7e58d23e0e 100644 --- a/src/bin/bench-streamer.rs +++ b/src/bin/bench-streamer.rs @@ -40,12 +40,7 @@ fn producer(addr: &SocketAddr, recycler: &PacketRecycler, exit: Arc) }) } -fn sink( - recycler: PacketRecycler, - exit: Arc, - rvs: Arc, - r: PacketReceiver, -) -> JoinHandle<()> { +fn sink(exit: Arc, rvs: Arc, r: PacketReceiver) -> JoinHandle<()> { spawn(move || loop { if exit.load(Ordering::Relaxed) { return; @@ -53,7 +48,6 @@ fn sink( let timer = Duration::new(1, 0); if let Ok(msgs) = r.recv_timeout(timer) { rvs.fetch_add(msgs.read().packets.len(), Ordering::Relaxed); - recycler.recycle(msgs, "sink"); } }) } @@ -101,7 +95,7 @@ fn main() -> Result<()> { let rvs = Arc::new(AtomicUsize::new(0)); let sink_threads: Vec<_> = read_channels .into_iter() - .map(|r_reader| sink(pack_recycler.clone(), exit.clone(), rvs.clone(), r_reader)) + .map(|r_reader| sink(exit.clone(), rvs.clone(), r_reader)) .collect(); let start = SystemTime::now(); let start_val = rvs.load(Ordering::Relaxed); diff --git a/src/erasure.rs b/src/erasure.rs index f1bac050ff..0ba8233a54 100644 --- a/src/erasure.rs +++ b/src/erasure.rs @@ -350,13 +350,7 @@ pub fn generate_coding( // true if slot is empty // true if slot is stale (i.e. has the wrong index), old blob is flushed // false if slot has a blob with the right index -fn is_missing( - id: &Pubkey, - idx: u64, - window_slot: &mut Option, - recycler: &BlobRecycler, - c_or_d: &str, -) -> bool { +fn is_missing(id: &Pubkey, idx: u64, window_slot: &mut Option, c_or_d: &str) -> bool { if let Some(blob) = window_slot.take() { let blob_idx = blob.read().get_index().unwrap(); if blob_idx == idx { @@ -372,8 +366,6 @@ fn is_missing( c_or_d, blob_idx, ); - // recycle it - recycler.recycle(blob, "is_missing"); true } } else { @@ -392,7 +384,6 @@ fn find_missing( block_start_idx: u64, block_start: usize, window: &mut [WindowSlot], - recycler: &BlobRecycler, ) -> (usize, usize) { let mut data_missing = 0; let mut coding_missing = 0; @@ -404,11 +395,11 @@ fn find_missing( let idx = (i - block_start) as u64 + block_start_idx; let n = i % window.len(); - if is_missing(id, idx, &mut window[n].data, recycler, "data") { + if is_missing(id, idx, &mut window[n].data, "data") { data_missing += 1; } - if i >= coding_start && is_missing(id, idx, &mut window[n].coding, recycler, "coding") { + if i >= coding_start && is_missing(id, idx, &mut window[n].coding, "coding") { coding_missing += 1; } } @@ -444,8 +435,7 @@ pub fn recover( block_end ); - let (data_missing, coding_missing) = - find_missing(id, block_start_idx, block_start, window, recycler); + let (data_missing, coding_missing) = find_missing(id, block_start_idx, block_start, window); // if we're not missing data, or if we have too much missin but have enough coding if data_missing == 0 { @@ -796,9 +786,6 @@ mod test { } blobs.push(blob); } - for blob in blobs { - blob_recycler.recycle(blob, "pollute_recycler"); - } } #[test] @@ -893,11 +880,6 @@ mod test { // Create a hole in the window let refwindow = window[erase_offset].data.clone(); window[erase_offset].data = None; - - blob_recycler.recycle( - window[erase_offset].coding.clone().unwrap(), - "window_recover_basic", - ); window[erase_offset].coding = None; print_window(&window); diff --git a/src/packet.rs b/src/packet.rs index 1f7ef1821c..c59d4a56c1 100644 --- a/src/packet.rs +++ b/src/packet.rs @@ -460,8 +460,6 @@ mod tests { assert_eq!(m.meta.size, PACKET_DATA_SIZE); assert_eq!(m.meta.addr(), saddr); } - - r.recycle(p, "packet_send_recv"); } #[test] diff --git a/src/recycler.rs b/src/recycler.rs index 2234b70792..f7a738d0bf 100644 --- a/src/recycler.rs +++ b/src/recycler.rs @@ -88,9 +88,6 @@ impl Recycler { landfill: self.landfill.clone(), } } - pub fn recycle(&self, r: Recyclable, _name: &str) { - drop(r) - } } #[cfg(test)] diff --git a/src/request_stage.rs b/src/request_stage.rs index a6b41e2f9e..3afb53689b 100644 --- a/src/request_stage.rs +++ b/src/request_stage.rs @@ -1,7 +1,7 @@ //! The `request_stage` processes thin client Request messages. use bincode::deserialize; -use packet::{to_blobs, BlobRecycler, PacketRecycler, Packets, SharedPackets}; +use packet::{to_blobs, BlobRecycler, Packets, SharedPackets}; use rayon::prelude::*; use request::Request; use request_processor::RequestProcessor; @@ -35,7 +35,6 @@ impl RequestStage { request_processor: &RequestProcessor, packet_receiver: &Receiver, blob_sender: &BlobSender, - packet_recycler: &PacketRecycler, blob_recycler: &BlobRecycler, ) -> Result<()> { let (batch, batch_len) = streamer::recv_batch(packet_receiver)?; @@ -63,7 +62,6 @@ impl RequestStage { //don't wake up the other side if there is nothing blob_sender.send(blobs)?; } - packet_recycler.recycle(msgs, "process_request_packets"); } let total_time_s = timing::duration_as_s(&proc_start.elapsed()); let total_time_ms = timing::duration_as_ms(&proc_start.elapsed()); @@ -81,7 +79,6 @@ impl RequestStage { request_processor: RequestProcessor, packet_receiver: Receiver, ) -> (Self, BlobReceiver) { - let packet_recycler = PacketRecycler::default(); let request_processor = Arc::new(request_processor); let request_processor_ = request_processor.clone(); let blob_recycler = BlobRecycler::default(); @@ -93,7 +90,6 @@ impl RequestStage { &request_processor_, &packet_receiver, &blob_sender, - &packet_recycler, &blob_recycler, ) { match e { diff --git a/src/window.rs b/src/window.rs index 741ddb13bd..4932e18ca1 100644 --- a/src/window.rs +++ b/src/window.rs @@ -33,10 +33,8 @@ impl WindowSlot { } } - fn clear_data(&mut self, recycler: &BlobRecycler) { - if let Some(blob) = self.data.take() { - recycler.recycle(blob, "WindowSlot::clear_data"); - } + fn clear_data(&mut self) { + self.data.take(); } } @@ -51,12 +49,11 @@ pub struct WindowIndex { pub trait WindowUtil { /// Finds available slots, clears them, and returns their indices. - fn clear_slots(&mut self, recycler: &BlobRecycler, consumed: u64, received: u64) -> Vec; + fn clear_slots(&mut self, consumed: u64, received: u64) -> Vec; fn repair( &mut self, crdt: &Arc>, - recycler: &BlobRecycler, id: &Pubkey, times: usize, consumed: u64, @@ -79,7 +76,7 @@ pub trait WindowUtil { } impl WindowUtil for Window { - fn clear_slots(&mut self, recycler: &BlobRecycler, consumed: u64, received: u64) -> Vec { + fn clear_slots(&mut self, consumed: u64, received: u64) -> Vec { (consumed..received) .filter_map(|pix| { let i = (pix % WINDOW_SIZE) as usize; @@ -88,7 +85,7 @@ impl WindowUtil for Window { return None; } } - self[i].clear_data(recycler); + self[i].clear_data(); Some(pix) }).collect() } @@ -96,7 +93,6 @@ impl WindowUtil for Window { fn repair( &mut self, crdt: &Arc>, - recycler: &BlobRecycler, id: &Pubkey, times: usize, consumed: u64, @@ -105,7 +101,7 @@ impl WindowUtil for Window { let num_peers = crdt.read().unwrap().table.len() as u64; let highest_lost = calculate_highest_lost_blob_index(num_peers, consumed, received); - let idxs = self.clear_slots(recycler, consumed, highest_lost); + let idxs = self.clear_slots(consumed, highest_lost); let reqs: Vec<_> = idxs .into_iter() .filter_map(|pix| crdt.read().unwrap().window_index_request(pix).ok()) @@ -177,8 +173,6 @@ impl WindowUtil for Window { /// * `pix` - the index of the blob, corresponds to /// the entry height of this blob /// * `consume_queue` - output, blobs to be rebroadcast are placed here - /// * `recycler` - where to return the blob once processed, also where - /// to return old blobs from the window /// * `consumed` - input/output, the entry-height to which this /// node has populated and rebroadcast entries fn process_blob( @@ -204,12 +198,10 @@ impl WindowUtil for Window { blob: SharedBlob, pix: u64, window_slot: &mut Option, - recycler: &BlobRecycler, c_or_d: &str, ) -> bool { if let Some(old) = mem::replace(window_slot, Some(blob)) { let is_dup = old.read().get_index().unwrap() == pix; - recycler.recycle(old, "insert_blob_is_dup"); trace!( "{}: occupied {} window slot {:}, is_dup: {}", id, @@ -226,9 +218,9 @@ impl WindowUtil for Window { // insert the new blob into the window, overwrite and recycle old (or duplicate) entry let is_duplicate = if is_coding { - insert_blob_is_dup(id, blob, pix, &mut self[w].coding, recycler, "coding") + insert_blob_is_dup(id, blob, pix, &mut self[w].coding, "coding") } else { - insert_blob_is_dup(id, blob, pix, &mut self[w].data, recycler, "data") + insert_blob_is_dup(id, blob, pix, &mut self[w].data, "data") }; if is_duplicate { diff --git a/src/window_service.rs b/src/window_service.rs index 5df080e93e..064884c8c4 100644 --- a/src/window_service.rs +++ b/src/window_service.rs @@ -192,7 +192,6 @@ fn recv_window( pixs.push(pix); if !blob_idx_in_window(&id, pix, *consumed, received) { - recycler.recycle(b, "recv_window"); continue; } @@ -282,7 +281,7 @@ pub fn window_service( } let mut window = window.write().unwrap(); - let reqs = window.repair(&crdt, &recycler, &id, times, consumed, received); + let reqs = window.repair(&crdt, &id, times, consumed, received); for (to, req) in reqs { repair_socket.send_to(&req, to).unwrap_or_else(|e| { info!("{} repair req send_to({}) error {:?}", id, to, e);