From 226d3b94715db30af2498c9fbdf6c9e70b1e0885 Mon Sep 17 00:00:00 2001 From: Rob Walker Date: Wed, 5 Sep 2018 05:07:02 +0900 Subject: [PATCH] Trace recycle() calls (#968) * trace recycle() calls fixes #810 --- src/banking_stage.rs | 2 +- src/bin/bench-streamer.rs | 2 +- src/broadcast_stage.rs | 4 ++-- src/crdt.rs | 4 ++-- src/erasure.rs | 9 ++++++--- src/packet.rs | 30 ++++++++++++++++++------------ src/replicate_stage.rs | 2 +- src/request_stage.rs | 2 +- src/retransmit_stage.rs | 2 +- src/streamer.rs | 2 +- src/window.rs | 6 +++--- 11 files changed, 37 insertions(+), 28 deletions(-) diff --git a/src/banking_stage.rs b/src/banking_stage.rs index 6bb46e91c8..8fa9bfb0c8 100644 --- a/src/banking_stage.rs +++ b/src/banking_stage.rs @@ -114,7 +114,7 @@ impl BankingStage { signal_sender.send(Signal::Transactions(transactions))?; debug!("done process_transactions"); - packet_recycler.recycle(msgs); + packet_recycler.recycle(msgs, "process_transactions"); } let total_time_s = timing::duration_as_s(&proc_start.elapsed()); let total_time_ms = timing::duration_as_ms(&proc_start.elapsed()); diff --git a/src/bin/bench-streamer.rs b/src/bin/bench-streamer.rs index b5c43793b1..d5d8c9b5f9 100644 --- a/src/bin/bench-streamer.rs +++ b/src/bin/bench-streamer.rs @@ -49,7 +49,7 @@ fn sink( let timer = Duration::new(1, 0); if let Ok(msgs) = r.recv_timeout(timer) { rvs.fetch_add(msgs.read().unwrap().packets.len(), Ordering::Relaxed); - recycler.recycle(msgs); + recycler.recycle(msgs, "sink"); } }) } diff --git a/src/broadcast_stage.rs b/src/broadcast_stage.rs index 1057f72558..97a599c630 100644 --- a/src/broadcast_stage.rs +++ b/src/broadcast_stage.rs @@ -69,7 +69,7 @@ fn broadcast( x.read().unwrap().get_index().unwrap(), pos ); - recycler.recycle(x); + recycler.recycle(x, "broadcast-data"); } if let Some(x) = mem::replace(&mut win[pos].coding, None) { trace!( @@ -78,7 +78,7 @@ fn broadcast( x.read().unwrap().get_index().unwrap(), pos ); - recycler.recycle(x); + recycler.recycle(x, "broadcast-coding"); } trace!("{:x} null {}", debug_id, pos); diff --git a/src/crdt.rs b/src/crdt.rs index d30fd64cfb..d4ad10ec1d 100644 --- a/src/crdt.rs +++ b/src/crdt.rs @@ -586,7 +586,7 @@ impl Crdt { trace!("broadcast results {}", errs.len()); for e in errs { if let Err(e) = &e { - eprintln!("broadcast result {:?}", e); + trace!("broadcast result {:?}", e); } e?; if transmit_index.data < received_index { @@ -1196,7 +1196,7 @@ impl Crdt { ) { resps.push(resp); } - blob_recycler.recycle(req); + blob_recycler.recycle(req, "run_listen"); } response_sender.send(resps)?; Ok(()) diff --git a/src/erasure.rs b/src/erasure.rs index abca544eac..4127060358 100644 --- a/src/erasure.rs +++ b/src/erasure.rs @@ -380,7 +380,7 @@ fn is_missing( blob_idx, ); // recycle it - recycler.recycle(blob); + recycler.recycle(blob, "is_missing"); true } } else { @@ -814,7 +814,7 @@ mod test { blobs.push(blob); } for blob in blobs { - blob_recycler.recycle(blob); + blob_recycler.recycle(blob, "pollute_recycler"); } } @@ -910,7 +910,10 @@ mod test { let refwindow = window[erase_offset].data.clone(); window[erase_offset].data = None; - blob_recycler.recycle(window[erase_offset].coding.clone().unwrap()); + blob_recycler.recycle( + window[erase_offset].coding.clone().unwrap(), + "window_recover_basic", + ); window[erase_offset].coding = None; print_window(&window); diff --git a/src/packet.rs b/src/packet.rs index 06b16058df..9e649d1063 100644 --- a/src/packet.rs +++ b/src/packet.rs @@ -178,7 +178,8 @@ pub enum BlobError { } pub struct Recycler { - gc: Arc>>>>, + #[cfg_attr(feature = "cargo-clippy", allow(type_complexity))] + gc: Arc>, &'static str)>>>, } impl Default for Recycler { @@ -202,7 +203,7 @@ impl Recycler { let mut gc = self.gc.lock().expect("recycler lock in pb fn allocate"); loop { - if let Some(x) = gc.pop() { + if let Some((x, who)) = gc.pop() { // Only return the item if this recycler is the last reference to it. // Remove this check once `T` holds a Weak reference back to this // recycler and implements `Drop`. At the time of this writing, Weak can't @@ -215,6 +216,11 @@ impl Recycler { // to retransmmit_request // // warn!("Recycled item still in use. Booting it."); + trace!( + "Recycled item from \"{}\" still in use. {} Booting it.", + who, + Arc::strong_count(&x) + ); continue; } @@ -228,9 +234,9 @@ impl Recycler { } } } - pub fn recycle(&self, x: Arc>) { + pub fn recycle(&self, x: Arc>, who: &'static str) { let mut gc = self.gc.lock().expect("recycler lock in pub fn recycle"); - gc.push(x); + gc.push((x, who)); } } @@ -487,7 +493,7 @@ impl Blob { Err(e)?; } } - re.recycle(r); + re.recycle(r, "send_to"); } Ok(()) } @@ -509,7 +515,7 @@ mod tests { pub fn packet_recycler_test() { let r = PacketRecycler::default(); let p = r.allocate(); - r.recycle(p); + r.recycle(p, "recycler_test"); assert_eq!(r.gc.lock().unwrap().len(), 1); let _ = r.allocate(); assert_eq!(r.gc.lock().unwrap().len(), 0); @@ -527,7 +533,7 @@ mod tests { // that is still referenced outside the recycler. let r = Recycler::::default(); let x0 = r.allocate(); - r.recycle(x0.clone()); + r.recycle(x0.clone(), "leaked_recyclable:1"); assert_eq!(Arc::strong_count(&x0), 2); assert_eq!(r.gc.lock().unwrap().len(), 1); @@ -542,8 +548,8 @@ mod tests { let r = Recycler::::default(); let x0 = r.allocate(); let x1 = r.allocate(); - r.recycle(x0); // <-- allocate() of this will require locking the recycler's stack. - r.recycle(x1.clone()); // <-- allocate() of this will cause it to be dropped and recurse. + r.recycle(x0, "leaked_recyclable_recursion:1"); // <-- allocate() of this will require locking the recycler's stack. + r.recycle(x1.clone(), "leaked_recyclable_recursion:2"); // <-- allocate() of this will cause it to be dropped and recurse. assert_eq!(Arc::strong_count(&x1), 2); assert_eq!(r.gc.lock().unwrap().len(), 2); @@ -555,7 +561,7 @@ mod tests { pub fn blob_recycler_test() { let r = BlobRecycler::default(); let p = r.allocate(); - r.recycle(p); + r.recycle(p, "blob_recycler_test"); assert_eq!(r.gc.lock().unwrap().len(), 1); let _ = r.allocate(); assert_eq!(r.gc.lock().unwrap().len(), 0); @@ -580,7 +586,7 @@ mod tests { assert_eq!(m.meta.addr(), saddr); } - r.recycle(p); + r.recycle(p, "packet_send_recv"); } #[test] @@ -636,7 +642,7 @@ mod tests { let mut rv = Blob::recv_from(&r, &reader).unwrap(); let rp = rv.pop_front().unwrap(); assert_eq!(rp.write().unwrap().meta.size, 1024); - r.recycle(rp); + r.recycle(rp, "blob_ip6_send_recv"); } #[test] diff --git a/src/replicate_stage.rs b/src/replicate_stage.rs index 917eba5042..3e33ce40c7 100644 --- a/src/replicate_stage.rs +++ b/src/replicate_stage.rs @@ -44,7 +44,7 @@ impl ReplicateStage { let res = bank.process_entries(entries.clone()); for blob in blobs { - blob_recycler.recycle(blob); + blob_recycler.recycle(blob, "replicate_requests"); } { diff --git a/src/request_stage.rs b/src/request_stage.rs index fb380eba4f..893613b4d8 100644 --- a/src/request_stage.rs +++ b/src/request_stage.rs @@ -64,7 +64,7 @@ impl RequestStage { //don't wake up the other side if there is nothing blob_sender.send(blobs)?; } - packet_recycler.recycle(msgs); + packet_recycler.recycle(msgs, "process_request_packets"); } let total_time_s = timing::duration_as_s(&proc_start.elapsed()); let total_time_ms = timing::duration_as_ms(&proc_start.elapsed()); diff --git a/src/retransmit_stage.rs b/src/retransmit_stage.rs index c3515bbee9..da5c4f48da 100644 --- a/src/retransmit_stage.rs +++ b/src/retransmit_stage.rs @@ -33,7 +33,7 @@ fn retransmit( } } for b in dq { - recycler.recycle(b); + recycler.recycle(b, "retransmit"); } Ok(()) } diff --git a/src/streamer.rs b/src/streamer.rs index 646ed60557..2db2ffd269 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -34,7 +34,7 @@ fn recv_loop( } Err(_) => { if exit.load(Ordering::Relaxed) { - re.recycle(msgs); + re.recycle(msgs, "recv_loop"); return Ok(()); } } diff --git a/src/window.rs b/src/window.rs index d67824af04..4c9a2c0ab9 100644 --- a/src/window.rs +++ b/src/window.rs @@ -59,7 +59,7 @@ fn find_next_missing( if blob_idx == pix { mem::replace(&mut window[i].data, Some(blob)); } else { - recycler.recycle(blob); + recycler.recycle(blob, "find_next_missing"); } } if window[i].data.is_none() { @@ -299,7 +299,7 @@ fn process_blob( ) -> bool { if let Some(old) = mem::replace(window_slot, Some(blob)) { let is_dup = old.read().unwrap().get_index().unwrap() == pix; - recycler.recycle(old); + recycler.recycle(old, "insert_blob_is_dup"); trace!( "{:x}: occupied {} window slot {:}, is_dup: {}", debug_id, @@ -458,7 +458,7 @@ fn recv_window( pixs.push(pix); if !blob_idx_in_window(debug_id, pix, *consumed, received) { - recycler.recycle(b); + recycler.recycle(b, "recv_window"); continue; }