From 0fa1af5d47cf6a987454c432ef54c8292a39aca5 Mon Sep 17 00:00:00 2001 From: sakridge Date: Fri, 28 Jun 2019 10:55:24 +0200 Subject: [PATCH] Cleanup num_threads() and batch_limit numbers (#4852) --- core/benches/banking_stage.rs | 8 ++- core/src/banking_stage.rs | 94 +++++++++++++++++++++++++++++------ core/src/packet.rs | 1 + core/src/streamer.rs | 7 +-- 4 files changed, 91 insertions(+), 19 deletions(-) diff --git a/core/benches/banking_stage.rs b/core/benches/banking_stage.rs index de66e82999..fa78c6a775 100644 --- a/core/benches/banking_stage.rs +++ b/core/benches/banking_stage.rs @@ -77,8 +77,12 @@ fn bench_consume_buffered(bencher: &mut Bencher) { // This tests the performance of buffering packets. // If the packet buffers are copied, performance will be poor. bencher.iter(move || { - let _ignored = - BankingStage::consume_buffered_packets(&my_pubkey, &poh_recorder, &mut packets); + let _ignored = BankingStage::consume_buffered_packets( + &my_pubkey, + &poh_recorder, + &mut packets, + 10_000, + ); }); exit.store(true, Ordering::Relaxed); diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 9b58b6d407..04b0a3a4f2 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -7,6 +7,7 @@ use crate::entry; use crate::entry::{hash_transactions, Entry}; use crate::leader_schedule_cache::LeaderScheduleCache; use crate::packet; +use crate::packet::PACKETS_PER_BLOB; use crate::packet::{Packet, Packets}; use crate::poh_recorder::{PohRecorder, PohRecorderError, WorkingBankEntries}; use crate::poh_service::PohService; @@ -27,6 +28,8 @@ use solana_sdk::timing::{ MAX_TRANSACTION_FORWARDING_DELAY, }; use solana_sdk::transaction::{self, Transaction, TransactionError}; +use std::cmp; +use std::env; use std::net::UdpSocket; use std::sync::atomic::AtomicBool; use std::sync::mpsc::Receiver; @@ -34,7 +37,6 @@ use std::sync::{Arc, Mutex, RwLock}; use std::thread::{self, Builder, JoinHandle}; use std::time::Duration; use std::time::Instant; -use sys_info; type PacketsAndOffsets = (Packets, Vec); pub type UnprocessedPackets = Vec; @@ -42,8 +44,10 @@ pub type UnprocessedPackets = Vec; /// Transaction forwarding pub const FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET: u64 = 4; -// number of threads is 1 until mt bank is ready -pub const NUM_THREADS: u32 = 10; +// Fixed thread size seems to be fastest on GCP setup +pub const NUM_THREADS: u32 = 4; + +const TOTAL_BUFFERED_PACKETS: usize = 500_000; /// Stores the stage's thread handle and output receiver. pub struct BankingStage { @@ -71,7 +75,7 @@ impl BankingStage { poh_recorder, verified_receiver, verified_vote_receiver, - 4, + Self::num_threads(), ) } @@ -82,6 +86,7 @@ impl BankingStage { verified_vote_receiver: CrossbeamReceiver, num_threads: u32, ) -> Self { + let batch_limit = TOTAL_BUFFERED_PACKETS / ((num_threads - 1) as usize * PACKETS_PER_BLOB); // Single thread to generate entries from many banks. // This thread talks to poh_service and broadcasts the entries once they have been recorded. // Once an entry has been recorded, its blockhash is registered with the bank. @@ -110,6 +115,7 @@ impl BankingStage { &mut recv_start, enable_forwarding, i, + batch_limit, ); }) .unwrap() @@ -145,19 +151,27 @@ impl BankingStage { my_pubkey: &Pubkey, poh_recorder: &Arc>, buffered_packets: &mut Vec, + batch_limit: usize, ) -> Result { let mut unprocessed_packets = vec![]; let mut rebuffered_packets = 0; let mut new_tx_count = 0; let buffered_len = buffered_packets.len(); let mut buffered_packets_iter = buffered_packets.drain(..); + let mut dropped_batches_count = 0; let proc_start = Instant::now(); while let Some((msgs, unprocessed_indexes)) = buffered_packets_iter.next() { let bank = poh_recorder.lock().unwrap().bank(); if bank.is_none() { rebuffered_packets += unprocessed_indexes.len(); - Self::push_unprocessed(&mut unprocessed_packets, msgs, unprocessed_indexes); + Self::push_unprocessed( + &mut unprocessed_packets, + msgs, + unprocessed_indexes, + &mut dropped_batches_count, + batch_limit, + ); continue; } let bank = bank.unwrap(); @@ -174,7 +188,13 @@ impl BankingStage { // Collect any unprocessed transactions in this batch for forwarding rebuffered_packets += new_unprocessed_indexes.len(); - Self::push_unprocessed(&mut unprocessed_packets, msgs, new_unprocessed_indexes); + Self::push_unprocessed( + &mut unprocessed_packets, + msgs, + new_unprocessed_indexes, + &mut dropped_batches_count, + batch_limit, + ); if processed < verified_txs_len { let next_leader = poh_recorder.lock().unwrap().next_slot_leader(); @@ -187,7 +207,13 @@ impl BankingStage { my_pubkey, next_leader, ); - Self::push_unprocessed(&mut unprocessed_packets, msgs, unprocessed_indexes); + Self::push_unprocessed( + &mut unprocessed_packets, + msgs, + unprocessed_indexes, + &mut dropped_batches_count, + batch_limit, + ); } } } @@ -207,6 +233,7 @@ impl BankingStage { inc_new_counter_info!("banking_stage-rebuffered_packets", rebuffered_packets); inc_new_counter_info!("banking_stage-consumed_buffered_packets", new_tx_count); inc_new_counter_debug!("banking_stage-process_transactions", new_tx_count); + inc_new_counter_debug!("banking_stage-dropped_batches_count", dropped_batches_count); Ok(unprocessed_packets) } @@ -246,6 +273,7 @@ impl BankingStage { cluster_info: &Arc>, buffered_packets: &mut Vec, enable_forwarding: bool, + batch_limit: usize, ) -> Result<()> { let decision = { let poh = poh_recorder.lock().unwrap(); @@ -261,8 +289,12 @@ impl BankingStage { match decision { BufferedPacketsDecision::Consume => { - let mut unprocessed = - Self::consume_buffered_packets(my_pubkey, poh_recorder, buffered_packets)?; + let mut unprocessed = Self::consume_buffered_packets( + my_pubkey, + poh_recorder, + buffered_packets, + batch_limit, + )?; buffered_packets.append(&mut unprocessed); Ok(()) } @@ -307,6 +339,7 @@ impl BankingStage { recv_start: &mut Instant, enable_forwarding: bool, id: u32, + batch_limit: usize, ) { let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let mut buffered_packets = vec![]; @@ -319,6 +352,7 @@ impl BankingStage { cluster_info, &mut buffered_packets, enable_forwarding, + batch_limit, ) .unwrap_or_else(|_| buffered_packets.clear()); } @@ -340,6 +374,7 @@ impl BankingStage { recv_start, recv_timeout, id, + batch_limit, ) { Err(Error::CrossbeamRecvTimeoutError(RecvTimeoutError::Timeout)) => (), Err(Error::CrossbeamRecvTimeoutError(RecvTimeoutError::Disconnected)) => break, @@ -362,7 +397,14 @@ impl BankingStage { } pub fn num_threads() -> u32 { - sys_info::cpu_num().unwrap_or(NUM_THREADS) + const MIN_THREADS_VOTES: u32 = 1; + const MIN_THREADS_BANKING: u32 = 1; + cmp::max( + env::var("SOLANA_BANKING_THREADS") + .map(|x| x.parse().unwrap_or(NUM_THREADS)) + .unwrap_or(NUM_THREADS), + MIN_THREADS_VOTES + MIN_THREADS_BANKING, + ) } /// Convert the transactions from a blob of binary data to a vector of transactions @@ -736,6 +778,7 @@ impl BankingStage { recv_start: &mut Instant, recv_timeout: Duration, id: u32, + batch_limit: usize, ) -> Result { let mms = verified_receiver.recv_timeout(recv_timeout)?; @@ -754,11 +797,18 @@ impl BankingStage { let mut mms_iter = mms.into_iter(); let mut unprocessed_packets = vec![]; + let mut dropped_batches_count = 0; while let Some((msgs, vers)) = mms_iter.next() { let packet_indexes = Self::generate_packet_indexes(vers); let bank = poh.lock().unwrap().bank(); if bank.is_none() { - Self::push_unprocessed(&mut unprocessed_packets, msgs, packet_indexes); + Self::push_unprocessed( + &mut unprocessed_packets, + msgs, + packet_indexes, + &mut dropped_batches_count, + batch_limit, + ); continue; } let bank = bank.unwrap(); @@ -769,7 +819,13 @@ impl BankingStage { new_tx_count += processed; // Collect any unprocessed transactions in this batch for forwarding - Self::push_unprocessed(&mut unprocessed_packets, msgs, unprocessed_indexes); + Self::push_unprocessed( + &mut unprocessed_packets, + msgs, + unprocessed_indexes, + &mut dropped_batches_count, + batch_limit, + ); if processed < verified_txs_len { let next_leader = poh.lock().unwrap().next_slot_leader(); @@ -783,7 +839,13 @@ impl BankingStage { &my_pubkey, next_leader, ); - Self::push_unprocessed(&mut unprocessed_packets, msgs, unprocessed_indexes); + Self::push_unprocessed( + &mut unprocessed_packets, + msgs, + unprocessed_indexes, + &mut dropped_batches_count, + batch_limit, + ); } } } @@ -806,6 +868,7 @@ impl BankingStage { ); inc_new_counter_debug!("banking_stage-process_packets", count); inc_new_counter_debug!("banking_stage-process_transactions", new_tx_count); + inc_new_counter_debug!("banking_stage-dropped_batches_count", dropped_batches_count); *recv_start = Instant::now(); @@ -816,10 +879,13 @@ impl BankingStage { unprocessed_packets: &mut UnprocessedPackets, packets: Packets, packet_indexes: Vec, + dropped_batches_count: &mut usize, + batch_limit: usize, ) { if !packet_indexes.is_empty() { - if unprocessed_packets.len() > 400 { + if unprocessed_packets.len() >= batch_limit { unprocessed_packets.remove(0); + *dropped_batches_count += 1; } unprocessed_packets.push((packets, packet_indexes)); } diff --git a/core/src/packet.rs b/core/src/packet.rs index 26998ca74b..e69cdae4bd 100644 --- a/core/src/packet.rs +++ b/core/src/packet.rs @@ -33,6 +33,7 @@ pub const BLOB_SIZE: usize = (64 * 1024 - 128); // wikipedia says there should b pub const BLOB_DATA_SIZE: usize = BLOB_SIZE - (BLOB_HEADER_SIZE * 2); pub const BLOB_DATA_ALIGN: usize = 16; // safe for erasure input pointers, gf.c needs 16byte-aligned buffers pub const NUM_BLOBS: usize = (NUM_PACKETS * PACKET_DATA_SIZE) / BLOB_SIZE; +pub const PACKETS_PER_BLOB: usize = 256; // reasonable estimate for payment packets per blob based on ~200b transaction size #[derive(Clone, Default, Debug, PartialEq, Serialize, Deserialize)] #[repr(C)] diff --git a/core/src/streamer.rs b/core/src/streamer.rs index 9663e2af10..d7b7557f4f 100644 --- a/core/src/streamer.rs +++ b/core/src/streamer.rs @@ -1,7 +1,7 @@ //! The `streamer` module defines a set of services for efficiently pulling data from UDP sockets. //! -use crate::packet::{Blob, Packets, PacketsRecycler, SharedBlobs}; +use crate::packet::{Blob, Packets, PacketsRecycler, SharedBlobs, PACKETS_PER_BLOB}; use crate::result::{Error, Result}; use solana_sdk::timing::duration_as_ms; use std::net::UdpSocket; @@ -24,7 +24,7 @@ fn recv_loop( name: &'static str, ) -> Result<()> { loop { - let mut msgs = Packets::new_with_recycler(recycler.clone(), 256, name); + let mut msgs = Packets::new_with_recycler(recycler.clone(), PACKETS_PER_BLOB, name); loop { // Check for exit signal, even if socket is busy // (for instance the leader trasaction socket) @@ -142,7 +142,8 @@ fn recv_blob_packets(sock: &UdpSocket, s: &PacketSender, recycler: &PacketsRecyc let blobs = Blob::recv_from(sock)?; for blob in blobs { - let mut packets = Packets::new_with_recycler(recycler.clone(), 256, "recv_blob_packets"); + let mut packets = + Packets::new_with_recycler(recycler.clone(), PACKETS_PER_BLOB, "recv_blob_packets"); blob.read().unwrap().load_packets(&mut packets.packets); s.send(packets)?; }