Boot Recycler::recycle()

This commit is contained in:
Greg Fitzgerald 2018-09-20 16:28:45 -06:00
parent 5174be5fe7
commit 6073cd57fa
7 changed files with 16 additions and 58 deletions

View File

@ -40,12 +40,7 @@ fn producer(addr: &SocketAddr, recycler: &PacketRecycler, exit: Arc<AtomicBool>)
}) })
} }
fn sink( fn sink(exit: Arc<AtomicBool>, rvs: Arc<AtomicUsize>, r: PacketReceiver) -> JoinHandle<()> {
recycler: PacketRecycler,
exit: Arc<AtomicBool>,
rvs: Arc<AtomicUsize>,
r: PacketReceiver,
) -> JoinHandle<()> {
spawn(move || loop { spawn(move || loop {
if exit.load(Ordering::Relaxed) { if exit.load(Ordering::Relaxed) {
return; return;
@ -53,7 +48,6 @@ fn sink(
let timer = Duration::new(1, 0); let timer = Duration::new(1, 0);
if let Ok(msgs) = r.recv_timeout(timer) { if let Ok(msgs) = r.recv_timeout(timer) {
rvs.fetch_add(msgs.read().packets.len(), Ordering::Relaxed); rvs.fetch_add(msgs.read().packets.len(), Ordering::Relaxed);
recycler.recycle(msgs, "sink");
} }
}) })
} }
@ -101,7 +95,7 @@ fn main() -> Result<()> {
let rvs = Arc::new(AtomicUsize::new(0)); let rvs = Arc::new(AtomicUsize::new(0));
let sink_threads: Vec<_> = read_channels let sink_threads: Vec<_> = read_channels
.into_iter() .into_iter()
.map(|r_reader| sink(pack_recycler.clone(), exit.clone(), rvs.clone(), r_reader)) .map(|r_reader| sink(exit.clone(), rvs.clone(), r_reader))
.collect(); .collect();
let start = SystemTime::now(); let start = SystemTime::now();
let start_val = rvs.load(Ordering::Relaxed); let start_val = rvs.load(Ordering::Relaxed);

View File

@ -350,13 +350,7 @@ pub fn generate_coding(
// true if slot is empty // true if slot is empty
// true if slot is stale (i.e. has the wrong index), old blob is flushed // true if slot is stale (i.e. has the wrong index), old blob is flushed
// false if slot has a blob with the right index // false if slot has a blob with the right index
fn is_missing( fn is_missing(id: &Pubkey, idx: u64, window_slot: &mut Option<SharedBlob>, c_or_d: &str) -> bool {
id: &Pubkey,
idx: u64,
window_slot: &mut Option<SharedBlob>,
recycler: &BlobRecycler,
c_or_d: &str,
) -> bool {
if let Some(blob) = window_slot.take() { if let Some(blob) = window_slot.take() {
let blob_idx = blob.read().get_index().unwrap(); let blob_idx = blob.read().get_index().unwrap();
if blob_idx == idx { if blob_idx == idx {
@ -372,8 +366,6 @@ fn is_missing(
c_or_d, c_or_d,
blob_idx, blob_idx,
); );
// recycle it
recycler.recycle(blob, "is_missing");
true true
} }
} else { } else {
@ -392,7 +384,6 @@ fn find_missing(
block_start_idx: u64, block_start_idx: u64,
block_start: usize, block_start: usize,
window: &mut [WindowSlot], window: &mut [WindowSlot],
recycler: &BlobRecycler,
) -> (usize, usize) { ) -> (usize, usize) {
let mut data_missing = 0; let mut data_missing = 0;
let mut coding_missing = 0; let mut coding_missing = 0;
@ -404,11 +395,11 @@ fn find_missing(
let idx = (i - block_start) as u64 + block_start_idx; let idx = (i - block_start) as u64 + block_start_idx;
let n = i % window.len(); let n = i % window.len();
if is_missing(id, idx, &mut window[n].data, recycler, "data") { if is_missing(id, idx, &mut window[n].data, "data") {
data_missing += 1; data_missing += 1;
} }
if i >= coding_start && is_missing(id, idx, &mut window[n].coding, recycler, "coding") { if i >= coding_start && is_missing(id, idx, &mut window[n].coding, "coding") {
coding_missing += 1; coding_missing += 1;
} }
} }
@ -444,8 +435,7 @@ pub fn recover(
block_end block_end
); );
let (data_missing, coding_missing) = let (data_missing, coding_missing) = find_missing(id, block_start_idx, block_start, window);
find_missing(id, block_start_idx, block_start, window, recycler);
// if we're not missing data, or if we have too much missin but have enough coding // if we're not missing data, or if we have too much missin but have enough coding
if data_missing == 0 { if data_missing == 0 {
@ -796,9 +786,6 @@ mod test {
} }
blobs.push(blob); blobs.push(blob);
} }
for blob in blobs {
blob_recycler.recycle(blob, "pollute_recycler");
}
} }
#[test] #[test]
@ -893,11 +880,6 @@ mod test {
// Create a hole in the window // Create a hole in the window
let refwindow = window[erase_offset].data.clone(); let refwindow = window[erase_offset].data.clone();
window[erase_offset].data = None; window[erase_offset].data = None;
blob_recycler.recycle(
window[erase_offset].coding.clone().unwrap(),
"window_recover_basic",
);
window[erase_offset].coding = None; window[erase_offset].coding = None;
print_window(&window); print_window(&window);

View File

@ -460,8 +460,6 @@ mod tests {
assert_eq!(m.meta.size, PACKET_DATA_SIZE); assert_eq!(m.meta.size, PACKET_DATA_SIZE);
assert_eq!(m.meta.addr(), saddr); assert_eq!(m.meta.addr(), saddr);
} }
r.recycle(p, "packet_send_recv");
} }
#[test] #[test]

View File

@ -88,9 +88,6 @@ impl<T: Default + Reset> Recycler<T> {
landfill: self.landfill.clone(), landfill: self.landfill.clone(),
} }
} }
pub fn recycle(&self, r: Recyclable<T>, _name: &str) {
drop(r)
}
} }
#[cfg(test)] #[cfg(test)]

View File

@ -1,7 +1,7 @@
//! The `request_stage` processes thin client Request messages. //! The `request_stage` processes thin client Request messages.
use bincode::deserialize; use bincode::deserialize;
use packet::{to_blobs, BlobRecycler, PacketRecycler, Packets, SharedPackets}; use packet::{to_blobs, BlobRecycler, Packets, SharedPackets};
use rayon::prelude::*; use rayon::prelude::*;
use request::Request; use request::Request;
use request_processor::RequestProcessor; use request_processor::RequestProcessor;
@ -35,7 +35,6 @@ impl RequestStage {
request_processor: &RequestProcessor, request_processor: &RequestProcessor,
packet_receiver: &Receiver<SharedPackets>, packet_receiver: &Receiver<SharedPackets>,
blob_sender: &BlobSender, blob_sender: &BlobSender,
packet_recycler: &PacketRecycler,
blob_recycler: &BlobRecycler, blob_recycler: &BlobRecycler,
) -> Result<()> { ) -> Result<()> {
let (batch, batch_len) = streamer::recv_batch(packet_receiver)?; let (batch, batch_len) = streamer::recv_batch(packet_receiver)?;
@ -63,7 +62,6 @@ impl RequestStage {
//don't wake up the other side if there is nothing //don't wake up the other side if there is nothing
blob_sender.send(blobs)?; blob_sender.send(blobs)?;
} }
packet_recycler.recycle(msgs, "process_request_packets");
} }
let total_time_s = timing::duration_as_s(&proc_start.elapsed()); let total_time_s = timing::duration_as_s(&proc_start.elapsed());
let total_time_ms = timing::duration_as_ms(&proc_start.elapsed()); let total_time_ms = timing::duration_as_ms(&proc_start.elapsed());
@ -81,7 +79,6 @@ impl RequestStage {
request_processor: RequestProcessor, request_processor: RequestProcessor,
packet_receiver: Receiver<SharedPackets>, packet_receiver: Receiver<SharedPackets>,
) -> (Self, BlobReceiver) { ) -> (Self, BlobReceiver) {
let packet_recycler = PacketRecycler::default();
let request_processor = Arc::new(request_processor); let request_processor = Arc::new(request_processor);
let request_processor_ = request_processor.clone(); let request_processor_ = request_processor.clone();
let blob_recycler = BlobRecycler::default(); let blob_recycler = BlobRecycler::default();
@ -93,7 +90,6 @@ impl RequestStage {
&request_processor_, &request_processor_,
&packet_receiver, &packet_receiver,
&blob_sender, &blob_sender,
&packet_recycler,
&blob_recycler, &blob_recycler,
) { ) {
match e { match e {

View File

@ -33,10 +33,8 @@ impl WindowSlot {
} }
} }
fn clear_data(&mut self, recycler: &BlobRecycler) { fn clear_data(&mut self) {
if let Some(blob) = self.data.take() { self.data.take();
recycler.recycle(blob, "WindowSlot::clear_data");
}
} }
} }
@ -51,12 +49,11 @@ pub struct WindowIndex {
pub trait WindowUtil { pub trait WindowUtil {
/// Finds available slots, clears them, and returns their indices. /// Finds available slots, clears them, and returns their indices.
fn clear_slots(&mut self, recycler: &BlobRecycler, consumed: u64, received: u64) -> Vec<u64>; fn clear_slots(&mut self, consumed: u64, received: u64) -> Vec<u64>;
fn repair( fn repair(
&mut self, &mut self,
crdt: &Arc<RwLock<Crdt>>, crdt: &Arc<RwLock<Crdt>>,
recycler: &BlobRecycler,
id: &Pubkey, id: &Pubkey,
times: usize, times: usize,
consumed: u64, consumed: u64,
@ -79,7 +76,7 @@ pub trait WindowUtil {
} }
impl WindowUtil for Window { impl WindowUtil for Window {
fn clear_slots(&mut self, recycler: &BlobRecycler, consumed: u64, received: u64) -> Vec<u64> { fn clear_slots(&mut self, consumed: u64, received: u64) -> Vec<u64> {
(consumed..received) (consumed..received)
.filter_map(|pix| { .filter_map(|pix| {
let i = (pix % WINDOW_SIZE) as usize; let i = (pix % WINDOW_SIZE) as usize;
@ -88,7 +85,7 @@ impl WindowUtil for Window {
return None; return None;
} }
} }
self[i].clear_data(recycler); self[i].clear_data();
Some(pix) Some(pix)
}).collect() }).collect()
} }
@ -96,7 +93,6 @@ impl WindowUtil for Window {
fn repair( fn repair(
&mut self, &mut self,
crdt: &Arc<RwLock<Crdt>>, crdt: &Arc<RwLock<Crdt>>,
recycler: &BlobRecycler,
id: &Pubkey, id: &Pubkey,
times: usize, times: usize,
consumed: u64, consumed: u64,
@ -105,7 +101,7 @@ impl WindowUtil for Window {
let num_peers = crdt.read().unwrap().table.len() as u64; let num_peers = crdt.read().unwrap().table.len() as u64;
let highest_lost = calculate_highest_lost_blob_index(num_peers, consumed, received); let highest_lost = calculate_highest_lost_blob_index(num_peers, consumed, received);
let idxs = self.clear_slots(recycler, consumed, highest_lost); let idxs = self.clear_slots(consumed, highest_lost);
let reqs: Vec<_> = idxs let reqs: Vec<_> = idxs
.into_iter() .into_iter()
.filter_map(|pix| crdt.read().unwrap().window_index_request(pix).ok()) .filter_map(|pix| crdt.read().unwrap().window_index_request(pix).ok())
@ -177,8 +173,6 @@ impl WindowUtil for Window {
/// * `pix` - the index of the blob, corresponds to /// * `pix` - the index of the blob, corresponds to
/// the entry height of this blob /// the entry height of this blob
/// * `consume_queue` - output, blobs to be rebroadcast are placed here /// * `consume_queue` - output, blobs to be rebroadcast are placed here
/// * `recycler` - where to return the blob once processed, also where
/// to return old blobs from the window
/// * `consumed` - input/output, the entry-height to which this /// * `consumed` - input/output, the entry-height to which this
/// node has populated and rebroadcast entries /// node has populated and rebroadcast entries
fn process_blob( fn process_blob(
@ -204,12 +198,10 @@ impl WindowUtil for Window {
blob: SharedBlob, blob: SharedBlob,
pix: u64, pix: u64,
window_slot: &mut Option<SharedBlob>, window_slot: &mut Option<SharedBlob>,
recycler: &BlobRecycler,
c_or_d: &str, c_or_d: &str,
) -> bool { ) -> bool {
if let Some(old) = mem::replace(window_slot, Some(blob)) { if let Some(old) = mem::replace(window_slot, Some(blob)) {
let is_dup = old.read().get_index().unwrap() == pix; let is_dup = old.read().get_index().unwrap() == pix;
recycler.recycle(old, "insert_blob_is_dup");
trace!( trace!(
"{}: occupied {} window slot {:}, is_dup: {}", "{}: occupied {} window slot {:}, is_dup: {}",
id, id,
@ -226,9 +218,9 @@ impl WindowUtil for Window {
// insert the new blob into the window, overwrite and recycle old (or duplicate) entry // insert the new blob into the window, overwrite and recycle old (or duplicate) entry
let is_duplicate = if is_coding { let is_duplicate = if is_coding {
insert_blob_is_dup(id, blob, pix, &mut self[w].coding, recycler, "coding") insert_blob_is_dup(id, blob, pix, &mut self[w].coding, "coding")
} else { } else {
insert_blob_is_dup(id, blob, pix, &mut self[w].data, recycler, "data") insert_blob_is_dup(id, blob, pix, &mut self[w].data, "data")
}; };
if is_duplicate { if is_duplicate {

View File

@ -192,7 +192,6 @@ fn recv_window(
pixs.push(pix); pixs.push(pix);
if !blob_idx_in_window(&id, pix, *consumed, received) { if !blob_idx_in_window(&id, pix, *consumed, received) {
recycler.recycle(b, "recv_window");
continue; continue;
} }
@ -282,7 +281,7 @@ pub fn window_service(
} }
let mut window = window.write().unwrap(); let mut window = window.write().unwrap();
let reqs = window.repair(&crdt, &recycler, &id, times, consumed, received); let reqs = window.repair(&crdt, &id, times, consumed, received);
for (to, req) in reqs { for (to, req) in reqs {
repair_socket.send_to(&req, to).unwrap_or_else(|e| { repair_socket.send_to(&req, to).unwrap_or_else(|e| {
info!("{} repair req send_to({}) error {:?}", id, to, e); info!("{} repair req send_to({}) error {:?}", id, to, e);