Forward and hold packets (#15634)

This commit is contained in:
sakridge 2021-03-03 10:23:05 -08:00 committed by GitHub
parent 6acb06f8d8
commit 830be855dc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 207 additions and 58 deletions

View File

@ -73,7 +73,7 @@ fn bench_consume_buffered(bencher: &mut Bencher) {
let mut packets = VecDeque::new();
for batch in batches {
let batch_len = batch.packets.len();
packets.push_back((batch, vec![0usize; batch_len]));
packets.push_back((batch, vec![0usize; batch_len], false));
}
let (s, _r) = unbounded();
// This tests the performance of buffering packets.

View File

@ -3,11 +3,13 @@
//! can do its processing in parallel with signature verification on the GPU.
use crate::{
cluster_info::ClusterInfo,
packet_hasher::PacketHasher,
poh_recorder::{PohRecorder, PohRecorderError, WorkingBankEntry},
poh_service::{self, PohService},
};
use crossbeam_channel::{Receiver as CrossbeamReceiver, RecvTimeoutError};
use itertools::Itertools;
use lru::LruCache;
use retain_mut::RetainMut;
use solana_ledger::{
blockstore::Blockstore,
@ -50,6 +52,7 @@ use std::{
collections::{HashMap, VecDeque},
env,
net::UdpSocket,
ops::DerefMut,
sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering},
sync::mpsc::Receiver,
sync::{Arc, Mutex},
@ -58,11 +61,15 @@ use std::{
time::Instant,
};
type PacketsAndOffsets = (Packets, Vec<usize>);
/// (packets, valid_indexes, forwarded)
/// Set of packets with a list of which are valid and if this batch has been forwarded.
type PacketsAndOffsets = (Packets, Vec<usize>, bool);
pub type UnprocessedPackets = VecDeque<PacketsAndOffsets>;
/// Transaction forwarding
pub const FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET: u64 = 2;
pub const HOLD_TRANSACTIONS_SLOT_OFFSET: u64 = 20;
// Fixed thread size seems to be fastest on GCP setup
pub const NUM_THREADS: u32 = 4;
@ -71,6 +78,8 @@ const TOTAL_BUFFERED_PACKETS: usize = 500_000;
const MAX_NUM_TRANSACTIONS_PER_BATCH: usize = 128;
const DEFAULT_LRU_SIZE: usize = 200_000;
#[derive(Debug, Default)]
pub struct BankingStageStats {
last_report: AtomicU64,
@ -154,6 +163,7 @@ pub struct BankingStage {
pub enum BufferedPacketsDecision {
Consume,
Forward,
ForwardAndHold,
Hold,
}
@ -193,6 +203,10 @@ impl BankingStage {
// This thread talks to poh_service and broadcasts the entries once they have been recorded.
// Once an entry has been recorded, its blockhash is registered with the bank.
let my_pubkey = cluster_info.id();
let duplicates = Arc::new(Mutex::new((
LruCache::new(DEFAULT_LRU_SIZE),
PacketHasher::default(),
)));
// Many banks that process transactions in parallel.
let bank_thread_hdls: Vec<JoinHandle<()>> = (0..num_threads)
.map(|i| {
@ -208,6 +222,7 @@ impl BankingStage {
let mut recv_start = Instant::now();
let transaction_status_sender = transaction_status_sender.clone();
let gossip_vote_sender = gossip_vote_sender.clone();
let duplicates = duplicates.clone();
Builder::new()
.name("solana-banking-stage-tx".to_string())
.spawn(move || {
@ -223,6 +238,7 @@ impl BankingStage {
batch_limit,
transaction_status_sender,
gossip_vote_sender,
&duplicates,
);
})
.unwrap()
@ -235,14 +251,17 @@ impl BankingStage {
all_packets: impl Iterator<Item = &'a PacketsAndOffsets>,
) -> Vec<&'a Packet> {
all_packets
.flat_map(|(p, valid_indexes)| valid_indexes.iter().map(move |x| &p.packets[*x]))
.filter(|(_p, _indexes, forwarded)| !forwarded)
.flat_map(|(p, valid_indexes, _forwarded)| {
valid_indexes.iter().map(move |x| &p.packets[*x])
})
.collect()
}
fn forward_buffered_packets(
socket: &std::net::UdpSocket,
tpu_forwards: &std::net::SocketAddr,
unprocessed_packets: &VecDeque<PacketsAndOffsets>,
unprocessed_packets: &UnprocessedPackets,
) -> std::io::Result<()> {
let packets = Self::filter_valid_packets_for_forwarding(unprocessed_packets.iter());
inc_new_counter_info!("banking_stage-forwarded_packets", packets.len());
@ -281,7 +300,7 @@ impl BankingStage {
let buffered_len = buffered_packets.len();
let mut proc_start = Measure::start("consume_buffered_process");
let mut reached_end_of_slot = None;
buffered_packets.retain_mut(|(msgs, ref mut original_unprocessed_indexes)| {
buffered_packets.retain_mut(|(msgs, ref mut original_unprocessed_indexes, _forwarded)| {
if let Some((next_leader, bank)) = &reached_end_of_slot {
// We've hit the end of this slot, no need to perform more processing,
// just filter the remaining packets for the invalid (e.g. too old) ones
@ -363,6 +382,7 @@ impl BankingStage {
leader_pubkey: Option<Pubkey>,
bank_is_available: bool,
would_be_leader: bool,
would_be_leader_shortly: bool,
) -> BufferedPacketsDecision {
leader_pubkey.map_or(
// If leader is not known, return the buffered packets as is
@ -372,9 +392,13 @@ impl BankingStage {
if bank_is_available {
// If the bank is available, this node is the leader
BufferedPacketsDecision::Consume
} else if would_be_leader {
} else if would_be_leader_shortly {
// If the node will be the leader soon, hold the packets for now
BufferedPacketsDecision::Hold
} else if would_be_leader {
// Node will be leader within ~20 slots, hold the transactions in
// case it is the only node which produces an accepted slot.
BufferedPacketsDecision::ForwardAndHold
} else if x != *my_pubkey {
// If the current node is not the leader, forward the buffered packets
BufferedPacketsDecision::Forward
@ -398,11 +422,12 @@ impl BankingStage {
gossip_vote_sender: &ReplayVoteSender,
banking_stage_stats: &BankingStageStats,
) -> BufferedPacketsDecision {
let (leader_at_slot_offset, poh_has_bank, would_be_leader) = {
let (leader_at_slot_offset, poh_has_bank, would_be_leader, would_be_leader_shortly) = {
let poh = poh_recorder.lock().unwrap();
(
poh.leader_after_n_slots(FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET),
poh.has_bank(),
poh.would_be_leader(HOLD_TRANSACTIONS_SLOT_OFFSET * DEFAULT_TICKS_PER_SLOT),
poh.would_be_leader(
(FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET - 1) * DEFAULT_TICKS_PER_SLOT,
),
@ -414,6 +439,7 @@ impl BankingStage {
leader_at_slot_offset,
poh_has_bank,
would_be_leader,
would_be_leader_shortly,
);
match decision {
@ -429,35 +455,66 @@ impl BankingStage {
);
}
BufferedPacketsDecision::Forward => {
if enable_forwarding {
let next_leader = poh_recorder
.lock()
.unwrap()
.leader_after_n_slots(FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET);
next_leader.map_or((), |leader_pubkey| {
let leader_addr = {
cluster_info
.lookup_contact_info(&leader_pubkey, |leader| leader.tpu_forwards)
};
leader_addr.map_or((), |leader_addr| {
let _ = Self::forward_buffered_packets(
&socket,
&leader_addr,
&buffered_packets,
);
buffered_packets.clear();
})
})
} else {
buffered_packets.clear();
}
Self::handle_forwarding(
enable_forwarding,
cluster_info,
buffered_packets,
poh_recorder,
socket,
false,
);
}
BufferedPacketsDecision::ForwardAndHold => {
Self::handle_forwarding(
enable_forwarding,
cluster_info,
buffered_packets,
poh_recorder,
socket,
true,
);
}
_ => (),
}
decision
}
fn handle_forwarding(
enable_forwarding: bool,
cluster_info: &ClusterInfo,
buffered_packets: &mut UnprocessedPackets,
poh_recorder: &Arc<Mutex<PohRecorder>>,
socket: &UdpSocket,
hold: bool,
) {
if enable_forwarding {
let next_leader = poh_recorder
.lock()
.unwrap()
.leader_after_n_slots(FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET);
next_leader.map_or((), |leader_pubkey| {
let leader_addr = {
cluster_info.lookup_contact_info(&leader_pubkey, |leader| leader.tpu_forwards)
};
leader_addr.map_or((), |leader_addr| {
let _ =
Self::forward_buffered_packets(&socket, &leader_addr, &buffered_packets);
if hold {
buffered_packets.retain(|b| b.1.is_empty());
for b in buffered_packets.iter_mut() {
b.2 = true;
}
} else {
buffered_packets.clear();
}
})
})
} else {
buffered_packets.clear();
}
}
#[allow(clippy::too_many_arguments)]
pub fn process_loop(
my_pubkey: Pubkey,
@ -470,6 +527,7 @@ impl BankingStage {
batch_limit: usize,
transaction_status_sender: Option<TransactionStatusSender>,
gossip_vote_sender: ReplayVoteSender,
duplicates: &Arc<Mutex<(LruCache<u64, ()>, PacketHasher)>>,
) {
let socket = UdpSocket::bind("0.0.0.0:0").unwrap();
let mut buffered_packets = VecDeque::with_capacity(batch_limit);
@ -487,7 +545,9 @@ impl BankingStage {
&gossip_vote_sender,
&banking_stage_stats,
);
if decision == BufferedPacketsDecision::Hold {
if decision == BufferedPacketsDecision::Hold
|| decision == BufferedPacketsDecision::ForwardAndHold
{
// If we are waiting on a new bank,
// check the receiver for more transactions/for exiting
break;
@ -516,6 +576,7 @@ impl BankingStage {
&gossip_vote_sender,
&mut buffered_packets,
&banking_stage_stats,
duplicates,
) {
Ok(()) | Err(RecvTimeoutError::Timeout) => (),
Err(RecvTimeoutError::Disconnected) => break,
@ -1038,6 +1099,7 @@ impl BankingStage {
gossip_vote_sender: &ReplayVoteSender,
buffered_packets: &mut UnprocessedPackets,
banking_stage_stats: &BankingStageStats,
duplicates: &Arc<Mutex<(LruCache<u64, ()>, PacketHasher)>>,
) -> Result<(), RecvTimeoutError> {
let mut recv_time = Measure::start("process_packets_recv");
let mms = verified_receiver.recv_timeout(recv_timeout)?;
@ -1070,6 +1132,7 @@ impl BankingStage {
&mut dropped_batches_count,
&mut newly_buffered_packets_count,
batch_limit,
duplicates,
);
continue;
}
@ -1094,6 +1157,7 @@ impl BankingStage {
&mut dropped_batches_count,
&mut newly_buffered_packets_count,
batch_limit,
duplicates,
);
if processed < verified_txs_len {
@ -1116,6 +1180,7 @@ impl BankingStage {
&mut dropped_batches_count,
&mut newly_buffered_packets_count,
batch_limit,
duplicates,
);
}
}
@ -1156,18 +1221,33 @@ impl BankingStage {
fn push_unprocessed(
unprocessed_packets: &mut UnprocessedPackets,
packets: Packets,
packet_indexes: Vec<usize>,
mut packet_indexes: Vec<usize>,
dropped_batches_count: &mut usize,
newly_buffered_packets_count: &mut usize,
batch_limit: usize,
duplicates: &Arc<Mutex<(LruCache<u64, ()>, PacketHasher)>>,
) {
{
let mut duplicates = duplicates.lock().unwrap();
let (cache, hasher) = duplicates.deref_mut();
packet_indexes.retain(|i| {
let packet_hash = hasher.hash_packet(&packets.packets[*i]);
match cache.get_mut(&packet_hash) {
Some(_hash) => false,
None => {
cache.put(packet_hash, ());
true
}
}
});
}
if Self::packet_has_more_unprocessed_transactions(&packet_indexes) {
if unprocessed_packets.len() >= batch_limit {
*dropped_batches_count += 1;
unprocessed_packets.pop_front();
}
*newly_buffered_packets_count += packet_indexes.len();
unprocessed_packets.push_back((packets, packet_indexes));
unprocessed_packets.push_back((packets, packet_indexes, false));
}
}
@ -1831,36 +1911,77 @@ mod tests {
let my_pubkey1 = solana_sdk::pubkey::new_rand();
assert_eq!(
BankingStage::consume_or_forward_packets(&my_pubkey, None, true, false,),
BankingStage::consume_or_forward_packets(&my_pubkey, None, true, false, false),
BufferedPacketsDecision::Hold
);
assert_eq!(
BankingStage::consume_or_forward_packets(&my_pubkey, None, false, false),
BankingStage::consume_or_forward_packets(&my_pubkey, None, false, false, false),
BufferedPacketsDecision::Hold
);
assert_eq!(
BankingStage::consume_or_forward_packets(&my_pubkey1, None, false, false),
BankingStage::consume_or_forward_packets(&my_pubkey1, None, false, false, false),
BufferedPacketsDecision::Hold
);
assert_eq!(
BankingStage::consume_or_forward_packets(&my_pubkey, Some(my_pubkey1), false, false,),
BankingStage::consume_or_forward_packets(
&my_pubkey,
Some(my_pubkey1),
false,
false,
false
),
BufferedPacketsDecision::Forward
);
assert_eq!(
BankingStage::consume_or_forward_packets(&my_pubkey, Some(my_pubkey1), false, true,),
BankingStage::consume_or_forward_packets(
&my_pubkey,
Some(my_pubkey1),
false,
true,
true
),
BufferedPacketsDecision::Hold
);
assert_eq!(
BankingStage::consume_or_forward_packets(&my_pubkey, Some(my_pubkey1), true, false,),
BankingStage::consume_or_forward_packets(
&my_pubkey,
Some(my_pubkey1),
false,
true,
false
),
BufferedPacketsDecision::ForwardAndHold
);
assert_eq!(
BankingStage::consume_or_forward_packets(
&my_pubkey,
Some(my_pubkey1),
true,
false,
false
),
BufferedPacketsDecision::Consume
);
assert_eq!(
BankingStage::consume_or_forward_packets(&my_pubkey1, Some(my_pubkey1), false, false,),
BankingStage::consume_or_forward_packets(
&my_pubkey1,
Some(my_pubkey1),
false,
false,
false
),
BufferedPacketsDecision::Hold
);
assert_eq!(
BankingStage::consume_or_forward_packets(&my_pubkey1, Some(my_pubkey1), true, false,),
BankingStage::consume_or_forward_packets(
&my_pubkey1,
Some(my_pubkey1),
true,
false,
false
),
BufferedPacketsDecision::Consume
);
}
@ -2024,7 +2145,7 @@ mod tests {
fn test_filter_valid_packets() {
solana_logger::setup();
let all_packets = (0..16)
let mut all_packets = (0..16)
.map(|packets_id| {
let packets = Packets::new(
(0..32)
@ -2038,7 +2159,7 @@ mod tests {
let valid_indexes = (0..32)
.filter_map(|x| if x % 2 != 0 { Some(x as usize) } else { None })
.collect_vec();
(packets, valid_indexes)
(packets, valid_indexes, false)
})
.collect_vec();
@ -2055,6 +2176,10 @@ mod tests {
assert_eq!(p.meta.port, (packets_id << 8 | packet_id) as u16);
})
.collect_vec();
all_packets[0].2 = true;
let result = BankingStage::filter_valid_packets_for_forwarding(all_packets.iter());
assert_eq!(result.len(), 240);
}
#[test]
@ -2280,6 +2405,7 @@ mod tests {
let mut buffered_packets: UnprocessedPackets = vec![(
all_packets,
(0..num_conflicting_transactions).into_iter().collect(),
false,
)]
.into_iter()
.collect();
@ -2336,7 +2462,7 @@ mod tests {
let mut buffered_packets: UnprocessedPackets = packets_vec
.clone()
.into_iter()
.map(|single_packets| (single_packets, vec![0]))
.map(|single_packets| (single_packets, vec![0], false))
.collect();
let (continue_sender, continue_receiver) = unbounded();
@ -2374,9 +2500,10 @@ mod tests {
buffered_packets.len(),
packets_vec[interrupted_iteration + 1..].iter().count()
);
for ((remaining_unprocessed_packet, _), original_packet) in buffered_packets
.iter()
.zip(&packets_vec[interrupted_iteration + 1..])
for ((remaining_unprocessed_packet, _, _forwarded), original_packet) in
buffered_packets
.iter()
.zip(&packets_vec[interrupted_iteration + 1..])
{
assert_eq!(
remaining_unprocessed_packet.packets[0],
@ -2404,10 +2531,11 @@ mod tests {
#[test]
fn test_push_unprocessed_batch_limit() {
solana_logger::setup();
// Create `Packets` with 1 unprocessed element
let single_element_packets = Packets::new(vec![Packet::default()]);
let mut unprocessed_packets: UnprocessedPackets =
vec![(single_element_packets.clone(), vec![0])]
vec![(single_element_packets.clone(), vec![0], false)]
.into_iter()
.collect();
// Set the limit to 2
@ -2416,6 +2544,10 @@ mod tests {
let new_packets = single_element_packets;
let packet_indexes = vec![];
let duplicates = Arc::new(Mutex::new((
LruCache::new(DEFAULT_LRU_SIZE),
PacketHasher::default(),
)));
let mut dropped_batches_count = 0;
let mut newly_buffered_packets_count = 0;
// Because the set of unprocessed `packet_indexes` is empty, the
@ -2427,6 +2559,7 @@ mod tests {
&mut dropped_batches_count,
&mut newly_buffered_packets_count,
batch_limit,
&duplicates,
);
assert_eq!(unprocessed_packets.len(), 1);
assert_eq!(dropped_batches_count, 0);
@ -2442,6 +2575,7 @@ mod tests {
&mut dropped_batches_count,
&mut newly_buffered_packets_count,
batch_limit,
&duplicates,
);
assert_eq!(unprocessed_packets.len(), 2);
assert_eq!(dropped_batches_count, 0);
@ -2458,10 +2592,26 @@ mod tests {
BankingStage::push_unprocessed(
&mut unprocessed_packets,
new_packets.clone(),
packet_indexes,
packet_indexes.clone(),
&mut dropped_batches_count,
&mut newly_buffered_packets_count,
batch_limit,
&duplicates,
);
assert_eq!(unprocessed_packets.len(), 2);
assert_eq!(unprocessed_packets[1].0.packets[0], new_packets.packets[0]);
assert_eq!(dropped_batches_count, 1);
assert_eq!(newly_buffered_packets_count, 2);
// Check duplicates are dropped
BankingStage::push_unprocessed(
&mut unprocessed_packets,
new_packets.clone(),
packet_indexes,
&mut dropped_batches_count,
&mut newly_buffered_packets_count,
3,
&duplicates,
);
assert_eq!(unprocessed_packets.len(), 2);
assert_eq!(unprocessed_packets[1].0.packets[0], new_packets.packets[0]);

View File

@ -1,6 +1,6 @@
//! The `fetch_stage` batches input from a UDP socket and sends it to a channel.
use crate::banking_stage::FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET;
use crate::banking_stage::HOLD_TRANSACTIONS_SLOT_OFFSET;
use crate::poh_recorder::PohRecorder;
use crate::result::{Error, Result};
use solana_measure::thread_mem_usage;
@ -81,11 +81,11 @@ impl FetchStage {
}
}
if poh_recorder.lock().unwrap().would_be_leader(
FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET
.saturating_add(1)
.saturating_mul(DEFAULT_TICKS_PER_SLOT),
) {
if poh_recorder
.lock()
.unwrap()
.would_be_leader(HOLD_TRANSACTIONS_SLOT_OFFSET.saturating_mul(DEFAULT_TICKS_PER_SLOT))
{
inc_new_counter_debug!("fetch_stage-honor_forwards", len);
for packets in batch {
if sendr.send(packets).is_err() {

View File

@ -7,9 +7,8 @@ use std::net::SocketAddr;
pub const NUM_PACKETS: usize = 1024 * 8;
pub const PACKETS_PER_BATCH: usize = 256;
pub const PACKETS_PER_BATCH: usize = 128;
pub const NUM_RCVMMSGS: usize = 128;
pub const PACKETS_BATCH_SIZE: usize = PACKETS_PER_BATCH * PACKET_DATA_SIZE;
#[derive(Debug, Default, Clone)]
pub struct Packets {

View File

@ -2,7 +2,7 @@
use crate::recvmmsg::{recv_mmsg, NUM_RCVMMSGS};
pub use solana_perf::packet::{
limited_deserialize, to_packets_chunked, Packets, PacketsRecycler, NUM_PACKETS,
PACKETS_BATCH_SIZE, PACKETS_PER_BATCH,
PACKETS_PER_BATCH,
};
use solana_metrics::inc_new_counter_debug;