From 626a381ddc43fbb9a2727e00b0bc138c0cd46326 Mon Sep 17 00:00:00 2001 From: Michael Vines Date: Thu, 14 Feb 2019 14:12:19 -0800 Subject: [PATCH] Collect and re-forward packets received while TpuForwarder is shutting down --- src/fullnode.rs | 96 ++++++++++++++++++-------------------------- src/replay_stage.rs | 12 +++--- src/tpu.rs | 81 +++++++++++++------------------------ src/tpu_forwarder.rs | 89 +++++++++++++++++++++++++++------------- tests/multinode.rs | 8 +++- 5 files changed, 145 insertions(+), 141 deletions(-) diff --git a/src/fullnode.rs b/src/fullnode.rs index 02cfd8907..322ba997f 100644 --- a/src/fullnode.rs +++ b/src/fullnode.rs @@ -98,7 +98,6 @@ pub struct Fullnode { rpc_pubsub_service: Option, gossip_service: GossipService, bank: Arc, - cluster_info: Arc>, sigverify_disabled: bool, tpu_sockets: Vec, broadcast_socket: UdpSocket, @@ -228,8 +227,6 @@ impl Fullnode { (scheduled_leader, max_tick_height, blob_index) }; - cluster_info.write().unwrap().set_leader(scheduled_leader); - let sockets = Sockets { repair: node .sockets @@ -274,34 +271,10 @@ impl Fullnode { ledger_signal_sender, ledger_signal_receiver, ); - let tpu = Tpu::new( - &Arc::new(bank.copy_for_tpu()), - PohServiceConfig::default(), - node.sockets - .tpu - .iter() - .map(|s| s.try_clone().expect("Failed to clone TPU sockets")) - .collect(), - node.sockets - .broadcast - .try_clone() - .expect("Failed to clone broadcast socket"), - &cluster_info, - config.sigverify_disabled, - max_tick_height, - blob_index, - &last_entry_id, - id, - &rotation_sender, - &blocktree, - scheduled_leader == id, - ); + let tpu = Tpu::new(id, &cluster_info); - inc_new_counter_info!("fullnode-new", 1); - - Self { + let mut fullnode = Self { id, - cluster_info, bank, sigverify_disabled: config.sigverify_disabled, gossip_service, @@ -314,7 +287,16 @@ impl Fullnode { rotation_sender, rotation_receiver, blocktree, - } + }; + + fullnode.rotate( + scheduled_leader, + max_tick_height, + blob_index, + &last_entry_id, + ); + inc_new_counter_info!("fullnode-new", 1); + fullnode } fn get_next_leader(&self, tick_height: u64) -> (Pubkey, u64) { @@ -355,30 +337,30 @@ impl Fullnode { max_tick_height ); - self.cluster_info - .write() - .unwrap() - .set_leader(scheduled_leader); - (scheduled_leader, max_tick_height) } - fn rotate(&mut self, tick_height: u64) -> FullnodeReturnType { - trace!("{:?}: rotate at tick_height={}", self.id, tick_height,); - let was_leader = self.node_services.tpu.is_leader(); - - let (scheduled_leader, max_tick_height) = self.get_next_leader(tick_height); - if scheduled_leader == self.id { - let transition = if was_leader { - debug!("{:?} remaining in leader role", self.id); - FullnodeReturnType::LeaderToLeaderRotation - } else { - debug!("{:?} rotating to leader role", self.id); - FullnodeReturnType::ValidatorToLeaderRotation + fn rotate( + &mut self, + next_leader: Pubkey, + max_tick_height: u64, + blob_index: u64, + last_entry_id: &Hash, + ) -> FullnodeReturnType { + if next_leader == self.id { + let transition = match self.node_services.tpu.is_leader() { + Some(was_leader) => { + if was_leader { + debug!("{:?} remaining in leader role", self.id); + FullnodeReturnType::LeaderToLeaderRotation + } else { + debug!("{:?} rotating to leader role", self.id); + FullnodeReturnType::ValidatorToLeaderRotation + } + } + None => FullnodeReturnType::LeaderToLeaderRotation, // value doesn't matter here... }; - let last_entry_id = self.bank.last_id(); - self.node_services.tpu.switch_to_leader( &Arc::new(self.bank.copy_for_tpu()), PohServiceConfig::default(), @@ -391,17 +373,16 @@ impl Fullnode { .expect("Failed to clone broadcast socket"), self.sigverify_disabled, max_tick_height, - 0, - &last_entry_id, - self.id, + blob_index, + last_entry_id, &self.rotation_sender, &self.blocktree, ); - transition } else { debug!("{:?} rotating to validator role", self.id); self.node_services.tpu.switch_to_forwarder( + next_leader, self.tpu_sockets .iter() .map(|s| s.try_clone().expect("Failed to clone TPU sockets")) @@ -430,7 +411,10 @@ impl Fullnode { match self.rotation_receiver.recv_timeout(timeout) { Ok(tick_height) => { - let transition = self.rotate(tick_height); + trace!("{:?}: rotate at tick_height={}", self.id, tick_height); + let (next_leader, max_tick_height) = self.get_next_leader(tick_height); + let transition = + self.rotate(next_leader, max_tick_height, 0, &self.bank.last_id()); debug!("role transition complete: {:?}", transition); if let Some(ref rotation_notifier) = rotation_notifier { rotation_notifier @@ -732,7 +716,7 @@ mod tests { &fullnode_config, ); - assert!(!bootstrap_leader.node_services.tpu.is_leader()); + assert!(!bootstrap_leader.node_services.tpu.is_leader().unwrap()); // Test that a node knows to transition to a leader based on parsing the ledger let validator = Fullnode::new( @@ -744,7 +728,7 @@ mod tests { &fullnode_config, ); - assert!(validator.node_services.tpu.is_leader()); + assert!(validator.node_services.tpu.is_leader().unwrap()); validator.close().expect("Expected leader node to close"); bootstrap_leader .close() diff --git a/src/replay_stage.rs b/src/replay_stage.rs index 6303bae13..c724c5482 100644 --- a/src/replay_stage.rs +++ b/src/replay_stage.rs @@ -278,11 +278,13 @@ impl ReplayStage { // Check for leader rotation let leader_id = Self::get_leader_for_next_tick(&bank); - // TODO: Remove this soon once we boot the leader from ClusterInfo - cluster_info.write().unwrap().set_leader(leader_id); - - if leader_id != last_leader_id && my_id == leader_id { - to_leader_sender.send(current_tick_height).unwrap(); + if leader_id != last_leader_id { + if my_id == leader_id { + to_leader_sender.send(current_tick_height).unwrap(); + } else { + // TODO: Remove this soon once we boot the leader from ClusterInfo + cluster_info.write().unwrap().set_leader(leader_id); + } } // Check for any slots that chain to this one diff --git a/src/tpu.rs b/src/tpu.rs index 636f60e7a..3fb210d6a 100644 --- a/src/tpu.rs +++ b/src/tpu.rs @@ -68,51 +68,18 @@ impl ForwarderServices { pub struct Tpu { tpu_mode: Option, exit: Arc, + id: Pubkey, cluster_info: Arc>, } impl Tpu { - #[allow(clippy::too_many_arguments)] - pub fn new( - bank: &Arc, - tick_duration: PohServiceConfig, - transactions_sockets: Vec, - broadcast_socket: UdpSocket, - cluster_info: &Arc>, - sigverify_disabled: bool, - max_tick_height: u64, - blob_index: u64, - last_entry_id: &Hash, - leader_id: Pubkey, - to_validator_sender: &TpuRotationSender, - blocktree: &Arc, - is_leader: bool, - ) -> Self { - let mut tpu = Self { + pub fn new(id: Pubkey, cluster_info: &Arc>) -> Self { + Self { tpu_mode: None, exit: Arc::new(AtomicBool::new(false)), + id, cluster_info: cluster_info.clone(), - }; - - if is_leader { - tpu.switch_to_leader( - bank, - tick_duration, - transactions_sockets, - broadcast_socket, - sigverify_disabled, - max_tick_height, - blob_index, - last_entry_id, - leader_id, - to_validator_sender, - blocktree, - ); - } else { - tpu.switch_to_forwarder(transactions_sockets); } - - tpu } fn mode_close(&self) { @@ -144,22 +111,30 @@ impl Tpu { fn close_and_forward_unprocessed_packets(&mut self) { self.mode_close(); - if let Some(TpuMode::Leader(svcs)) = self.tpu_mode.take().as_mut() { - let unprocessed_packets = svcs.banking_stage.join_and_collect_unprocessed_packets(); - - if !unprocessed_packets.is_empty() { - let tpu = self.cluster_info.read().unwrap().leader_data().unwrap().tpu; - info!("forwarding unprocessed packets to new leader at {:?}", tpu); - Tpu::forward_unprocessed_packets(&tpu, unprocessed_packets).unwrap_or_else(|err| { - warn!("Failed to forward unprocessed transactions: {:?}", err) - }); + let unprocessed_packets = match self.tpu_mode.take().as_mut() { + Some(TpuMode::Leader(svcs)) => { + svcs.banking_stage.join_and_collect_unprocessed_packets() } + Some(TpuMode::Forwarder(svcs)) => { + svcs.tpu_forwarder.join_and_collect_unprocessed_packets() + } + None => vec![], + }; + + if !unprocessed_packets.is_empty() { + let tpu = self.cluster_info.read().unwrap().leader_data().unwrap().tpu; + info!("forwarding unprocessed packets to new leader at {:?}", tpu); + Tpu::forward_unprocessed_packets(&tpu, unprocessed_packets).unwrap_or_else(|err| { + warn!("Failed to forward unprocessed transactions: {:?}", err) + }); } } - pub fn switch_to_forwarder(&mut self, transactions_sockets: Vec) { + pub fn switch_to_forwarder(&mut self, leader_id: Pubkey, transactions_sockets: Vec) { self.close_and_forward_unprocessed_packets(); + self.cluster_info.write().unwrap().set_leader(leader_id); + let tpu_forwarder = TpuForwarder::new(transactions_sockets, self.cluster_info.clone()); self.tpu_mode = Some(TpuMode::Forwarder(ForwarderServices::new(tpu_forwarder))); } @@ -175,12 +150,13 @@ impl Tpu { max_tick_height: u64, blob_index: u64, last_entry_id: &Hash, - leader_id: Pubkey, to_validator_sender: &TpuRotationSender, blocktree: &Arc, ) { self.close_and_forward_unprocessed_packets(); + self.cluster_info.write().unwrap().set_leader(self.id); + self.exit = Arc::new(AtomicBool::new(false)); let (packet_sender, packet_receiver) = channel(); let fetch_stage = FetchStage::new_with_sender( @@ -203,7 +179,7 @@ impl Tpu { tick_duration, last_entry_id, max_tick_height, - leader_id, + self.id, &to_validator_sender, ); @@ -229,10 +205,11 @@ impl Tpu { self.tpu_mode = Some(TpuMode::Leader(svcs)); } - pub fn is_leader(&self) -> bool { + pub fn is_leader(&self) -> Option { match self.tpu_mode { - Some(TpuMode::Leader(_)) => true, - _ => false, + Some(TpuMode::Leader(_)) => Some(true), + Some(TpuMode::Forwarder(_)) => Some(false), + None => None, } } diff --git a/src/tpu_forwarder.rs b/src/tpu_forwarder.rs index 4ec286ef9..db41e07c7 100644 --- a/src/tpu_forwarder.rs +++ b/src/tpu_forwarder.rs @@ -2,10 +2,10 @@ //! transaction processing unit responsibility, which //! forwards received packets to the current leader +use crate::banking_stage::UnprocessedPackets; use crate::cluster_info::ClusterInfo; use crate::contact_info::ContactInfo; use crate::counter::Counter; -use crate::result::Result; use crate::service::Service; use crate::streamer::{self, PacketReceiver}; use log::Level; @@ -18,9 +18,11 @@ use std::thread::{self, Builder, JoinHandle}; fn get_forwarding_addr(leader_data: Option<&ContactInfo>, my_id: &Pubkey) -> Option { let leader_data = leader_data?; - if leader_data.id == *my_id || !ContactInfo::is_valid_address(&leader_data.tpu) { - // weird cases, but we don't want to broadcast, send to ANY, or - // induce an infinite loop, but this shouldn't happen, or shouldn't be true for long... + if leader_data.id == *my_id { + info!("I may be stuck in a loop"); // Should never try to forward to ourselves + return None; + } + if !ContactInfo::is_valid_address(&leader_data.tpu) { return None; } Some(leader_data.tpu) @@ -29,36 +31,65 @@ fn get_forwarding_addr(leader_data: Option<&ContactInfo>, my_id: &Pubkey) -> Opt pub struct TpuForwarder { exit: Arc, thread_hdls: Vec>, + forwarder_thread: Option>, } impl TpuForwarder { - fn forward(receiver: &PacketReceiver, cluster_info: &Arc>) -> Result<()> { - let socket = UdpSocket::bind("0.0.0.0:0")?; - + fn forward( + receiver: &PacketReceiver, + cluster_info: &Arc>, + exit: &Arc, + ) -> UnprocessedPackets { + let socket = UdpSocket::bind("0.0.0.0:0").expect("Unable to bind"); let my_id = cluster_info.read().unwrap().id(); + let mut unprocessed_packets = vec![]; loop { - let msgs = receiver.recv()?; + match receiver.recv() { + Ok(msgs) => { + inc_new_counter_info!( + "tpu_forwarder-msgs_received", + msgs.read().unwrap().packets.len() + ); - inc_new_counter_info!( - "tpu_forwarder-msgs_received", - msgs.read().unwrap().packets.len() - ); + if exit.load(Ordering::Relaxed) { + // Collect all remaining packets on exit signaled + unprocessed_packets.push((msgs, 0)); + continue; + } - let send_addr = get_forwarding_addr(cluster_info.read().unwrap().leader_data(), &my_id); - - if let Some(send_addr) = send_addr { - msgs.write().unwrap().set_addr(&send_addr); - msgs.read().unwrap().send_to(&socket)?; + match get_forwarding_addr(cluster_info.read().unwrap().leader_data(), &my_id) { + Some(send_addr) => { + msgs.write().unwrap().set_addr(&send_addr); + msgs.read().unwrap().send_to(&socket).unwrap_or_else(|err| { + info!("Failed to forward packet to {:?}: {:?}", send_addr, err) + }); + } + None => warn!("Packets dropped due to no forwarding address"), + } + } + Err(err) => { + trace!("Exiting forwarder due to {:?}", err); + break; + } } } + unprocessed_packets + } + + pub fn join_and_collect_unprocessed_packets(&mut self) -> UnprocessedPackets { + let forwarder_thread = self.forwarder_thread.take().unwrap(); + forwarder_thread.join().unwrap_or_else(|err| { + warn!("forwarder_thread join failed: {:?}", err); + vec![] + }) } pub fn new(sockets: Vec, cluster_info: Arc>) -> Self { let exit = Arc::new(AtomicBool::new(false)); let (sender, receiver) = channel(); - let mut thread_hdls: Vec<_> = sockets + let thread_hdls: Vec<_> = sockets .into_iter() .map(|socket| { streamer::receiver( @@ -70,16 +101,19 @@ impl TpuForwarder { }) .collect(); - let thread_hdl = Builder::new() - .name("solana-tpu_forwarder".to_string()) - .spawn(move || { - let _ignored = Self::forward(&receiver, &cluster_info); - }) - .unwrap(); + let thread_exit = exit.clone(); + let forwarder_thread = Some( + Builder::new() + .name("solana-tpu_forwarder".to_string()) + .spawn(move || Self::forward(&receiver, &cluster_info, &thread_exit)) + .unwrap(), + ); - thread_hdls.push(thread_hdl); - - TpuForwarder { exit, thread_hdls } + TpuForwarder { + exit, + thread_hdls, + forwarder_thread, + } } pub fn close(&self) { @@ -95,6 +129,7 @@ impl Service for TpuForwarder { for thread_hdl in self.thread_hdls { thread_hdl.join()?; } + self.forwarder_thread.unwrap().join()?; Ok(()) } } diff --git a/tests/multinode.rs b/tests/multinode.rs index 8dc402061..9a8f98dd9 100644 --- a/tests/multinode.rs +++ b/tests/multinode.rs @@ -1797,6 +1797,13 @@ fn test_fullnode_rotate( let mut fullnode_config = FullnodeConfig::default(); fullnode_config.leader_scheduler_config.ticks_per_slot = ticks_per_slot; fullnode_config.leader_scheduler_config.slots_per_epoch = slots_per_epoch; + + // Note: when debugging failures in this test, disabling voting can help keep the log noise + // down by removing the extra vote transactions + /* + fullnode_config.voting_disabled = true; + */ + let blocktree_config = fullnode_config.ledger_config(); fullnode_config .leader_scheduler_config @@ -2084,7 +2091,6 @@ fn test_one_fullnode_rotate_every_tick_with_transactions() { } #[test] -#[ignore] fn test_two_fullnodes_rotate_every_tick_with_transactions() { test_fullnode_rotate(1, 1, true, true); }