diff --git a/core/benches/banking_stage.rs b/core/benches/banking_stage.rs index 8b23a2448..f39216a4e 100644 --- a/core/benches/banking_stage.rs +++ b/core/benches/banking_stage.rs @@ -73,7 +73,7 @@ fn bench_consume_buffered(bencher: &mut Bencher) { let mut packets = VecDeque::new(); for batch in batches { let batch_len = batch.packets.len(); - packets.push_back((batch, vec![0usize; batch_len])); + packets.push_back((batch, vec![0usize; batch_len], false)); } let (s, _r) = unbounded(); // This tests the performance of buffering packets. diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 92bfac1a2..5dd84b46a 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -3,11 +3,13 @@ //! can do its processing in parallel with signature verification on the GPU. use crate::{ cluster_info::ClusterInfo, + packet_hasher::PacketHasher, poh_recorder::{PohRecorder, PohRecorderError, WorkingBankEntry}, poh_service::{self, PohService}, }; use crossbeam_channel::{Receiver as CrossbeamReceiver, RecvTimeoutError}; use itertools::Itertools; +use lru::LruCache; use retain_mut::RetainMut; use solana_ledger::{ blockstore::Blockstore, @@ -50,6 +52,7 @@ use std::{ collections::{HashMap, VecDeque}, env, net::UdpSocket, + ops::DerefMut, sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering}, sync::mpsc::Receiver, sync::{Arc, Mutex}, @@ -58,11 +61,15 @@ use std::{ time::Instant, }; -type PacketsAndOffsets = (Packets, Vec); +/// (packets, valid_indexes, forwarded) +/// Set of packets with a list of which are valid and if this batch has been forwarded. +type PacketsAndOffsets = (Packets, Vec, bool); + pub type UnprocessedPackets = VecDeque; /// Transaction forwarding pub const FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET: u64 = 2; +pub const HOLD_TRANSACTIONS_SLOT_OFFSET: u64 = 20; // Fixed thread size seems to be fastest on GCP setup pub const NUM_THREADS: u32 = 4; @@ -71,6 +78,8 @@ const TOTAL_BUFFERED_PACKETS: usize = 500_000; const MAX_NUM_TRANSACTIONS_PER_BATCH: usize = 128; +const DEFAULT_LRU_SIZE: usize = 200_000; + #[derive(Debug, Default)] pub struct BankingStageStats { last_report: AtomicU64, @@ -154,6 +163,7 @@ pub struct BankingStage { pub enum BufferedPacketsDecision { Consume, Forward, + ForwardAndHold, Hold, } @@ -193,6 +203,10 @@ impl BankingStage { // This thread talks to poh_service and broadcasts the entries once they have been recorded. // Once an entry has been recorded, its blockhash is registered with the bank. let my_pubkey = cluster_info.id(); + let duplicates = Arc::new(Mutex::new(( + LruCache::new(DEFAULT_LRU_SIZE), + PacketHasher::default(), + ))); // Many banks that process transactions in parallel. let bank_thread_hdls: Vec> = (0..num_threads) .map(|i| { @@ -208,6 +222,7 @@ impl BankingStage { let mut recv_start = Instant::now(); let transaction_status_sender = transaction_status_sender.clone(); let gossip_vote_sender = gossip_vote_sender.clone(); + let duplicates = duplicates.clone(); Builder::new() .name("solana-banking-stage-tx".to_string()) .spawn(move || { @@ -223,6 +238,7 @@ impl BankingStage { batch_limit, transaction_status_sender, gossip_vote_sender, + &duplicates, ); }) .unwrap() @@ -235,14 +251,17 @@ impl BankingStage { all_packets: impl Iterator, ) -> Vec<&'a Packet> { all_packets - .flat_map(|(p, valid_indexes)| valid_indexes.iter().map(move |x| &p.packets[*x])) + .filter(|(_p, _indexes, forwarded)| !forwarded) + .flat_map(|(p, valid_indexes, _forwarded)| { + valid_indexes.iter().map(move |x| &p.packets[*x]) + }) .collect() } fn forward_buffered_packets( socket: &std::net::UdpSocket, tpu_forwards: &std::net::SocketAddr, - unprocessed_packets: &VecDeque, + unprocessed_packets: &UnprocessedPackets, ) -> std::io::Result<()> { let packets = Self::filter_valid_packets_for_forwarding(unprocessed_packets.iter()); inc_new_counter_info!("banking_stage-forwarded_packets", packets.len()); @@ -281,7 +300,7 @@ impl BankingStage { let buffered_len = buffered_packets.len(); let mut proc_start = Measure::start("consume_buffered_process"); let mut reached_end_of_slot = None; - buffered_packets.retain_mut(|(msgs, ref mut original_unprocessed_indexes)| { + buffered_packets.retain_mut(|(msgs, ref mut original_unprocessed_indexes, _forwarded)| { if let Some((next_leader, bank)) = &reached_end_of_slot { // We've hit the end of this slot, no need to perform more processing, // just filter the remaining packets for the invalid (e.g. too old) ones @@ -363,6 +382,7 @@ impl BankingStage { leader_pubkey: Option, bank_is_available: bool, would_be_leader: bool, + would_be_leader_shortly: bool, ) -> BufferedPacketsDecision { leader_pubkey.map_or( // If leader is not known, return the buffered packets as is @@ -372,9 +392,13 @@ impl BankingStage { if bank_is_available { // If the bank is available, this node is the leader BufferedPacketsDecision::Consume - } else if would_be_leader { + } else if would_be_leader_shortly { // If the node will be the leader soon, hold the packets for now BufferedPacketsDecision::Hold + } else if would_be_leader { + // Node will be leader within ~20 slots, hold the transactions in + // case it is the only node which produces an accepted slot. + BufferedPacketsDecision::ForwardAndHold } else if x != *my_pubkey { // If the current node is not the leader, forward the buffered packets BufferedPacketsDecision::Forward @@ -398,11 +422,12 @@ impl BankingStage { gossip_vote_sender: &ReplayVoteSender, banking_stage_stats: &BankingStageStats, ) -> BufferedPacketsDecision { - let (leader_at_slot_offset, poh_has_bank, would_be_leader) = { + let (leader_at_slot_offset, poh_has_bank, would_be_leader, would_be_leader_shortly) = { let poh = poh_recorder.lock().unwrap(); ( poh.leader_after_n_slots(FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET), poh.has_bank(), + poh.would_be_leader(HOLD_TRANSACTIONS_SLOT_OFFSET * DEFAULT_TICKS_PER_SLOT), poh.would_be_leader( (FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET - 1) * DEFAULT_TICKS_PER_SLOT, ), @@ -414,6 +439,7 @@ impl BankingStage { leader_at_slot_offset, poh_has_bank, would_be_leader, + would_be_leader_shortly, ); match decision { @@ -429,35 +455,66 @@ impl BankingStage { ); } BufferedPacketsDecision::Forward => { - if enable_forwarding { - let next_leader = poh_recorder - .lock() - .unwrap() - .leader_after_n_slots(FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET); - next_leader.map_or((), |leader_pubkey| { - let leader_addr = { - cluster_info - .lookup_contact_info(&leader_pubkey, |leader| leader.tpu_forwards) - }; - - leader_addr.map_or((), |leader_addr| { - let _ = Self::forward_buffered_packets( - &socket, - &leader_addr, - &buffered_packets, - ); - buffered_packets.clear(); - }) - }) - } else { - buffered_packets.clear(); - } + Self::handle_forwarding( + enable_forwarding, + cluster_info, + buffered_packets, + poh_recorder, + socket, + false, + ); + } + BufferedPacketsDecision::ForwardAndHold => { + Self::handle_forwarding( + enable_forwarding, + cluster_info, + buffered_packets, + poh_recorder, + socket, + true, + ); } _ => (), } decision } + fn handle_forwarding( + enable_forwarding: bool, + cluster_info: &ClusterInfo, + buffered_packets: &mut UnprocessedPackets, + poh_recorder: &Arc>, + socket: &UdpSocket, + hold: bool, + ) { + if enable_forwarding { + let next_leader = poh_recorder + .lock() + .unwrap() + .leader_after_n_slots(FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET); + next_leader.map_or((), |leader_pubkey| { + let leader_addr = { + cluster_info.lookup_contact_info(&leader_pubkey, |leader| leader.tpu_forwards) + }; + + leader_addr.map_or((), |leader_addr| { + let _ = + Self::forward_buffered_packets(&socket, &leader_addr, &buffered_packets); + if hold { + buffered_packets.retain(|b| b.1.is_empty()); + for b in buffered_packets.iter_mut() { + b.2 = true; + } + } else { + buffered_packets.clear(); + } + }) + }) + } else { + buffered_packets.clear(); + } + } + #[allow(clippy::too_many_arguments)] pub fn process_loop( my_pubkey: Pubkey, @@ -470,6 +527,7 @@ impl BankingStage { batch_limit: usize, transaction_status_sender: Option, gossip_vote_sender: ReplayVoteSender, + duplicates: &Arc, PacketHasher)>>, ) { let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let mut buffered_packets = VecDeque::with_capacity(batch_limit); @@ -487,7 +545,9 @@ impl BankingStage { &gossip_vote_sender, &banking_stage_stats, ); - if decision == BufferedPacketsDecision::Hold { + if decision == BufferedPacketsDecision::Hold + || decision == BufferedPacketsDecision::ForwardAndHold + { // If we are waiting on a new bank, // check the receiver for more transactions/for exiting break; @@ -516,6 +576,7 @@ impl BankingStage { &gossip_vote_sender, &mut buffered_packets, &banking_stage_stats, + duplicates, ) { Ok(()) | Err(RecvTimeoutError::Timeout) => (), Err(RecvTimeoutError::Disconnected) => break, @@ -1038,6 +1099,7 @@ impl BankingStage { gossip_vote_sender: &ReplayVoteSender, buffered_packets: &mut UnprocessedPackets, banking_stage_stats: &BankingStageStats, + duplicates: &Arc, PacketHasher)>>, ) -> Result<(), RecvTimeoutError> { let mut recv_time = Measure::start("process_packets_recv"); let mms = verified_receiver.recv_timeout(recv_timeout)?; @@ -1070,6 +1132,7 @@ impl BankingStage { &mut dropped_batches_count, &mut newly_buffered_packets_count, batch_limit, + duplicates, ); continue; } @@ -1094,6 +1157,7 @@ impl BankingStage { &mut dropped_batches_count, &mut newly_buffered_packets_count, batch_limit, + duplicates, ); if processed < verified_txs_len { @@ -1116,6 +1180,7 @@ impl BankingStage { &mut dropped_batches_count, &mut newly_buffered_packets_count, batch_limit, + duplicates, ); } } @@ -1156,18 +1221,33 @@ impl BankingStage { fn push_unprocessed( unprocessed_packets: &mut UnprocessedPackets, packets: Packets, - packet_indexes: Vec, + mut packet_indexes: Vec, dropped_batches_count: &mut usize, newly_buffered_packets_count: &mut usize, batch_limit: usize, + duplicates: &Arc, PacketHasher)>>, ) { + { + let mut duplicates = duplicates.lock().unwrap(); + let (cache, hasher) = duplicates.deref_mut(); + packet_indexes.retain(|i| { + let packet_hash = hasher.hash_packet(&packets.packets[*i]); + match cache.get_mut(&packet_hash) { + Some(_hash) => false, + None => { + cache.put(packet_hash, ()); + true + } + } + }); + } if Self::packet_has_more_unprocessed_transactions(&packet_indexes) { if unprocessed_packets.len() >= batch_limit { *dropped_batches_count += 1; unprocessed_packets.pop_front(); } *newly_buffered_packets_count += packet_indexes.len(); - unprocessed_packets.push_back((packets, packet_indexes)); + unprocessed_packets.push_back((packets, packet_indexes, false)); } } @@ -1831,36 +1911,77 @@ mod tests { let my_pubkey1 = solana_sdk::pubkey::new_rand(); assert_eq!( - BankingStage::consume_or_forward_packets(&my_pubkey, None, true, false,), + BankingStage::consume_or_forward_packets(&my_pubkey, None, true, false, false), BufferedPacketsDecision::Hold ); assert_eq!( - BankingStage::consume_or_forward_packets(&my_pubkey, None, false, false), + BankingStage::consume_or_forward_packets(&my_pubkey, None, false, false, false), BufferedPacketsDecision::Hold ); assert_eq!( - BankingStage::consume_or_forward_packets(&my_pubkey1, None, false, false), + BankingStage::consume_or_forward_packets(&my_pubkey1, None, false, false, false), BufferedPacketsDecision::Hold ); assert_eq!( - BankingStage::consume_or_forward_packets(&my_pubkey, Some(my_pubkey1), false, false,), + BankingStage::consume_or_forward_packets( + &my_pubkey, + Some(my_pubkey1), + false, + false, + false + ), BufferedPacketsDecision::Forward ); + assert_eq!( - BankingStage::consume_or_forward_packets(&my_pubkey, Some(my_pubkey1), false, true,), + BankingStage::consume_or_forward_packets( + &my_pubkey, + Some(my_pubkey1), + false, + true, + true + ), BufferedPacketsDecision::Hold ); assert_eq!( - BankingStage::consume_or_forward_packets(&my_pubkey, Some(my_pubkey1), true, false,), + BankingStage::consume_or_forward_packets( + &my_pubkey, + Some(my_pubkey1), + false, + true, + false + ), + BufferedPacketsDecision::ForwardAndHold + ); + assert_eq!( + BankingStage::consume_or_forward_packets( + &my_pubkey, + Some(my_pubkey1), + true, + false, + false + ), BufferedPacketsDecision::Consume ); assert_eq!( - BankingStage::consume_or_forward_packets(&my_pubkey1, Some(my_pubkey1), false, false,), + BankingStage::consume_or_forward_packets( + &my_pubkey1, + Some(my_pubkey1), + false, + false, + false + ), BufferedPacketsDecision::Hold ); assert_eq!( - BankingStage::consume_or_forward_packets(&my_pubkey1, Some(my_pubkey1), true, false,), + BankingStage::consume_or_forward_packets( + &my_pubkey1, + Some(my_pubkey1), + true, + false, + false + ), BufferedPacketsDecision::Consume ); } @@ -2024,7 +2145,7 @@ mod tests { fn test_filter_valid_packets() { solana_logger::setup(); - let all_packets = (0..16) + let mut all_packets = (0..16) .map(|packets_id| { let packets = Packets::new( (0..32) @@ -2038,7 +2159,7 @@ mod tests { let valid_indexes = (0..32) .filter_map(|x| if x % 2 != 0 { Some(x as usize) } else { None }) .collect_vec(); - (packets, valid_indexes) + (packets, valid_indexes, false) }) .collect_vec(); @@ -2055,6 +2176,10 @@ mod tests { assert_eq!(p.meta.port, (packets_id << 8 | packet_id) as u16); }) .collect_vec(); + + all_packets[0].2 = true; + let result = BankingStage::filter_valid_packets_for_forwarding(all_packets.iter()); + assert_eq!(result.len(), 240); } #[test] @@ -2280,6 +2405,7 @@ mod tests { let mut buffered_packets: UnprocessedPackets = vec![( all_packets, (0..num_conflicting_transactions).into_iter().collect(), + false, )] .into_iter() .collect(); @@ -2336,7 +2462,7 @@ mod tests { let mut buffered_packets: UnprocessedPackets = packets_vec .clone() .into_iter() - .map(|single_packets| (single_packets, vec![0])) + .map(|single_packets| (single_packets, vec![0], false)) .collect(); let (continue_sender, continue_receiver) = unbounded(); @@ -2374,9 +2500,10 @@ mod tests { buffered_packets.len(), packets_vec[interrupted_iteration + 1..].iter().count() ); - for ((remaining_unprocessed_packet, _), original_packet) in buffered_packets - .iter() - .zip(&packets_vec[interrupted_iteration + 1..]) + for ((remaining_unprocessed_packet, _, _forwarded), original_packet) in + buffered_packets + .iter() + .zip(&packets_vec[interrupted_iteration + 1..]) { assert_eq!( remaining_unprocessed_packet.packets[0], @@ -2404,10 +2531,11 @@ mod tests { #[test] fn test_push_unprocessed_batch_limit() { + solana_logger::setup(); // Create `Packets` with 1 unprocessed element let single_element_packets = Packets::new(vec![Packet::default()]); let mut unprocessed_packets: UnprocessedPackets = - vec![(single_element_packets.clone(), vec![0])] + vec![(single_element_packets.clone(), vec![0], false)] .into_iter() .collect(); // Set the limit to 2 @@ -2416,6 +2544,10 @@ mod tests { let new_packets = single_element_packets; let packet_indexes = vec![]; + let duplicates = Arc::new(Mutex::new(( + LruCache::new(DEFAULT_LRU_SIZE), + PacketHasher::default(), + ))); let mut dropped_batches_count = 0; let mut newly_buffered_packets_count = 0; // Because the set of unprocessed `packet_indexes` is empty, the @@ -2427,6 +2559,7 @@ mod tests { &mut dropped_batches_count, &mut newly_buffered_packets_count, batch_limit, + &duplicates, ); assert_eq!(unprocessed_packets.len(), 1); assert_eq!(dropped_batches_count, 0); @@ -2442,6 +2575,7 @@ mod tests { &mut dropped_batches_count, &mut newly_buffered_packets_count, batch_limit, + &duplicates, ); assert_eq!(unprocessed_packets.len(), 2); assert_eq!(dropped_batches_count, 0); @@ -2458,10 +2592,26 @@ mod tests { BankingStage::push_unprocessed( &mut unprocessed_packets, new_packets.clone(), - packet_indexes, + packet_indexes.clone(), &mut dropped_batches_count, &mut newly_buffered_packets_count, batch_limit, + &duplicates, + ); + assert_eq!(unprocessed_packets.len(), 2); + assert_eq!(unprocessed_packets[1].0.packets[0], new_packets.packets[0]); + assert_eq!(dropped_batches_count, 1); + assert_eq!(newly_buffered_packets_count, 2); + + // Check duplicates are dropped + BankingStage::push_unprocessed( + &mut unprocessed_packets, + new_packets.clone(), + packet_indexes, + &mut dropped_batches_count, + &mut newly_buffered_packets_count, + 3, + &duplicates, ); assert_eq!(unprocessed_packets.len(), 2); assert_eq!(unprocessed_packets[1].0.packets[0], new_packets.packets[0]); diff --git a/core/src/fetch_stage.rs b/core/src/fetch_stage.rs index 37bb41a83..3a5a0943b 100644 --- a/core/src/fetch_stage.rs +++ b/core/src/fetch_stage.rs @@ -1,6 +1,6 @@ //! The `fetch_stage` batches input from a UDP socket and sends it to a channel. -use crate::banking_stage::FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET; +use crate::banking_stage::HOLD_TRANSACTIONS_SLOT_OFFSET; use crate::poh_recorder::PohRecorder; use crate::result::{Error, Result}; use solana_measure::thread_mem_usage; @@ -81,11 +81,11 @@ impl FetchStage { } } - if poh_recorder.lock().unwrap().would_be_leader( - FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET - .saturating_add(1) - .saturating_mul(DEFAULT_TICKS_PER_SLOT), - ) { + if poh_recorder + .lock() + .unwrap() + .would_be_leader(HOLD_TRANSACTIONS_SLOT_OFFSET.saturating_mul(DEFAULT_TICKS_PER_SLOT)) + { inc_new_counter_debug!("fetch_stage-honor_forwards", len); for packets in batch { if sendr.send(packets).is_err() { diff --git a/perf/src/packet.rs b/perf/src/packet.rs index 292c3b7ce..a6cf3d1d1 100644 --- a/perf/src/packet.rs +++ b/perf/src/packet.rs @@ -7,9 +7,8 @@ use std::net::SocketAddr; pub const NUM_PACKETS: usize = 1024 * 8; -pub const PACKETS_PER_BATCH: usize = 256; +pub const PACKETS_PER_BATCH: usize = 128; pub const NUM_RCVMMSGS: usize = 128; -pub const PACKETS_BATCH_SIZE: usize = PACKETS_PER_BATCH * PACKET_DATA_SIZE; #[derive(Debug, Default, Clone)] pub struct Packets { diff --git a/streamer/src/packet.rs b/streamer/src/packet.rs index 2e69cac41..f8837d3b4 100644 --- a/streamer/src/packet.rs +++ b/streamer/src/packet.rs @@ -2,7 +2,7 @@ use crate::recvmmsg::{recv_mmsg, NUM_RCVMMSGS}; pub use solana_perf::packet::{ limited_deserialize, to_packets_chunked, Packets, PacketsRecycler, NUM_PACKETS, - PACKETS_BATCH_SIZE, PACKETS_PER_BATCH, + PACKETS_PER_BATCH, }; use solana_metrics::inc_new_counter_debug;