Trace recycle() calls (#968)

* trace recycle() calls fixes #810
This commit is contained in:
Rob Walker 2018-09-05 05:07:02 +09:00 committed by GitHub
parent 2752bde683
commit 226d3b9471
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 37 additions and 28 deletions

View File

@ -114,7 +114,7 @@ impl BankingStage {
signal_sender.send(Signal::Transactions(transactions))?;
debug!("done process_transactions");
packet_recycler.recycle(msgs);
packet_recycler.recycle(msgs, "process_transactions");
}
let total_time_s = timing::duration_as_s(&proc_start.elapsed());
let total_time_ms = timing::duration_as_ms(&proc_start.elapsed());

View File

@ -49,7 +49,7 @@ fn sink(
let timer = Duration::new(1, 0);
if let Ok(msgs) = r.recv_timeout(timer) {
rvs.fetch_add(msgs.read().unwrap().packets.len(), Ordering::Relaxed);
recycler.recycle(msgs);
recycler.recycle(msgs, "sink");
}
})
}

View File

@ -69,7 +69,7 @@ fn broadcast(
x.read().unwrap().get_index().unwrap(),
pos
);
recycler.recycle(x);
recycler.recycle(x, "broadcast-data");
}
if let Some(x) = mem::replace(&mut win[pos].coding, None) {
trace!(
@ -78,7 +78,7 @@ fn broadcast(
x.read().unwrap().get_index().unwrap(),
pos
);
recycler.recycle(x);
recycler.recycle(x, "broadcast-coding");
}
trace!("{:x} null {}", debug_id, pos);

View File

@ -586,7 +586,7 @@ impl Crdt {
trace!("broadcast results {}", errs.len());
for e in errs {
if let Err(e) = &e {
eprintln!("broadcast result {:?}", e);
trace!("broadcast result {:?}", e);
}
e?;
if transmit_index.data < received_index {
@ -1196,7 +1196,7 @@ impl Crdt {
) {
resps.push(resp);
}
blob_recycler.recycle(req);
blob_recycler.recycle(req, "run_listen");
}
response_sender.send(resps)?;
Ok(())

View File

@ -380,7 +380,7 @@ fn is_missing(
blob_idx,
);
// recycle it
recycler.recycle(blob);
recycler.recycle(blob, "is_missing");
true
}
} else {
@ -814,7 +814,7 @@ mod test {
blobs.push(blob);
}
for blob in blobs {
blob_recycler.recycle(blob);
blob_recycler.recycle(blob, "pollute_recycler");
}
}
@ -910,7 +910,10 @@ mod test {
let refwindow = window[erase_offset].data.clone();
window[erase_offset].data = None;
blob_recycler.recycle(window[erase_offset].coding.clone().unwrap());
blob_recycler.recycle(
window[erase_offset].coding.clone().unwrap(),
"window_recover_basic",
);
window[erase_offset].coding = None;
print_window(&window);

View File

@ -178,7 +178,8 @@ pub enum BlobError {
}
pub struct Recycler<T> {
gc: Arc<Mutex<Vec<Arc<RwLock<T>>>>>,
#[cfg_attr(feature = "cargo-clippy", allow(type_complexity))]
gc: Arc<Mutex<Vec<(Arc<RwLock<T>>, &'static str)>>>,
}
impl<T: Default> Default for Recycler<T> {
@ -202,7 +203,7 @@ impl<T: Default + Reset> Recycler<T> {
let mut gc = self.gc.lock().expect("recycler lock in pb fn allocate");
loop {
if let Some(x) = gc.pop() {
if let Some((x, who)) = gc.pop() {
// Only return the item if this recycler is the last reference to it.
// Remove this check once `T` holds a Weak reference back to this
// recycler and implements `Drop`. At the time of this writing, Weak can't
@ -215,6 +216,11 @@ impl<T: Default + Reset> Recycler<T> {
// to retransmmit_request
//
// warn!("Recycled item still in use. Booting it.");
trace!(
"Recycled item from \"{}\" still in use. {} Booting it.",
who,
Arc::strong_count(&x)
);
continue;
}
@ -228,9 +234,9 @@ impl<T: Default + Reset> Recycler<T> {
}
}
}
pub fn recycle(&self, x: Arc<RwLock<T>>) {
pub fn recycle(&self, x: Arc<RwLock<T>>, who: &'static str) {
let mut gc = self.gc.lock().expect("recycler lock in pub fn recycle");
gc.push(x);
gc.push((x, who));
}
}
@ -487,7 +493,7 @@ impl Blob {
Err(e)?;
}
}
re.recycle(r);
re.recycle(r, "send_to");
}
Ok(())
}
@ -509,7 +515,7 @@ mod tests {
pub fn packet_recycler_test() {
let r = PacketRecycler::default();
let p = r.allocate();
r.recycle(p);
r.recycle(p, "recycler_test");
assert_eq!(r.gc.lock().unwrap().len(), 1);
let _ = r.allocate();
assert_eq!(r.gc.lock().unwrap().len(), 0);
@ -527,7 +533,7 @@ mod tests {
// that is still referenced outside the recycler.
let r = Recycler::<u8>::default();
let x0 = r.allocate();
r.recycle(x0.clone());
r.recycle(x0.clone(), "leaked_recyclable:1");
assert_eq!(Arc::strong_count(&x0), 2);
assert_eq!(r.gc.lock().unwrap().len(), 1);
@ -542,8 +548,8 @@ mod tests {
let r = Recycler::<u8>::default();
let x0 = r.allocate();
let x1 = r.allocate();
r.recycle(x0); // <-- allocate() of this will require locking the recycler's stack.
r.recycle(x1.clone()); // <-- allocate() of this will cause it to be dropped and recurse.
r.recycle(x0, "leaked_recyclable_recursion:1"); // <-- allocate() of this will require locking the recycler's stack.
r.recycle(x1.clone(), "leaked_recyclable_recursion:2"); // <-- allocate() of this will cause it to be dropped and recurse.
assert_eq!(Arc::strong_count(&x1), 2);
assert_eq!(r.gc.lock().unwrap().len(), 2);
@ -555,7 +561,7 @@ mod tests {
pub fn blob_recycler_test() {
let r = BlobRecycler::default();
let p = r.allocate();
r.recycle(p);
r.recycle(p, "blob_recycler_test");
assert_eq!(r.gc.lock().unwrap().len(), 1);
let _ = r.allocate();
assert_eq!(r.gc.lock().unwrap().len(), 0);
@ -580,7 +586,7 @@ mod tests {
assert_eq!(m.meta.addr(), saddr);
}
r.recycle(p);
r.recycle(p, "packet_send_recv");
}
#[test]
@ -636,7 +642,7 @@ mod tests {
let mut rv = Blob::recv_from(&r, &reader).unwrap();
let rp = rv.pop_front().unwrap();
assert_eq!(rp.write().unwrap().meta.size, 1024);
r.recycle(rp);
r.recycle(rp, "blob_ip6_send_recv");
}
#[test]

View File

@ -44,7 +44,7 @@ impl ReplicateStage {
let res = bank.process_entries(entries.clone());
for blob in blobs {
blob_recycler.recycle(blob);
blob_recycler.recycle(blob, "replicate_requests");
}
{

View File

@ -64,7 +64,7 @@ impl RequestStage {
//don't wake up the other side if there is nothing
blob_sender.send(blobs)?;
}
packet_recycler.recycle(msgs);
packet_recycler.recycle(msgs, "process_request_packets");
}
let total_time_s = timing::duration_as_s(&proc_start.elapsed());
let total_time_ms = timing::duration_as_ms(&proc_start.elapsed());

View File

@ -33,7 +33,7 @@ fn retransmit(
}
}
for b in dq {
recycler.recycle(b);
recycler.recycle(b, "retransmit");
}
Ok(())
}

View File

@ -34,7 +34,7 @@ fn recv_loop(
}
Err(_) => {
if exit.load(Ordering::Relaxed) {
re.recycle(msgs);
re.recycle(msgs, "recv_loop");
return Ok(());
}
}

View File

@ -59,7 +59,7 @@ fn find_next_missing(
if blob_idx == pix {
mem::replace(&mut window[i].data, Some(blob));
} else {
recycler.recycle(blob);
recycler.recycle(blob, "find_next_missing");
}
}
if window[i].data.is_none() {
@ -299,7 +299,7 @@ fn process_blob(
) -> bool {
if let Some(old) = mem::replace(window_slot, Some(blob)) {
let is_dup = old.read().unwrap().get_index().unwrap() == pix;
recycler.recycle(old);
recycler.recycle(old, "insert_blob_is_dup");
trace!(
"{:x}: occupied {} window slot {:}, is_dup: {}",
debug_id,
@ -458,7 +458,7 @@ fn recv_window(
pixs.push(pix);
if !blob_idx_in_window(debug_id, pix, *consumed, received) {
recycler.recycle(b);
recycler.recycle(b, "recv_window");
continue;
}