Clean up fullnode/tpu/tvu/fetch_stage exit signal

This commit is contained in:
Michael Vines 2019-03-04 19:02:03 -08:00 committed by Grimes
parent e7cde846cb
commit 96bfe92334
5 changed files with 19 additions and 83 deletions

View File

@ -3,13 +3,12 @@
use crate::service::Service;
use crate::streamer::{self, PacketReceiver, PacketSender};
use std::net::UdpSocket;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::atomic::AtomicBool;
use std::sync::mpsc::channel;
use std::sync::Arc;
use std::thread::{self, JoinHandle};
pub struct FetchStage {
exit: Arc<AtomicBool>,
thread_hdls: Vec<JoinHandle<()>>,
}
@ -37,11 +36,7 @@ impl FetchStage {
.map(|socket| streamer::receiver(socket, exit.clone(), sender.clone(), "fetch-stage"))
.collect();
Self { exit, thread_hdls }
}
pub fn close(&self) {
self.exit.store(true, Ordering::Relaxed);
Self { thread_hdls }
}
}

View File

@ -36,28 +36,6 @@ use std::thread::JoinHandle;
use std::thread::{spawn, Result};
use std::time::Duration;
struct NodeServices {
tpu: Tpu,
tvu: Tvu,
}
impl NodeServices {
fn new(tpu: Tpu, tvu: Tvu) -> Self {
NodeServices { tpu, tvu }
}
fn join(self) -> Result<()> {
self.tpu.join()?;
self.tvu.join()?;
Ok(())
}
fn exit(&self) {
self.tpu.exit();
self.tvu.exit();
}
}
pub struct FullnodeConfig {
pub sigverify_disabled: bool,
pub voting_disabled: bool,
@ -92,9 +70,10 @@ pub struct Fullnode {
rpc_pubsub_service: Option<PubSubService>,
rpc_working_bank_handle: JoinHandle<()>,
gossip_service: GossipService,
node_services: NodeServices,
poh_service: PohService,
poh_recorder: Arc<Mutex<PohRecorder>>,
poh_service: PohService,
tpu: Tpu,
tvu: Tvu,
}
impl Fullnode {
@ -274,7 +253,8 @@ impl Fullnode {
rpc_service: Some(rpc_service),
rpc_pubsub_service: Some(rpc_pubsub_service),
rpc_working_bank_handle,
node_services: NodeServices::new(tpu, tvu),
tpu,
tvu,
exit,
poh_service,
poh_recorder,
@ -293,7 +273,6 @@ impl Fullnode {
// which is the sole initiator of rotations.
self.poh_recorder.lock().unwrap().clear_bank();
self.poh_service.exit();
self.node_services.exit();
}
pub fn close(self) -> Result<()> {
@ -340,7 +319,9 @@ impl Service for Fullnode {
self.rpc_working_bank_handle.join()?;
self.gossip_service.join()?;
self.node_services.join()?;
self.tpu.join()?;
self.tvu.join()?;
Ok(())
}
}

View File

@ -12,7 +12,7 @@ use crate::service::Service;
use crate::sigverify_stage::SigVerifyStage;
use solana_sdk::pubkey::Pubkey;
use std::net::UdpSocket;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::atomic::AtomicBool;
use std::sync::mpsc::{channel, Receiver};
use std::sync::{Arc, Mutex, RwLock};
use std::thread;
@ -42,11 +42,7 @@ impl LeaderServices {
}
}
pub fn exit(&self) {
self.fetch_stage.close();
}
fn join(self) -> thread::Result<()> {
pub fn join(self) -> thread::Result<()> {
let mut results = vec![];
results.push(self.fetch_stage.join());
results.push(self.sigverify_stage.join());
@ -59,16 +55,10 @@ impl LeaderServices {
let _ = broadcast_result?;
Ok(())
}
pub fn close(self) -> thread::Result<()> {
self.exit();
self.join()
}
}
pub struct Tpu {
leader_services: LeaderServices,
exit: Arc<AtomicBool>,
pub id: Pubkey,
}
@ -114,23 +104,9 @@ impl Tpu {
);
Self {
leader_services,
exit: exit.clone(),
id,
}
}
pub fn exit(&self) {
self.exit.store(true, Ordering::Relaxed);
}
pub fn is_exited(&self) -> bool {
self.exit.load(Ordering::Relaxed)
}
pub fn close(self) -> thread::Result<()> {
self.exit();
self.join()
}
}
impl Service for Tpu {

View File

@ -26,7 +26,7 @@ use crate::service::Service;
use crate::storage_stage::{StorageStage, StorageState};
use solana_sdk::signature::{Keypair, KeypairUtil};
use std::net::UdpSocket;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::atomic::AtomicBool;
use std::sync::mpsc::{channel, Receiver};
use std::sync::{Arc, Mutex, RwLock};
use std::thread;
@ -37,7 +37,6 @@ pub struct Tvu {
replay_stage: ReplayStage,
blockstream_service: Option<BlockstreamService>,
storage_stage: StorageStage,
exit: Arc<AtomicBool>,
}
pub struct Sockets {
@ -147,25 +146,8 @@ impl Tvu {
replay_stage,
blockstream_service,
storage_stage,
exit: exit.clone(),
}
}
pub fn is_exited(&self) -> bool {
self.exit.load(Ordering::Relaxed)
}
pub fn exit(&self) {
// Call exit to make sure replay stage is unblocked from a channel it may be blocked on.
// Then replay stage will set the self.exit variable and cause the rest of the
// pipeline to exit
self.replay_stage.exit();
}
pub fn close(self) -> thread::Result<()> {
self.exit();
self.join()
}
}
impl Service for Tvu {
@ -192,6 +174,7 @@ pub mod tests {
use crate::storage_stage::STORAGE_ROTATE_TEST_COUNT;
use solana_runtime::bank::Bank;
use solana_sdk::genesis_block::GenesisBlock;
use std::sync::atomic::Ordering;
#[test]
fn test_tvu_exit() {
@ -242,7 +225,8 @@ pub mod tests {
&poh_recorder,
&exit,
);
tvu.close().expect("close");
poh_service.close().expect("close");
exit.store(true, Ordering::Relaxed);
tvu.join().unwrap();
poh_service.close().unwrap();
}
}

View File

@ -184,9 +184,9 @@ fn test_replay() {
let bob_balance = bank.get_balance(&bob_keypair.pubkey());
assert_eq!(bob_balance, starting_balance - alice_ref_balance);
poh_service.close().expect("close");
tvu.close().expect("close");
exit.store(true, Ordering::Relaxed);
poh_service.close().expect("close");
tvu.join().expect("join");
dr_l.join().expect("join");
dr_2.join().expect("join");
dr_1.join().expect("join");