From ff9a42a3541b735763e94ebd3a554f61c3eaa439 Mon Sep 17 00:00:00 2001 From: behzad nouri Date: Fri, 31 Mar 2023 15:42:49 +0000 Subject: [PATCH] uses Duration type instead of untyped ..._ms: u64 (#30971) --- bench-streamer/src/main.rs | 2 +- client/src/connection_cache.rs | 8 +- core/src/ancestor_hashes_service.rs | 4 +- core/src/fetch_stage.rs | 16 +-- core/src/serve_repair_service.rs | 3 +- core/src/shred_fetch_stage.rs | 2 +- core/src/tpu.rs | 17 +-- core/src/validator.rs | 8 +- gossip/src/gossip_service.rs | 2 +- local-cluster/src/validator_configs.rs | 2 +- quic-client/src/nonblocking/quic_client.rs | 29 ++--- quic-client/src/quic_client.rs | 14 +-- quic-client/tests/quic_client.rs | 20 ++-- sdk/src/net.rs | 4 +- sdk/src/quic.rs | 7 +- streamer/src/nonblocking/quic.rs | 127 +++++++++------------ streamer/src/packet.rs | 29 +++-- streamer/src/quic.rs | 32 +++--- streamer/src/streamer.rs | 10 +- validator/src/main.rs | 9 +- 20 files changed, 165 insertions(+), 180 deletions(-) diff --git a/bench-streamer/src/main.rs b/bench-streamer/src/main.rs index 3113ea7b1c..1cf1951049 100644 --- a/bench-streamer/src/main.rs +++ b/bench-streamer/src/main.rs @@ -113,7 +113,7 @@ fn main() -> Result<()> { s_reader, recycler.clone(), stats.clone(), - 1, + Duration::from_millis(1), // coalesce true, None, )); diff --git a/client/src/connection_cache.rs b/client/src/connection_cache.rs index ce1a8b022d..680300a39e 100644 --- a/client/src/connection_cache.rs +++ b/client/src/connection_cache.rs @@ -204,9 +204,9 @@ mod tests { super::*, crate::connection_cache::ConnectionCache, crossbeam_channel::unbounded, - solana_sdk::{net::DEFAULT_TPU_COALESCE_MS, quic::QUIC_PORT_OFFSET, signature::Keypair}, + solana_sdk::{net::DEFAULT_TPU_COALESCE, quic::QUIC_PORT_OFFSET, signature::Keypair}, solana_streamer::{ - nonblocking::quic::DEFAULT_WAIT_FOR_CHUNK_TIMEOUT_MS, quic::StreamStats, + nonblocking::quic::DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, quic::StreamStats, streamer::StakedNodes, }, std::{ @@ -262,8 +262,8 @@ mod tests { 10, 10, response_recv_stats, - DEFAULT_WAIT_FOR_CHUNK_TIMEOUT_MS, - DEFAULT_TPU_COALESCE_MS, + DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, + DEFAULT_TPU_COALESCE, ) .unwrap(); diff --git a/core/src/ancestor_hashes_service.rs b/core/src/ancestor_hashes_service.rs index c571012bc9..bf7003bea5 100644 --- a/core/src/ancestor_hashes_service.rs +++ b/core/src/ancestor_hashes_service.rs @@ -159,7 +159,7 @@ impl AncestorHashesService { Arc::new(StreamerReceiveStats::new( "ancestor_hashes_response_receiver", )), - 1, + Duration::from_millis(1), // coalesce false, None, ); @@ -1008,7 +1008,7 @@ mod test { Arc::new(StreamerReceiveStats::new( "ancestor_hashes_response_receiver", )), - 1, + Duration::from_millis(1), // coalesce false, None, ); diff --git a/core/src/fetch_stage.rs b/core/src/fetch_stage.rs index e5e2933fa2..85622f1e7e 100644 --- a/core/src/fetch_stage.rs +++ b/core/src/fetch_stage.rs @@ -37,7 +37,7 @@ impl FetchStage { tpu_vote_sockets: Vec, exit: &Arc, poh_recorder: &Arc>, - coalesce_ms: u64, + coalesce: Duration, ) -> (Self, PacketBatchReceiver, PacketBatchReceiver) { let (sender, receiver) = unbounded(); let (vote_sender, vote_receiver) = unbounded(); @@ -53,7 +53,7 @@ impl FetchStage { &forward_sender, forward_receiver, poh_recorder, - coalesce_ms, + coalesce, None, DEFAULT_TPU_ENABLE_UDP, ), @@ -73,7 +73,7 @@ impl FetchStage { forward_sender: &PacketBatchSender, forward_receiver: PacketBatchReceiver, poh_recorder: &Arc>, - coalesce_ms: u64, + coalesce: Duration, in_vote_only_mode: Option>, tpu_enable_udp: bool, ) -> Self { @@ -90,7 +90,7 @@ impl FetchStage { forward_sender, forward_receiver, poh_recorder, - coalesce_ms, + coalesce, in_vote_only_mode, tpu_enable_udp, ) @@ -149,7 +149,7 @@ impl FetchStage { forward_sender: &PacketBatchSender, forward_receiver: PacketBatchReceiver, poh_recorder: &Arc>, - coalesce_ms: u64, + coalesce: Duration, in_vote_only_mode: Option>, tpu_enable_udp: bool, ) -> Self { @@ -167,7 +167,7 @@ impl FetchStage { sender.clone(), recycler.clone(), tpu_stats.clone(), - coalesce_ms, + coalesce, true, in_vote_only_mode.clone(), ) @@ -188,7 +188,7 @@ impl FetchStage { forward_sender.clone(), recycler.clone(), tpu_forward_stats.clone(), - coalesce_ms, + coalesce, true, in_vote_only_mode.clone(), ) @@ -208,7 +208,7 @@ impl FetchStage { vote_sender.clone(), recycler.clone(), tpu_vote_stats.clone(), - coalesce_ms, + coalesce, true, None, ) diff --git a/core/src/serve_repair_service.rs b/core/src/serve_repair_service.rs index 144de5c2a9..44c6326631 100644 --- a/core/src/serve_repair_service.rs +++ b/core/src/serve_repair_service.rs @@ -11,6 +11,7 @@ use { net::UdpSocket, sync::{atomic::AtomicBool, Arc}, thread::{self, JoinHandle}, + time::Duration, }, }; @@ -40,7 +41,7 @@ impl ServeRepairService { request_sender, Recycler::default(), Arc::new(StreamerReceiveStats::new("serve_repair_receiver")), - 1, + Duration::from_millis(1), // coalesce false, None, ); diff --git a/core/src/shred_fetch_stage.rs b/core/src/shred_fetch_stage.rs index a9965ea231..7fba33d15b 100644 --- a/core/src/shred_fetch_stage.rs +++ b/core/src/shred_fetch_stage.rs @@ -134,7 +134,7 @@ impl ShredFetchStage { packet_sender.clone(), recycler.clone(), Arc::new(StreamerReceiveStats::new("packet_modifier")), - 1, + Duration::from_millis(1), // coalesce true, None, ) diff --git a/core/src/tpu.rs b/core/src/tpu.rs index ff792d91f0..02c4705efb 100644 --- a/core/src/tpu.rs +++ b/core/src/tpu.rs @@ -1,7 +1,7 @@ //! The `tpu` module implements the Transaction Processing Unit, a //! multi-stage transaction processing pipeline in software. -pub use solana_sdk::net::DEFAULT_TPU_COALESCE_MS; +pub use solana_sdk::net::DEFAULT_TPU_COALESCE; use { crate::{ banking_stage::BankingStage, @@ -33,7 +33,7 @@ use { }, solana_sdk::{pubkey::Pubkey, signature::Keypair}, solana_streamer::{ - nonblocking::quic::DEFAULT_WAIT_FOR_CHUNK_TIMEOUT_MS, + nonblocking::quic::DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, quic::{spawn_server, StreamStats, MAX_STAKED_CONNECTIONS, MAX_UNSTAKED_CONNECTIONS}, streamer::StakedNodes, }, @@ -42,6 +42,7 @@ use { net::UdpSocket, sync::{atomic::AtomicBool, Arc, RwLock}, thread, + time::Duration, }, }; @@ -93,7 +94,7 @@ impl Tpu { replay_vote_receiver: ReplayVoteReceiver, replay_vote_sender: ReplayVoteSender, bank_notification_sender: Option, - tpu_coalesce_ms: u64, + tpu_coalesce: Duration, cluster_confirmed_slot_sender: GossipDuplicateConfirmedSlotsSender, connection_cache: &Arc, keypair: &Keypair, @@ -127,7 +128,7 @@ impl Tpu { &forwarded_packet_sender, forwarded_packet_receiver, poh_recorder, - tpu_coalesce_ms, + tpu_coalesce, Some(bank_forks.read().unwrap().get_vote_only_mode_signal()), tpu_enable_udp, ); @@ -177,8 +178,8 @@ impl Tpu { MAX_STAKED_CONNECTIONS, MAX_UNSTAKED_CONNECTIONS, stats.clone(), - DEFAULT_WAIT_FOR_CHUNK_TIMEOUT_MS, - tpu_coalesce_ms, + DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, + tpu_coalesce, ) .unwrap(); @@ -197,8 +198,8 @@ impl Tpu { MAX_STAKED_CONNECTIONS.saturating_add(MAX_UNSTAKED_CONNECTIONS), 0, // Prevent unstaked nodes from forwarding transactions stats, - DEFAULT_WAIT_FOR_CHUNK_TIMEOUT_MS, - tpu_coalesce_ms, + DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, + tpu_coalesce, ) .unwrap(); diff --git a/core/src/validator.rs b/core/src/validator.rs index eb9b85f6ba..cd48735b91 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -24,7 +24,7 @@ use { verify_net_stats_access, SystemMonitorService, SystemMonitorStatsReportConfig, }, tower_storage::TowerStorage, - tpu::{Tpu, TpuSockets, DEFAULT_TPU_COALESCE_MS}, + tpu::{Tpu, TpuSockets, DEFAULT_TPU_COALESCE}, tvu::{Tvu, TvuConfig, TvuSockets}, }, crossbeam_channel::{bounded, unbounded, Receiver}, @@ -225,7 +225,7 @@ pub struct ValidatorConfig { pub warp_slot: Option, pub accounts_db_test_hash_calculation: bool, pub accounts_db_skip_shrink: bool, - pub tpu_coalesce_ms: u64, + pub tpu_coalesce: Duration, pub staked_nodes_overrides: Arc>>, pub validator_exit: Arc>, pub no_wait_for_vote_to_start_leader: bool, @@ -290,7 +290,7 @@ impl Default for ValidatorConfig { warp_slot: None, accounts_db_test_hash_calculation: false, accounts_db_skip_shrink: false, - tpu_coalesce_ms: DEFAULT_TPU_COALESCE_MS, + tpu_coalesce: DEFAULT_TPU_COALESCE, staked_nodes_overrides: Arc::new(RwLock::new(HashMap::new())), validator_exit: Arc::new(RwLock::new(Exit::default())), no_wait_for_vote_to_start_leader: true, @@ -1146,7 +1146,7 @@ impl Validator { replay_vote_receiver, replay_vote_sender, bank_notification_sender, - config.tpu_coalesce_ms, + config.tpu_coalesce, cluster_confirmed_slot_sender, &connection_cache, &identity_keypair, diff --git a/gossip/src/gossip_service.rs b/gossip/src/gossip_service.rs index bff493d20b..65f4f5e1a4 100644 --- a/gossip/src/gossip_service.rs +++ b/gossip/src/gossip_service.rs @@ -55,7 +55,7 @@ impl GossipService { request_sender, Recycler::default(), Arc::new(StreamerReceiveStats::new("gossip_receiver")), - 1, + Duration::from_millis(1), // coalesce false, None, ); diff --git a/local-cluster/src/validator_configs.rs b/local-cluster/src/validator_configs.rs index 5d5ba06087..96d8e77cfa 100644 --- a/local-cluster/src/validator_configs.rs +++ b/local-cluster/src/validator_configs.rs @@ -53,7 +53,7 @@ pub fn safe_clone_config(config: &ValidatorConfig) -> ValidatorConfig { warp_slot: config.warp_slot, accounts_db_test_hash_calculation: config.accounts_db_test_hash_calculation, accounts_db_skip_shrink: config.accounts_db_skip_shrink, - tpu_coalesce_ms: config.tpu_coalesce_ms, + tpu_coalesce: config.tpu_coalesce, staked_nodes_overrides: config.staked_nodes_overrides.clone(), validator_exit: Arc::new(RwLock::new(Exit::default())), poh_hashes_per_batch: config.poh_hashes_per_batch, diff --git a/quic-client/src/nonblocking/quic_client.rs b/quic-client/src/nonblocking/quic_client.rs index 231b641bbb..4cd6635563 100644 --- a/quic-client/src/nonblocking/quic_client.rs +++ b/quic-client/src/nonblocking/quic_client.rs @@ -9,7 +9,7 @@ use { log::*, quinn::{ ClientConfig, ConnectError, Connection, ConnectionError, Endpoint, EndpointConfig, - IdleTimeout, TokioRuntime, TransportConfig, VarInt, WriteError, + IdleTimeout, TokioRuntime, TransportConfig, WriteError, }, solana_connection_cache::{ client_connection::ClientStats, connection_cache_stats::ConnectionCacheStats, @@ -20,7 +20,7 @@ use { solana_rpc_client_api::client_error::ErrorKind as ClientErrorKind, solana_sdk::{ quic::{ - QUIC_CONNECTION_HANDSHAKE_TIMEOUT_MS, QUIC_KEEP_ALIVE_MS, QUIC_MAX_TIMEOUT_MS, + QUIC_CONNECTION_HANDSHAKE_TIMEOUT, QUIC_KEEP_ALIVE, QUIC_MAX_TIMEOUT, QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS, }, signature::Keypair, @@ -33,7 +33,6 @@ use { net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket}, sync::{atomic::Ordering, Arc}, thread, - time::Duration, }, thiserror::Error, tokio::{sync::RwLock, time::timeout}, @@ -129,9 +128,9 @@ impl QuicLazyInitializedEndpoint { let mut config = ClientConfig::new(Arc::new(crypto)); let mut transport_config = TransportConfig::default(); - let timeout = IdleTimeout::from(VarInt::from_u32(QUIC_MAX_TIMEOUT_MS)); + let timeout = IdleTimeout::try_from(QUIC_MAX_TIMEOUT).unwrap(); transport_config.max_idle_timeout(Some(timeout)); - transport_config.keep_alive_interval(Some(Duration::from_millis(QUIC_KEEP_ALIVE_MS))); + transport_config.keep_alive_interval(Some(QUIC_KEEP_ALIVE)); config.transport_config(Arc::new(transport_config)); endpoint.set_default_client_config(config); @@ -198,11 +197,7 @@ impl QuicNewConnection { let connecting = endpoint.connect(addr, "connect")?; stats.total_connections.fetch_add(1, Ordering::Relaxed); - if let Ok(connecting_result) = timeout( - Duration::from_millis(QUIC_CONNECTION_HANDSHAKE_TIMEOUT_MS), - connecting, - ) - .await + if let Ok(connecting_result) = timeout(QUIC_CONNECTION_HANDSHAKE_TIMEOUT, connecting).await { if connecting_result.is_err() { stats.connection_errors.fetch_add(1, Ordering::Relaxed); @@ -239,12 +234,7 @@ impl QuicNewConnection { stats.total_connections.fetch_add(1, Ordering::Relaxed); let connection = match connecting.into_0rtt() { Ok((connection, zero_rtt)) => { - if let Ok(zero_rtt) = timeout( - Duration::from_millis(QUIC_CONNECTION_HANDSHAKE_TIMEOUT_MS), - zero_rtt, - ) - .await - { + if let Ok(zero_rtt) = timeout(QUIC_CONNECTION_HANDSHAKE_TIMEOUT, zero_rtt).await { if zero_rtt { stats.zero_rtt_accepts.fetch_add(1, Ordering::Relaxed); } else { @@ -258,11 +248,8 @@ impl QuicNewConnection { Err(connecting) => { stats.connection_errors.fetch_add(1, Ordering::Relaxed); - if let Ok(connecting_result) = timeout( - Duration::from_millis(QUIC_CONNECTION_HANDSHAKE_TIMEOUT_MS), - connecting, - ) - .await + if let Ok(connecting_result) = + timeout(QUIC_CONNECTION_HANDSHAKE_TIMEOUT, connecting).await { connecting_result? } else { diff --git a/quic-client/src/quic_client.rs b/quic-client/src/quic_client.rs index 39726deb1f..1ef85e83f2 100644 --- a/quic-client/src/quic_client.rs +++ b/quic-client/src/quic_client.rs @@ -22,7 +22,7 @@ use { }; pub const MAX_OUTSTANDING_TASK: u64 = 2000; -pub const SEND_DATA_TIMEOUT_MS: u64 = 10000; +const SEND_DATA_TIMEOUT: Duration = Duration::from_secs(10); /// A semaphore used for limiting the number of asynchronous tasks spawn to the /// runtime. Before spawnning a task, use acquire. After the task is done (be it @@ -79,11 +79,7 @@ async fn send_data_async( connection: Arc, buffer: Vec, ) -> TransportResult<()> { - let result = timeout( - Duration::from_millis(SEND_DATA_TIMEOUT_MS), - connection.send_data(&buffer), - ) - .await; + let result = timeout(SEND_DATA_TIMEOUT, connection.send_data(&buffer)).await; ASYNC_TASK_SEMAPHORE.release(); handle_send_result(result, connection) } @@ -92,10 +88,10 @@ async fn send_data_batch_async( connection: Arc, buffers: Vec>, ) -> TransportResult<()> { - let time_out = SEND_DATA_TIMEOUT_MS * buffers.len() as u64; - let result = timeout( - Duration::from_millis(time_out), + u32::try_from(buffers.len()) + .map(|size| SEND_DATA_TIMEOUT.saturating_mul(size)) + .unwrap_or(Duration::MAX), connection.send_data_batch(&buffers), ) .await; diff --git a/quic-client/tests/quic_client.rs b/quic-client/tests/quic_client.rs index 968e5d9214..09dbbdfbf9 100644 --- a/quic-client/tests/quic_client.rs +++ b/quic-client/tests/quic_client.rs @@ -8,9 +8,9 @@ mod tests { solana_quic_client::nonblocking::quic_client::{ QuicClientCertificate, QuicLazyInitializedEndpoint, }, - solana_sdk::{net::DEFAULT_TPU_COALESCE_MS, packet::PACKET_DATA_SIZE, signature::Keypair}, + solana_sdk::{net::DEFAULT_TPU_COALESCE, packet::PACKET_DATA_SIZE, signature::Keypair}, solana_streamer::{ - nonblocking::quic::DEFAULT_WAIT_FOR_CHUNK_TIMEOUT_MS, quic::StreamStats, + nonblocking::quic::DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, quic::StreamStats, streamer::StakedNodes, tls_certificates::new_self_signed_tls_certificate, }, std::{ @@ -86,8 +86,8 @@ mod tests { 10, 10, stats, - DEFAULT_WAIT_FOR_CHUNK_TIMEOUT_MS, - DEFAULT_TPU_COALESCE_MS, + DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, + DEFAULT_TPU_COALESCE, ) .unwrap(); @@ -166,8 +166,8 @@ mod tests { 10, 10, stats, - 1000, - DEFAULT_TPU_COALESCE_MS, + Duration::from_secs(1), // wait_for_chunk_timeout + DEFAULT_TPU_COALESCE, ) .unwrap(); @@ -223,8 +223,8 @@ mod tests { 10, 10, request_recv_stats, - DEFAULT_WAIT_FOR_CHUNK_TIMEOUT_MS, - DEFAULT_TPU_COALESCE_MS, + DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, + DEFAULT_TPU_COALESCE, ) .unwrap(); @@ -253,8 +253,8 @@ mod tests { 10, 10, response_recv_stats, - DEFAULT_WAIT_FOR_CHUNK_TIMEOUT_MS, - DEFAULT_TPU_COALESCE_MS, + DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, + DEFAULT_TPU_COALESCE, ) .unwrap(); diff --git a/sdk/src/net.rs b/sdk/src/net.rs index 1b67b4ec35..c901c6765b 100644 --- a/sdk/src/net.rs +++ b/sdk/src/net.rs @@ -1 +1,3 @@ -pub const DEFAULT_TPU_COALESCE_MS: u64 = 5; +use std::time::Duration; + +pub const DEFAULT_TPU_COALESCE: Duration = Duration::from_millis(5); diff --git a/sdk/src/quic.rs b/sdk/src/quic.rs index b39b7e7001..d7bfc201cb 100644 --- a/sdk/src/quic.rs +++ b/sdk/src/quic.rs @@ -1,4 +1,5 @@ //! Definitions related to Solana over QUIC. +use std::time::Duration; pub const QUIC_PORT_OFFSET: u16 = 6; // Empirically found max number of concurrent streams @@ -12,13 +13,13 @@ pub const QUIC_TOTAL_STAKED_CONCURRENT_STREAMS: usize = 100_000; // Set the maximum concurrent stream numbers to avoid excessive streams pub const QUIC_MAX_STAKED_CONCURRENT_STREAMS: usize = 2048; -pub const QUIC_MAX_TIMEOUT_MS: u32 = 2_000; -pub const QUIC_KEEP_ALIVE_MS: u64 = 1_000; +pub const QUIC_MAX_TIMEOUT: Duration = Duration::from_secs(2); +pub const QUIC_KEEP_ALIVE: Duration = Duration::from_secs(1); // Based on commonly-used handshake timeouts for various TCP // applications. Different applications vary, but most seem to // be in the 30-60 second range -pub const QUIC_CONNECTION_HANDSHAKE_TIMEOUT_MS: u64 = 60_000; +pub const QUIC_CONNECTION_HANDSHAKE_TIMEOUT: Duration = Duration::from_secs(60); /// The receive window for QUIC connection from unstaked nodes is /// set to this ratio times [`solana_sdk::packet::PACKET_DATA_SIZE`] diff --git a/streamer/src/nonblocking/quic.rs b/streamer/src/nonblocking/quic.rs index 6308108c31..3e56113302 100644 --- a/streamer/src/nonblocking/quic.rs +++ b/streamer/src/nonblocking/quic.rs @@ -19,7 +19,7 @@ use { packet::{Meta, PACKET_DATA_SIZE}, pubkey::Pubkey, quic::{ - QUIC_CONNECTION_HANDSHAKE_TIMEOUT_MS, QUIC_MAX_STAKED_CONCURRENT_STREAMS, + QUIC_CONNECTION_HANDSHAKE_TIMEOUT, QUIC_MAX_STAKED_CONCURRENT_STREAMS, QUIC_MAX_STAKED_RECEIVE_WINDOW_RATIO, QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS, QUIC_MIN_STAKED_CONCURRENT_STREAMS, QUIC_MIN_STAKED_RECEIVE_WINDOW_RATIO, QUIC_TOTAL_STAKED_CONCURRENT_STREAMS, QUIC_UNSTAKED_RECEIVE_WINDOW_RATIO, @@ -41,8 +41,8 @@ use { }, }; -const WAIT_FOR_STREAM_TIMEOUT_MS: u64 = 100; -pub const DEFAULT_WAIT_FOR_CHUNK_TIMEOUT_MS: u64 = 10000; +const WAIT_FOR_STREAM_TIMEOUT: Duration = Duration::from_millis(100); +pub const DEFAULT_WAIT_FOR_CHUNK_TIMEOUT: Duration = Duration::from_secs(10); pub const ALPN_TPU_PROTOCOL_ID: &[u8] = b"solana-tpu"; @@ -94,8 +94,8 @@ pub fn spawn_server( max_staked_connections: usize, max_unstaked_connections: usize, stats: Arc, - wait_for_chunk_timeout_ms: u64, - coalesce_ms: u64, + wait_for_chunk_timeout: Duration, + coalesce: Duration, ) -> Result<(Endpoint, JoinHandle<()>), QuicServerError> { info!("Start quic server on {:?}", sock); let (config, _cert) = configure_server(keypair, gossip_host)?; @@ -114,8 +114,8 @@ pub fn spawn_server( max_staked_connections, max_unstaked_connections, stats, - wait_for_chunk_timeout_ms, - coalesce_ms, + wait_for_chunk_timeout, + coalesce, )); Ok((endpoint, handle)) } @@ -130,9 +130,11 @@ pub async fn run_server( max_staked_connections: usize, max_unstaked_connections: usize, stats: Arc, - wait_for_chunk_timeout_ms: u64, - coalesce_ms: u64, + wait_for_chunk_timeout: Duration, + coalesce: Duration, ) { + const WAIT_FOR_CONNECTION_TIMEOUT: Duration = Duration::from_secs(1); + const WAIT_BETWEEN_NEW_CONNECTIONS: Duration = Duration::from_millis(1); debug!("spawn quic server"); let mut last_datapoint = Instant::now(); let unstaked_connection_table: Arc> = Arc::new(Mutex::new( @@ -146,16 +148,10 @@ pub async fn run_server( receiver, exit.clone(), stats.clone(), - coalesce_ms, + coalesce, )); while !exit.load(Ordering::Relaxed) { - const WAIT_FOR_CONNECTION_TIMEOUT_MS: u64 = 1000; - const WAIT_BETWEEN_NEW_CONNECTIONS_US: u64 = 1000; - let timeout_connection = timeout( - Duration::from_millis(WAIT_FOR_CONNECTION_TIMEOUT_MS), - incoming.accept(), - ) - .await; + let timeout_connection = timeout(WAIT_FOR_CONNECTION_TIMEOUT, incoming.accept()).await; if last_datapoint.elapsed().as_secs() >= 5 { stats.report(); @@ -174,9 +170,9 @@ pub async fn run_server( max_staked_connections, max_unstaked_connections, stats.clone(), - wait_for_chunk_timeout_ms, + wait_for_chunk_timeout, )); - sleep(Duration::from_micros(WAIT_BETWEEN_NEW_CONNECTIONS_US)).await; + sleep(WAIT_BETWEEN_NEW_CONNECTIONS).await; } else { debug!("accept(): Timed out waiting for connection"); } @@ -303,7 +299,7 @@ fn handle_and_cache_new_connection( mut connection_table_l: MutexGuard, connection_table: Arc>, params: &NewConnectionHandlerParams, - wait_for_chunk_timeout_ms: u64, + wait_for_chunk_timeout: Duration, ) -> Result<(), ConnectionHandlerError> { if let Ok(max_uni_streams) = VarInt::from_u64(compute_max_allowed_uni_streams( connection_table_l.peer_type, @@ -356,7 +352,7 @@ fn handle_and_cache_new_connection( params.stats.clone(), params.stake, peer_type, - wait_for_chunk_timeout_ms, + wait_for_chunk_timeout, )); Ok(()) } else { @@ -385,7 +381,7 @@ fn prune_unstaked_connections_and_add_new_connection( connection_table: Arc>, max_connections: usize, params: &NewConnectionHandlerParams, - wait_for_chunk_timeout_ms: u64, + wait_for_chunk_timeout: Duration, ) -> Result<(), ConnectionHandlerError> { let stats = params.stats.clone(); if max_connections > 0 { @@ -395,7 +391,7 @@ fn prune_unstaked_connections_and_add_new_connection( connection_table_l, connection_table, params, - wait_for_chunk_timeout_ms, + wait_for_chunk_timeout, ) } else { connection.close( @@ -461,14 +457,9 @@ async fn setup_connection( max_staked_connections: usize, max_unstaked_connections: usize, stats: Arc, - wait_for_chunk_timeout_ms: u64, + wait_for_chunk_timeout: Duration, ) { - if let Ok(connecting_result) = timeout( - Duration::from_millis(QUIC_CONNECTION_HANDSHAKE_TIMEOUT_MS), - connecting, - ) - .await - { + if let Ok(connecting_result) = timeout(QUIC_CONNECTION_HANDSHAKE_TIMEOUT, connecting).await { if let Ok(new_connection) = connecting_result { stats.total_new_connections.fetch_add(1, Ordering::Relaxed); @@ -503,7 +494,7 @@ async fn setup_connection( connection_table_l, staked_connection_table.clone(), ¶ms, - wait_for_chunk_timeout_ms, + wait_for_chunk_timeout, ) { stats .connection_added_from_staked_peer @@ -519,7 +510,7 @@ async fn setup_connection( unstaked_connection_table.clone(), max_unstaked_connections, ¶ms, - wait_for_chunk_timeout_ms, + wait_for_chunk_timeout, ) { stats .connection_added_from_staked_peer @@ -539,7 +530,7 @@ async fn setup_connection( unstaked_connection_table.clone(), max_unstaked_connections, ¶ms, - wait_for_chunk_timeout_ms, + wait_for_chunk_timeout, ) { stats .connection_added_from_unstaked_peer @@ -564,10 +555,9 @@ async fn packet_batch_sender( packet_receiver: AsyncReceiver, exit: Arc, stats: Arc, - coalesce_ms: u64, + coalesce: Duration, ) { trace!("enter packet_batch_sender"); - let coalesce_ms = coalesce_ms as u128; let mut batch_start_time = Instant::now(); loop { let mut packet_batch = PacketBatch::with_capacity(PACKETS_PER_BATCH); @@ -586,7 +576,7 @@ async fn packet_batch_sender( } let elapsed = batch_start_time.elapsed(); if packet_batch.len() >= PACKETS_PER_BATCH - || (!packet_batch.is_empty() && elapsed.as_millis() >= coalesce_ms) + || (!packet_batch.is_empty() && elapsed >= coalesce) { let len = packet_batch.len(); if let Err(e) = packet_sender.send(packet_batch) { @@ -654,7 +644,7 @@ async fn handle_connection( stats: Arc, stake: u64, peer_type: ConnectionPeerType, - wait_for_chunk_timeout_ms: u64, + wait_for_chunk_timeout: Duration, ) { debug!( "quic new connection {} streams: {} connections: {}", @@ -665,11 +655,8 @@ async fn handle_connection( let stable_id = connection.stable_id(); stats.total_connections.fetch_add(1, Ordering::Relaxed); while !stream_exit.load(Ordering::Relaxed) { - if let Ok(stream) = tokio::time::timeout( - Duration::from_millis(WAIT_FOR_STREAM_TIMEOUT_MS), - connection.accept_uni(), - ) - .await + if let Ok(stream) = + tokio::time::timeout(WAIT_FOR_STREAM_TIMEOUT, connection.accept_uni()).await { match stream { Ok(mut stream) => { @@ -686,11 +673,12 @@ async fn handle_connection( // which delay exit and cause some test failures when the timeout value is large. // Within this value, the heuristic is to wake up 10 times to check for exit // for the set timeout if there are no data. - let exit_check_interval = (wait_for_chunk_timeout_ms / 10).clamp(10, 1000); + let exit_check_interval = (wait_for_chunk_timeout / 10) + .clamp(Duration::from_millis(10), Duration::from_secs(1)); let mut start = Instant::now(); while !stream_exit.load(Ordering::Relaxed) { if let Ok(chunk) = tokio::time::timeout( - Duration::from_millis(exit_check_interval), + exit_check_interval, stream.read_chunk(PACKET_DATA_SIZE, false), ) .await @@ -710,15 +698,12 @@ async fn handle_connection( break; } start = Instant::now(); - } else { - let elapse = Instant::now() - start; - if elapse.as_millis() as u64 > wait_for_chunk_timeout_ms { - debug!("Timeout in receiving on stream"); - stats - .total_stream_read_timeouts - .fetch_add(1, Ordering::Relaxed); - break; - } + } else if start.elapsed() > wait_for_chunk_timeout { + debug!("Timeout in receiving on stream"); + stats + .total_stream_read_timeouts + .fetch_add(1, Ordering::Relaxed); + break; } } stats.total_streams.fetch_sub(1, Ordering::Relaxed); @@ -1090,10 +1075,10 @@ pub mod test { }, async_channel::unbounded as async_unbounded, crossbeam_channel::{unbounded, Receiver}, - quinn::{ClientConfig, IdleTimeout, TransportConfig, VarInt}, + quinn::{ClientConfig, IdleTimeout, TransportConfig}, solana_sdk::{ - net::DEFAULT_TPU_COALESCE_MS, - quic::{QUIC_KEEP_ALIVE_MS, QUIC_MAX_TIMEOUT_MS}, + net::DEFAULT_TPU_COALESCE, + quic::{QUIC_KEEP_ALIVE, QUIC_MAX_TIMEOUT}, signature::Keypair, signer::Signer, }, @@ -1140,9 +1125,9 @@ pub mod test { let mut config = ClientConfig::new(Arc::new(crypto)); let mut transport_config = TransportConfig::default(); - let timeout = IdleTimeout::from(VarInt::from_u32(QUIC_MAX_TIMEOUT_MS)); + let timeout = IdleTimeout::try_from(QUIC_MAX_TIMEOUT).unwrap(); transport_config.max_idle_timeout(Some(timeout)); - transport_config.keep_alive_interval(Some(Duration::from_millis(QUIC_KEEP_ALIVE_MS))); + transport_config.keep_alive_interval(Some(QUIC_KEEP_ALIVE)); config.transport_config(Arc::new(transport_config)); config @@ -1177,8 +1162,8 @@ pub mod test { MAX_STAKED_CONNECTIONS, MAX_UNSTAKED_CONNECTIONS, stats.clone(), - 2000, - DEFAULT_TPU_COALESCE_MS, + Duration::from_secs(2), + DEFAULT_TPU_COALESCE, ) .unwrap(); (t, exit, receiver, server_address, stats) @@ -1370,7 +1355,7 @@ pub mod test { pkt_receiver, exit.clone(), stats, - DEFAULT_TPU_COALESCE_MS, + DEFAULT_TPU_COALESCE, )); let num_packets = 1000; @@ -1419,8 +1404,8 @@ pub mod test { s1.write_all(&[0u8]).await.unwrap_or_default(); // Wait long enough for the stream to timeout in receiving chunks - let sleep_time = (WAIT_FOR_STREAM_TIMEOUT_MS * 1000).min(3000); - sleep(Duration::from_millis(sleep_time)).await; + let sleep_time = Duration::from_secs(3).min(WAIT_FOR_STREAM_TIMEOUT * 1000); + sleep(sleep_time).await; // Test that the stream was created, but timed out in read assert_eq!(stats.total_streams.load(Ordering::Relaxed), 0); @@ -1477,8 +1462,8 @@ pub mod test { CONNECTION_CLOSE_REASON_DROPPED_ENTRY, ); // Wait long enough for the stream to timeout in receiving chunks - let sleep_time = (WAIT_FOR_STREAM_TIMEOUT_MS * 1000).min(1000); - sleep(Duration::from_millis(sleep_time)).await; + let sleep_time = Duration::from_secs(1).min(WAIT_FOR_STREAM_TIMEOUT * 1000); + sleep(sleep_time).await; assert_eq!(stats.connection_removed.load(Ordering::Relaxed), 1); @@ -1490,8 +1475,8 @@ pub mod test { CONNECTION_CLOSE_REASON_DROPPED_ENTRY, ); // Wait long enough for the stream to timeout in receiving chunks - let sleep_time = (WAIT_FOR_STREAM_TIMEOUT_MS * 1000).min(1000); - sleep(Duration::from_millis(sleep_time)).await; + let sleep_time = Duration::from_secs(1).min(WAIT_FOR_STREAM_TIMEOUT * 1000); + sleep(sleep_time).await; assert_eq!(stats.connection_removed.load(Ordering::Relaxed), 2); @@ -1601,8 +1586,8 @@ pub mod test { MAX_STAKED_CONNECTIONS, 0, // Do not allow any connection from unstaked clients/nodes stats, - DEFAULT_WAIT_FOR_CHUNK_TIMEOUT_MS, - DEFAULT_TPU_COALESCE_MS, + DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, + DEFAULT_TPU_COALESCE, ) .unwrap(); @@ -1633,8 +1618,8 @@ pub mod test { MAX_STAKED_CONNECTIONS, MAX_UNSTAKED_CONNECTIONS, stats.clone(), - DEFAULT_WAIT_FOR_CHUNK_TIMEOUT_MS, - DEFAULT_TPU_COALESCE_MS, + DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, + DEFAULT_TPU_COALESCE, ) .unwrap(); diff --git a/streamer/src/packet.rs b/streamer/src/packet.rs index f6ce390af0..4e4e19b2ec 100644 --- a/streamer/src/packet.rs +++ b/streamer/src/packet.rs @@ -5,7 +5,11 @@ use { socket::SocketAddrSpace, }, solana_metrics::inc_new_counter_debug, - std::{io::Result, net::UdpSocket, time::Instant}, + std::{ + io::Result, + net::UdpSocket, + time::{Duration, Instant}, + }, }; pub use { solana_perf::packet::{ @@ -14,7 +18,7 @@ pub use { solana_sdk::packet::{Meta, Packet, PACKET_DATA_SIZE}, }; -pub fn recv_from(batch: &mut PacketBatch, socket: &UdpSocket, max_wait_ms: u64) -> Result { +pub fn recv_from(batch: &mut PacketBatch, socket: &UdpSocket, max_wait: Duration) -> Result { let mut i = 0; //DOCUMENTED SIDE-EFFECT //Performance out of the IO without poll @@ -32,7 +36,7 @@ pub fn recv_from(batch: &mut PacketBatch, socket: &UdpSocket, max_wait_ms: u64) ); match recv_mmsg(socket, &mut batch[i..]) { Err(_) if i > 0 => { - if start.elapsed().as_millis() as u64 > max_wait_ms { + if start.elapsed() > max_wait { break; } } @@ -48,7 +52,7 @@ pub fn recv_from(batch: &mut PacketBatch, socket: &UdpSocket, max_wait_ms: u64) i += npkts; // Try to batch into big enough buffers // will cause less re-shuffling later on. - if start.elapsed().as_millis() as u64 > max_wait_ms || i >= PACKETS_PER_BATCH { + if start.elapsed() > max_wait || i >= PACKETS_PER_BATCH { break; } } @@ -117,8 +121,12 @@ mod tests { batch .iter_mut() .for_each(|pkt| *pkt.meta_mut() = Meta::default()); - let recvd = recv_from(&mut batch, &recv_socket, 1).unwrap(); - + let recvd = recv_from( + &mut batch, + &recv_socket, + Duration::from_millis(1), // max_wait + ) + .unwrap(); assert_eq!(recvd, batch.len()); for m in batch.iter() { @@ -171,9 +179,12 @@ mod tests { } send_to(&batch, &send_socket, &SocketAddrSpace::Unspecified).unwrap(); } - - let recvd = recv_from(&mut batch, &recv_socket, 100).unwrap(); - + let recvd = recv_from( + &mut batch, + &recv_socket, + Duration::from_millis(100), // max_wait + ) + .unwrap(); // Check we only got PACKETS_PER_BATCH packets assert_eq!(recvd, PACKETS_PER_BATCH); assert_eq!(batch.capacity(), PACKETS_PER_BATCH); diff --git a/streamer/src/quic.rs b/streamer/src/quic.rs index 72f85a4e8f..7ee9cf490f 100644 --- a/streamer/src/quic.rs +++ b/streamer/src/quic.rs @@ -5,12 +5,12 @@ use { }, crossbeam_channel::Sender, pem::Pem, - quinn::{Endpoint, IdleTimeout, ServerConfig, VarInt}, + quinn::{Endpoint, IdleTimeout, ServerConfig}, rustls::{server::ClientCertVerified, Certificate, DistinguishedNames}, solana_perf::packet::PacketBatch, solana_sdk::{ packet::PACKET_DATA_SIZE, - quic::{QUIC_MAX_TIMEOUT_MS, QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS}, + quic::{QUIC_MAX_TIMEOUT, QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS}, signature::Keypair, }, std::{ @@ -20,7 +20,7 @@ use { Arc, RwLock, }, thread, - time::SystemTime, + time::{Duration, SystemTime}, }, tokio::runtime::Runtime, }; @@ -85,7 +85,7 @@ pub(crate) fn configure_server( .saturating_mul(MAX_CONCURRENT_UNI_STREAMS) .into(), ); - let timeout = IdleTimeout::from(VarInt::from_u32(QUIC_MAX_TIMEOUT_MS)); + let timeout = IdleTimeout::try_from(QUIC_MAX_TIMEOUT).unwrap(); config.max_idle_timeout(Some(timeout)); // disable bidi & datagrams @@ -361,8 +361,8 @@ pub fn spawn_server( max_staked_connections: usize, max_unstaked_connections: usize, stats: Arc, - wait_for_chunk_timeout_ms: u64, - coalesce_ms: u64, + wait_for_chunk_timeout: Duration, + coalesce: Duration, ) -> Result<(Endpoint, thread::JoinHandle<()>), QuicServerError> { let runtime = rt(); let (endpoint, task) = { @@ -378,8 +378,8 @@ pub fn spawn_server( max_staked_connections, max_unstaked_connections, stats, - wait_for_chunk_timeout_ms, - coalesce_ms, + wait_for_chunk_timeout, + coalesce, ) }?; let handle = thread::Builder::new() @@ -397,9 +397,9 @@ pub fn spawn_server( mod test { use { super::*, - crate::nonblocking::quic::{test::*, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT_MS}, + crate::nonblocking::quic::{test::*, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT}, crossbeam_channel::unbounded, - solana_sdk::net::DEFAULT_TPU_COALESCE_MS, + solana_sdk::net::DEFAULT_TPU_COALESCE, std::net::SocketAddr, }; @@ -428,8 +428,8 @@ mod test { MAX_STAKED_CONNECTIONS, MAX_UNSTAKED_CONNECTIONS, stats, - DEFAULT_WAIT_FOR_CHUNK_TIMEOUT_MS, - DEFAULT_TPU_COALESCE_MS, + DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, + DEFAULT_TPU_COALESCE, ) .unwrap(); (t, exit, receiver, server_address) @@ -485,8 +485,8 @@ mod test { MAX_STAKED_CONNECTIONS, MAX_UNSTAKED_CONNECTIONS, stats, - DEFAULT_WAIT_FOR_CHUNK_TIMEOUT_MS, - DEFAULT_TPU_COALESCE_MS, + DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, + DEFAULT_TPU_COALESCE, ) .unwrap(); @@ -529,8 +529,8 @@ mod test { MAX_STAKED_CONNECTIONS, 0, // Do not allow any connection from unstaked clients/nodes stats, - DEFAULT_WAIT_FOR_CHUNK_TIMEOUT_MS, - DEFAULT_TPU_COALESCE_MS, + DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, + DEFAULT_TPU_COALESCE, ) .unwrap(); diff --git a/streamer/src/streamer.rs b/streamer/src/streamer.rs index ab5955620f..77f31f5907 100644 --- a/streamer/src/streamer.rs +++ b/streamer/src/streamer.rs @@ -106,7 +106,7 @@ fn recv_loop( packet_batch_sender: &PacketBatchSender, recycler: &PacketBatchRecycler, stats: &StreamerReceiveStats, - coalesce_ms: u64, + coalesce: Duration, use_pinned_memory: bool, in_vote_only_mode: Option>, ) -> Result<()> { @@ -130,7 +130,7 @@ fn recv_loop( } } - if let Ok(len) = packet::recv_from(&mut packet_batch, socket, coalesce_ms) { + if let Ok(len) = packet::recv_from(&mut packet_batch, socket, coalesce) { if len > 0 { let StreamerReceiveStats { packets_count, @@ -161,7 +161,7 @@ pub fn receiver( packet_batch_sender: PacketBatchSender, recycler: PacketBatchRecycler, stats: Arc, - coalesce_ms: u64, + coalesce: Duration, use_pinned_memory: bool, in_vote_only_mode: Option>, ) -> JoinHandle<()> { @@ -176,7 +176,7 @@ pub fn receiver( &packet_batch_sender, &recycler, &stats, - coalesce_ms, + coalesce, use_pinned_memory, in_vote_only_mode, ); @@ -469,7 +469,7 @@ mod test { s_reader, Recycler::default(), stats.clone(), - 1, + Duration::from_millis(1), // coalesce true, None, ); diff --git a/validator/src/main.rs b/validator/src/main.rs index dbacdf650f..0923b51853 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -13,7 +13,7 @@ use { ledger_cleanup_service::{DEFAULT_MAX_LEDGER_SHREDS, DEFAULT_MIN_MAX_LEDGER_SHREDS}, system_monitor_service::SystemMonitorService, tower_storage, - tpu::DEFAULT_TPU_COALESCE_MS, + tpu::DEFAULT_TPU_COALESCE, validator::{ is_snapshot_config_valid, BlockProductionMethod, BlockVerificationMethod, Validator, ValidatorConfig, ValidatorStartProgress, @@ -919,8 +919,9 @@ pub fn main() { let private_rpc = matches.is_present("private_rpc"); let do_port_check = !matches.is_present("no_port_check"); - let tpu_coalesce_ms = - value_t!(matches, "tpu_coalesce_ms", u64).unwrap_or(DEFAULT_TPU_COALESCE_MS); + let tpu_coalesce = value_t!(matches, "tpu_coalesce_ms", u64) + .map(Duration::from_millis) + .unwrap_or(DEFAULT_TPU_COALESCE); let wal_recovery_mode = matches .value_of("wal_recovery_mode") .map(BlockstoreRecoveryMode::from); @@ -1315,7 +1316,7 @@ pub fn main() { accounts_db_test_hash_calculation: matches.is_present("accounts_db_test_hash_calculation"), accounts_db_config, accounts_db_skip_shrink: true, - tpu_coalesce_ms, + tpu_coalesce, no_wait_for_vote_to_start_leader: matches.is_present("no_wait_for_vote_to_start_leader"), accounts_shrink_ratio, runtime_config: RuntimeConfig {