From 431692d9d090636b8d504a1c9756f68db3289d0b Mon Sep 17 00:00:00 2001 From: Anatoly Yakovenko Date: Tue, 18 Sep 2018 08:02:57 -0700 Subject: [PATCH] Use a Drop trait to keep track of lifetimes for recycled objects. * Move recycler instances to the point of allocation * sinks no longer need to call `recycle` * Remove the recycler arguments from all the apis that no longer need them --- src/banking_stage.rs | 15 +-- src/bin/bench-streamer.rs | 15 +-- src/bin/bench-tps.rs | 10 +- src/blob_fetch_stage.rs | 15 +-- src/broadcast_stage.rs | 38 ++---- src/budget_contract.rs | 8 +- src/crdt.rs | 41 +++--- src/entry.rs | 2 +- src/erasure.rs | 70 +++++----- src/fetch_stage.rs | 15 +-- src/fullnode.rs | 15 +-- src/ledger.rs | 2 +- src/lib.rs | 1 + src/ncp.rs | 30 +---- src/packet.rs | 267 +++++--------------------------------- src/record_stage.rs | 13 +- src/recycler.rs | 173 ++++++++++++++++++++++++ src/replicate_stage.rs | 36 +---- src/request_stage.rs | 6 +- src/retransmit_stage.rs | 35 +---- src/rpu.rs | 31 +---- src/sigverify.rs | 44 +++---- src/streamer.rs | 58 +++------ src/thin_client.rs | 10 +- src/tpu.rs | 11 +- src/tvu.rs | 30 ++--- src/vote_stage.rs | 9 +- src/window.rs | 51 +++----- src/window_service.rs | 71 +++------- src/write_stage.rs | 6 +- tests/data_replicator.rs | 16 +-- tests/multinode.rs | 3 - 32 files changed, 414 insertions(+), 733 deletions(-) create mode 100644 src/recycler.rs diff --git a/src/banking_stage.rs b/src/banking_stage.rs index ab72cc413..ea7f26754 100644 --- a/src/banking_stage.rs +++ b/src/banking_stage.rs @@ -6,7 +6,7 @@ use bank::Bank; use bincode::deserialize; use counter::Counter; use log::Level; -use packet::{PacketRecycler, Packets, SharedPackets}; +use packet::{Packets, SharedPackets}; use rayon::prelude::*; use record_stage::Signal; use result::{Error, Result}; @@ -34,18 +34,12 @@ impl BankingStage { pub fn new( bank: Arc, verified_receiver: Receiver)>>, - packet_recycler: PacketRecycler, ) -> (Self, Receiver) { let (signal_sender, signal_receiver) = channel(); let thread_hdl = Builder::new() .name("solana-banking-stage".to_string()) .spawn(move || loop { - if let Err(e) = Self::process_packets( - &bank, - &verified_receiver, - &signal_sender, - &packet_recycler, - ) { + if let Err(e) = Self::process_packets(&bank, &verified_receiver, &signal_sender) { match e { Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break, Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (), @@ -75,7 +69,6 @@ impl BankingStage { bank: &Arc, verified_receiver: &Receiver)>>, signal_sender: &Sender, - packet_recycler: &PacketRecycler, ) -> Result<()> { let timer = Duration::new(1, 0); let recv_start = Instant::now(); @@ -92,7 +85,7 @@ impl BankingStage { let count = mms.iter().map(|x| x.1.len()).sum(); let proc_start = Instant::now(); for (msgs, vers) in mms { - let transactions = Self::deserialize_transactions(&msgs.read().unwrap()); + let transactions = Self::deserialize_transactions(&msgs.read()); reqs_len += transactions.len(); let transactions = transactions .into_iter() @@ -113,8 +106,6 @@ impl BankingStage { return Err(Error::SendError); } debug!("done process_transactions"); - - packet_recycler.recycle(msgs, "process_transactions"); } let total_time_s = timing::duration_as_s(&proc_start.elapsed()); let total_time_ms = timing::duration_as_ms(&proc_start.elapsed()); diff --git a/src/bin/bench-streamer.rs b/src/bin/bench-streamer.rs index 2faf30a67..d03e90cb9 100644 --- a/src/bin/bench-streamer.rs +++ b/src/bin/bench-streamer.rs @@ -20,8 +20,8 @@ fn producer(addr: &SocketAddr, recycler: &PacketRecycler, exit: Arc) let send = UdpSocket::bind("0.0.0.0:0").unwrap(); let msgs = recycler.allocate(); let msgs_ = msgs.clone(); - msgs.write().unwrap().packets.resize(10, Packet::default()); - for w in &mut msgs.write().unwrap().packets { + msgs.write().packets.resize(10, Packet::default()); + for w in &mut msgs.write().packets { w.meta.size = PACKET_DATA_SIZE; w.meta.set_addr(&addr); } @@ -30,7 +30,7 @@ fn producer(addr: &SocketAddr, recycler: &PacketRecycler, exit: Arc) return; } let mut num = 0; - for p in &msgs_.read().unwrap().packets { + for p in &msgs_.read().packets { let a = p.meta.addr(); assert!(p.meta.size < BLOB_SIZE); send.send_to(&p.data[..p.meta.size], &a).unwrap(); @@ -52,7 +52,7 @@ fn sink( } let timer = Duration::new(1, 0); if let Ok(msgs) = r.recv_timeout(timer) { - rvs.fetch_add(msgs.read().unwrap().packets.len(), Ordering::Relaxed); + rvs.fetch_add(msgs.read().packets.len(), Ordering::Relaxed); recycler.recycle(msgs, "sink"); } }) @@ -91,12 +91,7 @@ fn main() -> Result<()> { let (s_reader, r_reader) = channel(); read_channels.push(r_reader); - read_threads.push(receiver( - Arc::new(read), - exit.clone(), - pack_recycler.clone(), - s_reader, - )); + read_threads.push(receiver(Arc::new(read), exit.clone(), s_reader)); } let t_producer1 = producer(&addr, &pack_recycler, exit.clone()); diff --git a/src/bin/bench-tps.rs b/src/bin/bench-tps.rs index c70758f0b..770ba71a3 100644 --- a/src/bin/bench-tps.rs +++ b/src/bin/bench-tps.rs @@ -17,7 +17,6 @@ use solana::hash::Hash; use solana::logger; use solana::metrics; use solana::ncp::Ncp; -use solana::packet::BlobRecycler; use solana::service::Service; use solana::signature::{read_keypair, GenKeys, Keypair, KeypairUtil}; use solana::thin_client::{poll_gossip_for_leader, ThinClient}; @@ -695,14 +694,7 @@ fn converge( spy_crdt.set_leader(leader.id); let spy_ref = Arc::new(RwLock::new(spy_crdt)); let window = Arc::new(RwLock::new(default_window())); - let ncp = Ncp::new( - &spy_ref, - window, - BlobRecycler::default(), - None, - gossip_socket, - exit_signal.clone(), - ); + let ncp = Ncp::new(&spy_ref, window, None, gossip_socket, exit_signal.clone()); let mut v: Vec = vec![]; // wait for the network to converge, 30 seconds should be plenty for _ in 0..30 { diff --git a/src/blob_fetch_stage.rs b/src/blob_fetch_stage.rs index b0cd99716..54badfa50 100644 --- a/src/blob_fetch_stage.rs +++ b/src/blob_fetch_stage.rs @@ -1,6 +1,5 @@ //! The `blob_fetch_stage` pulls blobs from UDP sockets and sends it to a channel. -use packet::BlobRecycler; use service::Service; use std::net::UdpSocket; use std::sync::atomic::{AtomicBool, Ordering}; @@ -15,24 +14,18 @@ pub struct BlobFetchStage { } impl BlobFetchStage { - pub fn new( - socket: Arc, - exit: Arc, - recycler: &BlobRecycler, - ) -> (Self, BlobReceiver) { - Self::new_multi_socket(vec![socket], exit, recycler) + pub fn new(socket: Arc, exit: Arc) -> (Self, BlobReceiver) { + Self::new_multi_socket(vec![socket], exit) } pub fn new_multi_socket( sockets: Vec>, exit: Arc, - recycler: &BlobRecycler, ) -> (Self, BlobReceiver) { let (sender, receiver) = channel(); let thread_hdls: Vec<_> = sockets .into_iter() - .map(|socket| { - streamer::blob_receiver(socket, exit.clone(), recycler.clone(), sender.clone()) - }).collect(); + .map(|socket| streamer::blob_receiver(socket, exit.clone(), sender.clone())) + .collect(); (BlobFetchStage { exit, thread_hdls }, receiver) } diff --git a/src/broadcast_stage.rs b/src/broadcast_stage.rs index 58658c43e..8801a0426 100644 --- a/src/broadcast_stage.rs +++ b/src/broadcast_stage.rs @@ -56,7 +56,7 @@ fn broadcast( let to_blobs_elapsed = duration_as_ms(&to_blobs_start.elapsed()); // flatten deque to vec - let blobs_vec: Vec<_> = dq.into_iter().collect(); + let blobs_vec: SharedBlobs = dq.into_iter().collect(); let blobs_chunking = Instant::now(); // We could receive more blobs than window slots so @@ -80,36 +80,24 @@ fn broadcast( { let mut win = window.write().unwrap(); assert!(blobs.len() <= win.len()); - for b in &blobs { - let ix = b.read().unwrap().get_index().expect("blob index"); + for b in blobs.iter() { + let ix = b.read().get_index().expect("blob index"); let pos = (ix % WINDOW_SIZE) as usize; if let Some(x) = win[pos].data.take() { - trace!( - "{} popped {} at {}", - id, - x.read().unwrap().get_index().unwrap(), - pos - ); - recycler.recycle(x, "broadcast-data"); + trace!("{} popped {} at {}", id, x.read().get_index().unwrap(), pos); } if let Some(x) = win[pos].coding.take() { - trace!( - "{} popped {} at {}", - id, - x.read().unwrap().get_index().unwrap(), - pos - ); - recycler.recycle(x, "broadcast-coding"); + trace!("{} popped {} at {}", id, x.read().get_index().unwrap(), pos); } trace!("{} null {}", id, pos); } - while let Some(b) = blobs.pop() { - let ix = b.read().unwrap().get_index().expect("blob index"); + for b in blobs.iter() { + let ix = b.read().get_index().expect("blob index"); let pos = (ix % WINDOW_SIZE) as usize; trace!("{} caching {} at {}", id, ix, pos); assert!(win[pos].data.is_none()); - win[pos].data = Some(b); + win[pos].data = Some(b.clone()); } } @@ -253,10 +241,10 @@ impl BroadcastStage { crdt: Arc>, window: SharedWindow, entry_height: u64, - recycler: BlobRecycler, receiver: Receiver>, exit_sender: Arc, ) -> Self { + let recycler = BlobRecycler::default(); let thread_hdl = Builder::new() .name("solana-broadcaster".to_string()) .spawn(move || { @@ -282,7 +270,6 @@ mod tests { use crdt::{Crdt, Node}; use entry::Entry; use mint::Mint; - use packet::BlobRecycler; use recorder::Recorder; use service::Service; use signature::{Keypair, KeypairUtil, Pubkey}; @@ -318,7 +305,6 @@ mod tests { crdt.insert(&broadcast_buddy.info); crdt.set_leader_rotation_interval(leader_rotation_interval); let crdt = Arc::new(RwLock::new(crdt)); - let blob_recycler = BlobRecycler::default(); // Make dummy initial entries let mint = Mint::new(10000); @@ -326,8 +312,7 @@ mod tests { let entry_height = entries.len() as u64; // Setup a window - let window = - new_window_from_entries(&entries, entry_height, &leader_info.info, &blob_recycler); + let window = new_window_from_entries(&entries, entry_height, &leader_info.info); let shared_window = Arc::new(RwLock::new(window)); @@ -339,7 +324,6 @@ mod tests { crdt.clone(), shared_window.clone(), entry_height, - blob_recycler.clone(), entry_receiver, exit_sender, ); @@ -359,7 +343,7 @@ mod tests { let window = shared_window.read().unwrap(); window.iter().fold(0, |m, w_slot| { if let Some(ref blob) = w_slot.data { - cmp::max(m, blob.read().unwrap().get_index().unwrap()) + cmp::max(m, blob.read().get_index().unwrap()) } else { m } diff --git a/src/budget_contract.rs b/src/budget_contract.rs index 7c44a8887..b093108f7 100644 --- a/src/budget_contract.rs +++ b/src/budget_contract.rs @@ -439,7 +439,7 @@ mod test { ]); let date = DateTime::::from_utc(NaiveDate::from_ymd(2016, 7, 8).and_hms(9, 10, 11), Utc); - let dateIso8601 = "2016-07-08T09:10:11Z"; + let date_is_08601 = "2016-07-08T09:10:11Z"; let tx = Transaction::budget_new(&keypair, to, 192, Hash::default()); assert_eq!( @@ -477,9 +477,9 @@ mod test { date, Hash::default(), ); - let mut expectedUserdata = vec![1, 0, 0, 0, 20, 0, 0, 0, 0, 0, 0, 0]; - expectedUserdata.extend(dateIso8601.as_bytes()); - assert_eq!(tx.userdata, expectedUserdata,); + let mut expected_userdata = vec![1, 0, 0, 0, 20, 0, 0, 0, 0, 0, 0, 0]; + expected_userdata.extend(date_is_08601.as_bytes()); + assert_eq!(tx.userdata, expected_userdata,); // ApplySignature let tx = Transaction::budget_new_signature(&keypair, keypair.pubkey(), to, Hash::default()); diff --git a/src/crdt.rs b/src/crdt.rs index c132ed83c..aadf1191e 100644 --- a/src/crdt.rs +++ b/src/crdt.rs @@ -570,7 +570,7 @@ impl Crdt { // only leader should be broadcasting assert!(me.leader_id != v.id); let bl = b.unwrap(); - let blob = bl.read().expect("blob read lock in streamer::broadcast"); + let blob = bl.read(); //TODO profile this, may need multiple sockets for par_iter trace!( "{}: BROADCAST idx: {} sz: {} to {},{} coding: {}", @@ -622,10 +622,9 @@ impl Crdt { (s.my_data().clone(), s.table.values().cloned().collect()) }; blob.write() - .unwrap() .set_id(me.id) .expect("set_id in pub fn retransmit"); - let rblob = blob.read().unwrap(); + let rblob = blob.read(); let orders: Vec<_> = table .iter() .filter(|v| { @@ -880,10 +879,10 @@ impl Crdt { /// randomly pick a node and ask them for updates asynchronously pub fn gossip( obj: Arc>, - blob_recycler: BlobRecycler, blob_sender: BlobSender, exit: Arc, ) -> JoinHandle<()> { + let blob_recycler = BlobRecycler::default(); Builder::new() .name("solana-gossip".to_string()) .spawn(move || loop { @@ -913,8 +912,8 @@ impl Crdt { blob_recycler: &BlobRecycler, ) -> Option { let pos = (ix as usize) % window.read().unwrap().len(); - if let Some(blob) = &window.read().unwrap()[pos].data { - let mut wblob = blob.write().unwrap(); + if let Some(ref mut blob) = &mut window.write().unwrap()[pos].data { + let mut wblob = blob.write(); let blob_ix = wblob.get_index().expect("run_window_request get_index"); if blob_ix == ix { let num_retransmits = wblob.meta.num_retransmits; @@ -937,7 +936,7 @@ impl Crdt { // copy to avoid doing IO inside the lock { - let mut outblob = out.write().unwrap(); + let mut outblob = out.write(); let sz = wblob.meta.size; outblob.meta.size = sz; outblob.data[..sz].copy_from_slice(&wblob.data[..sz]); @@ -1177,16 +1176,11 @@ impl Crdt { } let mut resps = Vec::new(); for req in reqs { - if let Some(resp) = Self::handle_blob( - obj, - window, - ledger_window, - blob_recycler, - &req.read().unwrap(), - ) { + if let Some(resp) = + Self::handle_blob(obj, window, ledger_window, blob_recycler, &req.read()) + { resps.push(resp); } - blob_recycler.recycle(req, "run_listen"); } response_sender.send(resps)?; Ok(()) @@ -1195,12 +1189,12 @@ impl Crdt { me: Arc>, window: SharedWindow, ledger_path: Option<&str>, - blob_recycler: BlobRecycler, requests_receiver: BlobReceiver, response_sender: BlobSender, exit: Arc, ) -> JoinHandle<()> { let mut ledger_window = ledger_path.map(|p| LedgerWindow::open(p).unwrap()); + let blob_recycler = BlobRecycler::default(); Builder::new() .name("solana-listen".to_string()) @@ -1650,10 +1644,9 @@ mod tests { // check that the service works // and that it eventually produces a request for both nodes let (sender, reader) = channel(); - let recycler = BlobRecycler::default(); let exit = Arc::new(AtomicBool::new(false)); let obj = Arc::new(RwLock::new(crdt)); - let thread = Crdt::gossip(obj, recycler, sender, exit.clone()); + let thread = Crdt::gossip(obj, sender, exit.clone()); let mut one = false; let mut two = false; for _ in 0..30 { @@ -1664,9 +1657,9 @@ mod tests { } assert!(rv.len() > 0); for i in rv.iter() { - if i.read().unwrap().meta.addr() == nxt1.contact_info.ncp { + if i.read().meta.addr() == nxt1.contact_info.ncp { one = true; - } else if i.read().unwrap().meta.addr() == nxt2.contact_info.ncp { + } else if i.read().meta.addr() == nxt2.contact_info.ncp { two = true; } else { //unexpected request @@ -1774,7 +1767,7 @@ mod tests { ); assert!(rv.is_none()); let out = recycler.allocate(); - out.write().unwrap().meta.size = 200; + out.write().meta.size = 200; window.write().unwrap()[0].data = Some(out); let rv = Crdt::run_window_request( &me, @@ -1788,7 +1781,7 @@ mod tests { assert!(rv.is_some()); let v = rv.unwrap(); //test we copied the blob - assert_eq!(v.read().unwrap().meta.size, 200); + assert_eq!(v.read().meta.size, 200); let len = window.read().unwrap().len() as u64; let rv = Crdt::run_window_request( &me, @@ -1859,7 +1852,7 @@ mod tests { assert!(rv.is_none()); let blob = recycler.allocate(); let blob_size = 200; - blob.write().unwrap().meta.size = blob_size; + blob.write().meta.size = blob_size; window.write().unwrap()[0].data = Some(blob); let num_requests: u32 = 64; @@ -1873,7 +1866,7 @@ mod tests { 0, &recycler, ).unwrap(); - let blob = shared_blob.read().unwrap(); + let blob = shared_blob.read(); // Test we copied the blob assert_eq!(blob.meta.size, blob_size); diff --git a/src/entry.rs b/src/entry.rs index 6025a7ec2..03d569a5b 100644 --- a/src/entry.rs +++ b/src/entry.rs @@ -84,7 +84,7 @@ impl Entry { ) -> SharedBlob { let blob = blob_recycler.allocate(); { - let mut blob_w = blob.write().unwrap(); + let mut blob_w = blob.write(); let pos = { let mut out = Cursor::new(blob_w.data_mut()); serialize_into(&mut out, &self).expect("failed to serialize output"); diff --git a/src/erasure.rs b/src/erasure.rs index aebf41ac6..f1bac050f 100644 --- a/src/erasure.rs +++ b/src/erasure.rs @@ -75,7 +75,7 @@ pub const ERASURE_W: i32 = 32; // There are some alignment restrictions, blocks should be aligned by 16 bytes // which means their size should be >= 16 bytes pub fn generate_coding_blocks(coding: &mut [&mut [u8]], data: &[&[u8]]) -> Result<()> { - if data.len() == 0 { + if data.is_empty() { return Ok(()); } let k = data.len() as i32; @@ -130,7 +130,7 @@ pub fn decode_blocks( coding: &mut [&mut [u8]], erasures: &[i32], ) -> Result<()> { - if data.len() == 0 { + if data.is_empty() { return Ok(()); } let block_len = data[0].len(); @@ -247,7 +247,7 @@ pub fn generate_coding( trace!("{} window[{}] = {:?}", id, n, window[n].data); if let Some(b) = &window[n].data { - max_data_size = cmp::max(b.read().unwrap().meta.size, max_data_size); + max_data_size = cmp::max(b.read().meta.size, max_data_size); } else { trace!("{} data block is null @ {}", id, n); return Ok(()); @@ -266,7 +266,7 @@ pub fn generate_coding( if let Some(b) = &window[n].data { // make sure extra bytes in each blob are zero-d out for generation of // coding blobs - let mut b_wl = b.write().unwrap(); + let mut b_wl = b.write(); for i in b_wl.meta.size..max_data_size { b_wl.data[i] = 0; } @@ -288,13 +288,13 @@ pub fn generate_coding( window[n].coding = Some(recycler.allocate()); let coding = window[n].coding.clone().unwrap(); - let mut coding_wl = coding.write().unwrap(); + let mut coding_wl = coding.write(); for i in 0..max_data_size { coding_wl.data[i] = 0; } // copy index and id from the data blob if let Some(data) = &window[n].data { - let data_rl = data.read().unwrap(); + let data_rl = data.read(); let index = data_rl.get_index().unwrap(); let id = data_rl.get_id().unwrap(); @@ -316,10 +316,7 @@ pub fn generate_coding( coding_blobs.push(coding.clone()); } - let data_locks: Vec<_> = data_blobs - .iter() - .map(|b| b.read().expect("'data_locks' of data_blobs")) - .collect(); + let data_locks: Vec<_> = data_blobs.iter().map(|b| b.read()).collect(); let data_ptrs: Vec<_> = data_locks .iter() @@ -329,10 +326,7 @@ pub fn generate_coding( &l.data[..max_data_size] }).collect(); - let mut coding_locks: Vec<_> = coding_blobs - .iter() - .map(|b| b.write().expect("'coding_locks' of coding_blobs")) - .collect(); + let mut coding_locks: Vec<_> = coding_blobs.iter().map(|b| b.write()).collect(); let mut coding_ptrs: Vec<_> = coding_locks .iter_mut() @@ -364,7 +358,7 @@ fn is_missing( c_or_d: &str, ) -> bool { if let Some(blob) = window_slot.take() { - let blob_idx = blob.read().unwrap().get_index().unwrap(); + let blob_idx = blob.read().get_index().unwrap(); if blob_idx == idx { trace!("recover {}: idx: {} good {}", id, idx, c_or_d); // put it back @@ -489,7 +483,7 @@ pub fn recover( if let Some(b) = window[j].data.clone() { if meta.is_none() { - meta = Some(b.read().unwrap().meta.clone()); + meta = Some(b.read().meta.clone()); trace!("recover {} meta at {} {:?}", id, j, meta); } blobs.push(b); @@ -505,7 +499,7 @@ pub fn recover( let j = i % window.len(); if let Some(b) = window[j].coding.clone() { if size.is_none() { - size = Some(b.read().unwrap().meta.size - BLOB_HEADER_SIZE); + size = Some(b.read().meta.size - BLOB_HEADER_SIZE); trace!( "{} recover size {} from {}", id, @@ -529,7 +523,7 @@ pub fn recover( let j = i % window.len(); if let Some(b) = &window[j].data { - let mut b_wl = b.write().unwrap(); + let mut b_wl = b.write(); for i in b_wl.meta.size..size { b_wl.data[i] = 0; } @@ -541,7 +535,7 @@ pub fn recover( trace!("erasures[]: {} {:?} data_size: {}", id, erasures, size,); //lock everything for write for b in &blobs { - locks.push(b.write().expect("'locks' arr in pb fn recover")); + locks.push(b.write()); } { @@ -680,7 +674,7 @@ mod test { print!("window({:>w$}): ", i, w = 2); if w.data.is_some() { let window_l1 = w.data.clone().unwrap(); - let window_l2 = window_l1.read().unwrap(); + let window_l2 = window_l1.read(); print!( "data index: {:?} meta.size: {} data: ", window_l2.get_index(), @@ -692,11 +686,11 @@ mod test { } else { print!("data null "); } - println!(""); + println!(); print!("window({:>w$}): ", i, w = 2); if w.coding.is_some() { let window_l1 = w.coding.clone().unwrap(); - let window_l2 = window_l1.read().unwrap(); + let window_l2 = window_l1.read(); print!( "coding index: {:?} meta.size: {} data: ", window_l2.get_index(), @@ -708,7 +702,7 @@ mod test { } else { print!("coding null"); } - println!(""); + println!(); } } @@ -730,7 +724,7 @@ mod test { for i in 0..num_blobs { let b = blob_recycler.allocate(); let b_ = b.clone(); - let mut w = b.write().unwrap(); + let mut w = b.write(); // generate a random length, multiple of 4 between 8 and 32 let data_len = if i == 3 { BLOB_DATA_SIZE @@ -762,7 +756,7 @@ mod test { ); assert!(index_blobs(&d, &blobs, &mut (offset as u64)).is_ok()); for b in blobs { - let idx = b.read().unwrap().get_index().unwrap() as usize % WINDOW_SIZE; + let idx = b.read().get_index().unwrap() as usize % WINDOW_SIZE; window[idx].data = Some(b); } @@ -773,11 +767,11 @@ mod test { for i in 0..num_blobs { if let Some(b) = &window[i].data { let size = { - let b_l = b.read().unwrap(); + let b_l = b.read(); b_l.meta.size } as usize; - let mut b_l = b.write().unwrap(); + let mut b_l = b.write(); for i in size..BLOB_SIZE { b_l.data[i] = thread_rng().gen(); } @@ -790,7 +784,7 @@ mod test { for _ in 0..WINDOW_SIZE * 10 { let blob = blob_recycler.allocate(); { - let mut b_l = blob.write().unwrap(); + let mut b_l = blob.write(); for i in 0..BLOB_SIZE { b_l.data[i] = thread_rng().gen(); @@ -821,7 +815,7 @@ mod test { for slot in &window { if let Some(blob) = &slot.data { - let blob_r = blob.read().unwrap(); + let blob_r = blob.read(); assert!(!blob_r.is_coding()); } } @@ -875,9 +869,9 @@ mod test { // Check the result, block is here to drop locks let window_l = window[erase_offset].data.clone().unwrap(); - let window_l2 = window_l.read().unwrap(); + let window_l2 = window_l.read(); let ref_l = refwindow.clone().unwrap(); - let ref_l2 = ref_l.read().unwrap(); + let ref_l2 = ref_l.read(); assert_eq!(window_l2.meta.size, ref_l2.meta.size); assert_eq!( @@ -924,9 +918,9 @@ mod test { { // Check the result, block is here to drop locks let window_l = window[erase_offset].data.clone().unwrap(); - let window_l2 = window_l.read().unwrap(); + let window_l2 = window_l.read(); let ref_l = refwindow.clone().unwrap(); - let ref_l2 = ref_l.read().unwrap(); + let ref_l2 = ref_l.read(); assert_eq!(window_l2.meta.size, ref_l2.meta.size); assert_eq!( window_l2.data[..window_l2.meta.size], @@ -947,10 +941,7 @@ mod test { // Create a hole in the window by making the blob's index stale let refwindow = window[offset].data.clone(); if let Some(blob) = &window[erase_offset].data { - blob.write() - .unwrap() - .set_index(erase_offset as u64) - .unwrap(); // this also writes to refwindow... + blob.write().set_index(erase_offset as u64).unwrap(); // this also writes to refwindow... } print_window(&window); @@ -970,7 +961,6 @@ mod test { // fix refwindow, we wrote to it above... if let Some(blob) = &refwindow { blob.write() - .unwrap() .set_index((erase_offset + WINDOW_SIZE) as u64) .unwrap(); // this also writes to refwindow... } @@ -978,9 +968,9 @@ mod test { { // Check the result, block is here to drop locks let window_l = window[erase_offset].data.clone().unwrap(); - let window_l2 = window_l.read().unwrap(); + let window_l2 = window_l.read(); let ref_l = refwindow.clone().unwrap(); - let ref_l2 = ref_l.read().unwrap(); + let ref_l2 = ref_l.read(); assert_eq!(window_l2.meta.size, ref_l2.meta.size); assert_eq!( window_l2.data[..window_l2.meta.size], diff --git a/src/fetch_stage.rs b/src/fetch_stage.rs index 1fcdb7cee..f1f5615ac 100644 --- a/src/fetch_stage.rs +++ b/src/fetch_stage.rs @@ -1,6 +1,5 @@ //! The `fetch_stage` batches input from a UDP socket and sends it to a channel. -use packet::PacketRecycler; use service::Service; use std::net::UdpSocket; use std::sync::atomic::{AtomicBool, Ordering}; @@ -15,25 +14,19 @@ pub struct FetchStage { } impl FetchStage { - pub fn new( - sockets: Vec, - exit: Arc, - recycler: &PacketRecycler, - ) -> (Self, PacketReceiver) { + pub fn new(sockets: Vec, exit: Arc) -> (Self, PacketReceiver) { let tx_sockets = sockets.into_iter().map(Arc::new).collect(); - Self::new_multi_socket(tx_sockets, exit, recycler) + Self::new_multi_socket(tx_sockets, exit) } pub fn new_multi_socket( sockets: Vec>, exit: Arc, - recycler: &PacketRecycler, ) -> (Self, PacketReceiver) { let (sender, receiver) = channel(); let thread_hdls: Vec<_> = sockets .into_iter() - .map(|socket| { - streamer::receiver(socket, exit.clone(), recycler.clone(), sender.clone()) - }).collect(); + .map(|socket| streamer::receiver(socket, exit.clone(), sender.clone())) + .collect(); (FetchStage { exit, thread_hdls }, receiver) } diff --git a/src/fullnode.rs b/src/fullnode.rs index c6d79bd9d..0fa5a3019 100644 --- a/src/fullnode.rs +++ b/src/fullnode.rs @@ -7,7 +7,6 @@ use drone::DRONE_PORT; use entry::Entry; use ledger::read_ledger; use ncp::Ncp; -use packet::BlobRecycler; use rpc::{JsonRpcService, RPC_PORT}; use rpu::Rpu; use service::Service; @@ -91,7 +90,6 @@ pub struct Fullnode { broadcast_socket: UdpSocket, requests_socket: UdpSocket, respond_socket: UdpSocket, - blob_recycler: BlobRecycler, } #[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] @@ -235,8 +233,6 @@ impl Fullnode { } let exit = Arc::new(AtomicBool::new(false)); let bank = Arc::new(bank); - let mut blob_recycler = BlobRecycler::default(); - blob_recycler.set_name("fullnode::Blob"); let rpu = Some(Rpu::new( &bank, @@ -248,7 +244,6 @@ impl Fullnode { .respond .try_clone() .expect("Failed to clone respond socket"), - &blob_recycler, )); // TODO: this code assumes this node is the leader @@ -263,8 +258,7 @@ impl Fullnode { exit.clone(), ); - let window = - window::new_window_from_entries(ledger_tail, entry_height, &node.info, &blob_recycler); + let window = window::new_window_from_entries(ledger_tail, entry_height, &node.info); let shared_window = Arc::new(RwLock::new(window)); let mut crdt = Crdt::new(node.info).expect("Crdt::new"); @@ -276,7 +270,6 @@ impl Fullnode { let ncp = Ncp::new( &crdt, shared_window.clone(), - blob_recycler.clone(), Some(ledger_path), node.sockets.gossip, exit.clone(), @@ -295,7 +288,6 @@ impl Fullnode { entry_height, crdt.clone(), shared_window.clone(), - blob_recycler.clone(), node.sockets .replicate .iter() @@ -330,7 +322,6 @@ impl Fullnode { .iter() .map(|s| s.try_clone().expect("Failed to clone transaction sockets")) .collect(), - &blob_recycler, ledger_path, sigverify_disabled, entry_height, @@ -344,7 +335,6 @@ impl Fullnode { crdt.clone(), shared_window.clone(), entry_height, - blob_recycler.clone(), entry_receiver, tpu_exit, ); @@ -363,7 +353,6 @@ impl Fullnode { ncp, rpc_service, node_role, - blob_recycler: blob_recycler.clone(), ledger_path: ledger_path.to_owned(), exit, replicate_socket: node.sockets.replicate, @@ -406,7 +395,6 @@ impl Fullnode { self.respond_socket .try_clone() .expect("Failed to clone respond socket"), - &self.blob_recycler, )); } @@ -416,7 +404,6 @@ impl Fullnode { entry_height, self.crdt.clone(), self.shared_window.clone(), - self.blob_recycler.clone(), self.replicate_socket .iter() .map(|s| s.try_clone().expect("Failed to clone replicate sockets")) diff --git a/src/ledger.rs b/src/ledger.rs index 58469cbab..26ebcb020 100644 --- a/src/ledger.rs +++ b/src/ledger.rs @@ -456,7 +456,7 @@ pub fn reconstruct_entries_from_blobs(blobs: Vec) -> Result>, window: SharedWindow, - blob_recycler: BlobRecycler, ledger_path: Option<&str>, gossip_socket: UdpSocket, exit: Arc, @@ -32,29 +30,19 @@ impl Ncp { &crdt.read().unwrap().id.as_ref()[..4], gossip_socket.local_addr().unwrap() ); - let t_receiver = streamer::blob_receiver( - gossip_socket.clone(), - exit.clone(), - blob_recycler.clone(), - request_sender, - ); + let t_receiver = + streamer::blob_receiver(gossip_socket.clone(), exit.clone(), request_sender); let (response_sender, response_receiver) = channel(); - let t_responder = streamer::responder( - "ncp", - gossip_socket, - blob_recycler.clone(), - response_receiver, - ); + let t_responder = streamer::responder("ncp", gossip_socket, response_receiver); let t_listen = Crdt::listen( crdt.clone(), window, ledger_path, - blob_recycler.clone(), request_receiver, response_sender.clone(), exit.clone(), ); - let t_gossip = Crdt::gossip(crdt.clone(), blob_recycler, response_sender, exit.clone()); + let t_gossip = Crdt::gossip(crdt.clone(), response_sender, exit.clone()); let thread_hdls = vec![t_receiver, t_responder, t_listen, t_gossip]; Ncp { exit, thread_hdls } } @@ -80,7 +68,6 @@ impl Service for Ncp { mod tests { use crdt::{Crdt, Node}; use ncp::Ncp; - use packet::BlobRecycler; use std::sync::atomic::AtomicBool; use std::sync::{Arc, RwLock}; @@ -93,14 +80,7 @@ mod tests { let crdt = Crdt::new(tn.info.clone()).expect("Crdt::new"); let c = Arc::new(RwLock::new(crdt)); let w = Arc::new(RwLock::new(vec![])); - let d = Ncp::new( - &c, - w, - BlobRecycler::default(), - None, - tn.sockets.gossip, - exit.clone(), - ); + let d = Ncp::new(&c, w, None, tn.sockets.gossip, exit.clone()); d.close().expect("thread join"); } } diff --git a/src/packet.rs b/src/packet.rs index 206c522f0..1f7ef1821 100644 --- a/src/packet.rs +++ b/src/packet.rs @@ -4,6 +4,7 @@ use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt}; use counter::Counter; use log::Level; use recvmmsg::{recv_mmsg, NUM_RCVMMSGS}; +use recycler; use result::{Error, Result}; use serde::Serialize; use signature::Pubkey; @@ -11,14 +12,13 @@ use std::fmt; use std::io; use std::mem::size_of; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, UdpSocket}; -use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::{Arc, Mutex, RwLock}; +use std::sync::atomic::AtomicUsize; -pub type SharedPackets = Arc>; -pub type SharedBlob = Arc>; +pub type SharedPackets = recycler::Recyclable; +pub type SharedBlob = recycler::Recyclable; pub type SharedBlobs = Vec; -pub type PacketRecycler = Recycler; -pub type BlobRecycler = Recycler; +pub type PacketRecycler = recycler::Recycler; +pub type BlobRecycler = recycler::Recycler; pub const NUM_PACKETS: usize = 1024 * 8; pub const BLOB_SIZE: usize = (64 * 1024 - 128); // wikipedia says there should be 20b for ipv4 headers @@ -63,14 +63,7 @@ impl Default for Packet { } } -pub trait Reset { - // Reset trait is an object that can re-initialize important parts - // of itself, similar to Default, but not necessarily a full clear - // also, we do it in-place. - fn reset(&mut self); -} - -impl Reset for Packet { +impl recycler::Reset for Packet { fn reset(&mut self) { self.meta = Meta::default(); } @@ -130,7 +123,7 @@ impl Default for Packets { } } -impl Reset for Packets { +impl recycler::Reset for Packets { fn reset(&mut self) { for i in 0..self.packets.len() { self.packets[i].reset(); @@ -165,7 +158,7 @@ impl Default for Blob { } } -impl Reset for Blob { +impl recycler::Reset for Blob { fn reset(&mut self) { self.meta = Meta::default(); self.data[..BLOB_HEADER_SIZE].copy_from_slice(&[0u8; BLOB_HEADER_SIZE]); @@ -178,118 +171,6 @@ pub enum BlobError { BadState, } -pub struct Recycler { - #[cfg_attr(feature = "cargo-clippy", allow(type_complexity))] - gc: Arc>, &'static str)>>>, - allocated_count: Arc, - recycled_count: Arc, - reuse_count: Arc, - skipped_count: Arc, - name: String, -} - -impl Default for Recycler { - fn default() -> Recycler { - Recycler { - gc: Arc::new(Mutex::new(vec![])), - allocated_count: Arc::new(AtomicUsize::new(0)), - recycled_count: Arc::new(AtomicUsize::new(0)), - reuse_count: Arc::new(AtomicUsize::new(0)), - skipped_count: Arc::new(AtomicUsize::new(0)), - name: format!("? sz: {}", size_of::()).to_string(), - } - } -} - -impl Clone for Recycler { - fn clone(&self) -> Recycler { - Recycler { - gc: self.gc.clone(), - allocated_count: self.allocated_count.clone(), - recycled_count: self.recycled_count.clone(), - reuse_count: self.reuse_count.clone(), - skipped_count: self.skipped_count.clone(), - name: self.name.clone(), - } - } -} - -fn inc_counter(x: &AtomicUsize) { - x.fetch_add(1, Ordering::Relaxed); -} - -impl Recycler { - pub fn set_name(&mut self, name: &'static str) { - self.name = name.to_string(); - } - - pub fn allocate(&self) -> Arc> { - let mut gc = self.gc.lock().expect("recycler lock in pb fn allocate"); - let gc_count = gc.len(); - - loop { - if let Some((x, who)) = gc.pop() { - // Only return the item if this recycler is the last reference to it. - // Remove this check once `T` holds a Weak reference back to this - // recycler and implements `Drop`. At the time of this writing, Weak can't - // be passed across threads ('alloc' is a nightly-only API), and so our - // reference-counted recyclables are awkwardly being recycled by hand, - // which allows this race condition to exist. - if Arc::strong_count(&x) > 1 { - // Commenting out this message, is annoying for known use case of - // validator hanging onto a blob in the window, but also sending it over - // to retransmmit_request - // - // warn!("Recycled item still in use. Booting it."); - trace!( - "{} Recycled item from \"{}\" still in use. {} Booting it.", - self.name, - who, - Arc::strong_count(&x) - ); - inc_counter(&self.skipped_count); - continue; - } - - { - let mut w = x.write().unwrap(); - w.reset(); - } - inc_counter(&self.reuse_count); - return x; - } else { - inc_counter(&self.allocated_count); - if self.allocated_count.load(Ordering::Relaxed) % 2048 == 0 { - self.print_stats(gc_count); - } - return Arc::new(RwLock::new(Default::default())); - } - } - } - - fn print_stats(&self, gc_count: usize) { - info!( - "{} recycler stats: allocated: {} reused: {} skipped: {} recycled: {} gc_count: {}", - self.name, - self.allocated_count.load(Ordering::Relaxed), - self.reuse_count.load(Ordering::Relaxed), - self.skipped_count.load(Ordering::Relaxed), - self.recycled_count.load(Ordering::Relaxed), - gc_count - ); - } - - pub fn recycle(&self, x: Arc>, who: &'static str) { - let mut gc = self.gc.lock().expect("recycler lock in pub fn recycle"); - inc_counter(&self.recycled_count); - if self.recycled_count.load(Ordering::Relaxed) % 2048 == 0 { - self.print_stats(0); - } - - gc.push((x, who)); - } -} - impl Packets { fn run_read_from(&mut self, socket: &UdpSocket) -> Result { self.packets.resize(NUM_PACKETS, Packet::default()); @@ -348,12 +229,9 @@ pub fn to_packets_chunked( ) -> Vec { let mut out = vec![]; for x in xs.chunks(chunks) { - let p = r.allocate(); - p.write() - .unwrap() - .packets - .resize(x.len(), Default::default()); - for (i, o) in x.iter().zip(p.write().unwrap().packets.iter_mut()) { + let mut p = r.allocate(); + p.write().packets.resize(x.len(), Default::default()); + for (i, o) in x.iter().zip(p.write().packets.iter_mut()) { let v = serialize(&i).expect("serialize request"); let len = v.len(); o.data[..len].copy_from_slice(&v); @@ -375,7 +253,7 @@ pub fn to_blob( ) -> Result { let blob = blob_recycler.allocate(); { - let mut b = blob.write().unwrap(); + let mut b = blob.write(); let v = serialize(&resp)?; let len = v.len(); assert!(len <= BLOB_SIZE); @@ -492,7 +370,7 @@ impl Blob { } pub fn recv_blob(socket: &UdpSocket, r: &SharedBlob) -> io::Result<()> { - let mut p = r.write().expect("'r' write lock in pub fn recv_from"); + let mut p = r.write(); trace!("receiving on {}", socket.local_addr().unwrap()); let (nrecv, from) = socket.recv_from(&mut p.data)?; @@ -517,14 +395,12 @@ impl Blob { match Blob::recv_blob(socket, &r) { Err(_) if i > 0 => { trace!("got {:?} messages on {}", i, socket.local_addr().unwrap()); - re.recycle(r, "Bob::recv_from::i>0"); break; } Err(e) => { if e.kind() != io::ErrorKind::WouldBlock { info!("recv_from err {:?}", e); } - re.recycle(r, "Blob::recv_from::empty"); return Err(Error::IO(e)); } Ok(()) => if i == 0 { @@ -535,10 +411,10 @@ impl Blob { } Ok(v) } - pub fn send_to(re: &BlobRecycler, socket: &UdpSocket, v: SharedBlobs) -> Result<()> { + pub fn send_to(socket: &UdpSocket, v: SharedBlobs) -> Result<()> { for r in v { { - let p = r.read().expect("'r' read lock in pub fn send_to"); + let p = r.read(); let a = p.meta.addr(); if let Err(e) = socket.send_to(&p.data[..p.meta.size], &a) { warn!( @@ -548,7 +424,6 @@ impl Blob { Err(e)?; } } - re.recycle(r, "send_to"); } Ok(()) } @@ -557,87 +432,15 @@ impl Blob { #[cfg(test)] mod tests { use packet::{ - to_packets, Blob, BlobRecycler, Meta, Packet, PacketRecycler, Packets, Recycler, Reset, - BLOB_HEADER_SIZE, NUM_PACKETS, PACKET_DATA_SIZE, + to_packets, Blob, BlobRecycler, Meta, Packet, PacketRecycler, Packets, BLOB_HEADER_SIZE, + NUM_PACKETS, PACKET_DATA_SIZE, }; + use recycler::Reset; use request::Request; use std::io; use std::io::Write; use std::net::UdpSocket; - use std::sync::Arc; - #[test] - pub fn packet_recycler_test() { - let r = PacketRecycler::default(); - let p = r.allocate(); - r.recycle(p, "recycler_test"); - assert_eq!(r.gc.lock().unwrap().len(), 1); - let _ = r.allocate(); - assert_eq!(r.gc.lock().unwrap().len(), 0); - } - - impl Reset for u8 { - fn reset(&mut self) { - *self = Default::default(); - } - } - - #[test] - pub fn test_leaked_recyclable() { - // Ensure that the recycler won't return an item - // that is still referenced outside the recycler. - let r = Recycler::::default(); - let x0 = r.allocate(); - r.recycle(x0.clone(), "leaked_recyclable:1"); - assert_eq!(Arc::strong_count(&x0), 2); - assert_eq!(r.gc.lock().unwrap().len(), 1); - - let x1 = r.allocate(); - assert_eq!(Arc::strong_count(&x1), 1); - assert_eq!(r.gc.lock().unwrap().len(), 0); - } - - #[test] - pub fn test_leaked_recyclable_recursion() { - // In the case of a leaked recyclable, ensure the recycler drops its lock before recursing. - let r = Recycler::::default(); - let x0 = r.allocate(); - let x1 = r.allocate(); - r.recycle(x0, "leaked_recyclable_recursion:1"); // <-- allocate() of this will require locking the recycler's stack. - r.recycle(x1.clone(), "leaked_recyclable_recursion:2"); // <-- allocate() of this will cause it to be dropped and recurse. - assert_eq!(Arc::strong_count(&x1), 2); - assert_eq!(r.gc.lock().unwrap().len(), 2); - - r.allocate(); // Ensure lock is released before recursing. - assert_eq!(r.gc.lock().unwrap().len(), 0); - } - - #[test] - pub fn test_recycling_is_happening() { - // Test the case in allocate() which should return a re-used object and not allocate a new - // one. - let r = PacketRecycler::default(); - let x0 = r.allocate(); - { - x0.write().unwrap().packets.resize(1, Packet::default()); - } - r.recycle(x0, "recycle"); - let x1 = r.allocate(); - assert_ne!( - x1.read().unwrap().packets.len(), - Packets::default().packets.len() - ); - } - - #[test] - pub fn blob_recycler_test() { - let r = BlobRecycler::default(); - let p = r.allocate(); - r.recycle(p, "blob_recycler_test"); - assert_eq!(r.gc.lock().unwrap().len(), 1); - let _ = r.allocate(); - assert_eq!(r.gc.lock().unwrap().len(), 0); - } #[test] pub fn packet_send_recv() { let reader = UdpSocket::bind("127.0.0.1:0").expect("bind"); @@ -646,14 +449,14 @@ mod tests { let saddr = sender.local_addr().unwrap(); let r = PacketRecycler::default(); let p = r.allocate(); - p.write().unwrap().packets.resize(10, Packet::default()); - for m in p.write().unwrap().packets.iter_mut() { + p.write().packets.resize(10, Packet::default()); + for m in p.write().packets.iter_mut() { m.meta.set_addr(&addr); m.meta.size = PACKET_DATA_SIZE; } - p.read().unwrap().send_to(&sender).unwrap(); - p.write().unwrap().recv_from(&reader).unwrap(); - for m in p.write().unwrap().packets.iter_mut() { + p.read().send_to(&sender).unwrap(); + p.write().recv_from(&reader).unwrap(); + for m in p.write().packets.iter_mut() { assert_eq!(m.meta.size, PACKET_DATA_SIZE); assert_eq!(m.meta.addr(), saddr); } @@ -667,16 +470,16 @@ mod tests { let re = PacketRecycler::default(); let rv = to_packets(&re, &vec![tx.clone(); 1]); assert_eq!(rv.len(), 1); - assert_eq!(rv[0].read().unwrap().packets.len(), 1); + assert_eq!(rv[0].read().packets.len(), 1); let rv = to_packets(&re, &vec![tx.clone(); NUM_PACKETS]); assert_eq!(rv.len(), 1); - assert_eq!(rv[0].read().unwrap().packets.len(), NUM_PACKETS); + assert_eq!(rv[0].read().packets.len(), NUM_PACKETS); let rv = to_packets(&re, &vec![tx.clone(); NUM_PACKETS + 1]); assert_eq!(rv.len(), 2); - assert_eq!(rv[0].read().unwrap().packets.len(), NUM_PACKETS); - assert_eq!(rv[1].read().unwrap().packets.len(), 1); + assert_eq!(rv[0].read().packets.len(), NUM_PACKETS); + assert_eq!(rv[1].read().packets.len(), 1); } #[test] @@ -687,15 +490,15 @@ mod tests { let sender = UdpSocket::bind("127.0.0.1:0").expect("bind"); let r = BlobRecycler::default(); let p = r.allocate(); - p.write().unwrap().meta.set_addr(&addr); - p.write().unwrap().meta.size = 1024; + p.write().meta.set_addr(&addr); + p.write().meta.size = 1024; let v = vec![p]; - Blob::send_to(&r, &sender, v).unwrap(); + Blob::send_to(&sender, v).unwrap(); trace!("send_to"); let rv = Blob::recv_from(&r, &reader).unwrap(); trace!("recv_from"); assert_eq!(rv.len(), 1); - assert_eq!(rv[0].write().unwrap().meta.size, 1024); + assert_eq!(rv[0].read().meta.size, 1024); } #[cfg(all(feature = "ipv6", test))] @@ -706,14 +509,14 @@ mod tests { let sender = UdpSocket::bind("[::1]:0").expect("bind"); let r = BlobRecycler::default(); let p = r.allocate(); - p.write().unwrap().meta.set_addr(&addr); - p.write().unwrap().meta.size = 1024; + p.as_mut().meta.set_addr(&addr); + p.as_mut().meta.size = 1024; let mut v = VecDeque::default(); v.push_back(p); Blob::send_to(&r, &sender, &mut v).unwrap(); let mut rv = Blob::recv_from(&r, &reader).unwrap(); let rp = rv.pop_front().unwrap(); - assert_eq!(rp.write().unwrap().meta.size, 1024); + assert_eq!(rp.as_mut().meta.size, 1024); r.recycle(rp, "blob_ip6_send_recv"); } diff --git a/src/record_stage.rs b/src/record_stage.rs index 3edbb7a66..e2da86655 100644 --- a/src/record_stage.rs +++ b/src/record_stage.rs @@ -39,7 +39,8 @@ impl RecordStage { .name("solana-record-stage".to_string()) .spawn(move || { let mut recorder = Recorder::new(start_hash); - let _ = Self::process_signals(&mut recorder, &signal_receiver, bank, &entry_sender); + let _ = + Self::process_signals(&mut recorder, &signal_receiver, &bank, &entry_sender); }).unwrap(); (RecordStage { thread_hdl }, entry_receiver) @@ -65,7 +66,7 @@ impl RecordStage { start_time, tick_duration, &signal_receiver, - bank.clone(), + &bank, &entry_sender, ).is_err() { @@ -92,7 +93,7 @@ impl RecordStage { let txs_len = txs.len(); let entries = recorder.record(txs); - for entry in entries.iter() { + for entry in &entries { if !entry.has_more { bank.register_entry_id(&entry.id); } @@ -110,12 +111,12 @@ impl RecordStage { fn process_signals( recorder: &mut Recorder, receiver: &Receiver, - bank: Arc, + bank: &Arc, sender: &Sender>, ) -> Result<(), ()> { loop { match receiver.recv() { - Ok(signal) => Self::process_signal(signal, &bank, recorder, sender)?, + Ok(signal) => Self::process_signal(signal, bank, recorder, sender)?, Err(RecvError) => return Err(()), } } @@ -126,7 +127,7 @@ impl RecordStage { start_time: Instant, tick_duration: Duration, receiver: &Receiver, - bank: Arc, + bank: &Arc, sender: &Sender>, ) -> Result<(), ()> { loop { diff --git a/src/recycler.rs b/src/recycler.rs new file mode 100644 index 000000000..2234b7079 --- /dev/null +++ b/src/recycler.rs @@ -0,0 +1,173 @@ +use std::fmt; +use std::sync::{Arc, Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard}; + +/// A function that leaves the given type in the same state as Default, +/// but starts with an existing type instead of allocating a new one. +pub trait Reset { + fn reset(&mut self); +} + +/// An value that's returned to its heap once dropped. +pub struct Recyclable { + val: Arc>, + landfill: Arc>>>>, +} + +impl Recyclable { + pub fn read(&self) -> RwLockReadGuard { + self.val.read().unwrap() + } + pub fn write(&self) -> RwLockWriteGuard { + self.val.write().unwrap() + } +} + +impl Drop for Recyclable { + fn drop(&mut self) { + if Arc::strong_count(&self.val) == 1 { + // this isn't thread safe, it will allow some concurrent drops to leak and not recycle + // if that happens the allocator will end up allocating from the heap + self.landfill.lock().unwrap().push(self.val.clone()); + } + } +} + +impl Clone for Recyclable { + fn clone(&self) -> Self { + Recyclable { + val: self.val.clone(), + landfill: self.landfill.clone(), + } + } +} + +impl fmt::Debug for Recyclable { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "Recyclable {:?}", &self.read()) + } +} + +/// An object to minimize memory allocations. Use `allocate()` +/// to get recyclable values of type `T`. When those recyclables +/// are dropped, they're returned to the recycler. The next time +/// `allocate()` is called, the value will be pulled from the +/// recycler instead being allocated from memory. + +pub struct Recycler { + landfill: Arc>>>>, +} +impl Clone for Recycler { + fn clone(&self) -> Self { + Recycler { + landfill: self.landfill.clone(), + } + } +} + +impl Default for Recycler { + fn default() -> Self { + Recycler { + landfill: Arc::new(Mutex::new(vec![])), + } + } +} + +impl Recycler { + pub fn allocate(&self) -> Recyclable { + let val = self + .landfill + .lock() + .unwrap() + .pop() + .map(|val| { + val.write().unwrap().reset(); + val + }).unwrap_or_else(|| Arc::new(RwLock::new(Default::default()))); + Recyclable { + val, + landfill: self.landfill.clone(), + } + } + pub fn recycle(&self, r: Recyclable, _name: &str) { + drop(r) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::mem; + use std::sync::mpsc::channel; + + #[derive(Default)] + struct Foo { + x: u8, + } + + impl Reset for Foo { + fn reset(&mut self) { + self.x = 0; + } + } + + #[test] + fn test_allocate() { + let recycler: Recycler = Recycler::default(); + let r = recycler.allocate(); + assert_eq!(r.read().x, 0); + } + + #[test] + fn test_recycle() { + let recycler: Recycler = Recycler::default(); + + { + let foo = recycler.allocate(); + foo.write().x = 1; + } + assert_eq!(recycler.landfill.lock().unwrap().len(), 1); + + let foo = recycler.allocate(); + assert_eq!(foo.read().x, 0); + assert_eq!(recycler.landfill.lock().unwrap().len(), 0); + } + #[test] + fn test_channel() { + let recycler: Recycler = Recycler::default(); + let (sender, receiver) = channel(); + { + let foo = recycler.allocate(); + foo.write().x = 1; + sender.send(foo).unwrap(); + assert_eq!(recycler.landfill.lock().unwrap().len(), 0); + } + { + let foo = receiver.recv().unwrap(); + assert_eq!(foo.read().x, 1); + assert_eq!(recycler.landfill.lock().unwrap().len(), 0); + } + assert_eq!(recycler.landfill.lock().unwrap().len(), 1); + } + #[test] + fn test_window() { + let recycler: Recycler = Recycler::default(); + let mut window = vec![None]; + let (sender, receiver) = channel(); + { + // item is in the window while its in the pipeline + // which is used to serve requests from other threads + let item = recycler.allocate(); + item.write().x = 1; + window[0] = Some(item); + sender.send(window[0].clone().unwrap()).unwrap(); + } + { + let foo = receiver.recv().unwrap(); + assert_eq!(foo.read().x, 1); + let old = mem::replace(&mut window[0], None).unwrap(); + assert_eq!(old.read().x, 1); + } + // only one thing should be in the landfill at the end + assert_eq!(recycler.landfill.lock().unwrap().len(), 1); + } +} diff --git a/src/replicate_stage.rs b/src/replicate_stage.rs index d147cad2d..bcb35863b 100644 --- a/src/replicate_stage.rs +++ b/src/replicate_stage.rs @@ -5,7 +5,6 @@ use counter::Counter; use crdt::Crdt; use ledger::{reconstruct_entries_from_blobs, Block, LedgerWriter}; use log::Level; -use packet::BlobRecycler; use result::{Error, Result}; use service::Service; use signature::Keypair; @@ -30,7 +29,6 @@ impl ReplicateStage { fn replicate_requests( bank: &Arc, crdt: &Arc>, - blob_recycler: &BlobRecycler, window_receiver: &BlobReceiver, ledger_writer: Option<&mut LedgerWriter>, ) -> Result<()> { @@ -40,14 +38,10 @@ impl ReplicateStage { while let Ok(mut more) = window_receiver.try_recv() { blobs.append(&mut more); } - let entries = reconstruct_entries_from_blobs(blobs.clone())?; + let entries = reconstruct_entries_from_blobs(blobs)?; let res = bank.process_entries(entries.clone()); - for blob in blobs { - blob_recycler.recycle(blob, "replicate_requests"); - } - { let mut wcrdt = crdt.write().unwrap(); wcrdt.insert_votes(&entries.votes()); @@ -70,41 +64,25 @@ impl ReplicateStage { keypair: Arc, bank: Arc, crdt: Arc>, - blob_recycler: BlobRecycler, window_receiver: BlobReceiver, ledger_path: Option<&str>, exit: Arc, ) -> Self { let (vote_blob_sender, vote_blob_receiver) = channel(); let send = UdpSocket::bind("0.0.0.0:0").expect("bind"); - let t_responder = responder( - "replicate_stage", - Arc::new(send), - blob_recycler.clone(), - vote_blob_receiver, - ); + let t_responder = responder("replicate_stage", Arc::new(send), vote_blob_receiver); - let vote_stage = VoteStage::new( - keypair, - bank.clone(), - crdt.clone(), - blob_recycler.clone(), - vote_blob_sender, - exit, - ); + let vote_stage = + VoteStage::new(keypair, bank.clone(), crdt.clone(), vote_blob_sender, exit); let mut ledger_writer = ledger_path.map(|p| LedgerWriter::open(p, false).unwrap()); let t_replicate = Builder::new() .name("solana-replicate-stage".to_string()) .spawn(move || loop { - if let Err(e) = Self::replicate_requests( - &bank, - &crdt, - &blob_recycler, - &window_receiver, - ledger_writer.as_mut(), - ) { + if let Err(e) = + Self::replicate_requests(&bank, &crdt, &window_receiver, ledger_writer.as_mut()) + { match e { Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break, Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (), diff --git a/src/request_stage.rs b/src/request_stage.rs index 90e962a37..a6b41e2f9 100644 --- a/src/request_stage.rs +++ b/src/request_stage.rs @@ -49,7 +49,7 @@ impl RequestStage { let mut reqs_len = 0; let proc_start = Instant::now(); for msgs in batch { - let reqs: Vec<_> = Self::deserialize_requests(&msgs.read().unwrap()) + let reqs: Vec<_> = Self::deserialize_requests(&msgs.read()) .into_iter() .filter_map(|x| x) .collect(); @@ -80,11 +80,11 @@ impl RequestStage { pub fn new( request_processor: RequestProcessor, packet_receiver: Receiver, - packet_recycler: PacketRecycler, - blob_recycler: BlobRecycler, ) -> (Self, BlobReceiver) { + let packet_recycler = PacketRecycler::default(); let request_processor = Arc::new(request_processor); let request_processor_ = request_processor.clone(); + let blob_recycler = BlobRecycler::default(); let (blob_sender, blob_receiver) = channel(); let thread_hdl = Builder::new() .name("solana-request-stage".to_string()) diff --git a/src/retransmit_stage.rs b/src/retransmit_stage.rs index 1bba4d08c..1f18aad19 100644 --- a/src/retransmit_stage.rs +++ b/src/retransmit_stage.rs @@ -3,7 +3,6 @@ use counter::Counter; use crdt::Crdt; use log::Level; -use packet::BlobRecycler; use result::{Error, Result}; use service::Service; use std::net::UdpSocket; @@ -17,24 +16,14 @@ use streamer::BlobReceiver; use window::SharedWindow; use window_service::window_service; -fn retransmit( - crdt: &Arc>, - recycler: &BlobRecycler, - r: &BlobReceiver, - sock: &UdpSocket, -) -> Result<()> { +fn retransmit(crdt: &Arc>, r: &BlobReceiver, sock: &UdpSocket) -> Result<()> { let timer = Duration::new(1, 0); let mut dq = r.recv_timeout(timer)?; while let Ok(mut nq) = r.try_recv() { dq.append(&mut nq); } - { - for b in &dq { - Crdt::retransmit(&crdt, b, sock)?; - } - } - for b in dq { - recycler.recycle(b, "retransmit"); + for b in &mut dq { + Crdt::retransmit(&crdt, b, sock)?; } Ok(()) } @@ -47,18 +36,13 @@ fn retransmit( /// * `crdt` - This structure needs to be updated and populated by the bank and via gossip. /// * `recycler` - Blob recycler. /// * `r` - Receive channel for blobs to be retransmitted to all the layer 1 nodes. -fn retransmitter( - sock: Arc, - crdt: Arc>, - recycler: BlobRecycler, - r: BlobReceiver, -) -> JoinHandle<()> { +fn retransmitter(sock: Arc, crdt: Arc>, r: BlobReceiver) -> JoinHandle<()> { Builder::new() .name("solana-retransmitter".to_string()) .spawn(move || { trace!("retransmitter started"); loop { - if let Err(e) = retransmit(&crdt, &recycler, &r, &sock) { + if let Err(e) = retransmit(&crdt, &r, &sock) { match e { Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break, Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (), @@ -83,23 +67,16 @@ impl RetransmitStage { entry_height: u64, retransmit_socket: Arc, repair_socket: Arc, - blob_recycler: &BlobRecycler, fetch_stage_receiver: BlobReceiver, ) -> (Self, BlobReceiver) { let (retransmit_sender, retransmit_receiver) = channel(); - let t_retransmit = retransmitter( - retransmit_socket, - crdt.clone(), - blob_recycler.clone(), - retransmit_receiver, - ); + let t_retransmit = retransmitter(retransmit_socket, crdt.clone(), retransmit_receiver); let (blob_sender, blob_receiver) = channel(); let t_window = window_service( crdt.clone(), window, entry_height, - blob_recycler.clone(), fetch_stage_receiver, blob_sender, retransmit_sender, diff --git a/src/rpu.rs b/src/rpu.rs index 6381ef766..766468cad 100644 --- a/src/rpu.rs +++ b/src/rpu.rs @@ -24,7 +24,6 @@ //! ``` use bank::Bank; -use packet::{BlobRecycler, PacketRecycler}; use request_processor::RequestProcessor; use request_stage::RequestStage; use service::Service; @@ -42,37 +41,15 @@ pub struct Rpu { } impl Rpu { - pub fn new( - bank: &Arc, - requests_socket: UdpSocket, - respond_socket: UdpSocket, - blob_recycler: &BlobRecycler, - ) -> Self { + pub fn new(bank: &Arc, requests_socket: UdpSocket, respond_socket: UdpSocket) -> Self { let exit = Arc::new(AtomicBool::new(false)); - let mut packet_recycler = PacketRecycler::default(); - packet_recycler.set_name("rpu::Packet"); let (packet_sender, packet_receiver) = channel(); - let t_receiver = streamer::receiver( - Arc::new(requests_socket), - exit.clone(), - packet_recycler.clone(), - packet_sender, - ); + let t_receiver = streamer::receiver(Arc::new(requests_socket), exit.clone(), packet_sender); let request_processor = RequestProcessor::new(bank.clone()); - let (request_stage, blob_receiver) = RequestStage::new( - request_processor, - packet_receiver, - packet_recycler.clone(), - blob_recycler.clone(), - ); + let (request_stage, blob_receiver) = RequestStage::new(request_processor, packet_receiver); - let t_responder = streamer::responder( - "rpu", - Arc::new(respond_socket), - blob_recycler.clone(), - blob_receiver, - ); + let t_responder = streamer::responder("rpu", Arc::new(respond_socket), blob_receiver); let thread_hdls = vec![t_receiver, t_responder]; diff --git a/src/sigverify.rs b/src/sigverify.rs index aa797d440..6c520e0d0 100644 --- a/src/sigverify.rs +++ b/src/sigverify.rs @@ -72,10 +72,7 @@ fn verify_packet_disabled(_packet: &Packet) -> u8 { } fn batch_size(batches: &[SharedPackets]) -> usize { - batches - .iter() - .map(|p| p.read().unwrap().packets.len()) - .sum() + batches.iter().map(|p| p.read().packets.len()).sum() } #[cfg(not(feature = "cuda"))] @@ -89,14 +86,8 @@ pub fn ed25519_verify_cpu(batches: &[SharedPackets]) -> Vec> { info!("CPU ECDSA for {}", batch_size(batches)); let rv = batches .into_par_iter() - .map(|p| { - p.read() - .expect("'p' read lock in ed25519_verify") - .packets - .par_iter() - .map(verify_packet) - .collect() - }).collect(); + .map(|p| p.read().packets.par_iter().map(verify_packet).collect()) + .collect(); inc_new_counter_info!("ed25519_verify_cpu", count); rv } @@ -109,7 +100,6 @@ pub fn ed25519_verify_disabled(batches: &[SharedPackets]) -> Vec> { .into_par_iter() .map(|p| { p.read() - .expect("'p' read lock in ed25519_verify") .packets .par_iter() .map(verify_packet_disabled) @@ -151,11 +141,7 @@ pub fn ed25519_verify(batches: &[SharedPackets]) -> Vec> { let mut rvs = Vec::new(); for packets in batches { - locks.push( - packets - .read() - .expect("'packets' read lock in pub fn ed25519_verify"), - ); + locks.push(packets.read()); } let mut num = 0; for p in locks { @@ -209,9 +195,8 @@ pub fn ed25519_verify(batches: &[SharedPackets]) -> Vec> { #[cfg(test)] mod tests { use bincode::serialize; - use packet::{Packet, Packets, SharedPackets}; + use packet::{Packet, PacketRecycler}; use sigverify; - use std::sync::RwLock; use transaction::Transaction; use transaction::{memfind, test_tx}; @@ -242,13 +227,18 @@ mod tests { } // generate packet vector - let mut packets = Packets::default(); - packets.packets = Vec::new(); - for _ in 0..n { - packets.packets.push(packet.clone()); - } - let shared_packets = SharedPackets::new(RwLock::new(packets)); - let batches = vec![shared_packets.clone(), shared_packets.clone()]; + let packet_recycler = PacketRecycler::default(); + let batches: Vec<_> = (0..2) + .map(|_| { + let packets = packet_recycler.allocate(); + packets.write().packets.resize(0, Default::default()); + for _ in 0..n { + packets.write().packets.push(packet.clone()); + } + assert_eq!(packets.read().packets.len(), n); + packets + }).collect(); + assert_eq!(batches.len(), 2); // verify packets let ans = sigverify::ed25519_verify(&batches); diff --git a/src/streamer.rs b/src/streamer.rs index 619f6e793..825cd543d 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -23,10 +23,7 @@ fn recv_loop( loop { let msgs = re.allocate(); loop { - let result = msgs - .write() - .expect("write lock in fn recv_loop") - .recv_from(sock); + let result = msgs.write().recv_from(sock); match result { Ok(()) => { channel.send(msgs)?; @@ -34,7 +31,6 @@ fn recv_loop( } Err(_) => { if exit.load(Ordering::Relaxed) { - re.recycle(msgs, "recv_loop"); return Ok(()); } } @@ -46,10 +42,10 @@ fn recv_loop( pub fn receiver( sock: Arc, exit: Arc, - recycler: PacketRecycler, packet_sender: PacketSender, ) -> JoinHandle<()> { let res = sock.set_read_timeout(Some(Duration::new(1, 0))); + let recycler = PacketRecycler::default(); if res.is_err() { panic!("streamer::receiver set_read_timeout error"); } @@ -61,10 +57,10 @@ pub fn receiver( }).unwrap() } -fn recv_send(sock: &UdpSocket, recycler: &BlobRecycler, r: &BlobReceiver) -> Result<()> { +fn recv_send(sock: &UdpSocket, r: &BlobReceiver) -> Result<()> { let timer = Duration::new(1, 0); let msgs = r.recv_timeout(timer)?; - Blob::send_to(recycler, sock, msgs)?; + Blob::send_to(sock, msgs)?; Ok(()) } @@ -72,11 +68,11 @@ pub fn recv_batch(recvr: &PacketReceiver) -> Result<(Vec, usize)> let timer = Duration::new(1, 0); let msgs = recvr.recv_timeout(timer)?; trace!("got msgs"); - let mut len = msgs.read().unwrap().packets.len(); + let mut len = msgs.read().packets.len(); let mut batch = vec![msgs]; while let Ok(more) = recvr.try_recv() { trace!("got more msgs"); - len += more.read().unwrap().packets.len(); + len += more.read().packets.len(); batch.push(more); if len > 100_000 { @@ -87,16 +83,11 @@ pub fn recv_batch(recvr: &PacketReceiver) -> Result<(Vec, usize)> Ok((batch, len)) } -pub fn responder( - name: &'static str, - sock: Arc, - recycler: BlobRecycler, - r: BlobReceiver, -) -> JoinHandle<()> { +pub fn responder(name: &'static str, sock: Arc, r: BlobReceiver) -> JoinHandle<()> { Builder::new() .name(format!("solana-responder-{}", name)) .spawn(move || loop { - if let Err(e) = recv_send(&sock, &recycler, &r) { + if let Err(e) = recv_send(&sock, &r) { match e { Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break, Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (), @@ -117,17 +108,13 @@ fn recv_blobs(recycler: &BlobRecycler, sock: &UdpSocket, s: &BlobSender) -> Resu Ok(()) } -pub fn blob_receiver( - sock: Arc, - exit: Arc, - recycler: BlobRecycler, - s: BlobSender, -) -> JoinHandle<()> { +pub fn blob_receiver(sock: Arc, exit: Arc, s: BlobSender) -> JoinHandle<()> { //DOCUMENTED SIDE-EFFECT //1 second timeout on socket read let timer = Duration::new(1, 0); sock.set_read_timeout(Some(timer)) .expect("set socket timeout"); + let recycler = BlobRecycler::default(); Builder::new() .name("solana-blob_receiver".to_string()) .spawn(move || loop { @@ -140,7 +127,7 @@ pub fn blob_receiver( #[cfg(test)] mod test { - use packet::{Blob, BlobRecycler, Packet, PacketRecycler, Packets, PACKET_DATA_SIZE}; + use packet::{Blob, BlobRecycler, Packet, Packets, PACKET_DATA_SIZE}; use std::io; use std::io::Write; use std::net::UdpSocket; @@ -155,8 +142,8 @@ mod test { for _t in 0..5 { let timer = Duration::new(1, 0); match r.recv_timeout(timer) { - Ok(m) => *num += m.read().unwrap().packets.len(), - e => info!("error {:?}", e), + Ok(m) => *num += m.read().packets.len(), + _ => info!("get_msgs error"), } if *num == 10 { break; @@ -177,28 +164,17 @@ mod test { let addr = read.local_addr().unwrap(); let send = UdpSocket::bind("127.0.0.1:0").expect("bind"); let exit = Arc::new(AtomicBool::new(false)); - let pack_recycler = PacketRecycler::default(); let resp_recycler = BlobRecycler::default(); let (s_reader, r_reader) = channel(); - let t_receiver = receiver( - Arc::new(read), - exit.clone(), - pack_recycler.clone(), - s_reader, - ); + let t_receiver = receiver(Arc::new(read), exit.clone(), s_reader); let t_responder = { let (s_responder, r_responder) = channel(); - let t_responder = responder( - "streamer_send_test", - Arc::new(send), - resp_recycler.clone(), - r_responder, - ); + let t_responder = responder("streamer_send_test", Arc::new(send), r_responder); let mut msgs = Vec::new(); for i in 0..10 { - let b = resp_recycler.allocate(); + let mut b = resp_recycler.allocate(); { - let mut w = b.write().unwrap(); + let mut w = b.write(); w.data[0] = i as u8; w.meta.size = PACKET_DATA_SIZE; w.meta.set_addr(&addr); diff --git a/src/thin_client.rs b/src/thin_client.rs index 0ac399db8..5cae907f2 100644 --- a/src/thin_client.rs +++ b/src/thin_client.rs @@ -9,7 +9,6 @@ use crdt::{Crdt, CrdtError, NodeInfo}; use hash::Hash; use log::Level; use ncp::Ncp; -use packet::BlobRecycler; use request::{Request, Response}; use result::{Error, Result}; use signature::{Keypair, Pubkey, Signature}; @@ -375,14 +374,7 @@ pub fn poll_gossip_for_leader(leader_ncp: SocketAddr, timeout: Option) -> R let my_addr = gossip_socket.local_addr().unwrap(); let crdt = Arc::new(RwLock::new(Crdt::new(node).expect("Crdt::new"))); let window = Arc::new(RwLock::new(vec![])); - let ncp = Ncp::new( - &crdt.clone(), - window, - BlobRecycler::default(), - None, - gossip_socket, - exit.clone(), - ); + let ncp = Ncp::new(&crdt.clone(), window, None, gossip_socket, exit.clone()); let leader_entry_point = NodeInfo::new_entry_point(&leader_ncp); crdt.write().unwrap().insert(&leader_entry_point); diff --git a/src/tpu.rs b/src/tpu.rs index 49d5eaa7c..40e10e79e 100644 --- a/src/tpu.rs +++ b/src/tpu.rs @@ -30,7 +30,6 @@ use banking_stage::BankingStage; use crdt::Crdt; use entry::Entry; use fetch_stage::FetchStage; -use packet::{BlobRecycler, PacketRecycler}; use record_stage::RecordStage; use service::Service; use signature::Keypair; @@ -63,23 +62,18 @@ impl Tpu { crdt: &Arc>, tick_duration: Option, transactions_sockets: Vec, - blob_recycler: &BlobRecycler, ledger_path: &str, sigverify_disabled: bool, entry_height: u64, ) -> (Self, Receiver>, Arc) { let exit = Arc::new(AtomicBool::new(false)); - let mut packet_recycler = PacketRecycler::default(); - packet_recycler.set_name("tpu::Packet"); - let (fetch_stage, packet_receiver) = - FetchStage::new(transactions_sockets, exit.clone(), &packet_recycler); + let (fetch_stage, packet_receiver) = FetchStage::new(transactions_sockets, exit.clone()); let (sigverify_stage, verified_receiver) = SigVerifyStage::new(packet_receiver, sigverify_disabled); - let (banking_stage, signal_receiver) = - BankingStage::new(bank.clone(), verified_receiver, packet_recycler.clone()); + let (banking_stage, signal_receiver) = BankingStage::new(bank.clone(), verified_receiver); let (record_stage, entry_receiver) = match tick_duration { Some(tick_duration) => { @@ -92,7 +86,6 @@ impl Tpu { keypair, bank.clone(), crdt.clone(), - blob_recycler.clone(), ledger_path, entry_receiver, entry_height, diff --git a/src/tvu.rs b/src/tvu.rs index 7d1431cd0..9d4d97006 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -39,7 +39,6 @@ use bank::Bank; use blob_fetch_stage::BlobFetchStage; use crdt::Crdt; -use packet::BlobRecycler; use replicate_stage::ReplicateStage; use retransmit_stage::RetransmitStage; use service::Service; @@ -75,7 +74,6 @@ impl Tvu { entry_height: u64, crdt: Arc>, window: SharedWindow, - blob_recycler: BlobRecycler, replicate_sockets: Vec, repair_socket: UdpSocket, retransmit_socket: UdpSocket, @@ -87,7 +85,7 @@ impl Tvu { replicate_sockets.into_iter().map(Arc::new).collect(); blob_sockets.push(repair_socket.clone()); let (fetch_stage, blob_fetch_receiver) = - BlobFetchStage::new_multi_socket(blob_sockets, exit.clone(), &blob_recycler); + BlobFetchStage::new_multi_socket(blob_sockets, exit.clone()); //TODO //the packets coming out of blob_receiver need to be sent to the GPU and verified //then sent to the window, which does the erasure coding reconstruction @@ -97,7 +95,6 @@ impl Tvu { entry_height, Arc::new(retransmit_socket), repair_socket, - &blob_recycler, blob_fetch_receiver, ); @@ -105,7 +102,6 @@ impl Tvu { keypair, bank.clone(), crdt, - blob_recycler, blob_window_receiver, ledger_path, exit, @@ -163,11 +159,10 @@ pub mod tests { crdt: Arc>, gossip: UdpSocket, exit: Arc, - ) -> (Ncp, SharedWindow, BlobRecycler) { + ) -> (Ncp, SharedWindow) { let window = Arc::new(RwLock::new(window::default_window())); - let recycler = BlobRecycler::default(); - let ncp = Ncp::new(&crdt, window.clone(), recycler.clone(), None, gossip, exit); - (ncp, window, recycler) + let ncp = Ncp::new(&crdt, window.clone(), None, gossip, exit); + (ncp, window) } /// Test that message sent from leader to target1 and replicated to target2 @@ -207,19 +202,13 @@ pub mod tests { .map(Arc::new) .collect(); - let t_receiver = streamer::blob_receiver( - blob_sockets[0].clone(), - exit.clone(), - recycler.clone(), - s_reader, - ); + let t_receiver = streamer::blob_receiver(blob_sockets[0].clone(), exit.clone(), s_reader); // simulate leader sending messages let (s_responder, r_responder) = channel(); let t_responder = streamer::responder( "test_replicate", Arc::new(leader.sockets.requests), - recycler.clone(), r_responder, ); @@ -241,7 +230,6 @@ pub mod tests { 0, cref1, dr_1.1, - dr_1.2, target1.sockets.replicate, target1.sockets.repair, target1.sockets.retransmit, @@ -276,9 +264,9 @@ pub mod tests { alice_ref_balance -= transfer_amount; for entry in vec![entry0, entry1] { - let b = recycler.allocate(); + let mut b = recycler.allocate(); { - let mut w = b.write().unwrap(); + let mut w = b.write(); w.set_index(blob_id).unwrap(); blob_id += 1; w.set_id(leader_id).unwrap(); @@ -299,8 +287,8 @@ pub mod tests { // receive retransmitted messages let timer = Duration::new(1, 0); - while let Ok(msg) = r_reader.recv_timeout(timer) { - trace!("msg: {:?}", msg); + while let Ok(_msg) = r_reader.recv_timeout(timer) { + trace!("got msg"); } let alice_balance = bank.get_balance(&mint.keypair().pubkey()); diff --git a/src/vote_stage.rs b/src/vote_stage.rs index 0faee9ec1..57317a8cb 100644 --- a/src/vote_stage.rs +++ b/src/vote_stage.rs @@ -47,7 +47,7 @@ pub fn create_new_signed_vote_blob( }?; let tx = Transaction::budget_new_vote(&keypair, vote, *last_id, 0); { - let mut blob = shared_blob.write().unwrap(); + let mut blob = shared_blob.write(); let bytes = serialize(&tx)?; let len = bytes.len(); blob.data[..len].copy_from_slice(&bytes); @@ -174,10 +174,10 @@ impl VoteStage { keypair: Arc, bank: Arc, crdt: Arc>, - blob_recycler: BlobRecycler, vote_blob_sender: BlobSender, exit: Arc, ) -> Self { + let blob_recycler = BlobRecycler::default(); let thread_hdl = spawn(move || { Self::run( &keypair, @@ -230,7 +230,6 @@ pub mod tests { use instruction::Vote; use logger; use mint::Mint; - use packet::BlobRecycler; use service::Service; use signature::{Keypair, KeypairUtil}; use std::sync::atomic::AtomicBool; @@ -249,7 +248,6 @@ pub mod tests { let node = Node::new_localhost(); let mut crdt = Crdt::new(node.info.clone()).expect("Crdt::new"); crdt.set_leader(node.info.id); - let blob_recycler = BlobRecycler::default(); let (sender, receiver) = channel(); let exit = Arc::new(AtomicBool::new(false)); @@ -257,7 +255,6 @@ pub mod tests { Arc::new(keypair), bank.clone(), Arc::new(RwLock::new(crdt)), - blob_recycler.clone(), sender, exit.clone(), ); @@ -382,7 +379,7 @@ pub mod tests { // vote should be valid let blob = &vote_blob.unwrap()[0]; - let tx = deserialize(&(blob.read().unwrap().data)).unwrap(); + let tx = deserialize(&(blob.read().data)).unwrap(); assert!(bank.process_transaction(&tx).is_ok()); } diff --git a/src/window.rs b/src/window.rs index b3f61dbad..741ddb13b 100644 --- a/src/window.rs +++ b/src/window.rs @@ -18,7 +18,7 @@ use std::sync::{Arc, RwLock}; pub const WINDOW_SIZE: u64 = 2 * 1024; -#[derive(Clone, Default)] +#[derive(Default, Clone)] pub struct WindowSlot { pub data: Option, pub coding: Option, @@ -28,7 +28,7 @@ pub struct WindowSlot { impl WindowSlot { fn blob_index(&self) -> Option { match self.data { - Some(ref blob) => blob.read().unwrap().get_index().ok(), + Some(ref blob) => blob.read().get_index().ok(), None => None, } } @@ -194,12 +194,7 @@ impl WindowUtil for Window { ) { let w = (pix % WINDOW_SIZE) as usize; - let is_coding = { - let blob_r = blob - .read() - .expect("blob read lock for flogs streamer::window"); - blob_r.is_coding() - }; + let is_coding = blob.read().is_coding(); // insert a newly received blob into a window slot, clearing out and recycling any previous // blob unless the incoming blob is a duplicate (based on idx) @@ -213,7 +208,7 @@ impl WindowUtil for Window { c_or_d: &str, ) -> bool { if let Some(old) = mem::replace(window_slot, Some(blob)) { - let is_dup = old.read().unwrap().get_index().unwrap() == pix; + let is_dup = old.read().get_index().unwrap() == pix; recycler.recycle(old, "insert_blob_is_dup"); trace!( "{}: occupied {} window slot {:}, is_dup: {}", @@ -263,7 +258,7 @@ impl WindowUtil for Window { trace!("{}: k: {} consumed: {}", id, k, *consumed,); if let Some(blob) = &self[k].data { - if blob.read().unwrap().get_index().unwrap() < *consumed { + if blob.read().get_index().unwrap() < *consumed { // window wrap-around, end of received break; } @@ -271,7 +266,10 @@ impl WindowUtil for Window { // self[k].data is None, end of received break; } - consume_queue.push(self[k].data.clone().expect("clone in fn recv_window")); + let slot = self[k].clone(); + if let Some(r) = slot.data { + consume_queue.push(r) + } *consumed += 1; } } @@ -324,7 +322,7 @@ pub fn blob_idx_in_window(id: &Pubkey, pix: u64, consumed: u64, received: &mut u } pub fn default_window() -> Window { - vec![WindowSlot::default(); WINDOW_SIZE as usize] + (0..WINDOW_SIZE).map(|_| WindowSlot::default()).collect() } pub fn index_blobs( @@ -336,7 +334,7 @@ pub fn index_blobs( trace!("{}: INDEX_BLOBS {}", node_info.id, blobs.len()); for (i, b) in blobs.iter().enumerate() { // only leader should be broadcasting - let mut blob = b.write().expect("'blob' write lock in crdt::index_blobs"); + let mut blob = b.write(); blob.set_id(node_info.id) .expect("set_id in pub fn broadcast"); blob.set_index(*receive_index + i as u64) @@ -373,7 +371,7 @@ pub fn initialized_window( // populate the window, offset by implied index let diff = cmp::max(blobs.len() as isize - window.len() as isize, 0) as usize; for b in blobs.into_iter().skip(diff) { - let ix = b.read().unwrap().get_index().expect("blob index"); + let ix = b.read().get_index().expect("blob index"); let pos = (ix % WINDOW_SIZE) as usize; trace!("{} caching {} at {}", id, ix, pos); assert!(window[pos].data.is_none()); @@ -387,16 +385,16 @@ pub fn new_window_from_entries( ledger_tail: &[Entry], entry_height: u64, node_info: &NodeInfo, - blob_recycler: &BlobRecycler, ) -> Window { // convert to blobs + let blob_recycler = BlobRecycler::default(); let blobs = ledger_tail.to_blobs(&blob_recycler); initialized_window(&node_info, blobs, entry_height) } #[cfg(test)] mod test { - use packet::{Blob, BlobRecycler, Packet, PacketRecycler, Packets, PACKET_DATA_SIZE}; + use packet::{Blob, BlobRecycler, Packet, Packets, PACKET_DATA_SIZE}; use signature::Pubkey; use std::io; use std::io::Write; @@ -412,7 +410,7 @@ mod test { for _t in 0..5 { let timer = Duration::new(1, 0); match r.recv_timeout(timer) { - Ok(m) => *num += m.read().unwrap().packets.len(), + Ok(m) => *num += m.read().packets.len(), e => info!("error {:?}", e), } if *num == 10 { @@ -434,28 +432,17 @@ mod test { let addr = read.local_addr().unwrap(); let send = UdpSocket::bind("127.0.0.1:0").expect("bind"); let exit = Arc::new(AtomicBool::new(false)); - let pack_recycler = PacketRecycler::default(); let resp_recycler = BlobRecycler::default(); let (s_reader, r_reader) = channel(); - let t_receiver = receiver( - Arc::new(read), - exit.clone(), - pack_recycler.clone(), - s_reader, - ); + let t_receiver = receiver(Arc::new(read), exit.clone(), s_reader); let t_responder = { let (s_responder, r_responder) = channel(); - let t_responder = responder( - "streamer_send_test", - Arc::new(send), - resp_recycler.clone(), - r_responder, - ); + let t_responder = responder("streamer_send_test", Arc::new(send), r_responder); let mut msgs = Vec::new(); for i in 0..10 { - let b = resp_recycler.allocate(); + let mut b = resp_recycler.allocate(); { - let mut w = b.write().unwrap(); + let mut w = b.write(); w.data[0] = i as u8; w.meta.size = PACKET_DATA_SIZE; w.meta.set_addr(&addr); diff --git a/src/window_service.rs b/src/window_service.rs index c32aa4822..5df080e93 100644 --- a/src/window_service.rs +++ b/src/window_service.rs @@ -46,9 +46,7 @@ fn add_block_to_retransmit_queue( recycler: &BlobRecycler, retransmit_queue: &mut Vec, ) { - let p = b - .read() - .expect("'b' read lock in fn add_block_to_retransmit_queue"); + let p = b.read(); //TODO this check isn't safe against adverserial packets //we need to maintain a sequence window trace!( @@ -73,9 +71,7 @@ fn add_block_to_retransmit_queue( //is dropped via a weakref to the recycler let nv = recycler.allocate(); { - let mut mnv = nv - .write() - .expect("recycler write lock in fn add_block_to_retransmit_queue"); + let mut mnv = nv.write(); let sz = p.meta.size; mnv.meta.size = sz; mnv.data[..sz].copy_from_slice(&p.data[..sz]); @@ -110,9 +106,9 @@ fn retransmit_all_leader_blocks( { *pending_retransmits = false; if w.leader_unknown { - if let Some(b) = w.clone().data { + if let Some(ref b) = w.data { add_block_to_retransmit_queue( - &b, + b, leader_id, recycler, &mut retransmit_queue, @@ -190,7 +186,7 @@ fn recv_window( let mut consume_queue = Vec::new(); for b in dq { let (pix, meta_size) = { - let p = b.write().unwrap(); + let p = b.read(); (p.get_index()?, p.meta.size) }; pixs.push(pix); @@ -236,7 +232,6 @@ pub fn window_service( crdt: Arc>, window: SharedWindow, entry_height: u64, - recycler: BlobRecycler, r: BlobReceiver, s: BlobSender, retransmit: BlobSender, @@ -251,6 +246,7 @@ pub fn window_service( let mut times = 0; let id = crdt.read().unwrap().id; let mut pending_retransmits = false; + let recycler = BlobRecycler::default(); trace!("{}: RECV_WINDOW started", id); loop { if let Err(e) = recv_window( @@ -317,7 +313,7 @@ mod test { match r.recv_timeout(timer) { Ok(m) => { for (i, v) in m.iter().enumerate() { - assert_eq!(v.read().unwrap().get_index().unwrap() as usize, *num + i); + assert_eq!(v.read().get_index().unwrap() as usize, *num + i); } *num += m.len(); } @@ -341,12 +337,7 @@ mod test { let resp_recycler = BlobRecycler::default(); let (s_reader, r_reader) = channel(); - let t_receiver = blob_receiver( - Arc::new(tn.sockets.gossip), - exit.clone(), - resp_recycler.clone(), - s_reader, - ); + let t_receiver = blob_receiver(Arc::new(tn.sockets.gossip), exit.clone(), s_reader); let (s_window, r_window) = channel(); let (s_retransmit, r_retransmit) = channel(); let win = Arc::new(RwLock::new(default_window())); @@ -354,7 +345,6 @@ mod test { subs, win, 0, - resp_recycler.clone(), r_reader, s_window, s_retransmit, @@ -365,18 +355,13 @@ mod test { let blob_sockets: Vec> = tn.sockets.replicate.into_iter().map(Arc::new).collect(); - let t_responder = responder( - "window_send_test", - blob_sockets[0].clone(), - resp_recycler.clone(), - r_responder, - ); + let t_responder = responder("window_send_test", blob_sockets[0].clone(), r_responder); let mut msgs = Vec::new(); for v in 0..10 { let i = 9 - v; let b = resp_recycler.allocate(); { - let mut w = b.write().unwrap(); + let mut w = b.write(); w.set_index(i).unwrap(); w.set_id(me_id).unwrap(); assert_eq!(i, w.get_index().unwrap()); @@ -414,12 +399,7 @@ mod test { let resp_recycler = BlobRecycler::default(); let (s_reader, r_reader) = channel(); - let t_receiver = blob_receiver( - Arc::new(tn.sockets.gossip), - exit.clone(), - resp_recycler.clone(), - s_reader, - ); + let t_receiver = blob_receiver(Arc::new(tn.sockets.gossip), exit.clone(), s_reader); let (s_window, _r_window) = channel(); let (s_retransmit, r_retransmit) = channel(); let win = Arc::new(RwLock::new(default_window())); @@ -427,7 +407,6 @@ mod test { subs.clone(), win, 0, - resp_recycler.clone(), r_reader, s_window, s_retransmit, @@ -437,18 +416,13 @@ mod test { let (s_responder, r_responder) = channel(); let blob_sockets: Vec> = tn.sockets.replicate.into_iter().map(Arc::new).collect(); - let t_responder = responder( - "window_send_test", - blob_sockets[0].clone(), - resp_recycler.clone(), - r_responder, - ); + let t_responder = responder("window_send_test", blob_sockets[0].clone(), r_responder); let mut msgs = Vec::new(); for v in 0..10 { let i = 9 - v; let b = resp_recycler.allocate(); { - let mut w = b.write().unwrap(); + let mut w = b.write(); w.set_index(i).unwrap(); w.set_id(me_id).unwrap(); assert_eq!(i, w.get_index().unwrap()); @@ -479,12 +453,7 @@ mod test { let resp_recycler = BlobRecycler::default(); let (s_reader, r_reader) = channel(); - let t_receiver = blob_receiver( - Arc::new(tn.sockets.gossip), - exit.clone(), - resp_recycler.clone(), - s_reader, - ); + let t_receiver = blob_receiver(Arc::new(tn.sockets.gossip), exit.clone(), s_reader); let (s_window, _r_window) = channel(); let (s_retransmit, r_retransmit) = channel(); let win = Arc::new(RwLock::new(default_window())); @@ -492,7 +461,6 @@ mod test { subs.clone(), win, 0, - resp_recycler.clone(), r_reader, s_window, s_retransmit, @@ -502,18 +470,13 @@ mod test { let (s_responder, r_responder) = channel(); let blob_sockets: Vec> = tn.sockets.replicate.into_iter().map(Arc::new).collect(); - let t_responder = responder( - "window_send_test", - blob_sockets[0].clone(), - resp_recycler.clone(), - r_responder, - ); + let t_responder = responder("window_send_test", blob_sockets[0].clone(), r_responder); let mut msgs = Vec::new(); for v in 0..10 { let i = 9 - v; let b = resp_recycler.allocate(); { - let mut w = b.write().unwrap(); + let mut w = b.write(); w.set_index(i).unwrap(); w.set_id(me_id).unwrap(); assert_eq!(i, w.get_index().unwrap()); @@ -533,7 +496,7 @@ mod test { let i = 9 + v; let b = resp_recycler.allocate(); { - let mut w = b.write().unwrap(); + let mut w = b.write(); w.set_index(i).unwrap(); w.set_id(me_id).unwrap(); assert_eq!(i, w.get_index().unwrap()); diff --git a/src/write_stage.rs b/src/write_stage.rs index 26acb7367..1515ae780 100644 --- a/src/write_stage.rs +++ b/src/write_stage.rs @@ -176,7 +176,6 @@ impl WriteStage { keypair: Arc, bank: Arc, crdt: Arc>, - blob_recycler: BlobRecycler, ledger_path: &str, entry_receiver: Receiver>, entry_height: u64, @@ -186,7 +185,6 @@ impl WriteStage { let t_responder = responder( "write_stage_vote_sender", Arc::new(send), - blob_recycler.clone(), vote_blob_receiver, ); let (entry_sender, entry_receiver_forward) = channel(); @@ -205,6 +203,7 @@ impl WriteStage { leader_rotation_interval = rcrdt.get_leader_rotation_interval(); } let mut entry_height = entry_height; + let blob_recycler = BlobRecycler::default(); loop { info!("write_stage entry height: {}", entry_height); // Note that entry height is not zero indexed, it starts at 1, so the @@ -296,7 +295,6 @@ mod tests { use crdt::{Crdt, Node}; use entry::Entry; use ledger::{genesis, read_ledger}; - use packet::BlobRecycler; use recorder::Recorder; use service::Service; use signature::{Keypair, KeypairUtil, Pubkey}; @@ -337,7 +335,6 @@ mod tests { let crdt = Arc::new(RwLock::new(crdt)); let bank = Bank::new_default(true); let bank = Arc::new(bank); - let blob_recycler = BlobRecycler::default(); // Make a ledger let (_, leader_ledger_path) = genesis("test_leader_rotation_exit", 10_000); @@ -352,7 +349,6 @@ mod tests { leader_keypair, bank.clone(), crdt.clone(), - blob_recycler, &leader_ledger_path, entry_receiver, entry_height, diff --git a/tests/data_replicator.rs b/tests/data_replicator.rs index 39846e635..da90aa93f 100644 --- a/tests/data_replicator.rs +++ b/tests/data_replicator.rs @@ -21,14 +21,7 @@ fn test_node(exit: Arc) -> (Arc>, Ncp, UdpSocket) { let crdt = Crdt::new(tn.info.clone()).expect("Crdt::new"); let c = Arc::new(RwLock::new(crdt)); let w = Arc::new(RwLock::new(vec![])); - let d = Ncp::new( - &c.clone(), - w, - BlobRecycler::default(), - None, - tn.sockets.gossip, - exit, - ); + let d = Ncp::new(&c.clone(), w, None, tn.sockets.gossip, exit); (c, d, tn.sockets.replicate.pop().unwrap()) } @@ -166,9 +159,10 @@ pub fn crdt_retransmit() -> result::Result<()> { sleep(Duration::new(1, 0)); } assert!(done); - let mut b = Blob::default(); - b.meta.size = 10; - Crdt::retransmit(&c1, &Arc::new(RwLock::new(b)), &tn1)?; + let r = BlobRecycler::default(); + let b = r.allocate(); + b.write().meta.size = 10; + Crdt::retransmit(&c1, &b, &tn1)?; let res: Vec<_> = [tn1, tn2, tn3] .into_par_iter() .map(|s| { diff --git a/tests/multinode.rs b/tests/multinode.rs index b94fcb04a..283355f1b 100644 --- a/tests/multinode.rs +++ b/tests/multinode.rs @@ -13,7 +13,6 @@ use solana::ledger::LedgerWriter; use solana::logger; use solana::mint::Mint; use solana::ncp::Ncp; -use solana::packet::BlobRecycler; use solana::result; use solana::service::Service; use solana::signature::{Keypair, KeypairUtil, Pubkey}; @@ -41,11 +40,9 @@ fn make_spy_node(leader: &NodeInfo) -> (Ncp, Arc>, Pubkey) { spy_crdt.set_leader(leader.id); let spy_crdt_ref = Arc::new(RwLock::new(spy_crdt)); let spy_window = Arc::new(RwLock::new(default_window())); - let recycler = BlobRecycler::default(); let ncp = Ncp::new( &spy_crdt_ref, spy_window, - recycler, None, spy.sockets.gossip, exit.clone(),