diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 5a32c1d69..63741192c 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -7,7 +7,7 @@ use crate::entry; use crate::entry::{hash_transactions, Entry}; use crate::leader_schedule_cache::LeaderScheduleCache; use crate::packet; -use crate::packet::PACKETS_PER_BLOB; +use crate::packet::PACKETS_PER_BATCH; use crate::packet::{Packet, Packets}; use crate::poh_recorder::{PohRecorder, PohRecorderError, WorkingBankEntries}; use crate::poh_service::PohService; @@ -87,7 +87,7 @@ impl BankingStage { verified_vote_receiver: CrossbeamReceiver, num_threads: u32, ) -> Self { - let batch_limit = TOTAL_BUFFERED_PACKETS / ((num_threads - 1) as usize * PACKETS_PER_BLOB); + let batch_limit = TOTAL_BUFFERED_PACKETS / ((num_threads - 1) as usize * PACKETS_PER_BATCH); // Single thread to generate entries from many banks. // This thread talks to poh_service and broadcasts the entries once they have been recorded. // Once an entry has been recorded, its blockhash is registered with the bank. diff --git a/core/src/packet.rs b/core/src/packet.rs index 09cebaadc..adbacb70e 100644 --- a/core/src/packet.rs +++ b/core/src/packet.rs @@ -32,7 +32,9 @@ pub const BLOB_SIZE: usize = (2 * 1024 - 128); // wikipedia says there should be pub const BLOB_DATA_SIZE: usize = BLOB_SIZE - (BLOB_HEADER_SIZE * 2); pub const BLOB_DATA_ALIGN: usize = 16; // safe for erasure input pointers, gf.c needs 16byte-aligned buffers pub const NUM_BLOBS: usize = (NUM_PACKETS * PACKET_DATA_SIZE) / BLOB_SIZE; -pub const PACKETS_PER_BLOB: usize = 8; // reasonable estimate for payment packets per blob based on ~200b transaction size + +pub const PACKETS_PER_BATCH: usize = 256; +pub const PACKETS_BATCH_SIZE: usize = (PACKETS_PER_BATCH * PACKET_DATA_SIZE); #[derive(Clone, Default, Debug, PartialEq, Serialize, Deserialize)] #[repr(C)] @@ -291,9 +293,9 @@ impl Packets { trace!("got {} packets", npkts); i += npkts; total_size += size; - // Try to batch into blob-sized buffers + // Try to batch into big enough buffers // will cause less re-shuffling later on. - if start.elapsed().as_millis() > 1 || total_size >= (BLOB_DATA_SIZE - 512) { + if start.elapsed().as_millis() > 1 || total_size >= PACKETS_BATCH_SIZE { break; } } diff --git a/core/src/streamer.rs b/core/src/streamer.rs index 9674671d5..ef773df5b 100644 --- a/core/src/streamer.rs +++ b/core/src/streamer.rs @@ -1,7 +1,7 @@ //! The `streamer` module defines a set of services for efficiently pulling data from UDP sockets. //! -use crate::packet::{Blob, Packets, PacketsRecycler, SharedBlobs, PACKETS_PER_BLOB}; +use crate::packet::{Blob, Packets, PacketsRecycler, SharedBlobs, PACKETS_PER_BATCH}; use crate::result::{Error, Result}; use solana_sdk::timing::duration_as_ms; use std::net::UdpSocket; @@ -24,7 +24,7 @@ fn recv_loop( name: &'static str, ) -> Result<()> { loop { - let mut msgs = Packets::new_with_recycler(recycler.clone(), PACKETS_PER_BLOB, name); + let mut msgs = Packets::new_with_recycler(recycler.clone(), PACKETS_PER_BATCH, name); loop { // Check for exit signal, even if socket is busy // (for instance the leader trasaction socket)