Increase tpu coalescing and add parameter (#15536)

Should create larger entries on average
This commit is contained in:
sakridge 2021-02-26 09:15:45 -08:00 committed by GitHub
parent 5a9896706c
commit 05409e51ce
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 46 additions and 6 deletions

View File

@ -91,6 +91,7 @@ fn main() -> Result<()> {
s_reader,
recycler.clone(),
"bench-streamer-test",
1,
));
}

View File

@ -26,6 +26,7 @@ impl FetchStage {
tpu_forwards_sockets: Vec<UdpSocket>,
exit: &Arc<AtomicBool>,
poh_recorder: &Arc<Mutex<PohRecorder>>,
coalesce_ms: u64,
) -> (Self, PacketReceiver) {
let (sender, receiver) = channel();
(
@ -36,6 +37,7 @@ impl FetchStage {
&sender,
&poh_recorder,
None,
coalesce_ms,
),
receiver,
)
@ -47,6 +49,7 @@ impl FetchStage {
sender: &PacketSender,
poh_recorder: &Arc<Mutex<PohRecorder>>,
allocated_packet_limit: Option<u32>,
coalesce_ms: u64,
) -> Self {
let tx_sockets = sockets.into_iter().map(Arc::new).collect();
let tpu_forwards_sockets = tpu_forwards_sockets.into_iter().map(Arc::new).collect();
@ -57,6 +60,7 @@ impl FetchStage {
&sender,
&poh_recorder,
allocated_packet_limit,
coalesce_ms,
)
}
@ -102,6 +106,7 @@ impl FetchStage {
sender: &PacketSender,
poh_recorder: &Arc<Mutex<PohRecorder>>,
limit: Option<u32>,
coalesce_ms: u64,
) -> Self {
let recycler: PacketsRecycler =
Recycler::warmed(1000, 1024, limit, "fetch_stage_recycler_shrink");
@ -113,6 +118,7 @@ impl FetchStage {
sender.clone(),
recycler.clone(),
"fetch_stage",
coalesce_ms,
)
});
@ -124,6 +130,7 @@ impl FetchStage {
forward_sender.clone(),
recycler.clone(),
"fetch_forward_stage",
coalesce_ms,
)
});

View File

@ -49,6 +49,7 @@ impl GossipService {
request_sender,
Recycler::new_without_limit("gossip-receiver-recycler-shrink-stats"),
"gossip_receiver",
1,
);
let (response_sender, response_receiver) = channel();
let t_responder = streamer::responder("gossip", gossip_socket, response_receiver);

View File

@ -32,6 +32,7 @@ impl ServeRepairService {
request_sender,
Recycler::new_without_limit("serve-repair-receiver-recycler-shrink-stats"),
"serve_repair_receiver",
1,
);
let (response_sender, response_receiver) = channel();
let t_responder =

View File

@ -149,6 +149,7 @@ impl ShredFetchStage {
packet_sender.clone(),
recycler.clone(),
"packet_modifier",
1,
)
})
.collect();

View File

@ -29,6 +29,8 @@ use std::{
thread,
};
pub const DEFAULT_TPU_COALESCE_MS: u64 = 5;
pub struct Tpu {
fetch_stage: FetchStage,
sigverify_stage: SigVerifyStage,
@ -59,6 +61,7 @@ impl Tpu {
replay_vote_receiver: ReplayVoteReceiver,
replay_vote_sender: ReplayVoteSender,
bank_notification_sender: Option<BankNotificationSender>,
tpu_coalesce_ms: u64,
) -> Self {
let (packet_sender, packet_receiver) = channel();
let fetch_stage = FetchStage::new_with_sender(
@ -70,6 +73,7 @@ impl Tpu {
// At 1024 packets per `Packet`, each packet about MTU size ~1k, this is roughly
// 20GB
Some(20_000),
tpu_coalesce_ms,
);
let (verified_sender, verified_receiver) = unbounded();

View File

@ -28,7 +28,7 @@ use crate::{
serve_repair_service::ServeRepairService,
sigverify,
snapshot_packager_service::{PendingSnapshotPackage, SnapshotPackagerService},
tpu::Tpu,
tpu::{Tpu, DEFAULT_TPU_COALESCE_MS},
transaction_status_service::TransactionStatusService,
tvu::{Sockets, Tvu, TvuConfig},
};
@ -126,6 +126,7 @@ pub struct ValidatorConfig {
pub warp_slot: Option<Slot>,
pub accounts_db_test_hash_calculation: bool,
pub accounts_db_use_index_hash_calculation: bool,
pub tpu_coalesce_ms: u64,
}
impl Default for ValidatorConfig {
@ -177,6 +178,7 @@ impl Default for ValidatorConfig {
warp_slot: None,
accounts_db_test_hash_calculation: false,
accounts_db_use_index_hash_calculation: true,
tpu_coalesce_ms: DEFAULT_TPU_COALESCE_MS,
}
}
}
@ -681,6 +683,7 @@ impl Validator {
replay_vote_receiver,
replay_vote_sender,
bank_notification_sender,
config.tpu_coalesce_ms,
);
datapoint_info!("validator-new", ("id", id.to_string(), String));

View File

@ -9,7 +9,7 @@ use solana_metrics::inc_new_counter_debug;
pub use solana_sdk::packet::{Meta, Packet, PACKET_DATA_SIZE};
use std::{io::Result, net::UdpSocket, time::Instant};
pub fn recv_from(obj: &mut Packets, socket: &UdpSocket, max_wait_ms: usize) -> Result<usize> {
pub fn recv_from(obj: &mut Packets, socket: &UdpSocket, max_wait_ms: u64) -> Result<usize> {
let mut i = 0;
//DOCUMENTED SIDE-EFFECT
//Performance out of the IO without poll
@ -27,7 +27,7 @@ pub fn recv_from(obj: &mut Packets, socket: &UdpSocket, max_wait_ms: usize) -> R
);
match recv_mmsg(socket, &mut obj.packets[i..]) {
Err(_) if i > 0 => {
if start.elapsed().as_millis() > 1 {
if start.elapsed().as_millis() as u64 > max_wait_ms {
break;
}
}
@ -43,7 +43,7 @@ pub fn recv_from(obj: &mut Packets, socket: &UdpSocket, max_wait_ms: usize) -> R
i += npkts;
// Try to batch into big enough buffers
// will cause less re-shuffling later on.
if start.elapsed().as_millis() > max_wait_ms as u128 || i >= PACKETS_PER_BATCH {
if start.elapsed().as_millis() as u64 > max_wait_ms || i >= PACKETS_PER_BATCH {
break;
}
}

View File

@ -36,6 +36,7 @@ fn recv_loop(
channel: &PacketSender,
recycler: &PacketsRecycler,
name: &'static str,
coalesce_ms: u64,
) -> Result<()> {
let mut recv_count = 0;
let mut call_count = 0;
@ -52,7 +53,7 @@ fn recv_loop(
if exit.load(Ordering::Relaxed) {
return Ok(());
}
if let Ok(len) = packet::recv_from(&mut msgs, sock, 1) {
if let Ok(len) = packet::recv_from(&mut msgs, sock, coalesce_ms) {
if len == NUM_RCVMMSGS {
num_max_received += 1;
}
@ -86,6 +87,7 @@ pub fn receiver(
packet_sender: PacketSender,
recycler: PacketsRecycler,
name: &'static str,
coalesce_ms: u64,
) -> JoinHandle<()> {
let res = sock.set_read_timeout(Some(Duration::new(1, 0)));
if res.is_err() {
@ -96,7 +98,14 @@ pub fn receiver(
.name("solana-receiver".to_string())
.spawn(move || {
thread_mem_usage::datapoint(name);
let _ = recv_loop(&sock, exit, &packet_sender, &recycler.clone(), name);
let _ = recv_loop(
&sock,
exit,
&packet_sender,
&recycler.clone(),
name,
coalesce_ms,
);
})
.unwrap()
}
@ -207,6 +216,7 @@ mod test {
s_reader,
Recycler::new_without_limit(""),
"test",
1,
);
let t_responder = {
let (s_responder, r_responder) = channel();

View File

@ -24,6 +24,7 @@ use solana_core::{
poh_service,
rpc::JsonRpcConfig,
rpc_pubsub_service::PubSubConfig,
tpu::DEFAULT_TPU_COALESCE_MS,
validator::{is_snapshot_config_invalid, Validator, ValidatorConfig},
};
use solana_download_utils::{download_genesis_if_missing, download_snapshot};
@ -1243,6 +1244,14 @@ pub fn main() {
.takes_value(true)
.help("Number of slots between compacting ledger"),
)
.arg(
Arg::with_name("tpu_coalesce_ms")
.long("tpu-coalesce-ms")
.value_name("MILLISECS")
.takes_value(true)
.validator(is_parsable::<u64>)
.help("Milliseconds to wait in the TPU receiver for packet coalescing."),
)
.arg(
Arg::with_name("rocksdb_max_compaction_jitter")
.long("rocksdb-max-compaction-jitter-slots")
@ -1509,6 +1518,8 @@ pub fn main() {
let rocksdb_compaction_interval = value_t!(matches, "rocksdb_compaction_interval", u64).ok();
let rocksdb_max_compaction_jitter =
value_t!(matches, "rocksdb_max_compaction_jitter", u64).ok();
let tpu_coalesce_ms =
value_t!(matches, "tpu_coalesce_ms", u64).unwrap_or(DEFAULT_TPU_COALESCE_MS);
let wal_recovery_mode = matches
.value_of("wal_recovery_mode")
.map(BlockstoreRecoveryMode::from);
@ -1663,6 +1674,7 @@ pub fn main() {
accounts_db_caching_enabled: !matches.is_present("no_accounts_db_caching"),
accounts_db_test_hash_calculation: matches.is_present("accounts_db_test_hash_calculation"),
accounts_db_use_index_hash_calculation: !matches.is_present("no_accounts_db_index_hashing"),
tpu_coalesce_ms,
..ValidatorConfig::default()
};