Cleanup num_threads() and batch_limit numbers (#4852)

This commit is contained in:
sakridge 2019-06-28 10:55:24 +02:00 committed by GitHub
parent af1c70f032
commit 0fa1af5d47
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 91 additions and 19 deletions

View File

@ -77,8 +77,12 @@ fn bench_consume_buffered(bencher: &mut Bencher) {
// This tests the performance of buffering packets.
// If the packet buffers are copied, performance will be poor.
bencher.iter(move || {
let _ignored =
BankingStage::consume_buffered_packets(&my_pubkey, &poh_recorder, &mut packets);
let _ignored = BankingStage::consume_buffered_packets(
&my_pubkey,
&poh_recorder,
&mut packets,
10_000,
);
});
exit.store(true, Ordering::Relaxed);

View File

@ -7,6 +7,7 @@ use crate::entry;
use crate::entry::{hash_transactions, Entry};
use crate::leader_schedule_cache::LeaderScheduleCache;
use crate::packet;
use crate::packet::PACKETS_PER_BLOB;
use crate::packet::{Packet, Packets};
use crate::poh_recorder::{PohRecorder, PohRecorderError, WorkingBankEntries};
use crate::poh_service::PohService;
@ -27,6 +28,8 @@ use solana_sdk::timing::{
MAX_TRANSACTION_FORWARDING_DELAY,
};
use solana_sdk::transaction::{self, Transaction, TransactionError};
use std::cmp;
use std::env;
use std::net::UdpSocket;
use std::sync::atomic::AtomicBool;
use std::sync::mpsc::Receiver;
@ -34,7 +37,6 @@ use std::sync::{Arc, Mutex, RwLock};
use std::thread::{self, Builder, JoinHandle};
use std::time::Duration;
use std::time::Instant;
use sys_info;
type PacketsAndOffsets = (Packets, Vec<usize>);
pub type UnprocessedPackets = Vec<PacketsAndOffsets>;
@ -42,8 +44,10 @@ pub type UnprocessedPackets = Vec<PacketsAndOffsets>;
/// Transaction forwarding
pub const FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET: u64 = 4;
// number of threads is 1 until mt bank is ready
pub const NUM_THREADS: u32 = 10;
// Fixed thread size seems to be fastest on GCP setup
pub const NUM_THREADS: u32 = 4;
const TOTAL_BUFFERED_PACKETS: usize = 500_000;
/// Stores the stage's thread handle and output receiver.
pub struct BankingStage {
@ -71,7 +75,7 @@ impl BankingStage {
poh_recorder,
verified_receiver,
verified_vote_receiver,
4,
Self::num_threads(),
)
}
@ -82,6 +86,7 @@ impl BankingStage {
verified_vote_receiver: CrossbeamReceiver<VerifiedPackets>,
num_threads: u32,
) -> Self {
let batch_limit = TOTAL_BUFFERED_PACKETS / ((num_threads - 1) as usize * PACKETS_PER_BLOB);
// Single thread to generate entries from many banks.
// This thread talks to poh_service and broadcasts the entries once they have been recorded.
// Once an entry has been recorded, its blockhash is registered with the bank.
@ -110,6 +115,7 @@ impl BankingStage {
&mut recv_start,
enable_forwarding,
i,
batch_limit,
);
})
.unwrap()
@ -145,19 +151,27 @@ impl BankingStage {
my_pubkey: &Pubkey,
poh_recorder: &Arc<Mutex<PohRecorder>>,
buffered_packets: &mut Vec<PacketsAndOffsets>,
batch_limit: usize,
) -> Result<UnprocessedPackets> {
let mut unprocessed_packets = vec![];
let mut rebuffered_packets = 0;
let mut new_tx_count = 0;
let buffered_len = buffered_packets.len();
let mut buffered_packets_iter = buffered_packets.drain(..);
let mut dropped_batches_count = 0;
let proc_start = Instant::now();
while let Some((msgs, unprocessed_indexes)) = buffered_packets_iter.next() {
let bank = poh_recorder.lock().unwrap().bank();
if bank.is_none() {
rebuffered_packets += unprocessed_indexes.len();
Self::push_unprocessed(&mut unprocessed_packets, msgs, unprocessed_indexes);
Self::push_unprocessed(
&mut unprocessed_packets,
msgs,
unprocessed_indexes,
&mut dropped_batches_count,
batch_limit,
);
continue;
}
let bank = bank.unwrap();
@ -174,7 +188,13 @@ impl BankingStage {
// Collect any unprocessed transactions in this batch for forwarding
rebuffered_packets += new_unprocessed_indexes.len();
Self::push_unprocessed(&mut unprocessed_packets, msgs, new_unprocessed_indexes);
Self::push_unprocessed(
&mut unprocessed_packets,
msgs,
new_unprocessed_indexes,
&mut dropped_batches_count,
batch_limit,
);
if processed < verified_txs_len {
let next_leader = poh_recorder.lock().unwrap().next_slot_leader();
@ -187,7 +207,13 @@ impl BankingStage {
my_pubkey,
next_leader,
);
Self::push_unprocessed(&mut unprocessed_packets, msgs, unprocessed_indexes);
Self::push_unprocessed(
&mut unprocessed_packets,
msgs,
unprocessed_indexes,
&mut dropped_batches_count,
batch_limit,
);
}
}
}
@ -207,6 +233,7 @@ impl BankingStage {
inc_new_counter_info!("banking_stage-rebuffered_packets", rebuffered_packets);
inc_new_counter_info!("banking_stage-consumed_buffered_packets", new_tx_count);
inc_new_counter_debug!("banking_stage-process_transactions", new_tx_count);
inc_new_counter_debug!("banking_stage-dropped_batches_count", dropped_batches_count);
Ok(unprocessed_packets)
}
@ -246,6 +273,7 @@ impl BankingStage {
cluster_info: &Arc<RwLock<ClusterInfo>>,
buffered_packets: &mut Vec<PacketsAndOffsets>,
enable_forwarding: bool,
batch_limit: usize,
) -> Result<()> {
let decision = {
let poh = poh_recorder.lock().unwrap();
@ -261,8 +289,12 @@ impl BankingStage {
match decision {
BufferedPacketsDecision::Consume => {
let mut unprocessed =
Self::consume_buffered_packets(my_pubkey, poh_recorder, buffered_packets)?;
let mut unprocessed = Self::consume_buffered_packets(
my_pubkey,
poh_recorder,
buffered_packets,
batch_limit,
)?;
buffered_packets.append(&mut unprocessed);
Ok(())
}
@ -307,6 +339,7 @@ impl BankingStage {
recv_start: &mut Instant,
enable_forwarding: bool,
id: u32,
batch_limit: usize,
) {
let socket = UdpSocket::bind("0.0.0.0:0").unwrap();
let mut buffered_packets = vec![];
@ -319,6 +352,7 @@ impl BankingStage {
cluster_info,
&mut buffered_packets,
enable_forwarding,
batch_limit,
)
.unwrap_or_else(|_| buffered_packets.clear());
}
@ -340,6 +374,7 @@ impl BankingStage {
recv_start,
recv_timeout,
id,
batch_limit,
) {
Err(Error::CrossbeamRecvTimeoutError(RecvTimeoutError::Timeout)) => (),
Err(Error::CrossbeamRecvTimeoutError(RecvTimeoutError::Disconnected)) => break,
@ -362,7 +397,14 @@ impl BankingStage {
}
pub fn num_threads() -> u32 {
sys_info::cpu_num().unwrap_or(NUM_THREADS)
const MIN_THREADS_VOTES: u32 = 1;
const MIN_THREADS_BANKING: u32 = 1;
cmp::max(
env::var("SOLANA_BANKING_THREADS")
.map(|x| x.parse().unwrap_or(NUM_THREADS))
.unwrap_or(NUM_THREADS),
MIN_THREADS_VOTES + MIN_THREADS_BANKING,
)
}
/// Convert the transactions from a blob of binary data to a vector of transactions
@ -736,6 +778,7 @@ impl BankingStage {
recv_start: &mut Instant,
recv_timeout: Duration,
id: u32,
batch_limit: usize,
) -> Result<UnprocessedPackets> {
let mms = verified_receiver.recv_timeout(recv_timeout)?;
@ -754,11 +797,18 @@ impl BankingStage {
let mut mms_iter = mms.into_iter();
let mut unprocessed_packets = vec![];
let mut dropped_batches_count = 0;
while let Some((msgs, vers)) = mms_iter.next() {
let packet_indexes = Self::generate_packet_indexes(vers);
let bank = poh.lock().unwrap().bank();
if bank.is_none() {
Self::push_unprocessed(&mut unprocessed_packets, msgs, packet_indexes);
Self::push_unprocessed(
&mut unprocessed_packets,
msgs,
packet_indexes,
&mut dropped_batches_count,
batch_limit,
);
continue;
}
let bank = bank.unwrap();
@ -769,7 +819,13 @@ impl BankingStage {
new_tx_count += processed;
// Collect any unprocessed transactions in this batch for forwarding
Self::push_unprocessed(&mut unprocessed_packets, msgs, unprocessed_indexes);
Self::push_unprocessed(
&mut unprocessed_packets,
msgs,
unprocessed_indexes,
&mut dropped_batches_count,
batch_limit,
);
if processed < verified_txs_len {
let next_leader = poh.lock().unwrap().next_slot_leader();
@ -783,7 +839,13 @@ impl BankingStage {
&my_pubkey,
next_leader,
);
Self::push_unprocessed(&mut unprocessed_packets, msgs, unprocessed_indexes);
Self::push_unprocessed(
&mut unprocessed_packets,
msgs,
unprocessed_indexes,
&mut dropped_batches_count,
batch_limit,
);
}
}
}
@ -806,6 +868,7 @@ impl BankingStage {
);
inc_new_counter_debug!("banking_stage-process_packets", count);
inc_new_counter_debug!("banking_stage-process_transactions", new_tx_count);
inc_new_counter_debug!("banking_stage-dropped_batches_count", dropped_batches_count);
*recv_start = Instant::now();
@ -816,10 +879,13 @@ impl BankingStage {
unprocessed_packets: &mut UnprocessedPackets,
packets: Packets,
packet_indexes: Vec<usize>,
dropped_batches_count: &mut usize,
batch_limit: usize,
) {
if !packet_indexes.is_empty() {
if unprocessed_packets.len() > 400 {
if unprocessed_packets.len() >= batch_limit {
unprocessed_packets.remove(0);
*dropped_batches_count += 1;
}
unprocessed_packets.push((packets, packet_indexes));
}

View File

@ -33,6 +33,7 @@ pub const BLOB_SIZE: usize = (64 * 1024 - 128); // wikipedia says there should b
pub const BLOB_DATA_SIZE: usize = BLOB_SIZE - (BLOB_HEADER_SIZE * 2);
pub const BLOB_DATA_ALIGN: usize = 16; // safe for erasure input pointers, gf.c needs 16byte-aligned buffers
pub const NUM_BLOBS: usize = (NUM_PACKETS * PACKET_DATA_SIZE) / BLOB_SIZE;
pub const PACKETS_PER_BLOB: usize = 256; // reasonable estimate for payment packets per blob based on ~200b transaction size
#[derive(Clone, Default, Debug, PartialEq, Serialize, Deserialize)]
#[repr(C)]

View File

@ -1,7 +1,7 @@
//! The `streamer` module defines a set of services for efficiently pulling data from UDP sockets.
//!
use crate::packet::{Blob, Packets, PacketsRecycler, SharedBlobs};
use crate::packet::{Blob, Packets, PacketsRecycler, SharedBlobs, PACKETS_PER_BLOB};
use crate::result::{Error, Result};
use solana_sdk::timing::duration_as_ms;
use std::net::UdpSocket;
@ -24,7 +24,7 @@ fn recv_loop(
name: &'static str,
) -> Result<()> {
loop {
let mut msgs = Packets::new_with_recycler(recycler.clone(), 256, name);
let mut msgs = Packets::new_with_recycler(recycler.clone(), PACKETS_PER_BLOB, name);
loop {
// Check for exit signal, even if socket is busy
// (for instance the leader trasaction socket)
@ -142,7 +142,8 @@ fn recv_blob_packets(sock: &UdpSocket, s: &PacketSender, recycler: &PacketsRecyc
let blobs = Blob::recv_from(sock)?;
for blob in blobs {
let mut packets = Packets::new_with_recycler(recycler.clone(), 256, "recv_blob_packets");
let mut packets =
Packets::new_with_recycler(recycler.clone(), PACKETS_PER_BLOB, "recv_blob_packets");
blob.read().unwrap().load_packets(&mut packets.packets);
s.send(packets)?;
}