Process forwarded packets only when the node is about to be the leader (#3935)
* Tests and metrics
This commit is contained in:
parent
43f7cd8149
commit
de6af95061
|
@ -1,12 +1,16 @@
|
|||
//! The `fetch_stage` batches input from a UDP socket and sends it to a channel.
|
||||
|
||||
use crate::poh_recorder::PohRecorder;
|
||||
use crate::result::{Error, Result};
|
||||
use crate::service::Service;
|
||||
use crate::streamer::{self, PacketReceiver, PacketSender};
|
||||
use solana_metrics::counter::Counter;
|
||||
use solana_sdk::timing::DEFAULT_TICKS_PER_SLOT;
|
||||
use std::net::UdpSocket;
|
||||
use std::sync::atomic::AtomicBool;
|
||||
use std::sync::mpsc::channel;
|
||||
use std::sync::Arc;
|
||||
use std::thread::{self, JoinHandle};
|
||||
use std::sync::mpsc::{channel, RecvTimeoutError};
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::thread::{self, Builder, JoinHandle};
|
||||
|
||||
pub struct FetchStage {
|
||||
thread_hdls: Vec<JoinHandle<()>>,
|
||||
|
@ -18,10 +22,11 @@ impl FetchStage {
|
|||
sockets: Vec<UdpSocket>,
|
||||
tpu_via_blobs_sockets: Vec<UdpSocket>,
|
||||
exit: &Arc<AtomicBool>,
|
||||
poh_recorder: &Arc<Mutex<PohRecorder>>,
|
||||
) -> (Self, PacketReceiver) {
|
||||
let (sender, receiver) = channel();
|
||||
(
|
||||
Self::new_with_sender(sockets, tpu_via_blobs_sockets, exit, &sender),
|
||||
Self::new_with_sender(sockets, tpu_via_blobs_sockets, exit, &sender, &poh_recorder),
|
||||
receiver,
|
||||
)
|
||||
}
|
||||
|
@ -30,10 +35,48 @@ impl FetchStage {
|
|||
tpu_via_blobs_sockets: Vec<UdpSocket>,
|
||||
exit: &Arc<AtomicBool>,
|
||||
sender: &PacketSender,
|
||||
poh_recorder: &Arc<Mutex<PohRecorder>>,
|
||||
) -> Self {
|
||||
let tx_sockets = sockets.into_iter().map(Arc::new).collect();
|
||||
let tpu_via_blobs_sockets = tpu_via_blobs_sockets.into_iter().map(Arc::new).collect();
|
||||
Self::new_multi_socket(tx_sockets, tpu_via_blobs_sockets, exit, &sender)
|
||||
Self::new_multi_socket(
|
||||
tx_sockets,
|
||||
tpu_via_blobs_sockets,
|
||||
exit,
|
||||
&sender,
|
||||
&poh_recorder,
|
||||
)
|
||||
}
|
||||
|
||||
fn handle_forwarded_packets(
|
||||
recvr: &PacketReceiver,
|
||||
sendr: &PacketSender,
|
||||
poh_recorder: &Arc<Mutex<PohRecorder>>,
|
||||
) -> Result<()> {
|
||||
let msgs = recvr.recv()?;
|
||||
let mut len = msgs.packets.len();
|
||||
let mut batch = vec![msgs];
|
||||
while let Ok(more) = recvr.try_recv() {
|
||||
len += more.packets.len();
|
||||
batch.push(more);
|
||||
}
|
||||
|
||||
if poh_recorder
|
||||
.lock()
|
||||
.unwrap()
|
||||
.would_be_leader(1, DEFAULT_TICKS_PER_SLOT)
|
||||
{
|
||||
inc_new_counter_info!("fetch_stage-honor_forwards", len);
|
||||
for packets in batch {
|
||||
if sendr.send(packets).is_err() {
|
||||
return Err(Error::SendError);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
inc_new_counter_info!("fetch_stage-discard_forwards", len);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn new_multi_socket(
|
||||
|
@ -41,16 +84,39 @@ impl FetchStage {
|
|||
tpu_via_blobs_sockets: Vec<Arc<UdpSocket>>,
|
||||
exit: &Arc<AtomicBool>,
|
||||
sender: &PacketSender,
|
||||
poh_recorder: &Arc<Mutex<PohRecorder>>,
|
||||
) -> Self {
|
||||
let tpu_threads = sockets
|
||||
.into_iter()
|
||||
.map(|socket| streamer::receiver(socket, &exit, sender.clone()));
|
||||
|
||||
let (forward_sender, forward_receiver) = channel();
|
||||
let tpu_via_blobs_threads = tpu_via_blobs_sockets
|
||||
.into_iter()
|
||||
.map(|socket| streamer::blob_packet_receiver(socket, &exit, sender.clone()));
|
||||
.map(|socket| streamer::blob_packet_receiver(socket, &exit, forward_sender.clone()));
|
||||
|
||||
let thread_hdls: Vec<_> = tpu_threads.chain(tpu_via_blobs_threads).collect();
|
||||
let sender = sender.clone();
|
||||
let poh_recorder = poh_recorder.clone();
|
||||
|
||||
let fwd_thread_hdl = Builder::new()
|
||||
.name("solana-fetch-stage-fwd-rcvr".to_string())
|
||||
.spawn(move || loop {
|
||||
if let Err(e) =
|
||||
Self::handle_forwarded_packets(&forward_receiver, &sender, &poh_recorder)
|
||||
{
|
||||
match e {
|
||||
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
|
||||
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
|
||||
Error::RecvError(_) => break,
|
||||
Error::SendError => break,
|
||||
_ => error!("{:?}", e),
|
||||
}
|
||||
}
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
let mut thread_hdls: Vec<_> = tpu_threads.chain(tpu_via_blobs_threads).collect();
|
||||
thread_hdls.push(fwd_thread_hdl);
|
||||
Self { thread_hdls }
|
||||
}
|
||||
}
|
||||
|
|
|
@ -79,6 +79,19 @@ impl PohRecorder {
|
|||
}
|
||||
}
|
||||
|
||||
pub fn would_be_leader(&self, within_next_n_slots: u64, ticks_per_slot: u64) -> bool {
|
||||
let close_to_leader_tick = self.start_leader_at_tick.map_or(false, |leader_tick| {
|
||||
let leader_ideal_start_tick =
|
||||
leader_tick.saturating_sub(self.max_last_leader_grace_ticks);
|
||||
|
||||
self.tick_height() <= self.last_leader_tick.unwrap_or(0)
|
||||
&& self.tick_height()
|
||||
>= leader_ideal_start_tick.saturating_sub(within_next_n_slots * ticks_per_slot)
|
||||
});
|
||||
|
||||
self.working_bank.is_some() || close_to_leader_tick
|
||||
}
|
||||
|
||||
pub fn hash(&mut self) {
|
||||
// TODO: amortize the cost of this lock by doing the loop in here for
|
||||
// some min amount of hashes
|
||||
|
@ -1155,4 +1168,82 @@ mod tests {
|
|||
}
|
||||
Blocktree::destroy(&ledger_path).unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_would_be_leader_soon() {
|
||||
let ledger_path = get_tmp_ledger_path!();
|
||||
{
|
||||
let blocktree =
|
||||
Blocktree::open(&ledger_path).expect("Expected to be able to open database ledger");
|
||||
let (genesis_block, _mint_keypair) = GenesisBlock::new(2);
|
||||
let bank = Arc::new(Bank::new(&genesis_block));
|
||||
let prev_hash = bank.last_blockhash();
|
||||
let (mut poh_recorder, _entry_receiver) = PohRecorder::new(
|
||||
0,
|
||||
prev_hash,
|
||||
0,
|
||||
None,
|
||||
bank.ticks_per_slot(),
|
||||
&Pubkey::default(),
|
||||
&Arc::new(blocktree),
|
||||
&Arc::new(LeaderScheduleCache::new_from_bank(&bank)),
|
||||
);
|
||||
|
||||
// Test that with no leader slot, we don't reach the leader tick
|
||||
assert_eq!(
|
||||
poh_recorder.would_be_leader(2, bank.ticks_per_slot()),
|
||||
false
|
||||
);
|
||||
|
||||
for _ in 0..bank.ticks_per_slot() {
|
||||
poh_recorder.tick();
|
||||
}
|
||||
|
||||
// Test that with no leader slot, we don't reach the leader tick after sending some ticks
|
||||
assert_eq!(
|
||||
poh_recorder.would_be_leader(2, bank.ticks_per_slot()),
|
||||
false
|
||||
);
|
||||
|
||||
poh_recorder.reset(
|
||||
poh_recorder.tick_height(),
|
||||
bank.last_blockhash(),
|
||||
0,
|
||||
None,
|
||||
bank.ticks_per_slot(),
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
poh_recorder.would_be_leader(2, bank.ticks_per_slot()),
|
||||
false
|
||||
);
|
||||
|
||||
// We reset with leader slot after 3 slots
|
||||
poh_recorder.reset(
|
||||
poh_recorder.tick_height(),
|
||||
bank.last_blockhash(),
|
||||
0,
|
||||
Some(bank.slot() + 3),
|
||||
bank.ticks_per_slot(),
|
||||
);
|
||||
|
||||
// Test that the node won't be leader in next 2 slots
|
||||
assert_eq!(
|
||||
poh_recorder.would_be_leader(2, bank.ticks_per_slot()),
|
||||
false
|
||||
);
|
||||
|
||||
// Test that the node will be leader in next 3 slots
|
||||
assert_eq!(poh_recorder.would_be_leader(3, bank.ticks_per_slot()), true);
|
||||
|
||||
assert_eq!(
|
||||
poh_recorder.would_be_leader(2, bank.ticks_per_slot()),
|
||||
false
|
||||
);
|
||||
|
||||
// If we set the working bank, the node should be leader within next 2 slots
|
||||
poh_recorder.set_bank(&bank);
|
||||
assert_eq!(poh_recorder.would_be_leader(2, bank.ticks_per_slot()), true);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -51,6 +51,7 @@ impl Tpu {
|
|||
tpu_via_blobs_sockets,
|
||||
&exit,
|
||||
&packet_sender,
|
||||
&poh_recorder,
|
||||
);
|
||||
let (verified_sender, verified_receiver) = channel();
|
||||
|
||||
|
|
Loading…
Reference in New Issue