From 05409e51cec03ca282db87d7ec7c2e63a1266a97 Mon Sep 17 00:00:00 2001 From: sakridge Date: Fri, 26 Feb 2021 09:15:45 -0800 Subject: [PATCH] Increase tpu coalescing and add parameter (#15536) Should create larger entries on average --- bench-streamer/src/main.rs | 1 + core/src/fetch_stage.rs | 7 +++++++ core/src/gossip_service.rs | 1 + core/src/serve_repair_service.rs | 1 + core/src/shred_fetch_stage.rs | 1 + core/src/tpu.rs | 4 ++++ core/src/validator.rs | 5 ++++- streamer/src/packet.rs | 6 +++--- streamer/src/streamer.rs | 14 ++++++++++++-- validator/src/main.rs | 12 ++++++++++++ 10 files changed, 46 insertions(+), 6 deletions(-) diff --git a/bench-streamer/src/main.rs b/bench-streamer/src/main.rs index 6a668a6aa..80fe81521 100644 --- a/bench-streamer/src/main.rs +++ b/bench-streamer/src/main.rs @@ -91,6 +91,7 @@ fn main() -> Result<()> { s_reader, recycler.clone(), "bench-streamer-test", + 1, )); } diff --git a/core/src/fetch_stage.rs b/core/src/fetch_stage.rs index f2a660e61..37bb41a83 100644 --- a/core/src/fetch_stage.rs +++ b/core/src/fetch_stage.rs @@ -26,6 +26,7 @@ impl FetchStage { tpu_forwards_sockets: Vec, exit: &Arc, poh_recorder: &Arc>, + coalesce_ms: u64, ) -> (Self, PacketReceiver) { let (sender, receiver) = channel(); ( @@ -36,6 +37,7 @@ impl FetchStage { &sender, &poh_recorder, None, + coalesce_ms, ), receiver, ) @@ -47,6 +49,7 @@ impl FetchStage { sender: &PacketSender, poh_recorder: &Arc>, allocated_packet_limit: Option, + coalesce_ms: u64, ) -> Self { let tx_sockets = sockets.into_iter().map(Arc::new).collect(); let tpu_forwards_sockets = tpu_forwards_sockets.into_iter().map(Arc::new).collect(); @@ -57,6 +60,7 @@ impl FetchStage { &sender, &poh_recorder, allocated_packet_limit, + coalesce_ms, ) } @@ -102,6 +106,7 @@ impl FetchStage { sender: &PacketSender, poh_recorder: &Arc>, limit: Option, + coalesce_ms: u64, ) -> Self { let recycler: PacketsRecycler = Recycler::warmed(1000, 1024, limit, "fetch_stage_recycler_shrink"); @@ -113,6 +118,7 @@ impl FetchStage { sender.clone(), recycler.clone(), "fetch_stage", + coalesce_ms, ) }); @@ -124,6 +130,7 @@ impl FetchStage { forward_sender.clone(), recycler.clone(), "fetch_forward_stage", + coalesce_ms, ) }); diff --git a/core/src/gossip_service.rs b/core/src/gossip_service.rs index a4e633b49..3bf478662 100644 --- a/core/src/gossip_service.rs +++ b/core/src/gossip_service.rs @@ -49,6 +49,7 @@ impl GossipService { request_sender, Recycler::new_without_limit("gossip-receiver-recycler-shrink-stats"), "gossip_receiver", + 1, ); let (response_sender, response_receiver) = channel(); let t_responder = streamer::responder("gossip", gossip_socket, response_receiver); diff --git a/core/src/serve_repair_service.rs b/core/src/serve_repair_service.rs index 785ed06b0..2f8bb7819 100644 --- a/core/src/serve_repair_service.rs +++ b/core/src/serve_repair_service.rs @@ -32,6 +32,7 @@ impl ServeRepairService { request_sender, Recycler::new_without_limit("serve-repair-receiver-recycler-shrink-stats"), "serve_repair_receiver", + 1, ); let (response_sender, response_receiver) = channel(); let t_responder = diff --git a/core/src/shred_fetch_stage.rs b/core/src/shred_fetch_stage.rs index 01b1f177b..2cacdbff0 100644 --- a/core/src/shred_fetch_stage.rs +++ b/core/src/shred_fetch_stage.rs @@ -149,6 +149,7 @@ impl ShredFetchStage { packet_sender.clone(), recycler.clone(), "packet_modifier", + 1, ) }) .collect(); diff --git a/core/src/tpu.rs b/core/src/tpu.rs index e71b9f8d9..4b463cd36 100644 --- a/core/src/tpu.rs +++ b/core/src/tpu.rs @@ -29,6 +29,8 @@ use std::{ thread, }; +pub const DEFAULT_TPU_COALESCE_MS: u64 = 5; + pub struct Tpu { fetch_stage: FetchStage, sigverify_stage: SigVerifyStage, @@ -59,6 +61,7 @@ impl Tpu { replay_vote_receiver: ReplayVoteReceiver, replay_vote_sender: ReplayVoteSender, bank_notification_sender: Option, + tpu_coalesce_ms: u64, ) -> Self { let (packet_sender, packet_receiver) = channel(); let fetch_stage = FetchStage::new_with_sender( @@ -70,6 +73,7 @@ impl Tpu { // At 1024 packets per `Packet`, each packet about MTU size ~1k, this is roughly // 20GB Some(20_000), + tpu_coalesce_ms, ); let (verified_sender, verified_receiver) = unbounded(); diff --git a/core/src/validator.rs b/core/src/validator.rs index 0c569b766..b38428de0 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -28,7 +28,7 @@ use crate::{ serve_repair_service::ServeRepairService, sigverify, snapshot_packager_service::{PendingSnapshotPackage, SnapshotPackagerService}, - tpu::Tpu, + tpu::{Tpu, DEFAULT_TPU_COALESCE_MS}, transaction_status_service::TransactionStatusService, tvu::{Sockets, Tvu, TvuConfig}, }; @@ -126,6 +126,7 @@ pub struct ValidatorConfig { pub warp_slot: Option, pub accounts_db_test_hash_calculation: bool, pub accounts_db_use_index_hash_calculation: bool, + pub tpu_coalesce_ms: u64, } impl Default for ValidatorConfig { @@ -177,6 +178,7 @@ impl Default for ValidatorConfig { warp_slot: None, accounts_db_test_hash_calculation: false, accounts_db_use_index_hash_calculation: true, + tpu_coalesce_ms: DEFAULT_TPU_COALESCE_MS, } } } @@ -681,6 +683,7 @@ impl Validator { replay_vote_receiver, replay_vote_sender, bank_notification_sender, + config.tpu_coalesce_ms, ); datapoint_info!("validator-new", ("id", id.to_string(), String)); diff --git a/streamer/src/packet.rs b/streamer/src/packet.rs index 83202aa0e..2e69cac41 100644 --- a/streamer/src/packet.rs +++ b/streamer/src/packet.rs @@ -9,7 +9,7 @@ use solana_metrics::inc_new_counter_debug; pub use solana_sdk::packet::{Meta, Packet, PACKET_DATA_SIZE}; use std::{io::Result, net::UdpSocket, time::Instant}; -pub fn recv_from(obj: &mut Packets, socket: &UdpSocket, max_wait_ms: usize) -> Result { +pub fn recv_from(obj: &mut Packets, socket: &UdpSocket, max_wait_ms: u64) -> Result { let mut i = 0; //DOCUMENTED SIDE-EFFECT //Performance out of the IO without poll @@ -27,7 +27,7 @@ pub fn recv_from(obj: &mut Packets, socket: &UdpSocket, max_wait_ms: usize) -> R ); match recv_mmsg(socket, &mut obj.packets[i..]) { Err(_) if i > 0 => { - if start.elapsed().as_millis() > 1 { + if start.elapsed().as_millis() as u64 > max_wait_ms { break; } } @@ -43,7 +43,7 @@ pub fn recv_from(obj: &mut Packets, socket: &UdpSocket, max_wait_ms: usize) -> R i += npkts; // Try to batch into big enough buffers // will cause less re-shuffling later on. - if start.elapsed().as_millis() > max_wait_ms as u128 || i >= PACKETS_PER_BATCH { + if start.elapsed().as_millis() as u64 > max_wait_ms || i >= PACKETS_PER_BATCH { break; } } diff --git a/streamer/src/streamer.rs b/streamer/src/streamer.rs index a5d005383..946075d2f 100644 --- a/streamer/src/streamer.rs +++ b/streamer/src/streamer.rs @@ -36,6 +36,7 @@ fn recv_loop( channel: &PacketSender, recycler: &PacketsRecycler, name: &'static str, + coalesce_ms: u64, ) -> Result<()> { let mut recv_count = 0; let mut call_count = 0; @@ -52,7 +53,7 @@ fn recv_loop( if exit.load(Ordering::Relaxed) { return Ok(()); } - if let Ok(len) = packet::recv_from(&mut msgs, sock, 1) { + if let Ok(len) = packet::recv_from(&mut msgs, sock, coalesce_ms) { if len == NUM_RCVMMSGS { num_max_received += 1; } @@ -86,6 +87,7 @@ pub fn receiver( packet_sender: PacketSender, recycler: PacketsRecycler, name: &'static str, + coalesce_ms: u64, ) -> JoinHandle<()> { let res = sock.set_read_timeout(Some(Duration::new(1, 0))); if res.is_err() { @@ -96,7 +98,14 @@ pub fn receiver( .name("solana-receiver".to_string()) .spawn(move || { thread_mem_usage::datapoint(name); - let _ = recv_loop(&sock, exit, &packet_sender, &recycler.clone(), name); + let _ = recv_loop( + &sock, + exit, + &packet_sender, + &recycler.clone(), + name, + coalesce_ms, + ); }) .unwrap() } @@ -207,6 +216,7 @@ mod test { s_reader, Recycler::new_without_limit(""), "test", + 1, ); let t_responder = { let (s_responder, r_responder) = channel(); diff --git a/validator/src/main.rs b/validator/src/main.rs index 982f43657..aa388bde0 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -24,6 +24,7 @@ use solana_core::{ poh_service, rpc::JsonRpcConfig, rpc_pubsub_service::PubSubConfig, + tpu::DEFAULT_TPU_COALESCE_MS, validator::{is_snapshot_config_invalid, Validator, ValidatorConfig}, }; use solana_download_utils::{download_genesis_if_missing, download_snapshot}; @@ -1243,6 +1244,14 @@ pub fn main() { .takes_value(true) .help("Number of slots between compacting ledger"), ) + .arg( + Arg::with_name("tpu_coalesce_ms") + .long("tpu-coalesce-ms") + .value_name("MILLISECS") + .takes_value(true) + .validator(is_parsable::) + .help("Milliseconds to wait in the TPU receiver for packet coalescing."), + ) .arg( Arg::with_name("rocksdb_max_compaction_jitter") .long("rocksdb-max-compaction-jitter-slots") @@ -1509,6 +1518,8 @@ pub fn main() { let rocksdb_compaction_interval = value_t!(matches, "rocksdb_compaction_interval", u64).ok(); let rocksdb_max_compaction_jitter = value_t!(matches, "rocksdb_max_compaction_jitter", u64).ok(); + let tpu_coalesce_ms = + value_t!(matches, "tpu_coalesce_ms", u64).unwrap_or(DEFAULT_TPU_COALESCE_MS); let wal_recovery_mode = matches .value_of("wal_recovery_mode") .map(BlockstoreRecoveryMode::from); @@ -1663,6 +1674,7 @@ pub fn main() { accounts_db_caching_enabled: !matches.is_present("no_accounts_db_caching"), accounts_db_test_hash_calculation: matches.is_present("accounts_db_test_hash_calculation"), accounts_db_use_index_hash_calculation: !matches.is_present("no_accounts_db_index_hashing"), + tpu_coalesce_ms, ..ValidatorConfig::default() };