From 96bfe9233496d668b5643c0d27b69cace629961c Mon Sep 17 00:00:00 2001 From: Michael Vines Date: Mon, 4 Mar 2019 19:02:03 -0800 Subject: [PATCH] Clean up fullnode/tpu/tvu/fetch_stage exit signal --- core/src/fetch_stage.rs | 9 ++------- core/src/fullnode.rs | 35 ++++++++--------------------------- core/src/tpu.rs | 28 ++-------------------------- core/src/tvu.rs | 26 +++++--------------------- tests/tvu.rs | 4 ++-- 5 files changed, 19 insertions(+), 83 deletions(-) diff --git a/core/src/fetch_stage.rs b/core/src/fetch_stage.rs index 51b0fa1341..53c71722de 100644 --- a/core/src/fetch_stage.rs +++ b/core/src/fetch_stage.rs @@ -3,13 +3,12 @@ use crate::service::Service; use crate::streamer::{self, PacketReceiver, PacketSender}; use std::net::UdpSocket; -use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::atomic::AtomicBool; use std::sync::mpsc::channel; use std::sync::Arc; use std::thread::{self, JoinHandle}; pub struct FetchStage { - exit: Arc, thread_hdls: Vec>, } @@ -37,11 +36,7 @@ impl FetchStage { .map(|socket| streamer::receiver(socket, exit.clone(), sender.clone(), "fetch-stage")) .collect(); - Self { exit, thread_hdls } - } - - pub fn close(&self) { - self.exit.store(true, Ordering::Relaxed); + Self { thread_hdls } } } diff --git a/core/src/fullnode.rs b/core/src/fullnode.rs index 314b8cf197..0317cbc2e1 100644 --- a/core/src/fullnode.rs +++ b/core/src/fullnode.rs @@ -36,28 +36,6 @@ use std::thread::JoinHandle; use std::thread::{spawn, Result}; use std::time::Duration; -struct NodeServices { - tpu: Tpu, - tvu: Tvu, -} - -impl NodeServices { - fn new(tpu: Tpu, tvu: Tvu) -> Self { - NodeServices { tpu, tvu } - } - - fn join(self) -> Result<()> { - self.tpu.join()?; - self.tvu.join()?; - Ok(()) - } - - fn exit(&self) { - self.tpu.exit(); - self.tvu.exit(); - } -} - pub struct FullnodeConfig { pub sigverify_disabled: bool, pub voting_disabled: bool, @@ -92,9 +70,10 @@ pub struct Fullnode { rpc_pubsub_service: Option, rpc_working_bank_handle: JoinHandle<()>, gossip_service: GossipService, - node_services: NodeServices, - poh_service: PohService, poh_recorder: Arc>, + poh_service: PohService, + tpu: Tpu, + tvu: Tvu, } impl Fullnode { @@ -274,7 +253,8 @@ impl Fullnode { rpc_service: Some(rpc_service), rpc_pubsub_service: Some(rpc_pubsub_service), rpc_working_bank_handle, - node_services: NodeServices::new(tpu, tvu), + tpu, + tvu, exit, poh_service, poh_recorder, @@ -293,7 +273,6 @@ impl Fullnode { // which is the sole initiator of rotations. self.poh_recorder.lock().unwrap().clear_bank(); self.poh_service.exit(); - self.node_services.exit(); } pub fn close(self) -> Result<()> { @@ -340,7 +319,9 @@ impl Service for Fullnode { self.rpc_working_bank_handle.join()?; self.gossip_service.join()?; - self.node_services.join()?; + self.tpu.join()?; + self.tvu.join()?; + Ok(()) } } diff --git a/core/src/tpu.rs b/core/src/tpu.rs index 3405695d94..a4e8d30848 100644 --- a/core/src/tpu.rs +++ b/core/src/tpu.rs @@ -12,7 +12,7 @@ use crate::service::Service; use crate::sigverify_stage::SigVerifyStage; use solana_sdk::pubkey::Pubkey; use std::net::UdpSocket; -use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::atomic::AtomicBool; use std::sync::mpsc::{channel, Receiver}; use std::sync::{Arc, Mutex, RwLock}; use std::thread; @@ -42,11 +42,7 @@ impl LeaderServices { } } - pub fn exit(&self) { - self.fetch_stage.close(); - } - - fn join(self) -> thread::Result<()> { + pub fn join(self) -> thread::Result<()> { let mut results = vec![]; results.push(self.fetch_stage.join()); results.push(self.sigverify_stage.join()); @@ -59,16 +55,10 @@ impl LeaderServices { let _ = broadcast_result?; Ok(()) } - - pub fn close(self) -> thread::Result<()> { - self.exit(); - self.join() - } } pub struct Tpu { leader_services: LeaderServices, - exit: Arc, pub id: Pubkey, } @@ -114,23 +104,9 @@ impl Tpu { ); Self { leader_services, - exit: exit.clone(), id, } } - - pub fn exit(&self) { - self.exit.store(true, Ordering::Relaxed); - } - - pub fn is_exited(&self) -> bool { - self.exit.load(Ordering::Relaxed) - } - - pub fn close(self) -> thread::Result<()> { - self.exit(); - self.join() - } } impl Service for Tpu { diff --git a/core/src/tvu.rs b/core/src/tvu.rs index 214e427daf..4832876348 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -26,7 +26,7 @@ use crate::service::Service; use crate::storage_stage::{StorageStage, StorageState}; use solana_sdk::signature::{Keypair, KeypairUtil}; use std::net::UdpSocket; -use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::atomic::AtomicBool; use std::sync::mpsc::{channel, Receiver}; use std::sync::{Arc, Mutex, RwLock}; use std::thread; @@ -37,7 +37,6 @@ pub struct Tvu { replay_stage: ReplayStage, blockstream_service: Option, storage_stage: StorageStage, - exit: Arc, } pub struct Sockets { @@ -147,25 +146,8 @@ impl Tvu { replay_stage, blockstream_service, storage_stage, - exit: exit.clone(), } } - - pub fn is_exited(&self) -> bool { - self.exit.load(Ordering::Relaxed) - } - - pub fn exit(&self) { - // Call exit to make sure replay stage is unblocked from a channel it may be blocked on. - // Then replay stage will set the self.exit variable and cause the rest of the - // pipeline to exit - self.replay_stage.exit(); - } - - pub fn close(self) -> thread::Result<()> { - self.exit(); - self.join() - } } impl Service for Tvu { @@ -192,6 +174,7 @@ pub mod tests { use crate::storage_stage::STORAGE_ROTATE_TEST_COUNT; use solana_runtime::bank::Bank; use solana_sdk::genesis_block::GenesisBlock; + use std::sync::atomic::Ordering; #[test] fn test_tvu_exit() { @@ -242,7 +225,8 @@ pub mod tests { &poh_recorder, &exit, ); - tvu.close().expect("close"); - poh_service.close().expect("close"); + exit.store(true, Ordering::Relaxed); + tvu.join().unwrap(); + poh_service.close().unwrap(); } } diff --git a/tests/tvu.rs b/tests/tvu.rs index 2560053330..9f091dd059 100644 --- a/tests/tvu.rs +++ b/tests/tvu.rs @@ -184,9 +184,9 @@ fn test_replay() { let bob_balance = bank.get_balance(&bob_keypair.pubkey()); assert_eq!(bob_balance, starting_balance - alice_ref_balance); - poh_service.close().expect("close"); - tvu.close().expect("close"); exit.store(true, Ordering::Relaxed); + poh_service.close().expect("close"); + tvu.join().expect("join"); dr_l.join().expect("join"); dr_2.join().expect("join"); dr_1.join().expect("join");