From 44ebfa736a1c207f111f71a849a11d8e1c0af74c Mon Sep 17 00:00:00 2001 From: Pankaj Garg Date: Thu, 11 Apr 2019 17:23:45 -0700 Subject: [PATCH] Don't forward buffered packet to the same node (#3712) - instead, process the packets --- core/src/banking_stage.rs | 195 +++++++++++++++++++++++++------------- 1 file changed, 127 insertions(+), 68 deletions(-) diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 1859f1e02..1b4add1ff 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -30,7 +30,7 @@ use std::time::Duration; use std::time::Instant; use sys_info; -pub type UnprocessedPackets = Vec<(SharedPackets, usize)>; // `usize` is the index of the first unprocessed packet in `SharedPackets` +pub type UnprocessedPackets = Vec<(SharedPackets, usize, Vec)>; // `usize` is the index of the first unprocessed packet in `SharedPackets` // number of threads is 1 until mt bank is ready pub const NUM_THREADS: u32 = 10; @@ -97,11 +97,11 @@ impl BankingStage { fn forward_unprocessed_packets( socket: &std::net::UdpSocket, tpu_via_blobs: &std::net::SocketAddr, - unprocessed_packets: &[(SharedPackets, usize)], + unprocessed_packets: &[(SharedPackets, usize, Vec)], ) -> std::io::Result<()> { let locked_packets: Vec<_> = unprocessed_packets .iter() - .map(|(p, start_index)| (p.read().unwrap(), start_index)) + .map(|(p, start_index, _)| (p.read().unwrap(), start_index)) .collect(); let packets: Vec<&Packet> = locked_packets .iter() @@ -116,37 +116,77 @@ impl BankingStage { Ok(()) } - fn forward_buffered_packets( + fn process_buffered_packets( + poh_recorder: &Arc>, + buffered_packets: &[(SharedPackets, usize, Vec)], + ) -> Result { + let mut unprocessed_packets = vec![]; + let mut bank_shutdown = false; + for (msgs, offset, vers) in buffered_packets { + if bank_shutdown { + unprocessed_packets.push((msgs.to_owned(), *offset, vers.to_owned())); + continue; + } + + let bank = poh_recorder.lock().unwrap().bank(); + if bank.is_none() { + unprocessed_packets.push((msgs.to_owned(), *offset, vers.to_owned())); + continue; + } + let bank = bank.unwrap(); + + let (processed, verified_txs, verified_indexes) = + Self::process_received_packets(&bank, &poh_recorder, &msgs, &vers, *offset)?; + + if processed < verified_txs.len() { + bank_shutdown = true; + // Collect any unprocessed transactions in this batch for forwarding + unprocessed_packets.push(( + msgs.to_owned(), + verified_indexes[processed], + vers.to_owned(), + )); + } + } + Ok(unprocessed_packets) + } + + fn handle_buffered_packets( socket: &std::net::UdpSocket, poh_recorder: &Arc>, cluster_info: &Arc>, - buffered_packets: &[(SharedPackets, usize)], - ) -> bool { - if buffered_packets.is_empty() { - return false; - } - + buffered_packets: &[(SharedPackets, usize, Vec)], + ) -> Result { let rcluster_info = cluster_info.read().unwrap(); - // If there's a bank, and leader is available, forward the buffered packets - // or, if the current node is not the leader, forward the buffered packets - let forward = match poh_recorder.lock().unwrap().bank() { - Some(_) => rcluster_info.leader_data().is_some(), - None => rcluster_info - .leader_data() - .map(|x| x.id != rcluster_info.id()) - .unwrap_or(false), - }; + // If there's a bank, and leader is available, this node "is" the leader + // process the buffered packets + if poh_recorder.lock().unwrap().bank().is_some() { + if rcluster_info.leader_data().is_some() { + return Self::process_buffered_packets(poh_recorder, buffered_packets); + } - if forward { - let _ = Self::forward_unprocessed_packets( - &socket, - &rcluster_info.leader_data().unwrap().tpu_via_blobs, - &buffered_packets, - ); + return Ok(buffered_packets.to_vec()); } - forward + // If leader is not known, return the buffered packets as is + // else process the packets + rcluster_info + .leader_data() + .map_or(Ok(buffered_packets.to_vec()), |x| { + if x.id == rcluster_info.id() { + // If the current node is the leader, process the buffered packets + Self::process_buffered_packets(poh_recorder, buffered_packets) + } else { + // If the current node is not the leader, forward the buffered packets + let _ = Self::forward_unprocessed_packets( + &socket, + &rcluster_info.leader_data().unwrap().tpu_via_blobs, + &buffered_packets, + ); + Ok(vec![]) + } + }) } fn should_buffer_packets( @@ -179,13 +219,15 @@ impl BankingStage { let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let mut buffered_packets = vec![]; loop { - if Self::forward_buffered_packets( - &socket, - poh_recorder, - cluster_info, - &buffered_packets, - ) { - buffered_packets.clear(); + if !buffered_packets.is_empty() { + Self::handle_buffered_packets( + &socket, + poh_recorder, + cluster_info, + &buffered_packets, + ) + .map(|packets| buffered_packets = packets) + .unwrap_or_else(|_| buffered_packets.clear()); } match Self::process_packets(&verified_receiver, &poh_recorder, recv_start) { @@ -363,6 +405,52 @@ impl BankingStage { Ok(chunk_start) } + fn process_received_packets( + bank: &Arc, + poh: &Arc>, + msgs: &Arc>, + vers: &[u8], + offset: usize, + ) -> Result<(usize, Vec, Vec)> { + debug!("banking-stage-tx bank {}", bank.slot()); + let transactions = Self::deserialize_transactions(&Packets::new( + msgs.read().unwrap().packets[offset..].to_owned(), + )); + + let vers = vers[offset..].to_owned(); + + debug!( + "bank: {} transactions received {}", + bank.slot(), + transactions.len() + ); + let (verified_transactions, verified_indexes): (Vec<_>, Vec<_>) = transactions + .into_iter() + .zip(vers) + .zip(0..) + .filter_map(|((tx, ver), index)| match tx { + None => None, + Some(tx) => { + if ver != 0 { + Some((tx, index)) + } else { + None + } + } + }) + .unzip(); + + debug!( + "bank: {} verified transactions {}", + bank.slot(), + verified_transactions.len() + ); + + let processed = Self::process_transactions(&bank, &verified_transactions, poh)?; + + Ok((processed, verified_transactions, verified_indexes)) + } + /// Process the incoming packets pub fn process_packets( verified_receiver: &Arc>>, @@ -390,53 +478,24 @@ impl BankingStage { let mut bank_shutdown = false; for (msgs, vers) in mms { if bank_shutdown { - unprocessed_packets.push((msgs, 0)); + unprocessed_packets.push((msgs, 0, vers)); continue; } let bank = poh.lock().unwrap().bank(); if bank.is_none() { - unprocessed_packets.push((msgs, 0)); + unprocessed_packets.push((msgs, 0, vers)); continue; } let bank = bank.unwrap(); - debug!("banking-stage-tx bank {}", bank.slot()); - let transactions = Self::deserialize_transactions(&msgs.read().unwrap()); + let (processed, verified_txs, verified_indexes) = + Self::process_received_packets(&bank, &poh, &msgs, &vers, 0)?; - debug!( - "bank: {} transactions received {}", - bank.slot(), - transactions.len() - ); - let (verified_transactions, verified_transaction_index): (Vec<_>, Vec<_>) = - transactions - .into_iter() - .zip(vers) - .zip(0..) - .filter_map(|((tx, ver), index)| match tx { - None => None, - Some(tx) => { - if ver != 0 { - Some((tx, index)) - } else { - None - } - } - }) - .unzip(); - - debug!( - "bank: {} verified transactions {}", - bank.slot(), - verified_transactions.len() - ); - - let processed = Self::process_transactions(&bank, &verified_transactions, poh)?; - if processed < verified_transactions.len() { + if processed < verified_txs.len() { bank_shutdown = true; // Collect any unprocessed transactions in this batch for forwarding - unprocessed_packets.push((msgs, verified_transaction_index[processed])); + unprocessed_packets.push((msgs, verified_indexes[processed], vers)); } new_tx_count += processed; }