remove copying of forwarded packets (#4425)

automerge
This commit is contained in:
Pankaj Garg 2019-05-24 17:35:09 -07:00 committed by Grimes
parent bfa1c025fd
commit 1f71d05299
2 changed files with 29 additions and 38 deletions

View File

@ -74,11 +74,8 @@ fn bench_consume_buffered(bencher: &mut Bencher) {
// This tests the performance of buffering packets.
// If the packet buffers are copied, performance will be poor.
bencher.iter(move || {
let _ignored = BankingStage::consume_buffered_packets(
&my_pubkey,
&poh_recorder,
packets.as_slice(),
);
let _ignored =
BankingStage::consume_buffered_packets(&my_pubkey, &poh_recorder, &mut packets);
});
exit.store(true, Ordering::Relaxed);

View File

@ -146,24 +146,20 @@ impl BankingStage {
pub fn consume_buffered_packets(
my_pubkey: &Pubkey,
poh_recorder: &Arc<Mutex<PohRecorder>>,
buffered_packets: &[PacketsAndOffsets],
buffered_packets: &mut Vec<PacketsAndOffsets>,
) -> Result<UnprocessedPackets> {
let mut unprocessed_packets = vec![];
let mut rebuffered_packets = 0;
let mut new_tx_count = 0;
let buffered_len = buffered_packets.len();
let mut buffered_packets_iter = buffered_packets.iter();
let mut buffered_packets_iter = buffered_packets.drain(..);
let proc_start = Instant::now();
while let Some((msgs, unprocessed_indexes)) = buffered_packets_iter.next() {
let bank = poh_recorder.lock().unwrap().bank();
if bank.is_none() {
rebuffered_packets += unprocessed_indexes.len();
Self::push_unprocessed(
&mut unprocessed_packets,
msgs.to_owned(),
unprocessed_indexes.to_owned(),
);
Self::push_unprocessed(&mut unprocessed_packets, msgs, unprocessed_indexes);
continue;
}
let bank = bank.unwrap();
@ -180,11 +176,7 @@ impl BankingStage {
// Collect any unprocessed transactions in this batch for forwarding
rebuffered_packets += new_unprocessed_indexes.len();
Self::push_unprocessed(
&mut unprocessed_packets,
msgs.to_owned(),
new_unprocessed_indexes,
);
Self::push_unprocessed(&mut unprocessed_packets, msgs, new_unprocessed_indexes);
if processed < verified_txs_len {
let next_leader = poh_recorder.lock().unwrap().next_slot_leader();
@ -197,11 +189,7 @@ impl BankingStage {
my_pubkey,
next_leader,
);
Self::push_unprocessed(
&mut unprocessed_packets,
msgs.to_owned(),
unprocessed_indexes,
);
Self::push_unprocessed(&mut unprocessed_packets, msgs, unprocessed_indexes);
}
}
}
@ -257,9 +245,9 @@ impl BankingStage {
socket: &std::net::UdpSocket,
poh_recorder: &Arc<Mutex<PohRecorder>>,
cluster_info: &Arc<RwLock<ClusterInfo>>,
buffered_packets: &[PacketsAndOffsets],
buffered_packets: &mut Vec<PacketsAndOffsets>,
enable_forwarding: bool,
) -> Result<UnprocessedPackets> {
) -> Result<()> {
let rcluster_info = cluster_info.read().unwrap();
let (decision, next_leader) = {
@ -278,28 +266,35 @@ impl BankingStage {
match decision {
BufferedPacketsDecision::Consume => {
Self::consume_buffered_packets(&rcluster_info.id(), poh_recorder, buffered_packets)
let mut unprocessed = Self::consume_buffered_packets(
&rcluster_info.id(),
poh_recorder,
buffered_packets,
)?;
buffered_packets.append(&mut unprocessed);
Ok(())
}
BufferedPacketsDecision::Forward => {
if enable_forwarding {
next_leader.map_or(Ok(buffered_packets.to_vec()), |leader_pubkey| {
rcluster_info.lookup(&leader_pubkey).map_or(
Ok(buffered_packets.to_vec()),
|leader| {
next_leader.map_or(Ok(()), |leader_pubkey| {
rcluster_info
.lookup(&leader_pubkey)
.map_or(Ok(()), |leader| {
let _ = Self::forward_buffered_packets(
&socket,
&leader.tpu_via_blobs,
&buffered_packets,
);
Ok(vec![])
},
)
buffered_packets.clear();
Ok(())
})
})
} else {
Ok(vec![])
buffered_packets.clear();
Ok(())
}
}
_ => Ok(buffered_packets.to_vec()),
_ => Ok(()),
}
}
@ -319,10 +314,9 @@ impl BankingStage {
&socket,
poh_recorder,
cluster_info,
&buffered_packets,
&mut buffered_packets,
enable_forwarding,
)
.map(|packets| buffered_packets = packets)
.unwrap_or_else(|_| buffered_packets.clear());
}
@ -345,7 +339,7 @@ impl BankingStage {
id,
) {
Err(Error::RecvTimeoutError(RecvTimeoutError::Timeout)) => (),
Ok(unprocessed_packets) => {
Ok(mut unprocessed_packets) => {
if unprocessed_packets.is_empty() {
continue;
}
@ -354,7 +348,7 @@ impl BankingStage {
.map(|(_, unprocessed)| unprocessed.len())
.sum();
inc_new_counter_info!("banking_stage-buffered_packets", num);
buffered_packets.extend_from_slice(&unprocessed_packets);
buffered_packets.append(&mut unprocessed_packets);
}
Err(err) => {
debug!("solana-banking-stage-tx: exit due to {:?}", err);