Performance tweaks (#4340)

* Use Rc to prevent clone of Packets

* Fix min => max in banking_stage threads.

Coalesce packet buffers better since a larger batch will
be faster through banking and sigverify.

Deconstruct batches into banking_stage from sigverify since
sigverify likes to accumulate batches but then a single banking_stage
thread will be stuck with a large batch. Maximize parallelism by
creating more chunks of work for banking_stage.
This commit is contained in:
sakridge 2019-05-20 09:15:00 -07:00 committed by GitHub
parent 034eda4546
commit 55cee5742f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 161 additions and 54 deletions

View File

@ -20,13 +20,13 @@ fn producer(addr: &SocketAddr, exit: Arc<AtomicBool>) -> JoinHandle<()> {
w.meta.size = PACKET_DATA_SIZE;
w.meta.set_addr(&addr);
}
let msgs_ = msgs.clone();
let msgs = Arc::new(msgs);
spawn(move || loop {
if exit.load(Ordering::Relaxed) {
return;
}
let mut num = 0;
for p in &msgs_.packets {
for p in &msgs.packets {
let a = p.meta.addr();
assert!(p.meta.size < BLOB_SIZE);
send.send_to(&p.data[..p.meta.size], &a).unwrap();

View File

@ -15,6 +15,7 @@ use solana::genesis_utils::create_genesis_block;
use solana::packet::to_packets_chunked;
use solana::poh_recorder::WorkingBankEntries;
use solana::service::Service;
use solana::test_tx::test_tx;
use solana_runtime::bank::Bank;
use solana_sdk::hash::hash;
use solana_sdk::pubkey::Pubkey;
@ -24,6 +25,7 @@ use solana_sdk::timing::{
duration_as_ms, timestamp, DEFAULT_TICKS_PER_SLOT, MAX_RECENT_BLOCKHASHES,
};
use std::iter;
use std::rc::Rc;
use std::sync::atomic::Ordering;
use std::sync::mpsc::{channel, Receiver};
use std::sync::{Arc, RwLock};
@ -48,6 +50,40 @@ fn check_txs(receiver: &Arc<Receiver<WorkingBankEntries>>, ref_tx_count: usize)
assert_eq!(total, ref_tx_count);
}
#[bench]
fn bench_consume_buffered(bencher: &mut Bencher) {
let (genesis_block, _mint_keypair) = create_genesis_block(100_000);
let bank = Arc::new(Bank::new(&genesis_block));
let ledger_path = get_tmp_ledger_path!();
{
let blocktree = Arc::new(
Blocktree::open(&ledger_path).expect("Expected to be able to open database ledger"),
);
let (exit, poh_recorder, poh_service, _signal_receiver) =
create_test_recorder(&bank, &blocktree);
let tx = test_tx();
let len = 4096;
let chunk_size = 1024;
let batches = to_packets_chunked(&vec![tx; len], chunk_size);
let mut packets = vec![];
for batch in batches {
let batch_len = batch.packets.len();
packets.push((Rc::new(batch), vec![0usize; batch_len]));
}
// This tests the performance of buffering packets.
// If the packet buffers are copied, performance will be poor.
bencher.iter(move || {
let packets_len = packets.len();
let res = BankingStage::consume_buffered_packets(&poh_recorder, packets.as_slice());
});
exit.store(true, Ordering::Relaxed);
poh_service.join().unwrap();
}
let _unused = Blocktree::destroy(&ledger_path);
}
#[bench]
fn bench_banking_stage_multi_accounts(bencher: &mut Bencher) {
solana_logger::setup();

View File

@ -28,6 +28,7 @@ use solana_sdk::timing::{
use solana_sdk::transaction::{self, Transaction, TransactionError};
use std::cmp;
use std::net::UdpSocket;
use std::rc::Rc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{Receiver, RecvTimeoutError};
use std::sync::{Arc, Mutex, RwLock};
@ -36,7 +37,9 @@ use std::time::Duration;
use std::time::Instant;
use sys_info;
pub type UnprocessedPackets = Vec<(Packets, Vec<usize>)>;
// Rc prevents clone/copy of Packets vector buffer
type PacketsAndOffsets = (Rc<Packets>, Vec<usize>);
pub type UnprocessedPackets = Vec<PacketsAndOffsets>;
// number of threads is 1 until mt bank is ready
pub const NUM_THREADS: u32 = 10;
@ -67,7 +70,7 @@ impl BankingStage {
poh_recorder,
verified_receiver,
verified_vote_receiver,
cmp::min(2, Self::num_threads()),
cmp::max(2, Self::num_threads()),
)
}
@ -109,6 +112,7 @@ impl BankingStage {
&cluster_info,
&mut recv_start,
enable_forwarding,
i,
);
exit.store(true, Ordering::Relaxed);
})
@ -118,7 +122,7 @@ impl BankingStage {
Self { bank_thread_hdls }
}
fn filter_valid_packets_for_forwarding(all_packets: &[(Packets, Vec<usize>)]) -> Vec<&Packet> {
fn filter_valid_packets_for_forwarding(all_packets: &[PacketsAndOffsets]) -> Vec<&Packet> {
all_packets
.iter()
.flat_map(|(p, valid_indexes)| valid_indexes.iter().map(move |x| &p.packets[*x]))
@ -128,7 +132,7 @@ impl BankingStage {
fn forward_buffered_packets(
socket: &std::net::UdpSocket,
tpu_via_blobs: &std::net::SocketAddr,
unprocessed_packets: &[(Packets, Vec<usize>)],
unprocessed_packets: &[PacketsAndOffsets],
) -> std::io::Result<()> {
let packets = Self::filter_valid_packets_for_forwarding(unprocessed_packets);
inc_new_counter_info!("banking_stage-forwarded_packets", packets.len());
@ -141,19 +145,26 @@ impl BankingStage {
Ok(())
}
fn consume_buffered_packets(
pub fn consume_buffered_packets(
poh_recorder: &Arc<Mutex<PohRecorder>>,
buffered_packets: &[(Packets, Vec<usize>)],
buffered_packets: &[PacketsAndOffsets],
) -> Result<UnprocessedPackets> {
let mut unprocessed_packets = vec![];
let mut rebuffered_packets = 0;
let mut new_tx_count = 0;
let buffered_len = buffered_packets.len();
let mut buffered_packets_iter = buffered_packets.iter();
let proc_start = Instant::now();
while let Some((msgs, unprocessed_indexes)) = buffered_packets_iter.next() {
let bank = poh_recorder.lock().unwrap().bank();
if bank.is_none() {
rebuffered_packets += unprocessed_indexes.len();
unprocessed_packets.push((msgs.to_owned(), unprocessed_indexes.to_owned()));
Self::push_unprocessed(
&mut unprocessed_packets,
msgs.to_owned(),
unprocessed_indexes.to_owned(),
);
continue;
}
let bank = bank.unwrap();
@ -169,23 +180,39 @@ impl BankingStage {
new_tx_count += processed;
// Collect any unprocessed transactions in this batch for forwarding
if !new_unprocessed_indexes.is_empty() {
rebuffered_packets += new_unprocessed_indexes.len();
unprocessed_packets.push((msgs.to_owned(), new_unprocessed_indexes));
}
rebuffered_packets += new_unprocessed_indexes.len();
Self::push_unprocessed(
&mut unprocessed_packets,
msgs.to_owned(),
new_unprocessed_indexes,
);
if processed < verified_txs_len {
// Walk thru rest of the transactions and filter out the invalid (e.g. too old) ones
while let Some((msgs, unprocessed_indexes)) = buffered_packets_iter.next() {
let unprocessed_indexes =
Self::filter_unprocessed_packets(&bank, &msgs, &unprocessed_indexes);
if !unprocessed_indexes.is_empty() {
unprocessed_packets.push((msgs.to_owned(), unprocessed_indexes));
}
Self::push_unprocessed(
&mut unprocessed_packets,
msgs.to_owned(),
unprocessed_indexes,
);
}
}
}
let total_time_s = timing::duration_as_s(&proc_start.elapsed());
let total_time_ms = timing::duration_as_ms(&proc_start.elapsed());
debug!(
"@{:?} done processing buffered batches: {} time: {:?}ms tx count: {} tx/s: {}",
timing::timestamp(),
buffered_len,
total_time_ms,
new_tx_count,
(new_tx_count as f32) / (total_time_s)
);
inc_new_counter_info!("banking_stage-rebuffered_packets", rebuffered_packets);
inc_new_counter_info!("banking_stage-consumed_buffered_packets", new_tx_count);
inc_new_counter_debug!("banking_stage-process_transactions", new_tx_count);
@ -225,7 +252,7 @@ impl BankingStage {
socket: &std::net::UdpSocket,
poh_recorder: &Arc<Mutex<PohRecorder>>,
cluster_info: &Arc<RwLock<ClusterInfo>>,
buffered_packets: &[(Packets, Vec<usize>)],
buffered_packets: &[PacketsAndOffsets],
enable_forwarding: bool,
) -> Result<UnprocessedPackets> {
let rcluster_info = cluster_info.read().unwrap();
@ -277,6 +304,7 @@ impl BankingStage {
cluster_info: &Arc<RwLock<ClusterInfo>>,
recv_start: &mut Instant,
enable_forwarding: bool,
id: u32,
) {
let socket = UdpSocket::bind("0.0.0.0:0").unwrap();
let mut buffered_packets = vec![];
@ -303,8 +331,13 @@ impl BankingStage {
Duration::from_millis(100)
};
match Self::process_packets(&verified_receiver, &poh_recorder, recv_start, recv_timeout)
{
match Self::process_packets(
&verified_receiver,
&poh_recorder,
recv_start,
recv_timeout,
id,
) {
Err(Error::RecvTimeoutError(RecvTimeoutError::Timeout)) => (),
Ok(unprocessed_packets) => {
if unprocessed_packets.is_empty() {
@ -645,12 +678,20 @@ impl BankingStage {
filtered_unprocessed_tx_indexes
}
fn generate_packet_indexes(vers: Vec<u8>) -> Vec<usize> {
vers.iter()
.enumerate()
.filter_map(|(index, ver)| if *ver != 0 { Some(index) } else { None })
.collect()
}
/// Process the incoming packets
pub fn process_packets(
verified_receiver: &Arc<Mutex<Receiver<VerifiedPackets>>>,
poh: &Arc<Mutex<PohRecorder>>,
recv_start: &mut Instant,
recv_timeout: Duration,
id: u32,
) -> Result<UnprocessedPackets> {
let mms = verified_receiver
.lock()
@ -658,29 +699,30 @@ impl BankingStage {
.recv_timeout(recv_timeout)?;
let mms_len = mms.len();
let count = mms.iter().map(|x| x.1.len()).sum();
let count: usize = mms.iter().map(|x| x.1.len()).sum();
debug!(
"@{:?} process start stalled for: {:?}ms txs: {}",
"@{:?} process start stalled for: {:?}ms txs: {} id: {}",
timing::timestamp(),
timing::duration_as_ms(&recv_start.elapsed()),
count,
id,
);
inc_new_counter_debug!("banking_stage-transactions_received", count);
let proc_start = Instant::now();
let mut new_tx_count = 0;
let mms: Vec<_> = mms
.into_iter()
.map(|(packets, vers)| (Rc::new(packets), vers))
.collect();
let mut mms_iter = mms.into_iter();
let mut unprocessed_packets = vec![];
while let Some((msgs, vers)) = mms_iter.next() {
let packet_indexes: Vec<usize> = vers
.iter()
.enumerate()
.filter_map(|(index, ver)| if *ver != 0 { Some(index) } else { None })
.collect();
let packet_indexes = Self::generate_packet_indexes(vers);
let bank = poh.lock().unwrap().bank();
if bank.is_none() {
if !packet_indexes.is_empty() {
unprocessed_packets.push((msgs, packet_indexes));
}
Self::push_unprocessed(&mut unprocessed_packets, msgs, packet_indexes);
continue;
}
let bank = bank.unwrap();
@ -691,23 +733,15 @@ impl BankingStage {
new_tx_count += processed;
// Collect any unprocessed transactions in this batch for forwarding
if !unprocessed_indexes.is_empty() {
unprocessed_packets.push((msgs, unprocessed_indexes));
}
Self::push_unprocessed(&mut unprocessed_packets, msgs, unprocessed_indexes);
if processed < verified_txs_len {
// Walk thru rest of the transactions and filter out the invalid (e.g. too old) ones
while let Some((msgs, vers)) = mms_iter.next() {
let packet_indexes: Vec<usize> = vers
.iter()
.enumerate()
.filter_map(|(index, ver)| if *ver != 0 { Some(index) } else { None })
.collect();
let packet_indexes = Self::generate_packet_indexes(vers);
let unprocessed_indexes =
Self::filter_unprocessed_packets(&bank, &msgs, &packet_indexes);
if !unprocessed_indexes.is_empty() {
unprocessed_packets.push((msgs, unprocessed_indexes));
}
Self::push_unprocessed(&mut unprocessed_packets, msgs, unprocessed_indexes);
}
}
}
@ -719,12 +753,14 @@ impl BankingStage {
let total_time_s = timing::duration_as_s(&proc_start.elapsed());
let total_time_ms = timing::duration_as_ms(&proc_start.elapsed());
debug!(
"@{:?} done processing transaction batches: {} time: {:?}ms tx count: {} tx/s: {}",
"@{:?} done processing transaction batches: {} time: {:?}ms tx count: {} tx/s: {} total count: {} id: {}",
timing::timestamp(),
mms_len,
total_time_ms,
new_tx_count,
(new_tx_count as f32) / (total_time_s)
(new_tx_count as f32) / (total_time_s),
count,
id,
);
inc_new_counter_debug!("banking_stage-process_packets", count);
inc_new_counter_debug!("banking_stage-process_transactions", new_tx_count);
@ -733,6 +769,16 @@ impl BankingStage {
Ok(unprocessed_packets)
}
fn push_unprocessed(
unprocessed_packets: &mut UnprocessedPackets,
packets: Rc<Packets>,
packet_indexes: Vec<usize>,
) {
if !packet_indexes.is_empty() {
unprocessed_packets.push((packets, packet_indexes));
}
}
}
impl Service for BankingStage {
@ -924,8 +970,14 @@ mod tests {
// glad they all fit
assert_eq!(packets.len(), 1);
let packets = packets
.into_iter()
.map(|packets| (packets, vec![0u8, 1u8, 1u8]))
.collect();
verified_sender // no_ver, anf, tx
.send(vec![(packets[0].clone(), vec![0u8, 1u8, 1u8])])
.send(packets)
.unwrap();
drop(verified_sender);
@ -990,9 +1042,11 @@ mod tests {
);
let packets = to_packets(&[tx]);
verified_sender
.send(vec![(packets[0].clone(), vec![1u8])])
.unwrap();
let packets = packets
.into_iter()
.map(|packets| (packets, vec![1u8]))
.collect();
verified_sender.send(packets).unwrap();
// Process a second batch that spends one of those lamports.
let tx = system_transaction::create_user_account(
@ -1003,9 +1057,11 @@ mod tests {
0,
);
let packets = to_packets(&[tx]);
verified_sender
.send(vec![(packets[0].clone(), vec![1u8])])
.unwrap();
let packets = packets
.into_iter()
.map(|packets| (packets, vec![1u8]))
.collect();
verified_sender.send(packets).unwrap();
let (vote_sender, vote_receiver) = channel();
let ledger_path = get_tmp_ledger_path!();
@ -1493,7 +1549,7 @@ mod tests {
let valid_indexes = (0..32)
.filter_map(|x| if x % 2 != 0 { Some(x as usize) } else { None })
.collect_vec();
(packets, valid_indexes)
(Rc::new(packets), valid_indexes)
})
.collect_vec();

View File

@ -18,6 +18,7 @@ use std::mem::size_of;
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, UdpSocket};
use std::ops::{Deref, DerefMut};
use std::sync::{Arc, RwLock};
use std::time::Instant;
pub type SharedBlob = Arc<RwLock<Blob>>;
pub type SharedBlobs = Vec<SharedBlob>;
@ -213,6 +214,9 @@ pub enum BlobError {
impl Packets {
pub fn recv_from(&mut self, socket: &UdpSocket) -> Result<usize> {
const MAX_PACKETS_PER_VEC: usize = 1024;
const MAX_MILLIS_PER_BATCH: u128 = 2;
let mut i = 0;
//DOCUMENTED SIDE-EFFECT
//Performance out of the IO without poll
@ -222,11 +226,16 @@ impl Packets {
// * set it back to blocking before returning
socket.set_nonblocking(false)?;
trace!("receiving on {}", socket.local_addr().unwrap());
let start = Instant::now();
loop {
self.packets.resize(i + NUM_RCVMMSGS, Packet::default());
match recv_mmsg(socket, &mut self.packets[i..]) {
Err(_) if i > 0 => {
break;
if i >= MAX_PACKETS_PER_VEC
|| start.elapsed().as_millis() > MAX_MILLIS_PER_BATCH
{
break;
}
}
Err(e) => {
trace!("recv_from err {:?}", e);
@ -238,7 +247,9 @@ impl Packets {
}
trace!("got {} packets", npkts);
i += npkts;
if npkts != NUM_RCVMMSGS || i >= 1024 {
if i >= MAX_PACKETS_PER_VEC
|| start.elapsed().as_millis() > MAX_MILLIS_PER_BATCH
{
break;
}
}

View File

@ -75,8 +75,12 @@ impl SigVerifyStage {
let verified_batch = Self::verify_batch(batch, sigverify_disabled);
inc_new_counter_info!("sigverify_stage-verified_packets_send", len);
if sendr.send(verified_batch).is_err() {
return Err(Error::SendError);
// Batch may be very large. Break it up so banking_stage can have maximum
// parallelism.
for item in verified_batch {
if sendr.send(vec![item]).is_err() {
return Err(Error::SendError);
}
}
let total_time_ms = timing::duration_as_ms(&now.elapsed());