Exit signal cleanup: pass in references, make the receiver clone as needed

This commit is contained in:
Michael Vines 2019-03-04 16:33:14 -08:00
parent 794e961328
commit 906df5e20e
11 changed files with 36 additions and 48 deletions

View File

@ -41,8 +41,7 @@ fn converge(
spy_cluster_info.insert_info(leader.clone()); spy_cluster_info.insert_info(leader.clone());
spy_cluster_info.set_leader(leader.id); spy_cluster_info.set_leader(leader.id);
let spy_ref = Arc::new(RwLock::new(spy_cluster_info)); let spy_ref = Arc::new(RwLock::new(spy_cluster_info));
let gossip_service = let gossip_service = GossipService::new(&spy_ref, None, None, gossip_socket, &exit_signal);
GossipService::new(&spy_ref, None, None, gossip_socket, exit_signal.clone());
let mut v: Vec<NodeInfo> = vec![]; let mut v: Vec<NodeInfo> = vec![];
// wait for the network to converge, 30 seconds should be plenty // wait for the network to converge, 30 seconds should be plenty
for _ in 0..30 { for _ in 0..30 {

View File

@ -355,11 +355,7 @@ pub fn create_test_recorder(
let (poh_recorder, entry_receiver) = let (poh_recorder, entry_receiver) =
PohRecorder::new(bank.tick_height(), bank.last_blockhash()); PohRecorder::new(bank.tick_height(), bank.last_blockhash());
let poh_recorder = Arc::new(Mutex::new(poh_recorder)); let poh_recorder = Arc::new(Mutex::new(poh_recorder));
let poh_service = PohService::new( let poh_service = PohService::new(poh_recorder.clone(), &PohServiceConfig::default(), &exit);
poh_recorder.clone(),
&PohServiceConfig::default(),
exit.clone(),
);
(poh_recorder, poh_service, entry_receiver) (poh_recorder, poh_service, entry_receiver)
} }

View File

@ -130,7 +130,7 @@ impl Fullnode {
let (poh_recorder, entry_receiver) = let (poh_recorder, entry_receiver) =
PohRecorder::new(bank.tick_height(), bank.last_blockhash()); PohRecorder::new(bank.tick_height(), bank.last_blockhash());
let poh_recorder = Arc::new(Mutex::new(poh_recorder)); let poh_recorder = Arc::new(Mutex::new(poh_recorder));
let poh_service = PohService::new(poh_recorder.clone(), &config.tick_config, exit.clone()); let poh_service = PohService::new(poh_recorder.clone(), &config.tick_config, &exit);
info!("node info: {:?}", node.info); info!("node info: {:?}", node.info);
info!("node entrypoint_info: {:?}", entrypoint_info_option); info!("node entrypoint_info: {:?}", entrypoint_info_option);
@ -186,7 +186,7 @@ impl Fullnode {
Some(blocktree.clone()), Some(blocktree.clone()),
Some(bank_forks.clone()), Some(bank_forks.clone()),
node.sockets.gossip, node.sockets.gossip,
exit.clone(), &exit,
); );
// Insert the entrypoint info, should only be None if this node // Insert the entrypoint info, should only be None if this node
@ -237,7 +237,7 @@ impl Fullnode {
ledger_signal_receiver, ledger_signal_receiver,
&subscriptions, &subscriptions,
&poh_recorder, &poh_recorder,
exit.clone(), &exit,
); );
let tpu = Tpu::new( let tpu = Tpu::new(
id, id,
@ -248,7 +248,7 @@ impl Fullnode {
node.sockets.broadcast, node.sockets.broadcast,
config.sigverify_disabled, config.sigverify_disabled,
&blocktree, &blocktree,
exit.clone(), &exit,
); );
let exit_ = exit.clone(); let exit_ = exit.clone();
let bank_forks_ = bank_forks.clone(); let bank_forks_ = bank_forks.clone();

View File

@ -26,7 +26,7 @@ impl GossipService {
blocktree: Option<Arc<Blocktree>>, blocktree: Option<Arc<Blocktree>>,
bank_forks: Option<Arc<RwLock<BankForks>>>, bank_forks: Option<Arc<RwLock<BankForks>>>,
gossip_socket: UdpSocket, gossip_socket: UdpSocket,
exit: Arc<AtomicBool>, exit: &Arc<AtomicBool>,
) -> Self { ) -> Self {
let (request_sender, request_receiver) = channel(); let (request_sender, request_receiver) = channel();
let gossip_socket = Arc::new(gossip_socket); let gossip_socket = Arc::new(gossip_socket);
@ -53,7 +53,10 @@ impl GossipService {
exit.clone(), exit.clone(),
); );
let thread_hdls = vec![t_receiver, t_responder, t_listen, t_gossip]; let thread_hdls = vec![t_receiver, t_responder, t_listen, t_gossip];
Self { exit, thread_hdls } Self {
exit: exit.clone(),
thread_hdls,
}
} }
pub fn close(self) -> thread::Result<()> { pub fn close(self) -> thread::Result<()> {
@ -83,7 +86,7 @@ pub fn make_listening_node(
.gossip .gossip
.try_clone() .try_clone()
.expect("Failed to clone gossip"), .expect("Failed to clone gossip"),
exit.clone(), &exit,
); );
(gossip_service, new_node_cluster_info_ref, new_node, id) (gossip_service, new_node_cluster_info_ref, new_node, id)
@ -141,13 +144,8 @@ pub fn make_spy_node(leader: &NodeInfo) -> (GossipService, Arc<RwLock<ClusterInf
spy_cluster_info.insert_info(leader.clone()); spy_cluster_info.insert_info(leader.clone());
spy_cluster_info.set_leader(leader.id); spy_cluster_info.set_leader(leader.id);
let spy_cluster_info_ref = Arc::new(RwLock::new(spy_cluster_info)); let spy_cluster_info_ref = Arc::new(RwLock::new(spy_cluster_info));
let gossip_service = GossipService::new( let gossip_service =
&spy_cluster_info_ref, GossipService::new(&spy_cluster_info_ref, None, None, spy.sockets.gossip, &exit);
None,
None,
spy.sockets.gossip,
exit.clone(),
);
(gossip_service, spy_cluster_info_ref, id) (gossip_service, spy_cluster_info_ref, id)
} }
@ -178,7 +176,7 @@ mod tests {
let tn = Node::new_localhost(); let tn = Node::new_localhost();
let cluster_info = ClusterInfo::new(tn.info.clone()); let cluster_info = ClusterInfo::new(tn.info.clone());
let c = Arc::new(RwLock::new(cluster_info)); let c = Arc::new(RwLock::new(cluster_info));
let d = GossipService::new(&c, None, None, tn.sockets.gossip, exit.clone()); let d = GossipService::new(&c, None, None, tn.sockets.gossip, &exit);
d.close().expect("thread join"); d.close().expect("thread join");
} }
} }

View File

@ -47,7 +47,7 @@ impl PohService {
pub fn new( pub fn new(
poh_recorder: Arc<Mutex<PohRecorder>>, poh_recorder: Arc<Mutex<PohRecorder>>,
config: &PohServiceConfig, config: &PohServiceConfig,
poh_exit: Arc<AtomicBool>, poh_exit: &Arc<AtomicBool>,
) -> Self { ) -> Self {
// PohService is a headless producer, so when it exits it should notify the banking stage. // PohService is a headless producer, so when it exits it should notify the banking stage.
// Since channel are not used to talk between these threads an AtomicBool is used as a // Since channel are not used to talk between these threads an AtomicBool is used as a
@ -66,7 +66,7 @@ impl PohService {
Self { Self {
tick_producer, tick_producer,
poh_exit, poh_exit: poh_exit.clone(),
} }
} }
@ -157,7 +157,7 @@ mod tests {
let poh_service = PohService::new( let poh_service = PohService::new(
poh_recorder.clone(), poh_recorder.clone(),
&PohServiceConfig::Tick(HASHES_PER_TICK as usize), &PohServiceConfig::Tick(HASHES_PER_TICK as usize),
Arc::new(AtomicBool::new(false)), &Arc::new(AtomicBool::new(false)),
); );
poh_recorder.lock().unwrap().set_working_bank(working_bank); poh_recorder.lock().unwrap().set_working_bank(working_bank);

View File

@ -150,7 +150,7 @@ impl Replicator {
Some(blocktree.clone()), Some(blocktree.clone()),
None, None,
node.sockets.gossip, node.sockets.gossip,
exit.clone(), &exit,
); );
info!("polling for leader"); info!("polling for leader");

View File

@ -394,13 +394,8 @@ pub fn poll_gossip_for_leader(leader_gossip: SocketAddr, timeout: Option<u64>) -
let (node, gossip_socket) = ClusterInfo::spy_node(); let (node, gossip_socket) = ClusterInfo::spy_node();
let my_addr = gossip_socket.local_addr().unwrap(); let my_addr = gossip_socket.local_addr().unwrap();
let cluster_info = Arc::new(RwLock::new(ClusterInfo::new(node))); let cluster_info = Arc::new(RwLock::new(ClusterInfo::new(node)));
let gossip_service = GossipService::new( let gossip_service =
&cluster_info.clone(), GossipService::new(&cluster_info.clone(), None, None, gossip_socket, &exit);
None,
None,
gossip_socket,
exit.clone(),
);
let leader_entry_point = NodeInfo::new_entry_point(&leader_gossip); let leader_entry_point = NodeInfo::new_entry_point(&leader_gossip);
cluster_info cluster_info

View File

@ -82,7 +82,7 @@ impl Tpu {
broadcast_socket: UdpSocket, broadcast_socket: UdpSocket,
sigverify_disabled: bool, sigverify_disabled: bool,
blocktree: &Arc<Blocktree>, blocktree: &Arc<Blocktree>,
exit: Arc<AtomicBool>, exit: &Arc<AtomicBool>,
) -> Self { ) -> Self {
cluster_info.write().unwrap().set_leader(id); cluster_info.write().unwrap().set_leader(id);
@ -114,7 +114,7 @@ impl Tpu {
); );
Self { Self {
leader_services, leader_services,
exit, exit: exit.clone(),
id, id,
} }
} }

View File

@ -67,7 +67,7 @@ impl Tvu {
ledger_signal_receiver: Receiver<bool>, ledger_signal_receiver: Receiver<bool>,
subscriptions: &Arc<RpcSubscriptions>, subscriptions: &Arc<RpcSubscriptions>,
poh_recorder: &Arc<Mutex<PohRecorder>>, poh_recorder: &Arc<Mutex<PohRecorder>>,
exit: Arc<AtomicBool>, exit: &Arc<AtomicBool>,
) -> Self ) -> Self
where where
T: 'static + KeypairUtil + Sync + Send, T: 'static + KeypairUtil + Sync + Send,
@ -147,7 +147,7 @@ impl Tvu {
replay_stage, replay_stage,
blockstream_service, blockstream_service,
storage_stage, storage_stage,
exit, exit: exit.clone(),
} }
} }
@ -240,7 +240,7 @@ pub mod tests {
l_receiver, l_receiver,
&Arc::new(RpcSubscriptions::default()), &Arc::new(RpcSubscriptions::default()),
&poh_recorder, &poh_recorder,
exit, &exit,
); );
tvu.close().expect("close"); tvu.close().expect("close");
poh_service.close().expect("close"); poh_service.close().expect("close");

View File

@ -16,7 +16,7 @@ use std::sync::{Arc, RwLock};
use std::thread::sleep; use std::thread::sleep;
use std::time::Duration; use std::time::Duration;
fn test_node(exit: Arc<AtomicBool>) -> (Arc<RwLock<ClusterInfo>>, GossipService, UdpSocket) { fn test_node(exit: &Arc<AtomicBool>) -> (Arc<RwLock<ClusterInfo>>, GossipService, UdpSocket) {
let keypair = Keypair::new(); let keypair = Keypair::new();
let mut tn = Node::new_localhost_with_pubkey(keypair.pubkey()); let mut tn = Node::new_localhost_with_pubkey(keypair.pubkey());
let cluster_info = ClusterInfo::new_with_keypair(tn.info.clone(), Arc::new(keypair)); let cluster_info = ClusterInfo::new_with_keypair(tn.info.clone(), Arc::new(keypair));
@ -35,7 +35,7 @@ where
F: Fn(&Vec<(Arc<RwLock<ClusterInfo>>, GossipService, UdpSocket)>) -> (), F: Fn(&Vec<(Arc<RwLock<ClusterInfo>>, GossipService, UdpSocket)>) -> (),
{ {
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
let listen: Vec<_> = (0..num).map(|_| test_node(exit.clone())).collect(); let listen: Vec<_> = (0..num).map(|_| test_node(&exit)).collect();
topo(&listen); topo(&listen);
let mut done = true; let mut done = true;
for i in 0..(num * 32) { for i in 0..(num * 32) {
@ -142,11 +142,11 @@ pub fn cluster_info_retransmit() -> result::Result<()> {
solana_logger::setup(); solana_logger::setup();
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
trace!("c1:"); trace!("c1:");
let (c1, dr1, tn1) = test_node(exit.clone()); let (c1, dr1, tn1) = test_node(&exit);
trace!("c2:"); trace!("c2:");
let (c2, dr2, tn2) = test_node(exit.clone()); let (c2, dr2, tn2) = test_node(&exit);
trace!("c3:"); trace!("c3:");
let (c3, dr3, tn3) = test_node(exit.clone()); let (c3, dr3, tn3) = test_node(&exit);
let c1_data = c1.read().unwrap().my_data().clone(); let c1_data = c1.read().unwrap().my_data().clone();
c1.write().unwrap().set_leader(c1_data.id); c1.write().unwrap().set_leader(c1_data.id);

View File

@ -32,7 +32,7 @@ use std::time::Duration;
fn new_gossip( fn new_gossip(
cluster_info: Arc<RwLock<ClusterInfo>>, cluster_info: Arc<RwLock<ClusterInfo>>,
gossip: UdpSocket, gossip: UdpSocket,
exit: Arc<AtomicBool>, exit: &Arc<AtomicBool>,
) -> GossipService { ) -> GossipService {
GossipService::new(&cluster_info, None, None, gossip, exit) GossipService::new(&cluster_info, None, None, gossip, exit)
} }
@ -52,14 +52,14 @@ fn test_replay() {
cluster_info_l.set_leader(leader.info.id); cluster_info_l.set_leader(leader.info.id);
let cref_l = Arc::new(RwLock::new(cluster_info_l)); let cref_l = Arc::new(RwLock::new(cluster_info_l));
let dr_l = new_gossip(cref_l, leader.sockets.gossip, exit.clone()); let dr_l = new_gossip(cref_l, leader.sockets.gossip, &exit);
// start cluster_info2 // start cluster_info2
let mut cluster_info2 = ClusterInfo::new(target2.info.clone()); let mut cluster_info2 = ClusterInfo::new(target2.info.clone());
cluster_info2.insert_info(leader.info.clone()); cluster_info2.insert_info(leader.info.clone());
cluster_info2.set_leader(leader.info.id); cluster_info2.set_leader(leader.info.id);
let cref2 = Arc::new(RwLock::new(cluster_info2)); let cref2 = Arc::new(RwLock::new(cluster_info2));
let dr_2 = new_gossip(cref2, target2.sockets.gossip, exit.clone()); let dr_2 = new_gossip(cref2, target2.sockets.gossip, &exit);
// setup some blob services to send blobs into the socket // setup some blob services to send blobs into the socket
// to simulate the source peer and get blobs out of the socket to // to simulate the source peer and get blobs out of the socket to
@ -102,7 +102,7 @@ fn test_replay() {
cluster_info1.insert_info(leader.info.clone()); cluster_info1.insert_info(leader.info.clone());
cluster_info1.set_leader(leader.info.id); cluster_info1.set_leader(leader.info.id);
let cref1 = Arc::new(RwLock::new(cluster_info1)); let cref1 = Arc::new(RwLock::new(cluster_info1));
let dr_1 = new_gossip(cref1.clone(), target1.sockets.gossip, exit.clone()); let dr_1 = new_gossip(cref1.clone(), target1.sockets.gossip, &exit);
let blocktree_path = get_tmp_ledger_path!(); let blocktree_path = get_tmp_ledger_path!();
@ -131,7 +131,7 @@ fn test_replay() {
ledger_signal_receiver, ledger_signal_receiver,
&Arc::new(RpcSubscriptions::default()), &Arc::new(RpcSubscriptions::default()),
&poh_recorder, &poh_recorder,
exit.clone(), &exit,
); );
let mut alice_ref_balance = starting_balance; let mut alice_ref_balance = starting_balance;