From a8e1c44663882688550ec3afca93e4a6fca00618 Mon Sep 17 00:00:00 2001 From: Anatoly Yakovenko Date: Wed, 30 May 2018 13:38:15 -0700 Subject: [PATCH] names --- src/banking_stage.rs | 29 ++++++++++++----------- src/record_stage.rs | 37 ++++++++++++++++-------------- src/replicate_stage.rs | 17 ++++++++------ src/request_stage.rs | 31 +++++++++++++------------ src/write_stage.rs | 52 +++++++++++++++++++++++------------------- 5 files changed, 92 insertions(+), 74 deletions(-) diff --git a/src/banking_stage.rs b/src/banking_stage.rs index aed137d239..62f480b294 100644 --- a/src/banking_stage.rs +++ b/src/banking_stage.rs @@ -11,7 +11,7 @@ use std::net::SocketAddr; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::{channel, Receiver, Sender}; use std::sync::Arc; -use std::thread::{spawn, JoinHandle}; +use std::thread::{Builder, JoinHandle}; use std::time::Duration; use std::time::Instant; use timing; @@ -30,19 +30,22 @@ impl BankingStage { packet_recycler: packet::PacketRecycler, ) -> Self { let (signal_sender, signal_receiver) = channel(); - let thread_hdl = spawn(move || loop { - let e = Self::process_packets( - bank.clone(), - &verified_receiver, - &signal_sender, - &packet_recycler, - ); - if e.is_err() { - if exit.load(Ordering::Relaxed) { - break; + let thread_hdl = Builder::new() + .name("solana-banking-stage".to_string()) + .spawn(move || loop { + let e = Self::process_packets( + bank.clone(), + &verified_receiver, + &signal_sender, + &packet_recycler, + ); + if e.is_err() { + if exit.load(Ordering::Relaxed) { + break; + } } - } - }); + }) + .unwrap(); BankingStage { thread_hdl, signal_receiver, diff --git a/src/record_stage.rs b/src/record_stage.rs index b4cd3cec91..f8523856b2 100644 --- a/src/record_stage.rs +++ b/src/record_stage.rs @@ -9,7 +9,7 @@ use entry::Entry; use hash::Hash; use recorder::Recorder; use std::sync::mpsc::{channel, Receiver, Sender, TryRecvError}; -use std::thread::{spawn, JoinHandle}; +use std::thread::{Builder, JoinHandle}; use std::time::{Duration, Instant}; use transaction::Transaction; @@ -35,23 +35,26 @@ impl RecordStage { let (entry_sender, entry_receiver) = channel(); let start_hash = start_hash.clone(); - let thread_hdl = spawn(move || { - let mut recorder = Recorder::new(start_hash); - let duration_data = tick_duration.map(|dur| (Instant::now(), dur)); - loop { - if let Err(_) = Self::process_transactions( - &mut recorder, - duration_data, - &transaction_receiver, - &entry_sender, - ) { - return; + let thread_hdl = Builder::new() + .name("solana-record-stage".to_string()) + .spawn(move || { + let mut recorder = Recorder::new(start_hash); + let duration_data = tick_duration.map(|dur| (Instant::now(), dur)); + loop { + if let Err(_) = Self::process_transactions( + &mut recorder, + duration_data, + &transaction_receiver, + &entry_sender, + ) { + return; + } + if duration_data.is_some() { + recorder.hash(); + } } - if duration_data.is_some() { - recorder.hash(); - } - } - }); + }) + .unwrap(); RecordStage { entry_receiver, diff --git a/src/replicate_stage.rs b/src/replicate_stage.rs index a3df0c4a32..994e2285db 100644 --- a/src/replicate_stage.rs +++ b/src/replicate_stage.rs @@ -6,7 +6,7 @@ use packet; use result::Result; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; -use std::thread::{spawn, JoinHandle}; +use std::thread::{Builder, JoinHandle}; use std::time::Duration; use streamer; @@ -41,12 +41,15 @@ impl ReplicateStage { window_receiver: streamer::BlobReceiver, blob_recycler: packet::BlobRecycler, ) -> Self { - let thread_hdl = spawn(move || loop { - let e = Self::replicate_requests(&bank, &window_receiver, &blob_recycler); - if e.is_err() && exit.load(Ordering::Relaxed) { - break; - } - }); + let thread_hdl = Builder::new() + .name("solana-replicate-stage".to_string()) + .spawn(move || loop { + let e = Self::replicate_requests(&bank, &window_receiver, &blob_recycler); + if e.is_err() && exit.load(Ordering::Relaxed) { + break; + } + }) + .unwrap(); ReplicateStage { thread_hdl } } } diff --git a/src/request_stage.rs b/src/request_stage.rs index 4c10659eec..d2c1978508 100644 --- a/src/request_stage.rs +++ b/src/request_stage.rs @@ -11,7 +11,7 @@ use std::net::SocketAddr; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::{channel, Receiver}; use std::sync::Arc; -use std::thread::{spawn, JoinHandle}; +use std::thread::{Builder, JoinHandle}; use std::time::Instant; use streamer; use timing; @@ -90,20 +90,23 @@ impl RequestStage { let request_processor = Arc::new(request_processor); let request_processor_ = request_processor.clone(); let (blob_sender, blob_receiver) = channel(); - let thread_hdl = spawn(move || loop { - let e = Self::process_request_packets( - &request_processor_, - &packet_receiver, - &blob_sender, - &packet_recycler, - &blob_recycler, - ); - if e.is_err() { - if exit.load(Ordering::Relaxed) { - break; + let thread_hdl = Builder::new() + .name("solana-request-stage".to_string()) + .spawn(move || loop { + let e = Self::process_request_packets( + &request_processor_, + &packet_receiver, + &blob_sender, + &packet_recycler, + &blob_recycler, + ); + if e.is_err() { + if exit.load(Ordering::Relaxed) { + break; + } } - } - }); + }) + .unwrap(); RequestStage { thread_hdl, blob_receiver, diff --git a/src/write_stage.rs b/src/write_stage.rs index fd25384677..60a55243c6 100644 --- a/src/write_stage.rs +++ b/src/write_stage.rs @@ -8,7 +8,7 @@ use std::io::Write; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::{channel, Receiver}; use std::sync::{Arc, Mutex}; -use std::thread::{spawn, JoinHandle}; +use std::thread::{Builder, JoinHandle}; use streamer; pub struct WriteStage { @@ -26,19 +26,22 @@ impl WriteStage { entry_receiver: Receiver, ) -> Self { let (blob_sender, blob_receiver) = channel(); - let thread_hdl = spawn(move || loop { - let entry_writer = EntryWriter::new(&bank); - let _ = entry_writer.write_and_send_entries( - &blob_sender, - &blob_recycler, - &writer, - &entry_receiver, - ); - if exit.load(Ordering::Relaxed) { - info!("broadcat_service exiting"); - break; - } - }); + let thread_hdl = Builder::new() + .name("solana-writer".to_string()) + .spawn(move || loop { + let entry_writer = EntryWriter::new(&bank); + let _ = entry_writer.write_and_send_entries( + &blob_sender, + &blob_recycler, + &writer, + &entry_receiver, + ); + if exit.load(Ordering::Relaxed) { + info!("broadcat_service exiting"); + break; + } + }) + .unwrap(); WriteStage { thread_hdl, @@ -52,16 +55,19 @@ impl WriteStage { entry_receiver: Receiver, ) -> Self { let (_blob_sender, blob_receiver) = channel(); - let thread_hdl = spawn(move || { - let entry_writer = EntryWriter::new(&bank); - loop { - let _ = entry_writer.drain_entries(&entry_receiver); - if exit.load(Ordering::Relaxed) { - info!("drain_service exiting"); - break; + let thread_hdl = Builder::new() + .name("solana-drain".to_string()) + .spawn(move || { + let entry_writer = EntryWriter::new(&bank); + loop { + let _ = entry_writer.drain_entries(&entry_receiver); + if exit.load(Ordering::Relaxed) { + info!("drain_service exiting"); + break; + } } - } - }); + }) + .unwrap(); WriteStage { thread_hdl,