From a8fdb8a5a7695db8cbcbed068d927f97241e4d19 Mon Sep 17 00:00:00 2001 From: Rob Walker Date: Tue, 11 Sep 2018 14:13:10 -0700 Subject: [PATCH] use a single BlobRecycler per fullnode --- src/bin/bench-tps.rs | 28 ++++++++++++---------------- src/fullnode.rs | 7 +++++-- src/ncp.rs | 12 ++++++++++-- src/rpu.rs | 2 +- src/thin_client.rs | 10 +++++++++- src/tpu.rs | 2 +- src/tvu.rs | 19 ++++++++++--------- tests/data_replicator.rs | 11 +++++++++-- tests/multinode.rs | 11 ++++++++++- 9 files changed, 67 insertions(+), 35 deletions(-) diff --git a/src/bin/bench-tps.rs b/src/bin/bench-tps.rs index 7b90d3013..81ea26aac 100644 --- a/src/bin/bench-tps.rs +++ b/src/bin/bench-tps.rs @@ -17,6 +17,7 @@ use solana::hash::Hash; use solana::logger; use solana::metrics; use solana::ncp::Ncp; +use solana::packet::BlobRecycler; use solana::service::Service; use solana::signature::{read_keypair, GenKeys, Keypair, KeypairUtil}; use solana::thin_client::{poll_gossip_for_leader, ThinClient}; @@ -611,12 +612,10 @@ fn main() { .collect(); // generate and send transactions for the specified duration - let now = Instant::now(); - let mut last_stat = Instant::now(); - let stat_interval = Duration::new(90, 0); + let start = Instant::now(); let mut reclaim_tokens_back_to_source_account = false; let mut i = keypair0_balance; - while now.elapsed() < duration { + while start.elapsed() < duration { let balance = client.poll_get_balance(&id.pubkey()).unwrap_or(-1); metrics_submit_token_balance(balance); @@ -648,16 +647,6 @@ fn main() { if should_switch_directions(num_tokens_per_account, i) { reclaim_tokens_back_to_source_account = !reclaim_tokens_back_to_source_account; } - - if last_stat.elapsed() >= stat_interval { - last_stat = Instant::now(); - compute_and_report_stats( - &maxes, - sample_period, - &now.elapsed(), - total_tx_sent_count.load(Ordering::Relaxed), - ); - } } // Stop the sampling threads so it will collect the stats @@ -684,7 +673,7 @@ fn main() { compute_and_report_stats( &maxes, sample_period, - &now.elapsed(), + &start.elapsed(), total_tx_sent_count.load(Ordering::Relaxed), ); @@ -707,7 +696,14 @@ fn converge( spy_crdt.set_leader(leader.id); let spy_ref = Arc::new(RwLock::new(spy_crdt)); let window = Arc::new(RwLock::new(default_window())); - let ncp = Ncp::new(&spy_ref, window, None, gossip_socket, exit_signal.clone()); + let ncp = Ncp::new( + &spy_ref, + window, + BlobRecycler::default(), + None, + gossip_socket, + exit_signal.clone(), + ); let mut v: Vec = vec![]; // wait for the network to converge, 30 seconds should be plenty for _ in 0..30 { diff --git a/src/fullnode.rs b/src/fullnode.rs index eaeb7f352..efc3f3f0f 100644 --- a/src/fullnode.rs +++ b/src/fullnode.rs @@ -176,12 +176,14 @@ impl Fullnode { } let bank = Arc::new(bank); + let blob_recycler = BlobRecycler::default(); let mut thread_hdls = vec![]; let rpu = Rpu::new( &bank, node.sockets.requests, node.sockets.respond, + &blob_recycler, exit.clone(), ); thread_hdls.extend(rpu.thread_hdls()); @@ -199,7 +201,6 @@ impl Fullnode { ); thread_hdls.extend(rpc_service.thread_hdls()); - let blob_recycler = BlobRecycler::default(); let window = window::new_window_from_entries(ledger_tail, entry_height, &node.info, &blob_recycler); let shared_window = Arc::new(RwLock::new(window)); @@ -209,6 +210,7 @@ impl Fullnode { let ncp = Ncp::new( &crdt, shared_window.clone(), + blob_recycler.clone(), ledger_path, node.sockets.gossip, exit.clone(), @@ -226,6 +228,7 @@ impl Fullnode { entry_height, crdt, shared_window, + blob_recycler.clone(), node.sockets.replicate, node.sockets.repair, node.sockets.retransmit, @@ -247,7 +250,7 @@ impl Fullnode { &crdt, tick_duration, node.sockets.transaction, - &blob_recycler, + blob_recycler.clone(), exit.clone(), ledger_path, sigverify_disabled, diff --git a/src/ncp.rs b/src/ncp.rs index 11f346884..7f6b793db 100644 --- a/src/ncp.rs +++ b/src/ncp.rs @@ -20,11 +20,11 @@ impl Ncp { pub fn new( crdt: &Arc>, window: SharedWindow, + blob_recycler: BlobRecycler, ledger_path: Option<&str>, gossip_socket: UdpSocket, exit: Arc, ) -> Self { - let blob_recycler = BlobRecycler::default(); let (request_sender, request_receiver) = channel(); let gossip_socket = Arc::new(gossip_socket); trace!( @@ -82,6 +82,7 @@ impl Service for Ncp { mod tests { use crdt::{Crdt, Node}; use ncp::Ncp; + use packet::BlobRecycler; use std::sync::atomic::AtomicBool; use std::sync::{Arc, RwLock}; @@ -94,7 +95,14 @@ mod tests { let crdt = Crdt::new(tn.info.clone()).expect("Crdt::new"); let c = Arc::new(RwLock::new(crdt)); let w = Arc::new(RwLock::new(vec![])); - let d = Ncp::new(&c, w, None, tn.sockets.gossip, exit.clone()); + let d = Ncp::new( + &c, + w, + BlobRecycler::default(), + None, + tn.sockets.gossip, + exit.clone(), + ); d.close().expect("thread join"); } } diff --git a/src/rpu.rs b/src/rpu.rs index 1cd7ca000..e67db168d 100644 --- a/src/rpu.rs +++ b/src/rpu.rs @@ -44,6 +44,7 @@ impl Rpu { bank: &Arc, requests_socket: UdpSocket, respond_socket: UdpSocket, + blob_recycler: &BlobRecycler, exit: Arc, ) -> Self { let packet_recycler = PacketRecycler::default(); @@ -55,7 +56,6 @@ impl Rpu { packet_sender, ); - let blob_recycler = BlobRecycler::default(); let request_processor = RequestProcessor::new(bank.clone()); let (request_stage, blob_receiver) = RequestStage::new( request_processor, diff --git a/src/thin_client.rs b/src/thin_client.rs index 5732fc3f7..806c61a7a 100644 --- a/src/thin_client.rs +++ b/src/thin_client.rs @@ -9,6 +9,7 @@ use crdt::{Crdt, CrdtError, NodeInfo}; use hash::Hash; use log::Level; use ncp::Ncp; +use packet::BlobRecycler; use request::{Request, Response}; use result::{Error, Result}; use signature::{Keypair, Pubkey, Signature}; @@ -377,7 +378,14 @@ pub fn poll_gossip_for_leader(leader_ncp: SocketAddr, timeout: Option) -> R let my_addr = gossip_socket.local_addr().unwrap(); let crdt = Arc::new(RwLock::new(Crdt::new(node).expect("Crdt::new"))); let window = Arc::new(RwLock::new(vec![])); - let ncp = Ncp::new(&crdt.clone(), window, None, gossip_socket, exit.clone()); + let ncp = Ncp::new( + &crdt.clone(), + window, + BlobRecycler::default(), + None, + gossip_socket, + exit.clone(), + ); let leader_entry_point = NodeInfo::new_entry_point(&leader_ncp); crdt.write().unwrap().insert(&leader_entry_point); diff --git a/src/tpu.rs b/src/tpu.rs index d3404ee75..157094a55 100644 --- a/src/tpu.rs +++ b/src/tpu.rs @@ -57,7 +57,7 @@ impl Tpu { crdt: &Arc>, tick_duration: Option, transactions_sockets: Vec, - blob_recycler: &BlobRecycler, + blob_recycler: BlobRecycler, exit: Arc, ledger_path: &str, sigverify_disabled: bool, diff --git a/src/tvu.rs b/src/tvu.rs index 002bc0d81..31e2b7f4b 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -75,6 +75,7 @@ impl Tvu { entry_height: u64, crdt: Arc>, window: SharedWindow, + blob_recycler: BlobRecycler, replicate_socket: UdpSocket, repair_socket: UdpSocket, retransmit_socket: UdpSocket, @@ -82,7 +83,6 @@ impl Tvu { exit: Arc, ) -> Self { let repair_socket = Arc::new(repair_socket); - let blob_recycler = BlobRecycler::default(); let (fetch_stage, blob_fetch_receiver) = BlobFetchStage::new_multi_socket( vec![Arc::new(replicate_socket), repair_socket.clone()], exit.clone(), @@ -168,10 +168,11 @@ pub mod tests { crdt: Arc>, gossip: UdpSocket, exit: Arc, - ) -> (Ncp, SharedWindow) { + ) -> (Ncp, SharedWindow, BlobRecycler) { let window = Arc::new(RwLock::new(window::default_window())); - let ncp = Ncp::new(&crdt, window.clone(), None, gossip, exit); - (ncp, window) + let recycler = BlobRecycler::default(); + let ncp = Ncp::new(&crdt, window.clone(), recycler.clone(), None, gossip, exit); + (ncp, window, recycler) } /// Test that message sent from leader to target1 and replicated to target2 @@ -202,13 +203,12 @@ pub mod tests { // setup some blob services to send blobs into the socket // to simulate the source peer and get blobs out of the socket to // simulate target peer - let recv_recycler = BlobRecycler::default(); - let resp_recycler = BlobRecycler::default(); + let recycler = BlobRecycler::default(); let (s_reader, r_reader) = channel(); let t_receiver = streamer::blob_receiver( Arc::new(target2.sockets.replicate), exit.clone(), - recv_recycler.clone(), + recycler.clone(), s_reader, ); @@ -217,7 +217,7 @@ pub mod tests { let t_responder = streamer::responder( "test_replicate", Arc::new(leader.sockets.requests), - resp_recycler.clone(), + recycler.clone(), r_responder, ); @@ -239,6 +239,7 @@ pub mod tests { 0, cref1, dr_1.1, + dr_1.2, target1.sockets.replicate, target1.sockets.repair, target1.sockets.retransmit, @@ -273,7 +274,7 @@ pub mod tests { alice_ref_balance -= transfer_amount; for entry in vec![entry0, entry1] { - let b = resp_recycler.allocate(); + let b = recycler.allocate(); { let mut w = b.write().unwrap(); w.set_index(blob_id).unwrap(); diff --git a/tests/data_replicator.rs b/tests/data_replicator.rs index 408ce08bc..7455a0851 100644 --- a/tests/data_replicator.rs +++ b/tests/data_replicator.rs @@ -7,7 +7,7 @@ use rayon::iter::*; use solana::crdt::{Crdt, Node}; use solana::logger; use solana::ncp::Ncp; -use solana::packet::Blob; +use solana::packet::{Blob, BlobRecycler}; use solana::result; use solana::service::Service; use std::net::UdpSocket; @@ -21,7 +21,14 @@ fn test_node(exit: Arc) -> (Arc>, Ncp, UdpSocket) { let crdt = Crdt::new(tn.info.clone()).expect("Crdt::new"); let c = Arc::new(RwLock::new(crdt)); let w = Arc::new(RwLock::new(vec![])); - let d = Ncp::new(&c.clone(), w, None, tn.sockets.gossip, exit); + let d = Ncp::new( + &c.clone(), + w, + BlobRecycler::default(), + None, + tn.sockets.gossip, + exit, + ); (c, d, tn.sockets.replicate) } diff --git a/tests/multinode.rs b/tests/multinode.rs index e4bf1de67..bc2abd339 100644 --- a/tests/multinode.rs +++ b/tests/multinode.rs @@ -13,6 +13,7 @@ use solana::ledger::LedgerWriter; use solana::logger; use solana::mint::Mint; use solana::ncp::Ncp; +use solana::packet::BlobRecycler; use solana::result; use solana::service::Service; use solana::signature::{Keypair, KeypairUtil, Pubkey}; @@ -42,7 +43,15 @@ fn converge(leader: &NodeInfo, num_nodes: usize) -> Vec { spy_crdt.set_leader(leader.id); let spy_ref = Arc::new(RwLock::new(spy_crdt)); let spy_window = Arc::new(RwLock::new(default_window())); - let ncp = Ncp::new(&spy_ref, spy_window, None, spy.sockets.gossip, exit.clone()); + let recycler = BlobRecycler::default(); + let ncp = Ncp::new( + &spy_ref, + spy_window, + recycler, + None, + spy.sockets.gossip, + exit.clone(), + ); //wait for the network to converge let mut converged = false; let mut rv = vec![];