Don't forward buffered packet to the same node (#3712)
- instead, process the packets
This commit is contained in:
parent
b001685e7b
commit
44ebfa736a
|
@ -30,7 +30,7 @@ use std::time::Duration;
|
||||||
use std::time::Instant;
|
use std::time::Instant;
|
||||||
use sys_info;
|
use sys_info;
|
||||||
|
|
||||||
pub type UnprocessedPackets = Vec<(SharedPackets, usize)>; // `usize` is the index of the first unprocessed packet in `SharedPackets`
|
pub type UnprocessedPackets = Vec<(SharedPackets, usize, Vec<u8>)>; // `usize` is the index of the first unprocessed packet in `SharedPackets`
|
||||||
|
|
||||||
// number of threads is 1 until mt bank is ready
|
// number of threads is 1 until mt bank is ready
|
||||||
pub const NUM_THREADS: u32 = 10;
|
pub const NUM_THREADS: u32 = 10;
|
||||||
|
@ -97,11 +97,11 @@ impl BankingStage {
|
||||||
fn forward_unprocessed_packets(
|
fn forward_unprocessed_packets(
|
||||||
socket: &std::net::UdpSocket,
|
socket: &std::net::UdpSocket,
|
||||||
tpu_via_blobs: &std::net::SocketAddr,
|
tpu_via_blobs: &std::net::SocketAddr,
|
||||||
unprocessed_packets: &[(SharedPackets, usize)],
|
unprocessed_packets: &[(SharedPackets, usize, Vec<u8>)],
|
||||||
) -> std::io::Result<()> {
|
) -> std::io::Result<()> {
|
||||||
let locked_packets: Vec<_> = unprocessed_packets
|
let locked_packets: Vec<_> = unprocessed_packets
|
||||||
.iter()
|
.iter()
|
||||||
.map(|(p, start_index)| (p.read().unwrap(), start_index))
|
.map(|(p, start_index, _)| (p.read().unwrap(), start_index))
|
||||||
.collect();
|
.collect();
|
||||||
let packets: Vec<&Packet> = locked_packets
|
let packets: Vec<&Packet> = locked_packets
|
||||||
.iter()
|
.iter()
|
||||||
|
@ -116,37 +116,77 @@ impl BankingStage {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn forward_buffered_packets(
|
fn process_buffered_packets(
|
||||||
|
poh_recorder: &Arc<Mutex<PohRecorder>>,
|
||||||
|
buffered_packets: &[(SharedPackets, usize, Vec<u8>)],
|
||||||
|
) -> Result<UnprocessedPackets> {
|
||||||
|
let mut unprocessed_packets = vec![];
|
||||||
|
let mut bank_shutdown = false;
|
||||||
|
for (msgs, offset, vers) in buffered_packets {
|
||||||
|
if bank_shutdown {
|
||||||
|
unprocessed_packets.push((msgs.to_owned(), *offset, vers.to_owned()));
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
let bank = poh_recorder.lock().unwrap().bank();
|
||||||
|
if bank.is_none() {
|
||||||
|
unprocessed_packets.push((msgs.to_owned(), *offset, vers.to_owned()));
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
let bank = bank.unwrap();
|
||||||
|
|
||||||
|
let (processed, verified_txs, verified_indexes) =
|
||||||
|
Self::process_received_packets(&bank, &poh_recorder, &msgs, &vers, *offset)?;
|
||||||
|
|
||||||
|
if processed < verified_txs.len() {
|
||||||
|
bank_shutdown = true;
|
||||||
|
// Collect any unprocessed transactions in this batch for forwarding
|
||||||
|
unprocessed_packets.push((
|
||||||
|
msgs.to_owned(),
|
||||||
|
verified_indexes[processed],
|
||||||
|
vers.to_owned(),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(unprocessed_packets)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn handle_buffered_packets(
|
||||||
socket: &std::net::UdpSocket,
|
socket: &std::net::UdpSocket,
|
||||||
poh_recorder: &Arc<Mutex<PohRecorder>>,
|
poh_recorder: &Arc<Mutex<PohRecorder>>,
|
||||||
cluster_info: &Arc<RwLock<ClusterInfo>>,
|
cluster_info: &Arc<RwLock<ClusterInfo>>,
|
||||||
buffered_packets: &[(SharedPackets, usize)],
|
buffered_packets: &[(SharedPackets, usize, Vec<u8>)],
|
||||||
) -> bool {
|
) -> Result<UnprocessedPackets> {
|
||||||
if buffered_packets.is_empty() {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
let rcluster_info = cluster_info.read().unwrap();
|
let rcluster_info = cluster_info.read().unwrap();
|
||||||
|
|
||||||
// If there's a bank, and leader is available, forward the buffered packets
|
// If there's a bank, and leader is available, this node "is" the leader
|
||||||
// or, if the current node is not the leader, forward the buffered packets
|
// process the buffered packets
|
||||||
let forward = match poh_recorder.lock().unwrap().bank() {
|
if poh_recorder.lock().unwrap().bank().is_some() {
|
||||||
Some(_) => rcluster_info.leader_data().is_some(),
|
if rcluster_info.leader_data().is_some() {
|
||||||
None => rcluster_info
|
return Self::process_buffered_packets(poh_recorder, buffered_packets);
|
||||||
.leader_data()
|
}
|
||||||
.map(|x| x.id != rcluster_info.id())
|
|
||||||
.unwrap_or(false),
|
|
||||||
};
|
|
||||||
|
|
||||||
if forward {
|
return Ok(buffered_packets.to_vec());
|
||||||
|
}
|
||||||
|
|
||||||
|
// If leader is not known, return the buffered packets as is
|
||||||
|
// else process the packets
|
||||||
|
rcluster_info
|
||||||
|
.leader_data()
|
||||||
|
.map_or(Ok(buffered_packets.to_vec()), |x| {
|
||||||
|
if x.id == rcluster_info.id() {
|
||||||
|
// If the current node is the leader, process the buffered packets
|
||||||
|
Self::process_buffered_packets(poh_recorder, buffered_packets)
|
||||||
|
} else {
|
||||||
|
// If the current node is not the leader, forward the buffered packets
|
||||||
let _ = Self::forward_unprocessed_packets(
|
let _ = Self::forward_unprocessed_packets(
|
||||||
&socket,
|
&socket,
|
||||||
&rcluster_info.leader_data().unwrap().tpu_via_blobs,
|
&rcluster_info.leader_data().unwrap().tpu_via_blobs,
|
||||||
&buffered_packets,
|
&buffered_packets,
|
||||||
);
|
);
|
||||||
|
Ok(vec![])
|
||||||
}
|
}
|
||||||
|
})
|
||||||
forward
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn should_buffer_packets(
|
fn should_buffer_packets(
|
||||||
|
@ -179,13 +219,15 @@ impl BankingStage {
|
||||||
let socket = UdpSocket::bind("0.0.0.0:0").unwrap();
|
let socket = UdpSocket::bind("0.0.0.0:0").unwrap();
|
||||||
let mut buffered_packets = vec![];
|
let mut buffered_packets = vec![];
|
||||||
loop {
|
loop {
|
||||||
if Self::forward_buffered_packets(
|
if !buffered_packets.is_empty() {
|
||||||
|
Self::handle_buffered_packets(
|
||||||
&socket,
|
&socket,
|
||||||
poh_recorder,
|
poh_recorder,
|
||||||
cluster_info,
|
cluster_info,
|
||||||
&buffered_packets,
|
&buffered_packets,
|
||||||
) {
|
)
|
||||||
buffered_packets.clear();
|
.map(|packets| buffered_packets = packets)
|
||||||
|
.unwrap_or_else(|_| buffered_packets.clear());
|
||||||
}
|
}
|
||||||
|
|
||||||
match Self::process_packets(&verified_receiver, &poh_recorder, recv_start) {
|
match Self::process_packets(&verified_receiver, &poh_recorder, recv_start) {
|
||||||
|
@ -363,6 +405,52 @@ impl BankingStage {
|
||||||
Ok(chunk_start)
|
Ok(chunk_start)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn process_received_packets(
|
||||||
|
bank: &Arc<Bank>,
|
||||||
|
poh: &Arc<Mutex<PohRecorder>>,
|
||||||
|
msgs: &Arc<RwLock<Packets>>,
|
||||||
|
vers: &[u8],
|
||||||
|
offset: usize,
|
||||||
|
) -> Result<(usize, Vec<Transaction>, Vec<usize>)> {
|
||||||
|
debug!("banking-stage-tx bank {}", bank.slot());
|
||||||
|
let transactions = Self::deserialize_transactions(&Packets::new(
|
||||||
|
msgs.read().unwrap().packets[offset..].to_owned(),
|
||||||
|
));
|
||||||
|
|
||||||
|
let vers = vers[offset..].to_owned();
|
||||||
|
|
||||||
|
debug!(
|
||||||
|
"bank: {} transactions received {}",
|
||||||
|
bank.slot(),
|
||||||
|
transactions.len()
|
||||||
|
);
|
||||||
|
let (verified_transactions, verified_indexes): (Vec<_>, Vec<_>) = transactions
|
||||||
|
.into_iter()
|
||||||
|
.zip(vers)
|
||||||
|
.zip(0..)
|
||||||
|
.filter_map(|((tx, ver), index)| match tx {
|
||||||
|
None => None,
|
||||||
|
Some(tx) => {
|
||||||
|
if ver != 0 {
|
||||||
|
Some((tx, index))
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.unzip();
|
||||||
|
|
||||||
|
debug!(
|
||||||
|
"bank: {} verified transactions {}",
|
||||||
|
bank.slot(),
|
||||||
|
verified_transactions.len()
|
||||||
|
);
|
||||||
|
|
||||||
|
let processed = Self::process_transactions(&bank, &verified_transactions, poh)?;
|
||||||
|
|
||||||
|
Ok((processed, verified_transactions, verified_indexes))
|
||||||
|
}
|
||||||
|
|
||||||
/// Process the incoming packets
|
/// Process the incoming packets
|
||||||
pub fn process_packets(
|
pub fn process_packets(
|
||||||
verified_receiver: &Arc<Mutex<Receiver<VerifiedPackets>>>,
|
verified_receiver: &Arc<Mutex<Receiver<VerifiedPackets>>>,
|
||||||
|
@ -390,53 +478,24 @@ impl BankingStage {
|
||||||
let mut bank_shutdown = false;
|
let mut bank_shutdown = false;
|
||||||
for (msgs, vers) in mms {
|
for (msgs, vers) in mms {
|
||||||
if bank_shutdown {
|
if bank_shutdown {
|
||||||
unprocessed_packets.push((msgs, 0));
|
unprocessed_packets.push((msgs, 0, vers));
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
let bank = poh.lock().unwrap().bank();
|
let bank = poh.lock().unwrap().bank();
|
||||||
if bank.is_none() {
|
if bank.is_none() {
|
||||||
unprocessed_packets.push((msgs, 0));
|
unprocessed_packets.push((msgs, 0, vers));
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
let bank = bank.unwrap();
|
let bank = bank.unwrap();
|
||||||
debug!("banking-stage-tx bank {}", bank.slot());
|
|
||||||
|
|
||||||
let transactions = Self::deserialize_transactions(&msgs.read().unwrap());
|
let (processed, verified_txs, verified_indexes) =
|
||||||
|
Self::process_received_packets(&bank, &poh, &msgs, &vers, 0)?;
|
||||||
|
|
||||||
debug!(
|
if processed < verified_txs.len() {
|
||||||
"bank: {} transactions received {}",
|
|
||||||
bank.slot(),
|
|
||||||
transactions.len()
|
|
||||||
);
|
|
||||||
let (verified_transactions, verified_transaction_index): (Vec<_>, Vec<_>) =
|
|
||||||
transactions
|
|
||||||
.into_iter()
|
|
||||||
.zip(vers)
|
|
||||||
.zip(0..)
|
|
||||||
.filter_map(|((tx, ver), index)| match tx {
|
|
||||||
None => None,
|
|
||||||
Some(tx) => {
|
|
||||||
if ver != 0 {
|
|
||||||
Some((tx, index))
|
|
||||||
} else {
|
|
||||||
None
|
|
||||||
}
|
|
||||||
}
|
|
||||||
})
|
|
||||||
.unzip();
|
|
||||||
|
|
||||||
debug!(
|
|
||||||
"bank: {} verified transactions {}",
|
|
||||||
bank.slot(),
|
|
||||||
verified_transactions.len()
|
|
||||||
);
|
|
||||||
|
|
||||||
let processed = Self::process_transactions(&bank, &verified_transactions, poh)?;
|
|
||||||
if processed < verified_transactions.len() {
|
|
||||||
bank_shutdown = true;
|
bank_shutdown = true;
|
||||||
// Collect any unprocessed transactions in this batch for forwarding
|
// Collect any unprocessed transactions in this batch for forwarding
|
||||||
unprocessed_packets.push((msgs, verified_transaction_index[processed]));
|
unprocessed_packets.push((msgs, verified_indexes[processed], vers));
|
||||||
}
|
}
|
||||||
new_tx_count += processed;
|
new_tx_count += processed;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue