From 2ece27ee3ac8069db97615f22304a94ff40a2fd6 Mon Sep 17 00:00:00 2001 From: Rob Walker Date: Tue, 31 Jul 2018 16:52:50 -0700 Subject: [PATCH] fix leak --- src/streamer.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/streamer.rs b/src/streamer.rs index a6777ca0f1..67da77f7ec 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -173,6 +173,7 @@ pub fn blob_receiver( fn find_next_missing( window: &SharedWindow, crdt: &Arc>, + recycler: &BlobRecycler, consumed: u64, received: u64, ) -> Result)>> { @@ -188,6 +189,8 @@ fn find_next_missing( let blob_idx = blob.read().unwrap().get_index().unwrap(); if blob_idx == pix { mem::replace(&mut window[i].data, Some(blob)); + } else { + recycler.recycle(blob); } } if window[i].data.is_none() { @@ -206,6 +209,7 @@ fn repair_window( debug_id: u64, window: &SharedWindow, crdt: &Arc>, + recycler: &BlobRecycler, last: &mut u64, times: &mut usize, consumed: u64, @@ -223,7 +227,7 @@ fn repair_window( return Ok(()); } - let reqs = find_next_missing(window, crdt, consumed, received)?; + let reqs = find_next_missing(window, crdt, recycler, consumed, received)?; trace!("{:x}: repair_window missing: {}", debug_id, reqs.len()); if !reqs.is_empty() { inc_new_counter!("streamer-repair_window-repair", reqs.len()); @@ -673,7 +677,7 @@ pub fn window( } } let _ = repair_window( - debug_id, &window, &crdt, &mut last, &mut times, consumed, received, + debug_id, &window, &crdt, &recycler, &mut last, &mut times, consumed, received, ); } })