From dbaf8e66ab593c7b2fad63292ffcdb8da6506dcd Mon Sep 17 00:00:00 2001 From: Michael Vines Date: Thu, 7 Feb 2019 21:13:39 -0800 Subject: [PATCH] Remove code duplication --- src/tpu.rs | 129 +++++++++++++++++++---------------------------------- 1 file changed, 46 insertions(+), 83 deletions(-) diff --git a/src/tpu.rs b/src/tpu.rs index f501f7b2b4..9b61b53254 100644 --- a/src/tpu.rs +++ b/src/tpu.rs @@ -69,12 +69,11 @@ impl ForwarderServices { } pub struct Tpu { - tpu_mode: TpuMode, + tpu_mode: Option, exit: Arc, } impl Tpu { - #[allow(clippy::new_ret_no_self)] #[allow(clippy::too_many_arguments)] pub fn new( bank: &Arc, @@ -91,59 +90,42 @@ impl Tpu { to_validator_sender: &TpuRotationSender, blob_sender: &BlobSender, ) -> Self { - let exit = Arc::new(AtomicBool::new(false)); - let tpu_mode = if is_leader { - let (packet_sender, packet_receiver) = channel(); - let fetch_stage = FetchStage::new_with_sender( - transactions_sockets, - exit.clone(), - &packet_sender.clone(), - ); - let cluster_info_vote_listener = - ClusterInfoVoteListener::new(exit.clone(), cluster_info.clone(), packet_sender); - - let (sigverify_stage, verified_receiver) = - SigVerifyStage::new(packet_receiver, sigverify_disabled); - - let (banking_stage, entry_receiver) = BankingStage::new( - &bank, - verified_receiver, - tick_duration, - last_entry_id, - max_tick_height, - leader_id, - &to_validator_sender, - ); - - let broadcast_service = BroadcastService::new( - bank.clone(), - broadcast_socket, - cluster_info, - blob_index, - bank.leader_scheduler.clone(), - entry_receiver, - max_tick_height, - exit.clone(), - blob_sender, - ); - - let svcs = LeaderServices::new( - fetch_stage, - sigverify_stage, - banking_stage, - cluster_info_vote_listener, - broadcast_service, - ); - TpuMode::Leader(svcs) - } else { - let tpu_forwarder = TpuForwarder::new(transactions_sockets, cluster_info); - let svcs = ForwarderServices::new(tpu_forwarder); - TpuMode::Forwarder(svcs) + let mut tpu = Self { + tpu_mode: None, + exit: Arc::new(AtomicBool::new(false)), }; - Self { - tpu_mode, - exit: exit.clone(), + if is_leader { + tpu.switch_to_leader( + bank, + tick_duration, + transactions_sockets, + broadcast_socket, + cluster_info, + sigverify_disabled, + max_tick_height, + blob_index, + last_entry_id, + leader_id, + to_validator_sender, + blob_sender, + ); + } else { + tpu.switch_to_forwarder(transactions_sockets, cluster_info); + } + + tpu + } + + fn tpu_mode_close(&self) { + match &self.tpu_mode { + Some(TpuMode::Leader(svcs)) => { + svcs.fetch_stage.close(); + } + Some(TpuMode::Forwarder(svcs)) => { + svcs.tpu_forwarder.close(); + } + None => (), } } @@ -152,16 +134,9 @@ impl Tpu { transactions_sockets: Vec, cluster_info: Arc>, ) { - match &self.tpu_mode { - TpuMode::Leader(svcs) => { - svcs.fetch_stage.close(); - } - TpuMode::Forwarder(svcs) => { - svcs.tpu_forwarder.close(); - } - } + self.tpu_mode_close(); let tpu_forwarder = TpuForwarder::new(transactions_sockets, cluster_info); - self.tpu_mode = TpuMode::Forwarder(ForwarderServices::new(tpu_forwarder)); + self.tpu_mode = Some(TpuMode::Forwarder(ForwarderServices::new(tpu_forwarder))); } #[allow(clippy::too_many_arguments)] @@ -180,14 +155,8 @@ impl Tpu { to_validator_sender: &TpuRotationSender, blob_sender: &BlobSender, ) { - match &self.tpu_mode { - TpuMode::Leader(svcs) => { - svcs.fetch_stage.close(); - } - TpuMode::Forwarder(svcs) => { - svcs.tpu_forwarder.close(); - } - } + self.tpu_mode_close(); + self.exit = Arc::new(AtomicBool::new(false)); let (packet_sender, packet_receiver) = channel(); let fetch_stage = FetchStage::new_with_sender( @@ -230,13 +199,13 @@ impl Tpu { cluster_info_vote_listener, broadcast_service, ); - self.tpu_mode = TpuMode::Leader(svcs); + self.tpu_mode = Some(TpuMode::Leader(svcs)); } pub fn is_leader(&self) -> bool { match self.tpu_mode { - TpuMode::Forwarder(_) => false, - TpuMode::Leader(_) => true, + Some(TpuMode::Leader(_)) => true, + _ => false, } } @@ -249,14 +218,7 @@ impl Tpu { } pub fn close(self) -> thread::Result> { - match &self.tpu_mode { - TpuMode::Leader(svcs) => { - svcs.fetch_stage.close(); - } - TpuMode::Forwarder(svcs) => { - svcs.tpu_forwarder.close(); - } - } + self.tpu_mode_close(); self.join() } } @@ -266,7 +228,7 @@ impl Service for Tpu { fn join(self) -> thread::Result<(Option)> { match self.tpu_mode { - TpuMode::Leader(svcs) => { + Some(TpuMode::Leader(svcs)) => { svcs.broadcast_service.join()?; svcs.fetch_stage.join()?; svcs.sigverify_stage.join()?; @@ -278,10 +240,11 @@ impl Service for Tpu { _ => Ok(None), } } - TpuMode::Forwarder(svcs) => { + Some(TpuMode::Forwarder(svcs)) => { svcs.tpu_forwarder.join()?; Ok(None) } + None => Ok(None), } } }