From 2782922f7abcc1ca7eff79a2cb1e93804b82665d Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Fri, 1 Mar 2019 20:43:30 -0700 Subject: [PATCH] Rename BroadcastService back to BroadcastStage --- ...roadcast_service.rs => broadcast_stage.rs} | 34 +++++++++---------- src/lib.rs | 2 +- src/tpu.rs | 16 ++++----- 3 files changed, 26 insertions(+), 26 deletions(-) rename src/{broadcast_service.rs => broadcast_stage.rs} (93%) diff --git a/src/broadcast_service.rs b/src/broadcast_stage.rs similarity index 93% rename from src/broadcast_service.rs rename to src/broadcast_stage.rs index c70c225e7..44a50962d 100644 --- a/src/broadcast_service.rs +++ b/src/broadcast_stage.rs @@ -1,4 +1,4 @@ -//! The `broadcast_service` broadcasts data from a leader node to validators +//! A stage to broadcast data from a leader node to validators //! use crate::blocktree::Blocktree; use crate::cluster_info::{ClusterInfo, ClusterInfoError, NodeInfo, DATA_PLANE_FANOUT}; @@ -24,7 +24,7 @@ use std::thread::{self, Builder, JoinHandle}; use std::time::{Duration, Instant}; #[derive(Debug, PartialEq, Eq, Clone)] -pub enum BroadcastServiceReturnType { +pub enum BroadcastStageReturnType { LeaderRotation, ChannelDisconnected, ExitSignal, @@ -138,7 +138,7 @@ impl Broadcast { } } -// Implement a destructor for the BroadcastService3 thread to signal it exited +// Implement a destructor for the BroadcastStage thread to signal it exited // even on panics struct Finalizer { exit_sender: Arc, @@ -156,11 +156,11 @@ impl Drop for Finalizer { } } -pub struct BroadcastService { - thread_hdl: JoinHandle, +pub struct BroadcastStage { + thread_hdl: JoinHandle, } -impl BroadcastService { +impl BroadcastStage { #[allow(clippy::too_many_arguments)] fn run( slot_height: u64, @@ -171,7 +171,7 @@ impl BroadcastService { receiver: &Receiver>, exit_signal: &Arc, blocktree: &Arc, - ) -> BroadcastServiceReturnType { + ) -> BroadcastStageReturnType { let me = cluster_info.read().unwrap().my_data().clone(); let mut broadcast = Broadcast { @@ -185,7 +185,7 @@ impl BroadcastService { loop { if exit_signal.load(Ordering::Relaxed) { - return BroadcastServiceReturnType::ExitSignal; + return BroadcastStageReturnType::ExitSignal; } let mut broadcast_table = cluster_info .read() @@ -204,7 +204,7 @@ impl BroadcastService { ) { match e { Error::RecvTimeoutError(RecvTimeoutError::Disconnected) | Error::SendError => { - return BroadcastServiceReturnType::ChannelDisconnected; + return BroadcastStageReturnType::ChannelDisconnected; } Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (), Error::ClusterInfoError(ClusterInfoError::NoPeers) => (), // TODO: Why are the unit-tests throwing hundreds of these? @@ -267,10 +267,10 @@ impl BroadcastService { } } -impl Service for BroadcastService { - type JoinReturnType = BroadcastServiceReturnType; +impl Service for BroadcastStage { + type JoinReturnType = BroadcastStageReturnType; - fn join(self) -> thread::Result { + fn join(self) -> thread::Result { self.thread_hdl.join() } } @@ -291,9 +291,9 @@ mod test { use std::thread::sleep; use std::time::Duration; - struct MockBroadcastService { + struct MockBroadcastStage { blocktree: Arc, - broadcast_service: BroadcastService, + broadcast_service: BroadcastStage, } fn setup_dummy_broadcast_service( @@ -302,7 +302,7 @@ mod test { ledger_path: &str, entry_receiver: Receiver>, blob_index: u64, - ) -> MockBroadcastService { + ) -> MockBroadcastStage { // Make the database ledger let blocktree = Arc::new(Blocktree::open(ledger_path).unwrap()); @@ -322,7 +322,7 @@ mod test { let bank = Arc::new(Bank::default()); // Start up the broadcast stage - let broadcast_service = BroadcastService::new( + let broadcast_service = BroadcastStage::new( slot_height, &bank, leader_info.sockets.broadcast, @@ -333,7 +333,7 @@ mod test { &blocktree, ); - MockBroadcastService { + MockBroadcastStage { blocktree, broadcast_service, } diff --git a/src/lib.rs b/src/lib.rs index de6ed8440..d28bce4a9 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -10,7 +10,7 @@ pub mod bank_forks; pub mod banking_stage; pub mod blob_fetch_stage; -pub mod broadcast_service; +pub mod broadcast_stage; #[cfg(feature = "chacha")] pub mod chacha; #[cfg(all(feature = "chacha", feature = "cuda"))] diff --git a/src/tpu.rs b/src/tpu.rs index c5448daf7..ba514b34f 100644 --- a/src/tpu.rs +++ b/src/tpu.rs @@ -3,7 +3,7 @@ use crate::banking_stage::{BankingStage, UnprocessedPackets}; use crate::blocktree::Blocktree; -use crate::broadcast_service::BroadcastService; +use crate::broadcast_stage::BroadcastStage; use crate::cluster_info::ClusterInfo; use crate::cluster_info_vote_listener::ClusterInfoVoteListener; use crate::fetch_stage::FetchStage; @@ -29,7 +29,7 @@ pub struct LeaderServices { sigverify_stage: SigVerifyStage, banking_stage: BankingStage, cluster_info_vote_listener: ClusterInfoVoteListener, - broadcast_service: BroadcastService, + broadcast_stage: BroadcastStage, } impl LeaderServices { @@ -38,14 +38,14 @@ impl LeaderServices { sigverify_stage: SigVerifyStage, banking_stage: BankingStage, cluster_info_vote_listener: ClusterInfoVoteListener, - broadcast_service: BroadcastService, + broadcast_stage: BroadcastStage, ) -> Self { LeaderServices { fetch_stage, sigverify_stage, banking_stage, cluster_info_vote_listener, - broadcast_service, + broadcast_stage, } } @@ -59,7 +59,7 @@ impl LeaderServices { results.push(self.sigverify_stage.join()); results.push(self.cluster_info_vote_listener.join()); results.push(self.banking_stage.join()); - let broadcast_result = self.broadcast_service.join(); + let broadcast_result = self.broadcast_stage.join(); for result in results { result?; } @@ -217,7 +217,7 @@ impl Tpu { let (sigverify_stage, verified_receiver) = SigVerifyStage::new(packet_receiver, sigverify_disabled); - // TODO: Fix BankingStage/BroadcastService to operate on `slot` directly instead of + // TODO: Fix BankingStage/BroadcastStage to operate on `slot` directly instead of // `max_tick_height` let max_tick_height = (slot + 1) * bank.ticks_per_slot() - 1; let blob_index = blocktree @@ -234,7 +234,7 @@ impl Tpu { self.id, ); - let broadcast_service = BroadcastService::new( + let broadcast_stage = BroadcastStage::new( slot, bank, broadcast_socket, @@ -250,7 +250,7 @@ impl Tpu { sigverify_stage, banking_stage, cluster_info_vote_listener, - broadcast_service, + broadcast_stage, ); self.tpu_mode = Some(TpuMode::Leader(svcs)); }