Send messages to EntryNotifierService in Tpu, simpler (#31962)

* Add TpuEntryNotifier to send EntryNotifications from Tpu

* Optionally run TpuEntryNotifier to send out EntrySummarys alongside BroadcastStage messages

* Track entry index in TpuEntryNotifier

* Allow for leader slots that switch forks

* Exit if broadcast send fails
This commit is contained in:
Tyera 2023-06-05 18:19:17 -06:00 committed by GitHub
parent 6371240746
commit 841609959c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 123 additions and 1 deletions

View File

@ -74,6 +74,7 @@ mod tower1_14_11;
mod tower1_7_14;
pub mod tower_storage;
pub mod tpu;
mod tpu_entry_notifier;
pub mod tracer_packet_stats;
pub mod tree_diff;
pub mod tvu;

View File

@ -15,6 +15,7 @@ use {
sigverify::TransactionSigVerifier,
sigverify_stage::SigVerifyStage,
staked_nodes_updater_service::StakedNodesUpdaterService,
tpu_entry_notifier::TpuEntryNotifier,
validator::GeneratorConfig,
},
crossbeam_channel::{unbounded, Receiver},
@ -70,6 +71,7 @@ pub struct Tpu {
broadcast_stage: BroadcastStage,
tpu_quic_t: thread::JoinHandle<()>,
tpu_forwards_quic_t: thread::JoinHandle<()>,
tpu_entry_notifier: Option<TpuEntryNotifier>,
staked_nodes_updater_service: StakedNodesUpdaterService,
tracer_thread_hdl: TracerThread,
}
@ -84,7 +86,7 @@ impl Tpu {
sockets: TpuSockets,
subscriptions: &Arc<RpcSubscriptions>,
transaction_status_sender: Option<TransactionStatusSender>,
_entry_notification_sender: Option<EntryNotifierSender>,
entry_notification_sender: Option<EntryNotifierSender>,
blockstore: &Arc<Blockstore>,
broadcast_type: &BroadcastStageType,
exit: Arc<AtomicBool>,
@ -229,6 +231,20 @@ impl Tpu {
prioritization_fee_cache,
);
let (entry_receiver, tpu_entry_notifier) =
if let Some(entry_notification_sender) = entry_notification_sender {
let (broadcast_entry_sender, broadcast_entry_receiver) = unbounded();
let tpu_entry_notifier = TpuEntryNotifier::new(
entry_receiver,
entry_notification_sender,
broadcast_entry_sender,
exit.clone(),
);
(broadcast_entry_receiver, Some(tpu_entry_notifier))
} else {
(entry_receiver, None)
};
let broadcast_stage = broadcast_type.new_broadcast_stage(
broadcast_sockets,
cluster_info.clone(),
@ -249,6 +265,7 @@ impl Tpu {
broadcast_stage,
tpu_quic_t,
tpu_forwards_quic_t,
tpu_entry_notifier,
staked_nodes_updater_service,
tracer_thread_hdl,
}
@ -269,6 +286,9 @@ impl Tpu {
for result in results {
result?;
}
if let Some(tpu_entry_notifier) = self.tpu_entry_notifier {
tpu_entry_notifier.join()?;
}
let _ = broadcast_result?;
if let Some(tracer_thread_hdl) = self.tracer_thread_hdl {
if let Err(tracer_result) = tracer_thread_hdl.join()? {

View File

@ -0,0 +1,101 @@
use {
crossbeam_channel::{Receiver, RecvTimeoutError, Sender},
solana_entry::entry::EntrySummary,
solana_ledger::entry_notifier_service::{EntryNotification, EntryNotifierSender},
solana_poh::poh_recorder::WorkingBankEntry,
std::{
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
thread::{self, Builder, JoinHandle},
time::Duration,
},
};
pub(crate) struct TpuEntryNotifier {
thread_hdl: JoinHandle<()>,
}
impl TpuEntryNotifier {
pub(crate) fn new(
entry_receiver: Receiver<WorkingBankEntry>,
entry_notification_sender: EntryNotifierSender,
broadcast_entry_sender: Sender<WorkingBankEntry>,
exit: Arc<AtomicBool>,
) -> Self {
let thread_hdl = Builder::new()
.name("solTpuEntry".to_string())
.spawn(move || {
let mut current_slot = 0;
let mut current_index = 0;
loop {
if exit.load(Ordering::Relaxed) {
break;
}
if let Err(RecvTimeoutError::Disconnected) = Self::send_entry_notification(
exit.clone(),
&entry_receiver,
&entry_notification_sender,
&broadcast_entry_sender,
&mut current_slot,
&mut current_index,
) {
break;
}
}
})
.unwrap();
Self { thread_hdl }
}
pub(crate) fn send_entry_notification(
exit: Arc<AtomicBool>,
entry_receiver: &Receiver<WorkingBankEntry>,
entry_notification_sender: &EntryNotifierSender,
broadcast_entry_sender: &Sender<WorkingBankEntry>,
current_slot: &mut u64,
current_index: &mut usize,
) -> Result<(), RecvTimeoutError> {
let (bank, (entry, tick_height)) = entry_receiver.recv_timeout(Duration::from_secs(1))?;
let slot = bank.slot();
let index = if slot != *current_slot {
*current_index = 0;
*current_slot = slot;
0
} else {
*current_index += 1;
*current_index
};
let entry_summary = EntrySummary {
num_hashes: entry.num_hashes,
hash: entry.hash,
num_transactions: entry.transactions.len() as u64,
};
if let Err(err) = entry_notification_sender.send(EntryNotification {
slot,
index,
entry: entry_summary,
}) {
warn!(
"Failed to send slot {slot:?} entry {index:?} from Tpu to EntryNotifierService, error {err:?}",
);
}
if let Err(err) = broadcast_entry_sender.send((bank, (entry, tick_height))) {
warn!(
"Failed to send slot {slot:?} entry {index:?} from Tpu to BroadcastStage, error {err:?}",
);
// If the BroadcastStage channel is closed, the validator has halted. Try to exit
// gracefully.
exit.store(true, Ordering::Relaxed);
}
Ok(())
}
pub(crate) fn join(self) -> thread::Result<()> {
self.thread_hdl.join()
}
}