From 4e3d71c2c9306c794121f30fc1566d7d2ef3b134 Mon Sep 17 00:00:00 2001 From: Carl Date: Sat, 16 Feb 2019 18:03:55 -0800 Subject: [PATCH] Batch joins on entire tpumode struct instead of individual services --- src/tpu.rs | 88 +++++++++++++++++++++----------------------- src/tpu_forwarder.rs | 4 +- 2 files changed, 45 insertions(+), 47 deletions(-) diff --git a/src/tpu.rs b/src/tpu.rs index b3f00cead3..d5ffc3c359 100644 --- a/src/tpu.rs +++ b/src/tpu.rs @@ -31,11 +31,11 @@ pub enum TpuMode { } pub struct LeaderServices { - fetch_stage: Option, - sigverify_stage: Option, - banking_stage: Option, - cluster_info_vote_listener: Option, - broadcast_service: Option, + fetch_stage: FetchStage, + sigverify_stage: SigVerifyStage, + banking_stage: BankingStage, + cluster_info_vote_listener: ClusterInfoVoteListener, + broadcast_service: BroadcastService, } impl LeaderServices { @@ -47,25 +47,25 @@ impl LeaderServices { broadcast_service: BroadcastService, ) -> Self { LeaderServices { - fetch_stage: Some(fetch_stage), - sigverify_stage: Some(sigverify_stage), - banking_stage: Some(banking_stage), - cluster_info_vote_listener: Some(cluster_info_vote_listener), - broadcast_service: Some(broadcast_service), + fetch_stage, + sigverify_stage, + banking_stage, + cluster_info_vote_listener, + broadcast_service, } } fn exit(&self) { - self.fetch_stage.as_ref().unwrap().close(); + self.fetch_stage.close(); } - fn join(&mut self) -> thread::Result<()> { + fn join(self) -> thread::Result<()> { let mut results = vec![]; - results.push(self.fetch_stage.take().unwrap().join()); - results.push(self.sigverify_stage.take().unwrap().join()); - results.push(self.cluster_info_vote_listener.take().unwrap().join()); - results.push(self.banking_stage.take().unwrap().join()); - let broadcast_result = self.broadcast_service.take().unwrap().join(); + results.push(self.fetch_stage.join()); + results.push(self.sigverify_stage.join()); + results.push(self.cluster_info_vote_listener.join()); + results.push(self.banking_stage.join()); + let broadcast_result = self.broadcast_service.join(); for result in results { result?; } @@ -73,32 +73,30 @@ impl LeaderServices { Ok(()) } - fn close(&mut self) -> thread::Result<()> { + fn close(self) -> thread::Result<()> { self.exit(); self.join() } } pub struct ForwarderServices { - tpu_forwarder: Option, + tpu_forwarder: TpuForwarder, } impl ForwarderServices { fn new(tpu_forwarder: TpuForwarder) -> Self { - ForwarderServices { - tpu_forwarder: Some(tpu_forwarder), - } + ForwarderServices { tpu_forwarder } } fn exit(&self) { - self.tpu_forwarder.as_ref().unwrap().close(); + self.tpu_forwarder.close(); } - fn join(&mut self) -> thread::Result<()> { - self.tpu_forwarder.take().unwrap().join() + fn join(self) -> thread::Result<()> { + self.tpu_forwarder.join() } - fn close(&mut self) -> thread::Result<()> { + fn close(self) -> thread::Result<()> { self.exit(); self.join() } @@ -134,14 +132,16 @@ impl Tpu { } fn mode_close(&mut self) { - match &mut self.tpu_mode { - Some(TpuMode::Leader(svcs)) => { - let _ = svcs.close(); + let tpu_mode = self.tpu_mode.take(); + if let Some(tpu_mode) = tpu_mode { + match tpu_mode { + TpuMode::Leader(svcs) => { + let _ = svcs.close(); + } + TpuMode::Forwarder(svcs) => { + let _ = svcs.close(); + } } - Some(TpuMode::Forwarder(svcs)) => { - let _ = svcs.close(); - } - None => (), } } @@ -162,17 +162,13 @@ impl Tpu { fn close_and_forward_unprocessed_packets(&mut self) { self.mode_exit(); - let unprocessed_packets = match self.tpu_mode.take().as_mut() { - Some(TpuMode::Leader(svcs)) => svcs - .banking_stage - .as_mut() - .unwrap() - .join_and_collect_unprocessed_packets(), - Some(TpuMode::Forwarder(svcs)) => svcs - .tpu_forwarder - .as_mut() - .unwrap() - .join_and_collect_unprocessed_packets(), + let unprocessed_packets = match self.tpu_mode.as_mut() { + Some(TpuMode::Leader(svcs)) => { + svcs.banking_stage.join_and_collect_unprocessed_packets() + } + Some(TpuMode::Forwarder(svcs)) => { + svcs.tpu_forwarder.join_and_collect_unprocessed_packets() + } None => vec![], }; @@ -290,8 +286,8 @@ impl Service for Tpu { fn join(self) -> thread::Result<()> { match self.tpu_mode { - Some(TpuMode::Leader(mut svcs)) => svcs.join()?, - Some(TpuMode::Forwarder(mut svcs)) => svcs.join()?, + Some(TpuMode::Leader(svcs)) => svcs.join()?, + Some(TpuMode::Forwarder(svcs)) => svcs.join()?, None => (), } Ok(()) diff --git a/src/tpu_forwarder.rs b/src/tpu_forwarder.rs index db41e07c74..4448f88a23 100644 --- a/src/tpu_forwarder.rs +++ b/src/tpu_forwarder.rs @@ -129,7 +129,9 @@ impl Service for TpuForwarder { for thread_hdl in self.thread_hdls { thread_hdl.join()?; } - self.forwarder_thread.unwrap().join()?; + if let Some(forwarder_thread) = self.forwarder_thread { + forwarder_thread.join()?; + } Ok(()) } }