Remove code duplication

This commit is contained in:
Michael Vines 2019-02-07 21:13:39 -08:00
parent 6e7c5f205b
commit dbaf8e66ab
1 changed files with 46 additions and 83 deletions

View File

@ -69,12 +69,11 @@ impl ForwarderServices {
}
pub struct Tpu {
tpu_mode: TpuMode,
tpu_mode: Option<TpuMode>,
exit: Arc<AtomicBool>,
}
impl Tpu {
#[allow(clippy::new_ret_no_self)]
#[allow(clippy::too_many_arguments)]
pub fn new(
bank: &Arc<Bank>,
@ -91,59 +90,42 @@ impl Tpu {
to_validator_sender: &TpuRotationSender,
blob_sender: &BlobSender,
) -> Self {
let exit = Arc::new(AtomicBool::new(false));
let tpu_mode = if is_leader {
let (packet_sender, packet_receiver) = channel();
let fetch_stage = FetchStage::new_with_sender(
transactions_sockets,
exit.clone(),
&packet_sender.clone(),
);
let cluster_info_vote_listener =
ClusterInfoVoteListener::new(exit.clone(), cluster_info.clone(), packet_sender);
let (sigverify_stage, verified_receiver) =
SigVerifyStage::new(packet_receiver, sigverify_disabled);
let (banking_stage, entry_receiver) = BankingStage::new(
&bank,
verified_receiver,
tick_duration,
last_entry_id,
max_tick_height,
leader_id,
&to_validator_sender,
);
let broadcast_service = BroadcastService::new(
bank.clone(),
broadcast_socket,
cluster_info,
blob_index,
bank.leader_scheduler.clone(),
entry_receiver,
max_tick_height,
exit.clone(),
blob_sender,
);
let svcs = LeaderServices::new(
fetch_stage,
sigverify_stage,
banking_stage,
cluster_info_vote_listener,
broadcast_service,
);
TpuMode::Leader(svcs)
} else {
let tpu_forwarder = TpuForwarder::new(transactions_sockets, cluster_info);
let svcs = ForwarderServices::new(tpu_forwarder);
TpuMode::Forwarder(svcs)
let mut tpu = Self {
tpu_mode: None,
exit: Arc::new(AtomicBool::new(false)),
};
Self {
tpu_mode,
exit: exit.clone(),
if is_leader {
tpu.switch_to_leader(
bank,
tick_duration,
transactions_sockets,
broadcast_socket,
cluster_info,
sigverify_disabled,
max_tick_height,
blob_index,
last_entry_id,
leader_id,
to_validator_sender,
blob_sender,
);
} else {
tpu.switch_to_forwarder(transactions_sockets, cluster_info);
}
tpu
}
fn tpu_mode_close(&self) {
match &self.tpu_mode {
Some(TpuMode::Leader(svcs)) => {
svcs.fetch_stage.close();
}
Some(TpuMode::Forwarder(svcs)) => {
svcs.tpu_forwarder.close();
}
None => (),
}
}
@ -152,16 +134,9 @@ impl Tpu {
transactions_sockets: Vec<UdpSocket>,
cluster_info: Arc<RwLock<ClusterInfo>>,
) {
match &self.tpu_mode {
TpuMode::Leader(svcs) => {
svcs.fetch_stage.close();
}
TpuMode::Forwarder(svcs) => {
svcs.tpu_forwarder.close();
}
}
self.tpu_mode_close();
let tpu_forwarder = TpuForwarder::new(transactions_sockets, cluster_info);
self.tpu_mode = TpuMode::Forwarder(ForwarderServices::new(tpu_forwarder));
self.tpu_mode = Some(TpuMode::Forwarder(ForwarderServices::new(tpu_forwarder)));
}
#[allow(clippy::too_many_arguments)]
@ -180,14 +155,8 @@ impl Tpu {
to_validator_sender: &TpuRotationSender,
blob_sender: &BlobSender,
) {
match &self.tpu_mode {
TpuMode::Leader(svcs) => {
svcs.fetch_stage.close();
}
TpuMode::Forwarder(svcs) => {
svcs.tpu_forwarder.close();
}
}
self.tpu_mode_close();
self.exit = Arc::new(AtomicBool::new(false));
let (packet_sender, packet_receiver) = channel();
let fetch_stage = FetchStage::new_with_sender(
@ -230,13 +199,13 @@ impl Tpu {
cluster_info_vote_listener,
broadcast_service,
);
self.tpu_mode = TpuMode::Leader(svcs);
self.tpu_mode = Some(TpuMode::Leader(svcs));
}
pub fn is_leader(&self) -> bool {
match self.tpu_mode {
TpuMode::Forwarder(_) => false,
TpuMode::Leader(_) => true,
Some(TpuMode::Leader(_)) => true,
_ => false,
}
}
@ -249,14 +218,7 @@ impl Tpu {
}
pub fn close(self) -> thread::Result<Option<TpuReturnType>> {
match &self.tpu_mode {
TpuMode::Leader(svcs) => {
svcs.fetch_stage.close();
}
TpuMode::Forwarder(svcs) => {
svcs.tpu_forwarder.close();
}
}
self.tpu_mode_close();
self.join()
}
}
@ -266,7 +228,7 @@ impl Service for Tpu {
fn join(self) -> thread::Result<(Option<TpuReturnType>)> {
match self.tpu_mode {
TpuMode::Leader(svcs) => {
Some(TpuMode::Leader(svcs)) => {
svcs.broadcast_service.join()?;
svcs.fetch_stage.join()?;
svcs.sigverify_stage.join()?;
@ -278,10 +240,11 @@ impl Service for Tpu {
_ => Ok(None),
}
}
TpuMode::Forwarder(svcs) => {
Some(TpuMode::Forwarder(svcs)) => {
svcs.tpu_forwarder.join()?;
Ok(None)
}
None => Ok(None),
}
}
}