From 841609959c7213048149ca9ed55d7a7d07af06a3 Mon Sep 17 00:00:00 2001 From: Tyera Date: Mon, 5 Jun 2023 18:19:17 -0600 Subject: [PATCH] Send messages to EntryNotifierService in Tpu, simpler (#31962) * Add TpuEntryNotifier to send EntryNotifications from Tpu * Optionally run TpuEntryNotifier to send out EntrySummarys alongside BroadcastStage messages * Track entry index in TpuEntryNotifier * Allow for leader slots that switch forks * Exit if broadcast send fails --- core/src/lib.rs | 1 + core/src/tpu.rs | 22 ++++++- core/src/tpu_entry_notifier.rs | 101 +++++++++++++++++++++++++++++++++ 3 files changed, 123 insertions(+), 1 deletion(-) create mode 100644 core/src/tpu_entry_notifier.rs diff --git a/core/src/lib.rs b/core/src/lib.rs index 7f48a9cbe..674773223 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -74,6 +74,7 @@ mod tower1_14_11; mod tower1_7_14; pub mod tower_storage; pub mod tpu; +mod tpu_entry_notifier; pub mod tracer_packet_stats; pub mod tree_diff; pub mod tvu; diff --git a/core/src/tpu.rs b/core/src/tpu.rs index 75bd7832d..515c2c5e7 100644 --- a/core/src/tpu.rs +++ b/core/src/tpu.rs @@ -15,6 +15,7 @@ use { sigverify::TransactionSigVerifier, sigverify_stage::SigVerifyStage, staked_nodes_updater_service::StakedNodesUpdaterService, + tpu_entry_notifier::TpuEntryNotifier, validator::GeneratorConfig, }, crossbeam_channel::{unbounded, Receiver}, @@ -70,6 +71,7 @@ pub struct Tpu { broadcast_stage: BroadcastStage, tpu_quic_t: thread::JoinHandle<()>, tpu_forwards_quic_t: thread::JoinHandle<()>, + tpu_entry_notifier: Option, staked_nodes_updater_service: StakedNodesUpdaterService, tracer_thread_hdl: TracerThread, } @@ -84,7 +86,7 @@ impl Tpu { sockets: TpuSockets, subscriptions: &Arc, transaction_status_sender: Option, - _entry_notification_sender: Option, + entry_notification_sender: Option, blockstore: &Arc, broadcast_type: &BroadcastStageType, exit: Arc, @@ -229,6 +231,20 @@ impl Tpu { prioritization_fee_cache, ); + let (entry_receiver, tpu_entry_notifier) = + if let Some(entry_notification_sender) = entry_notification_sender { + let (broadcast_entry_sender, broadcast_entry_receiver) = unbounded(); + let tpu_entry_notifier = TpuEntryNotifier::new( + entry_receiver, + entry_notification_sender, + broadcast_entry_sender, + exit.clone(), + ); + (broadcast_entry_receiver, Some(tpu_entry_notifier)) + } else { + (entry_receiver, None) + }; + let broadcast_stage = broadcast_type.new_broadcast_stage( broadcast_sockets, cluster_info.clone(), @@ -249,6 +265,7 @@ impl Tpu { broadcast_stage, tpu_quic_t, tpu_forwards_quic_t, + tpu_entry_notifier, staked_nodes_updater_service, tracer_thread_hdl, } @@ -269,6 +286,9 @@ impl Tpu { for result in results { result?; } + if let Some(tpu_entry_notifier) = self.tpu_entry_notifier { + tpu_entry_notifier.join()?; + } let _ = broadcast_result?; if let Some(tracer_thread_hdl) = self.tracer_thread_hdl { if let Err(tracer_result) = tracer_thread_hdl.join()? { diff --git a/core/src/tpu_entry_notifier.rs b/core/src/tpu_entry_notifier.rs new file mode 100644 index 000000000..730a3b14f --- /dev/null +++ b/core/src/tpu_entry_notifier.rs @@ -0,0 +1,101 @@ +use { + crossbeam_channel::{Receiver, RecvTimeoutError, Sender}, + solana_entry::entry::EntrySummary, + solana_ledger::entry_notifier_service::{EntryNotification, EntryNotifierSender}, + solana_poh::poh_recorder::WorkingBankEntry, + std::{ + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, + thread::{self, Builder, JoinHandle}, + time::Duration, + }, +}; + +pub(crate) struct TpuEntryNotifier { + thread_hdl: JoinHandle<()>, +} + +impl TpuEntryNotifier { + pub(crate) fn new( + entry_receiver: Receiver, + entry_notification_sender: EntryNotifierSender, + broadcast_entry_sender: Sender, + exit: Arc, + ) -> Self { + let thread_hdl = Builder::new() + .name("solTpuEntry".to_string()) + .spawn(move || { + let mut current_slot = 0; + let mut current_index = 0; + loop { + if exit.load(Ordering::Relaxed) { + break; + } + + if let Err(RecvTimeoutError::Disconnected) = Self::send_entry_notification( + exit.clone(), + &entry_receiver, + &entry_notification_sender, + &broadcast_entry_sender, + &mut current_slot, + &mut current_index, + ) { + break; + } + } + }) + .unwrap(); + Self { thread_hdl } + } + + pub(crate) fn send_entry_notification( + exit: Arc, + entry_receiver: &Receiver, + entry_notification_sender: &EntryNotifierSender, + broadcast_entry_sender: &Sender, + current_slot: &mut u64, + current_index: &mut usize, + ) -> Result<(), RecvTimeoutError> { + let (bank, (entry, tick_height)) = entry_receiver.recv_timeout(Duration::from_secs(1))?; + let slot = bank.slot(); + let index = if slot != *current_slot { + *current_index = 0; + *current_slot = slot; + 0 + } else { + *current_index += 1; + *current_index + }; + + let entry_summary = EntrySummary { + num_hashes: entry.num_hashes, + hash: entry.hash, + num_transactions: entry.transactions.len() as u64, + }; + if let Err(err) = entry_notification_sender.send(EntryNotification { + slot, + index, + entry: entry_summary, + }) { + warn!( + "Failed to send slot {slot:?} entry {index:?} from Tpu to EntryNotifierService, error {err:?}", + ); + } + + if let Err(err) = broadcast_entry_sender.send((bank, (entry, tick_height))) { + warn!( + "Failed to send slot {slot:?} entry {index:?} from Tpu to BroadcastStage, error {err:?}", + ); + // If the BroadcastStage channel is closed, the validator has halted. Try to exit + // gracefully. + exit.store(true, Ordering::Relaxed); + } + Ok(()) + } + + pub(crate) fn join(self) -> thread::Result<()> { + self.thread_hdl.join() + } +}