From 906df5e20ebd4491c2ff96f6a516a2fcc23cde73 Mon Sep 17 00:00:00 2001 From: Michael Vines Date: Mon, 4 Mar 2019 16:33:14 -0800 Subject: [PATCH] Exit signal cleanup: pass in references, make the receiver clone as needed --- bench-tps/src/main.rs | 3 +-- core/src/banking_stage.rs | 6 +----- core/src/fullnode.rs | 8 ++++---- core/src/gossip_service.rs | 20 +++++++++----------- core/src/poh_service.rs | 6 +++--- core/src/replicator.rs | 2 +- core/src/thin_client.rs | 9 ++------- core/src/tpu.rs | 4 ++-- core/src/tvu.rs | 6 +++--- tests/gossip.rs | 10 +++++----- tests/tvu.rs | 10 +++++----- 11 files changed, 36 insertions(+), 48 deletions(-) diff --git a/bench-tps/src/main.rs b/bench-tps/src/main.rs index ce9704dfa..dcdd2bda5 100644 --- a/bench-tps/src/main.rs +++ b/bench-tps/src/main.rs @@ -41,8 +41,7 @@ fn converge( spy_cluster_info.insert_info(leader.clone()); spy_cluster_info.set_leader(leader.id); let spy_ref = Arc::new(RwLock::new(spy_cluster_info)); - let gossip_service = - GossipService::new(&spy_ref, None, None, gossip_socket, exit_signal.clone()); + let gossip_service = GossipService::new(&spy_ref, None, None, gossip_socket, &exit_signal); let mut v: Vec = vec![]; // wait for the network to converge, 30 seconds should be plenty for _ in 0..30 { diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index e06d32004..e5e39f657 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -355,11 +355,7 @@ pub fn create_test_recorder( let (poh_recorder, entry_receiver) = PohRecorder::new(bank.tick_height(), bank.last_blockhash()); let poh_recorder = Arc::new(Mutex::new(poh_recorder)); - let poh_service = PohService::new( - poh_recorder.clone(), - &PohServiceConfig::default(), - exit.clone(), - ); + let poh_service = PohService::new(poh_recorder.clone(), &PohServiceConfig::default(), &exit); (poh_recorder, poh_service, entry_receiver) } diff --git a/core/src/fullnode.rs b/core/src/fullnode.rs index cfcea8377..d891d39c3 100644 --- a/core/src/fullnode.rs +++ b/core/src/fullnode.rs @@ -130,7 +130,7 @@ impl Fullnode { let (poh_recorder, entry_receiver) = PohRecorder::new(bank.tick_height(), bank.last_blockhash()); let poh_recorder = Arc::new(Mutex::new(poh_recorder)); - let poh_service = PohService::new(poh_recorder.clone(), &config.tick_config, exit.clone()); + let poh_service = PohService::new(poh_recorder.clone(), &config.tick_config, &exit); info!("node info: {:?}", node.info); info!("node entrypoint_info: {:?}", entrypoint_info_option); @@ -186,7 +186,7 @@ impl Fullnode { Some(blocktree.clone()), Some(bank_forks.clone()), node.sockets.gossip, - exit.clone(), + &exit, ); // Insert the entrypoint info, should only be None if this node @@ -237,7 +237,7 @@ impl Fullnode { ledger_signal_receiver, &subscriptions, &poh_recorder, - exit.clone(), + &exit, ); let tpu = Tpu::new( id, @@ -248,7 +248,7 @@ impl Fullnode { node.sockets.broadcast, config.sigverify_disabled, &blocktree, - exit.clone(), + &exit, ); let exit_ = exit.clone(); let bank_forks_ = bank_forks.clone(); diff --git a/core/src/gossip_service.rs b/core/src/gossip_service.rs index fb78a3bc0..06fac2f53 100644 --- a/core/src/gossip_service.rs +++ b/core/src/gossip_service.rs @@ -26,7 +26,7 @@ impl GossipService { blocktree: Option>, bank_forks: Option>>, gossip_socket: UdpSocket, - exit: Arc, + exit: &Arc, ) -> Self { let (request_sender, request_receiver) = channel(); let gossip_socket = Arc::new(gossip_socket); @@ -53,7 +53,10 @@ impl GossipService { exit.clone(), ); let thread_hdls = vec![t_receiver, t_responder, t_listen, t_gossip]; - Self { exit, thread_hdls } + Self { + exit: exit.clone(), + thread_hdls, + } } pub fn close(self) -> thread::Result<()> { @@ -83,7 +86,7 @@ pub fn make_listening_node( .gossip .try_clone() .expect("Failed to clone gossip"), - exit.clone(), + &exit, ); (gossip_service, new_node_cluster_info_ref, new_node, id) @@ -141,13 +144,8 @@ pub fn make_spy_node(leader: &NodeInfo) -> (GossipService, Arc>, config: &PohServiceConfig, - poh_exit: Arc, + poh_exit: &Arc, ) -> Self { // PohService is a headless producer, so when it exits it should notify the banking stage. // Since channel are not used to talk between these threads an AtomicBool is used as a @@ -66,7 +66,7 @@ impl PohService { Self { tick_producer, - poh_exit, + poh_exit: poh_exit.clone(), } } @@ -157,7 +157,7 @@ mod tests { let poh_service = PohService::new( poh_recorder.clone(), &PohServiceConfig::Tick(HASHES_PER_TICK as usize), - Arc::new(AtomicBool::new(false)), + &Arc::new(AtomicBool::new(false)), ); poh_recorder.lock().unwrap().set_working_bank(working_bank); diff --git a/core/src/replicator.rs b/core/src/replicator.rs index 9a14f5dad..340c4d5b9 100644 --- a/core/src/replicator.rs +++ b/core/src/replicator.rs @@ -150,7 +150,7 @@ impl Replicator { Some(blocktree.clone()), None, node.sockets.gossip, - exit.clone(), + &exit, ); info!("polling for leader"); diff --git a/core/src/thin_client.rs b/core/src/thin_client.rs index 4710f12bf..5b3ae5b2f 100644 --- a/core/src/thin_client.rs +++ b/core/src/thin_client.rs @@ -394,13 +394,8 @@ pub fn poll_gossip_for_leader(leader_gossip: SocketAddr, timeout: Option) - let (node, gossip_socket) = ClusterInfo::spy_node(); let my_addr = gossip_socket.local_addr().unwrap(); let cluster_info = Arc::new(RwLock::new(ClusterInfo::new(node))); - let gossip_service = GossipService::new( - &cluster_info.clone(), - None, - None, - gossip_socket, - exit.clone(), - ); + let gossip_service = + GossipService::new(&cluster_info.clone(), None, None, gossip_socket, &exit); let leader_entry_point = NodeInfo::new_entry_point(&leader_gossip); cluster_info diff --git a/core/src/tpu.rs b/core/src/tpu.rs index f1526146d..3405695d9 100644 --- a/core/src/tpu.rs +++ b/core/src/tpu.rs @@ -82,7 +82,7 @@ impl Tpu { broadcast_socket: UdpSocket, sigverify_disabled: bool, blocktree: &Arc, - exit: Arc, + exit: &Arc, ) -> Self { cluster_info.write().unwrap().set_leader(id); @@ -114,7 +114,7 @@ impl Tpu { ); Self { leader_services, - exit, + exit: exit.clone(), id, } } diff --git a/core/src/tvu.rs b/core/src/tvu.rs index 5488b79d6..214e427da 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -67,7 +67,7 @@ impl Tvu { ledger_signal_receiver: Receiver, subscriptions: &Arc, poh_recorder: &Arc>, - exit: Arc, + exit: &Arc, ) -> Self where T: 'static + KeypairUtil + Sync + Send, @@ -147,7 +147,7 @@ impl Tvu { replay_stage, blockstream_service, storage_stage, - exit, + exit: exit.clone(), } } @@ -240,7 +240,7 @@ pub mod tests { l_receiver, &Arc::new(RpcSubscriptions::default()), &poh_recorder, - exit, + &exit, ); tvu.close().expect("close"); poh_service.close().expect("close"); diff --git a/tests/gossip.rs b/tests/gossip.rs index f6471e438..4695e80d2 100644 --- a/tests/gossip.rs +++ b/tests/gossip.rs @@ -16,7 +16,7 @@ use std::sync::{Arc, RwLock}; use std::thread::sleep; use std::time::Duration; -fn test_node(exit: Arc) -> (Arc>, GossipService, UdpSocket) { +fn test_node(exit: &Arc) -> (Arc>, GossipService, UdpSocket) { let keypair = Keypair::new(); let mut tn = Node::new_localhost_with_pubkey(keypair.pubkey()); let cluster_info = ClusterInfo::new_with_keypair(tn.info.clone(), Arc::new(keypair)); @@ -35,7 +35,7 @@ where F: Fn(&Vec<(Arc>, GossipService, UdpSocket)>) -> (), { let exit = Arc::new(AtomicBool::new(false)); - let listen: Vec<_> = (0..num).map(|_| test_node(exit.clone())).collect(); + let listen: Vec<_> = (0..num).map(|_| test_node(&exit)).collect(); topo(&listen); let mut done = true; for i in 0..(num * 32) { @@ -142,11 +142,11 @@ pub fn cluster_info_retransmit() -> result::Result<()> { solana_logger::setup(); let exit = Arc::new(AtomicBool::new(false)); trace!("c1:"); - let (c1, dr1, tn1) = test_node(exit.clone()); + let (c1, dr1, tn1) = test_node(&exit); trace!("c2:"); - let (c2, dr2, tn2) = test_node(exit.clone()); + let (c2, dr2, tn2) = test_node(&exit); trace!("c3:"); - let (c3, dr3, tn3) = test_node(exit.clone()); + let (c3, dr3, tn3) = test_node(&exit); let c1_data = c1.read().unwrap().my_data().clone(); c1.write().unwrap().set_leader(c1_data.id); diff --git a/tests/tvu.rs b/tests/tvu.rs index 6f05ef033..256005333 100644 --- a/tests/tvu.rs +++ b/tests/tvu.rs @@ -32,7 +32,7 @@ use std::time::Duration; fn new_gossip( cluster_info: Arc>, gossip: UdpSocket, - exit: Arc, + exit: &Arc, ) -> GossipService { GossipService::new(&cluster_info, None, None, gossip, exit) } @@ -52,14 +52,14 @@ fn test_replay() { cluster_info_l.set_leader(leader.info.id); let cref_l = Arc::new(RwLock::new(cluster_info_l)); - let dr_l = new_gossip(cref_l, leader.sockets.gossip, exit.clone()); + let dr_l = new_gossip(cref_l, leader.sockets.gossip, &exit); // start cluster_info2 let mut cluster_info2 = ClusterInfo::new(target2.info.clone()); cluster_info2.insert_info(leader.info.clone()); cluster_info2.set_leader(leader.info.id); let cref2 = Arc::new(RwLock::new(cluster_info2)); - let dr_2 = new_gossip(cref2, target2.sockets.gossip, exit.clone()); + let dr_2 = new_gossip(cref2, target2.sockets.gossip, &exit); // setup some blob services to send blobs into the socket // to simulate the source peer and get blobs out of the socket to @@ -102,7 +102,7 @@ fn test_replay() { cluster_info1.insert_info(leader.info.clone()); cluster_info1.set_leader(leader.info.id); let cref1 = Arc::new(RwLock::new(cluster_info1)); - let dr_1 = new_gossip(cref1.clone(), target1.sockets.gossip, exit.clone()); + let dr_1 = new_gossip(cref1.clone(), target1.sockets.gossip, &exit); let blocktree_path = get_tmp_ledger_path!(); @@ -131,7 +131,7 @@ fn test_replay() { ledger_signal_receiver, &Arc::new(RpcSubscriptions::default()), &poh_recorder, - exit.clone(), + &exit, ); let mut alice_ref_balance = starting_balance;