From de6af95061c803e17fe3b917b49bca4e3de0d42b Mon Sep 17 00:00:00 2001 From: Pankaj Garg Date: Mon, 22 Apr 2019 19:49:32 -0700 Subject: [PATCH] Process forwarded packets only when the node is about to be the leader (#3935) * Tests and metrics --- core/src/fetch_stage.rs | 80 +++++++++++++++++++++++++++++++---- core/src/poh_recorder.rs | 91 ++++++++++++++++++++++++++++++++++++++++ core/src/tpu.rs | 1 + 3 files changed, 165 insertions(+), 7 deletions(-) diff --git a/core/src/fetch_stage.rs b/core/src/fetch_stage.rs index fab6152165..ef9158314f 100644 --- a/core/src/fetch_stage.rs +++ b/core/src/fetch_stage.rs @@ -1,12 +1,16 @@ //! The `fetch_stage` batches input from a UDP socket and sends it to a channel. +use crate::poh_recorder::PohRecorder; +use crate::result::{Error, Result}; use crate::service::Service; use crate::streamer::{self, PacketReceiver, PacketSender}; +use solana_metrics::counter::Counter; +use solana_sdk::timing::DEFAULT_TICKS_PER_SLOT; use std::net::UdpSocket; use std::sync::atomic::AtomicBool; -use std::sync::mpsc::channel; -use std::sync::Arc; -use std::thread::{self, JoinHandle}; +use std::sync::mpsc::{channel, RecvTimeoutError}; +use std::sync::{Arc, Mutex}; +use std::thread::{self, Builder, JoinHandle}; pub struct FetchStage { thread_hdls: Vec>, @@ -18,10 +22,11 @@ impl FetchStage { sockets: Vec, tpu_via_blobs_sockets: Vec, exit: &Arc, + poh_recorder: &Arc>, ) -> (Self, PacketReceiver) { let (sender, receiver) = channel(); ( - Self::new_with_sender(sockets, tpu_via_blobs_sockets, exit, &sender), + Self::new_with_sender(sockets, tpu_via_blobs_sockets, exit, &sender, &poh_recorder), receiver, ) } @@ -30,10 +35,48 @@ impl FetchStage { tpu_via_blobs_sockets: Vec, exit: &Arc, sender: &PacketSender, + poh_recorder: &Arc>, ) -> Self { let tx_sockets = sockets.into_iter().map(Arc::new).collect(); let tpu_via_blobs_sockets = tpu_via_blobs_sockets.into_iter().map(Arc::new).collect(); - Self::new_multi_socket(tx_sockets, tpu_via_blobs_sockets, exit, &sender) + Self::new_multi_socket( + tx_sockets, + tpu_via_blobs_sockets, + exit, + &sender, + &poh_recorder, + ) + } + + fn handle_forwarded_packets( + recvr: &PacketReceiver, + sendr: &PacketSender, + poh_recorder: &Arc>, + ) -> Result<()> { + let msgs = recvr.recv()?; + let mut len = msgs.packets.len(); + let mut batch = vec![msgs]; + while let Ok(more) = recvr.try_recv() { + len += more.packets.len(); + batch.push(more); + } + + if poh_recorder + .lock() + .unwrap() + .would_be_leader(1, DEFAULT_TICKS_PER_SLOT) + { + inc_new_counter_info!("fetch_stage-honor_forwards", len); + for packets in batch { + if sendr.send(packets).is_err() { + return Err(Error::SendError); + } + } + } else { + inc_new_counter_info!("fetch_stage-discard_forwards", len); + } + + Ok(()) } fn new_multi_socket( @@ -41,16 +84,39 @@ impl FetchStage { tpu_via_blobs_sockets: Vec>, exit: &Arc, sender: &PacketSender, + poh_recorder: &Arc>, ) -> Self { let tpu_threads = sockets .into_iter() .map(|socket| streamer::receiver(socket, &exit, sender.clone())); + let (forward_sender, forward_receiver) = channel(); let tpu_via_blobs_threads = tpu_via_blobs_sockets .into_iter() - .map(|socket| streamer::blob_packet_receiver(socket, &exit, sender.clone())); + .map(|socket| streamer::blob_packet_receiver(socket, &exit, forward_sender.clone())); - let thread_hdls: Vec<_> = tpu_threads.chain(tpu_via_blobs_threads).collect(); + let sender = sender.clone(); + let poh_recorder = poh_recorder.clone(); + + let fwd_thread_hdl = Builder::new() + .name("solana-fetch-stage-fwd-rcvr".to_string()) + .spawn(move || loop { + if let Err(e) = + Self::handle_forwarded_packets(&forward_receiver, &sender, &poh_recorder) + { + match e { + Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break, + Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (), + Error::RecvError(_) => break, + Error::SendError => break, + _ => error!("{:?}", e), + } + } + }) + .unwrap(); + + let mut thread_hdls: Vec<_> = tpu_threads.chain(tpu_via_blobs_threads).collect(); + thread_hdls.push(fwd_thread_hdl); Self { thread_hdls } } } diff --git a/core/src/poh_recorder.rs b/core/src/poh_recorder.rs index 8589fc396c..d359079b78 100644 --- a/core/src/poh_recorder.rs +++ b/core/src/poh_recorder.rs @@ -79,6 +79,19 @@ impl PohRecorder { } } + pub fn would_be_leader(&self, within_next_n_slots: u64, ticks_per_slot: u64) -> bool { + let close_to_leader_tick = self.start_leader_at_tick.map_or(false, |leader_tick| { + let leader_ideal_start_tick = + leader_tick.saturating_sub(self.max_last_leader_grace_ticks); + + self.tick_height() <= self.last_leader_tick.unwrap_or(0) + && self.tick_height() + >= leader_ideal_start_tick.saturating_sub(within_next_n_slots * ticks_per_slot) + }); + + self.working_bank.is_some() || close_to_leader_tick + } + pub fn hash(&mut self) { // TODO: amortize the cost of this lock by doing the loop in here for // some min amount of hashes @@ -1155,4 +1168,82 @@ mod tests { } Blocktree::destroy(&ledger_path).unwrap(); } + + #[test] + fn test_would_be_leader_soon() { + let ledger_path = get_tmp_ledger_path!(); + { + let blocktree = + Blocktree::open(&ledger_path).expect("Expected to be able to open database ledger"); + let (genesis_block, _mint_keypair) = GenesisBlock::new(2); + let bank = Arc::new(Bank::new(&genesis_block)); + let prev_hash = bank.last_blockhash(); + let (mut poh_recorder, _entry_receiver) = PohRecorder::new( + 0, + prev_hash, + 0, + None, + bank.ticks_per_slot(), + &Pubkey::default(), + &Arc::new(blocktree), + &Arc::new(LeaderScheduleCache::new_from_bank(&bank)), + ); + + // Test that with no leader slot, we don't reach the leader tick + assert_eq!( + poh_recorder.would_be_leader(2, bank.ticks_per_slot()), + false + ); + + for _ in 0..bank.ticks_per_slot() { + poh_recorder.tick(); + } + + // Test that with no leader slot, we don't reach the leader tick after sending some ticks + assert_eq!( + poh_recorder.would_be_leader(2, bank.ticks_per_slot()), + false + ); + + poh_recorder.reset( + poh_recorder.tick_height(), + bank.last_blockhash(), + 0, + None, + bank.ticks_per_slot(), + ); + + assert_eq!( + poh_recorder.would_be_leader(2, bank.ticks_per_slot()), + false + ); + + // We reset with leader slot after 3 slots + poh_recorder.reset( + poh_recorder.tick_height(), + bank.last_blockhash(), + 0, + Some(bank.slot() + 3), + bank.ticks_per_slot(), + ); + + // Test that the node won't be leader in next 2 slots + assert_eq!( + poh_recorder.would_be_leader(2, bank.ticks_per_slot()), + false + ); + + // Test that the node will be leader in next 3 slots + assert_eq!(poh_recorder.would_be_leader(3, bank.ticks_per_slot()), true); + + assert_eq!( + poh_recorder.would_be_leader(2, bank.ticks_per_slot()), + false + ); + + // If we set the working bank, the node should be leader within next 2 slots + poh_recorder.set_bank(&bank); + assert_eq!(poh_recorder.would_be_leader(2, bank.ticks_per_slot()), true); + } + } } diff --git a/core/src/tpu.rs b/core/src/tpu.rs index d2cbb706b8..5edfd4483f 100644 --- a/core/src/tpu.rs +++ b/core/src/tpu.rs @@ -51,6 +51,7 @@ impl Tpu { tpu_via_blobs_sockets, &exit, &packet_sender, + &poh_recorder, ); let (verified_sender, verified_receiver) = channel();