From 0ed9f6260238d1366579343e8bcc1b77e4b8cab5 Mon Sep 17 00:00:00 2001 From: ryleung-solana <91908731+ryleung-solana@users.noreply.github.com> Date: Thu, 16 Mar 2023 21:50:57 +0800 Subject: [PATCH] Quic server batching (#30330) --- Cargo.lock | 31 +++- Cargo.toml | 1 + client/src/connection_cache.rs | 3 +- core/src/tpu.rs | 5 +- programs/sbf/Cargo.lock | 31 +++- quic-client/tests/quic_client.rs | 41 ++++- sdk/src/lib.rs | 1 + sdk/src/net.rs | 1 + streamer/Cargo.toml | 2 + streamer/src/nonblocking/quic.rs | 282 +++++++++++++++++++++++++------ streamer/src/quic.rs | 33 +++- 11 files changed, 362 insertions(+), 69 deletions(-) create mode 100644 sdk/src/net.rs diff --git a/Cargo.lock b/Cargo.lock index a7e2b867e7..99cf3120b2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -293,6 +293,17 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9b34d609dfbaf33d6889b2b7106d3ca345eacad44200913df5ba02bfd31d2ba9" +[[package]] +name = "async-channel" +version = "1.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cf46fee83e5ccffc220104713af3292ff9bc7c64c7de289f66dae8e38d826833" +dependencies = [ + "concurrent-queue", + "event-listener", + "futures-core", +] + [[package]] name = "async-compression" version = "0.3.14" @@ -719,9 +730,9 @@ checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610" [[package]] name = "bytes" -version = "1.2.1" +version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ec8a7b6a70fde80372154c65702f00a0f56f3e1c36abbc6c440484be248856db" +checksum = "89b2fd2a0dcf38d7971e2194b6b6eebab45ae01067456a7fd93d5547a61b70be" [[package]] name = "bytesize" @@ -943,6 +954,15 @@ dependencies = [ "unreachable", ] +[[package]] +name = "concurrent-queue" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c278839b831783b70278b14df4d45e1beb1aad306c07bb796637de9a0e323e8e" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "console" version = "0.15.0" @@ -1110,12 +1130,11 @@ dependencies = [ [[package]] name = "crossbeam-utils" -version = "0.8.8" +version = "0.8.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0bf124c720b7686e3c2663cf54062ab0f68a88af2fb6a030e87e30bf721fcb38" +checksum = "4fb766fa798726286dbbb842f174001dab8abc7b627a1dd86e0b7222a95d929f" dependencies = [ "cfg-if 1.0.0", - "lazy_static", ] [[package]] @@ -6761,6 +6780,8 @@ dependencies = [ name = "solana-streamer" version = "1.16.0" dependencies = [ + "async-channel", + "bytes", "crossbeam-channel", "futures-util", "histogram", diff --git a/Cargo.toml b/Cargo.toml index 46378fb908..c16b11d518 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -130,6 +130,7 @@ array-bytes = "=1.4.1" arrayref = "0.3.6" assert_cmd = "2.0" assert_matches = "1.5.0" +async-channel = "1.8.0" async-mutex = "1.4.0" async-trait = "0.1.57" atty = "0.2.11" diff --git a/client/src/connection_cache.rs b/client/src/connection_cache.rs index db6f844b5d..045ef57ca5 100644 --- a/client/src/connection_cache.rs +++ b/client/src/connection_cache.rs @@ -204,7 +204,7 @@ mod tests { super::*, crate::connection_cache::ConnectionCache, crossbeam_channel::unbounded, - solana_sdk::{quic::QUIC_PORT_OFFSET, signature::Keypair}, + solana_sdk::{net::DEFAULT_TPU_COALESCE_MS, quic::QUIC_PORT_OFFSET, signature::Keypair}, solana_streamer::{ nonblocking::quic::DEFAULT_WAIT_FOR_CHUNK_TIMEOUT_MS, quic::StreamStats, streamer::StakedNodes, @@ -263,6 +263,7 @@ mod tests { 10, response_recv_stats, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT_MS, + DEFAULT_TPU_COALESCE_MS, ) .unwrap(); diff --git a/core/src/tpu.rs b/core/src/tpu.rs index 4d8daedf36..a86c0b470a 100644 --- a/core/src/tpu.rs +++ b/core/src/tpu.rs @@ -1,6 +1,7 @@ //! The `tpu` module implements the Transaction Processing Unit, a //! multi-stage transaction processing pipeline in software. +pub use solana_sdk::net::DEFAULT_TPU_COALESCE_MS; use { crate::{ banking_stage::BankingStage, @@ -43,8 +44,6 @@ use { }, }; -pub const DEFAULT_TPU_COALESCE_MS: u64 = 5; - // allow multiple connections for NAT and any open/close overlap pub const MAX_QUIC_CONNECTIONS_PER_PEER: usize = 8; @@ -177,6 +176,7 @@ impl Tpu { MAX_UNSTAKED_CONNECTIONS, stats.clone(), DEFAULT_WAIT_FOR_CHUNK_TIMEOUT_MS, + tpu_coalesce_ms, ) .unwrap(); @@ -196,6 +196,7 @@ impl Tpu { 0, // Prevent unstaked nodes from forwarding transactions stats, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT_MS, + tpu_coalesce_ms, ) .unwrap(); diff --git a/programs/sbf/Cargo.lock b/programs/sbf/Cargo.lock index 54353b386e..4c40da4712 100644 --- a/programs/sbf/Cargo.lock +++ b/programs/sbf/Cargo.lock @@ -279,6 +279,17 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9b34d609dfbaf33d6889b2b7106d3ca345eacad44200913df5ba02bfd31d2ba9" +[[package]] +name = "async-channel" +version = "1.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cf46fee83e5ccffc220104713af3292ff9bc7c64c7de289f66dae8e38d826833" +dependencies = [ + "concurrent-queue", + "event-listener", + "futures-core", +] + [[package]] name = "async-compression" version = "0.3.14" @@ -678,9 +689,9 @@ checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610" [[package]] name = "bytes" -version = "1.2.1" +version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ec8a7b6a70fde80372154c65702f00a0f56f3e1c36abbc6c440484be248856db" +checksum = "89b2fd2a0dcf38d7971e2194b6b6eebab45ae01067456a7fd93d5547a61b70be" [[package]] name = "bzip2" @@ -842,6 +853,15 @@ dependencies = [ "unreachable", ] +[[package]] +name = "concurrent-queue" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c278839b831783b70278b14df4d45e1beb1aad306c07bb796637de9a0e323e8e" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "console" version = "0.15.0" @@ -977,12 +997,11 @@ dependencies = [ [[package]] name = "crossbeam-utils" -version = "0.8.5" +version = "0.8.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d82cfc11ce7f2c3faef78d8a684447b40d503d9681acebed6cb728d45940c4db" +checksum = "4fb766fa798726286dbbb842f174001dab8abc7b627a1dd86e0b7222a95d929f" dependencies = [ "cfg-if 1.0.0", - "lazy_static", ] [[package]] @@ -6036,6 +6055,8 @@ dependencies = [ name = "solana-streamer" version = "1.16.0" dependencies = [ + "async-channel", + "bytes", "crossbeam-channel", "futures-util", "histogram", diff --git a/quic-client/tests/quic_client.rs b/quic-client/tests/quic_client.rs index f48e8b369b..968e5d9214 100644 --- a/quic-client/tests/quic_client.rs +++ b/quic-client/tests/quic_client.rs @@ -8,7 +8,7 @@ mod tests { solana_quic_client::nonblocking::quic_client::{ QuicClientCertificate, QuicLazyInitializedEndpoint, }, - solana_sdk::{packet::PACKET_DATA_SIZE, signature::Keypair}, + solana_sdk::{net::DEFAULT_TPU_COALESCE_MS, packet::PACKET_DATA_SIZE, signature::Keypair}, solana_streamer::{ nonblocking::quic::DEFAULT_WAIT_FOR_CHUNK_TIMEOUT_MS, quic::StreamStats, streamer::StakedNodes, tls_certificates::new_self_signed_tls_certificate, @@ -21,6 +21,7 @@ mod tests { }, time::{Duration, Instant}, }, + tokio::time::sleep, }; fn check_packets( @@ -86,6 +87,7 @@ mod tests { 10, stats, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT_MS, + DEFAULT_TPU_COALESCE_MS, ) .unwrap(); @@ -111,6 +113,38 @@ mod tests { t.join().unwrap(); } + // A version of check_packets that avoids blocking in an + // async environment. todo: we really need a way of guaranteeing + // we don't block in async code/tests, as it can lead to subtle bugs + // that don't immediately manifest, but only show up when a separate + // change (often itself valid) is made + async fn nonblocking_check_packets( + receiver: Receiver, + num_bytes: usize, + num_expected_packets: usize, + ) { + let mut all_packets = vec![]; + let now = Instant::now(); + let mut total_packets: usize = 0; + while now.elapsed().as_secs() < 10 { + if let Ok(packets) = receiver.try_recv() { + total_packets = total_packets.saturating_add(packets.len()); + all_packets.push(packets) + } else { + sleep(Duration::from_secs(1)).await; + } + if total_packets >= num_expected_packets { + break; + } + } + for batch in all_packets { + for p in &batch { + assert_eq!(p.meta().size, num_bytes); + } + } + assert_eq!(total_packets, num_expected_packets); + } + #[tokio::test] async fn test_nonblocking_quic_client_multiple_writes() { use { @@ -133,6 +167,7 @@ mod tests { 10, stats, 1000, + DEFAULT_TPU_COALESCE_MS, ) .unwrap(); @@ -152,7 +187,7 @@ mod tests { let packets = vec![vec![0u8; PACKET_DATA_SIZE]; num_expected_packets]; assert!(client.send_data_batch(&packets).await.is_ok()); - check_packets(receiver, num_bytes, num_expected_packets); + nonblocking_check_packets(receiver, num_bytes, num_expected_packets).await; exit.store(true, Ordering::Relaxed); t.await.unwrap(); } @@ -189,6 +224,7 @@ mod tests { 10, request_recv_stats, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT_MS, + DEFAULT_TPU_COALESCE_MS, ) .unwrap(); @@ -218,6 +254,7 @@ mod tests { 10, response_recv_stats, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT_MS, + DEFAULT_TPU_COALESCE_MS, ) .unwrap(); diff --git a/sdk/src/lib.rs b/sdk/src/lib.rs index d60881cac7..8459bab298 100644 --- a/sdk/src/lib.rs +++ b/sdk/src/lib.rs @@ -77,6 +77,7 @@ pub mod hash; pub mod inflation; pub mod log; pub mod native_loader; +pub mod net; pub mod nonce_account; pub mod offchain_message; pub mod packet; diff --git a/sdk/src/net.rs b/sdk/src/net.rs new file mode 100644 index 0000000000..1b67b4ec35 --- /dev/null +++ b/sdk/src/net.rs @@ -0,0 +1 @@ +pub const DEFAULT_TPU_COALESCE_MS: u64 = 5; diff --git a/streamer/Cargo.toml b/streamer/Cargo.toml index 59d3d7bed1..0ef27bb405 100644 --- a/streamer/Cargo.toml +++ b/streamer/Cargo.toml @@ -10,6 +10,8 @@ license = { workspace = true } edition = { workspace = true } [dependencies] +async-channel = { workspace = true } +bytes = { workspace = true } crossbeam-channel = { workspace = true } futures-util = { workspace = true } histogram = { workspace = true } diff --git a/streamer/src/nonblocking/quic.rs b/streamer/src/nonblocking/quic.rs index 3a53850012..bbb055274d 100644 --- a/streamer/src/nonblocking/quic.rs +++ b/streamer/src/nonblocking/quic.rs @@ -4,15 +4,19 @@ use { streamer::StakedNodes, tls_certificates::get_pubkey_from_tls_certificate, }, + async_channel::{ + unbounded as async_unbounded, Receiver as AsyncReceiver, Sender as AsyncSender, + }, + bytes::Bytes, crossbeam_channel::Sender, indexmap::map::{Entry, IndexMap}, percentage::Percentage, quinn::{Connecting, Connection, Endpoint, EndpointConfig, TokioRuntime, VarInt}, quinn_proto::VarIntBoundsExceeded, rand::{thread_rng, Rng}, - solana_perf::packet::PacketBatch, + solana_perf::packet::{PacketBatch, PACKETS_PER_BATCH}, solana_sdk::{ - packet::{Packet, PACKET_DATA_SIZE}, + packet::{Meta, PACKET_DATA_SIZE}, pubkey::Pubkey, quic::{ QUIC_CONNECTION_HANDSHAKE_TIMEOUT_MS, QUIC_MAX_STAKED_CONCURRENT_STREAMS, @@ -54,6 +58,30 @@ const CONNECTION_CLOSE_REASON_EXCEED_MAX_STREAM_COUNT: &[u8] = b"exceed_max_stre const CONNECTION_CLOSE_CODE_TOO_MANY: u32 = 4; const CONNECTION_CLOSE_REASON_TOO_MANY: &[u8] = b"too_many"; +// A sequence of bytes that is part of a packet +// along with where in the packet it is +struct PacketChunk { + pub bytes: Bytes, + // The offset of these bytes in the Quic stream + // and thus the beginning offset in the slice of the + // Packet data array into which the bytes will be copied + pub offset: usize, + // The end offset of these bytes in the Quic stream + // and thus the end of the slice in the Packet data array + // into which the bytes will be copied + pub end_of_chunk: usize, +} + +// A struct to accumulate the bytes making up +// a packet, along with their offsets, and the +// packet metadata. We use this accumulator to avoid +// multiple copies of the Bytes (when building up +// the Packet and then when copying the Packet into a PacketBatch) +struct PacketAccumulator { + pub meta: Meta, + pub chunks: Vec, +} + #[allow(clippy::too_many_arguments)] pub fn spawn_server( sock: UdpSocket, @@ -67,6 +95,7 @@ pub fn spawn_server( max_unstaked_connections: usize, stats: Arc, wait_for_chunk_timeout_ms: u64, + coalesce_ms: u64, ) -> Result<(Endpoint, JoinHandle<()>), QuicServerError> { info!("Start quic server on {:?}", sock); let (config, _cert) = configure_server(keypair, gossip_host)?; @@ -86,10 +115,12 @@ pub fn spawn_server( max_unstaked_connections, stats, wait_for_chunk_timeout_ms, + coalesce_ms, )); Ok((endpoint, handle)) } +#[allow(clippy::too_many_arguments)] pub async fn run_server( incoming: Endpoint, packet_sender: Sender, @@ -100,6 +131,7 @@ pub async fn run_server( max_unstaked_connections: usize, stats: Arc, wait_for_chunk_timeout_ms: u64, + coalesce_ms: u64, ) { debug!("spawn quic server"); let mut last_datapoint = Instant::now(); @@ -108,6 +140,14 @@ pub async fn run_server( )); let staked_connection_table: Arc> = Arc::new(Mutex::new(ConnectionTable::new(ConnectionPeerType::Staked))); + let (sender, receiver) = async_unbounded(); + tokio::spawn(packet_batch_sender( + packet_sender, + receiver, + exit.clone(), + stats.clone(), + coalesce_ms, + )); while !exit.load(Ordering::Relaxed) { const WAIT_FOR_CONNECTION_TIMEOUT_MS: u64 = 1000; const WAIT_BETWEEN_NEW_CONNECTIONS_US: u64 = 1000; @@ -128,7 +168,7 @@ pub async fn run_server( connection, unstaked_connection_table.clone(), staked_connection_table.clone(), - packet_sender.clone(), + sender.clone(), max_connections_per_peer, staked_nodes.clone(), max_staked_connections, @@ -229,7 +269,13 @@ enum ConnectionHandlerError { } struct NewConnectionHandlerParams { - packet_sender: Sender, + // In principle, the code should work as-is if we replaced this with a crossbeam channel + // as the server code never does a blocking send (as the channel's unbounded) + // or a blocking recv (as we always use try_recv) + // but I've found that it's simply too easy to accidentally block + // in async code when using the crossbeam channel, so for the sake of maintainability, + // we're sticking with an async channel + packet_sender: AsyncSender, remote_pubkey: Option, stake: u64, total_stake: u64, @@ -241,7 +287,7 @@ struct NewConnectionHandlerParams { impl NewConnectionHandlerParams { fn new_unstaked( - packet_sender: Sender, + packet_sender: AsyncSender, max_connections_per_peer: usize, stats: Arc, ) -> NewConnectionHandlerParams { @@ -415,7 +461,7 @@ async fn setup_connection( connecting: Connecting, unstaked_connection_table: Arc>, staked_connection_table: Arc>, - packet_sender: Sender, + packet_sender: AsyncSender, max_connections_per_peer: usize, staked_nodes: Arc>, max_staked_connections: usize, @@ -519,10 +565,76 @@ async fn setup_connection( } } +async fn packet_batch_sender( + packet_sender: Sender, + packet_receiver: AsyncReceiver, + exit: Arc, + stats: Arc, + coalesce_ms: u64, +) { + trace!("enter packet_batch_sender"); + let coalesce_ms = coalesce_ms as u128; + let mut batch_start_time = Instant::now(); + loop { + let mut packet_batch = PacketBatch::with_capacity(PACKETS_PER_BATCH); + + stats + .total_packets_allocated + .fetch_add(PACKETS_PER_BATCH, Ordering::Relaxed); + loop { + if exit.load(Ordering::Relaxed) { + return; + } + let elapsed = batch_start_time.elapsed(); + if packet_batch.len() >= PACKETS_PER_BATCH + || (!packet_batch.is_empty() && elapsed.as_millis() >= coalesce_ms) + { + let len = packet_batch.len(); + if let Err(e) = packet_sender.send(packet_batch) { + stats + .total_packet_batch_send_err + .fetch_add(1, Ordering::Relaxed); + trace!("Send error: {}", e); + } else { + stats + .total_packet_batches_sent + .fetch_add(1, Ordering::Relaxed); + + stats + .total_packets_sent_to_consumer + .fetch_add(len, Ordering::Relaxed); + + trace!("Sent {} packet batch", len); + } + break; + } + + let timeout_res = timeout(Duration::from_micros(250), packet_receiver.recv()).await; + + if let Ok(Ok(packet_accumulator)) = timeout_res { + unsafe { + packet_batch.set_len(packet_batch.len() + 1); + } + + let i = packet_batch.len() - 1; + *packet_batch[i].meta_mut() = packet_accumulator.meta; + for chunk in packet_accumulator.chunks { + packet_batch[i].buffer_mut()[chunk.offset..chunk.end_of_chunk] + .copy_from_slice(&chunk.bytes); + } + + if packet_batch.len() == 1 { + batch_start_time = Instant::now(); + } + } + } + } +} + #[allow(clippy::too_many_arguments)] async fn handle_connection( connection: Connection, - packet_sender: Sender, + packet_sender: AsyncSender, remote_addr: SocketAddr, remote_pubkey: Option, last_update: Arc, @@ -573,14 +685,16 @@ async fn handle_connection( .await { if handle_chunk( - &chunk, + chunk, &mut maybe_batch, &remote_addr, &packet_sender, stats.clone(), stake, peer_type, - ) { + ) + .await + { last_update.store(timing::timestamp(), Ordering::Relaxed); break; } @@ -625,11 +739,11 @@ async fn handle_connection( } // Return true if the server should drop the stream -fn handle_chunk( - chunk: &Result, quinn::ReadError>, - maybe_batch: &mut Option, +async fn handle_chunk( + chunk: Result, quinn::ReadError>, + packet_accum: &mut Option, remote_addr: &SocketAddr, - packet_sender: &Sender, + packet_sender: &AsyncSender, stats: Arc, stake: u64, peer_type: ConnectionPeerType, @@ -657,56 +771,63 @@ fn handle_chunk( } // chunk looks valid - if maybe_batch.is_none() { - let mut batch = PacketBatch::with_capacity(1); - let mut packet = Packet::default(); - packet.meta_mut().set_socket_addr(remote_addr); - packet.meta_mut().sender_stake = stake; - batch.push(packet); - *maybe_batch = Some(batch); - stats - .total_packets_allocated - .fetch_add(1, Ordering::Relaxed); + if packet_accum.is_none() { + let mut meta = Meta::default(); + meta.set_socket_addr(remote_addr); + meta.sender_stake = stake; + *packet_accum = Some(PacketAccumulator { + meta, + chunks: Vec::new(), + }); } - if let Some(batch) = maybe_batch.as_mut() { + if let Some(accum) = packet_accum.as_mut() { + let offset = chunk.offset; let end_of_chunk = match (chunk.offset as usize).checked_add(chunk.bytes.len()) { Some(end) => end, None => return true, }; - batch[0].buffer_mut()[chunk.offset as usize..end_of_chunk] - .copy_from_slice(&chunk.bytes); - batch[0].meta_mut().size = std::cmp::max(batch[0].meta().size, end_of_chunk); - stats.total_chunks_received.fetch_add(1, Ordering::Relaxed); - match peer_type { - ConnectionPeerType::Staked => { - stats - .total_staked_chunks_received - .fetch_add(1, Ordering::Relaxed); - } - ConnectionPeerType::Unstaked => { - stats - .total_unstaked_chunks_received - .fetch_add(1, Ordering::Relaxed); - } + accum.chunks.push(PacketChunk { + bytes: chunk.bytes, + offset: offset as usize, + end_of_chunk, + }); + + accum.meta.size = std::cmp::max(accum.meta.size, end_of_chunk); + } + + match peer_type { + ConnectionPeerType::Staked => { + stats + .total_staked_chunks_received + .fetch_add(1, Ordering::Relaxed); + } + ConnectionPeerType::Unstaked => { + stats + .total_unstaked_chunks_received + .fetch_add(1, Ordering::Relaxed); } } } else { - trace!("chunk is none"); // done receiving chunks - if let Some(batch) = maybe_batch.take() { - let len = batch[0].meta().size; - if let Err(e) = packet_sender.send(batch) { + trace!("chunk is none"); + if let Some(accum) = packet_accum.take() { + let len = accum + .chunks + .iter() + .map(|packet_bytes| packet_bytes.bytes.len()) + .sum::(); + if let Err(err) = packet_sender.send(accum).await { stats - .total_packet_batch_send_err + .total_handle_chunk_to_packet_batcher_send_err .fetch_add(1, Ordering::Relaxed); - info!("send error: {}", e); + trace!("packet batch send error {:?}", err); } else { stats - .total_packet_batches_sent + .total_packets_sent_for_batching .fetch_add(1, Ordering::Relaxed); - trace!("sent {} byte packet", len); + trace!("sent {} byte packet for batching", len); } } else { stats @@ -951,9 +1072,11 @@ pub mod test { quic::{MAX_STAKED_CONNECTIONS, MAX_UNSTAKED_CONNECTIONS}, tls_certificates::new_self_signed_tls_certificate, }, + async_channel::unbounded as async_unbounded, crossbeam_channel::{unbounded, Receiver}, quinn::{ClientConfig, IdleTimeout, TransportConfig, VarInt}, solana_sdk::{ + net::DEFAULT_TPU_COALESCE_MS, quic::{QUIC_KEEP_ALIVE_MS, QUIC_MAX_TIMEOUT_MS}, signature::Keypair, signer::Signer, @@ -1039,6 +1162,7 @@ pub mod test { MAX_UNSTAKED_CONNECTIONS, stats.clone(), 2000, + DEFAULT_TPU_COALESCE_MS, ) .unwrap(); (t, exit, receiver, server_address, stats) @@ -1075,9 +1199,11 @@ pub mod test { } let mut received = 0; loop { - if let Ok(_x) = receiver.recv_timeout(Duration::from_millis(500)) { + if let Ok(_x) = receiver.try_recv() { received += 1; info!("got {}", received); + } else { + sleep(Duration::from_millis(500)).await; } if received >= total { break; @@ -1128,9 +1254,11 @@ pub mod test { let now = Instant::now(); let mut total_packets = 0; while now.elapsed().as_secs() < 10 { - if let Ok(packets) = receiver.recv_timeout(Duration::from_secs(1)) { + if let Ok(packets) = receiver.try_recv() { total_packets += packets.len(); all_packets.push(packets) + } else { + sleep(Duration::from_secs(1)).await; } if total_packets == num_expected_packets { break; @@ -1164,9 +1292,13 @@ pub mod test { let now = Instant::now(); let mut total_packets = 0; while now.elapsed().as_secs() < 5 { - if let Ok(packets) = receiver.recv_timeout(Duration::from_secs(1)) { + // We're running in an async environment, we (almost) never + // want to block + if let Ok(packets) = receiver.try_recv() { total_packets += packets.len(); all_packets.push(packets) + } else { + sleep(Duration::from_secs(1)).await; } if total_packets >= num_expected_packets { break; @@ -1209,6 +1341,54 @@ pub mod test { t.await.unwrap(); } + #[tokio::test] + async fn test_packet_batcher() { + solana_logger::setup(); + let (pkt_batch_sender, pkt_batch_receiver) = unbounded(); + let (ptk_sender, pkt_receiver) = async_unbounded(); + let exit = Arc::new(AtomicBool::new(false)); + let stats = Arc::new(StreamStats::default()); + + let handle = tokio::spawn(packet_batch_sender( + pkt_batch_sender, + pkt_receiver, + exit.clone(), + stats, + DEFAULT_TPU_COALESCE_MS, + )); + + let num_packets = 1000; + + for _i in 0..num_packets { + let mut meta = Meta::default(); + let bytes = Bytes::from("Hello world"); + let offset = 0; + let size = bytes.len(); + meta.size = size; + let packet_accum = PacketAccumulator { + meta, + chunks: vec![PacketChunk { + bytes, + offset, + end_of_chunk: size, + }], + }; + ptk_sender.send(packet_accum).await.unwrap(); + } + let mut i = 0; + let start = Instant::now(); + while i < num_packets && start.elapsed().as_secs() < 2 { + if let Ok(batch) = pkt_batch_receiver.try_recv() { + i += batch.len(); + } else { + sleep(Duration::from_millis(1)).await; + } + } + assert_eq!(i, num_packets); + exit.store(true, Ordering::Relaxed); + handle.await.unwrap(); + } + #[tokio::test] async fn test_quic_stream_timeout() { solana_logger::setup(); @@ -1406,6 +1586,7 @@ pub mod test { 0, // Do not allow any connection from unstaked clients/nodes stats, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT_MS, + DEFAULT_TPU_COALESCE_MS, ) .unwrap(); @@ -1437,6 +1618,7 @@ pub mod test { MAX_UNSTAKED_CONNECTIONS, stats.clone(), DEFAULT_WAIT_FOR_CHUNK_TIMEOUT_MS, + DEFAULT_TPU_COALESCE_MS, ) .unwrap(); diff --git a/streamer/src/quic.rs b/streamer/src/quic.rs index 8ddf26a296..771cc4a1ac 100644 --- a/streamer/src/quic.rs +++ b/streamer/src/quic.rs @@ -22,12 +22,11 @@ use { thread, time::SystemTime, }, - tokio::runtime::{Builder, Runtime}, + tokio::runtime::Runtime, }; pub const MAX_STAKED_CONNECTIONS: usize = 2000; pub const MAX_UNSTAKED_CONNECTIONS: usize = 500; -const NUM_QUIC_STREAMER_WORKER_THREADS: usize = 1; struct SkipClientVerification; @@ -98,8 +97,7 @@ pub(crate) fn configure_server( } fn rt() -> Runtime { - Builder::new_multi_thread() - .worker_threads(NUM_QUIC_STREAMER_WORKER_THREADS) + tokio::runtime::Builder::new_multi_thread() .thread_name("quic-server") .enable_all() .build() @@ -128,8 +126,11 @@ pub struct StreamStats { pub(crate) total_staked_chunks_received: AtomicUsize, pub(crate) total_unstaked_chunks_received: AtomicUsize, pub(crate) total_packet_batch_send_err: AtomicUsize, + pub(crate) total_handle_chunk_to_packet_batcher_send_err: AtomicUsize, pub(crate) total_packet_batches_sent: AtomicUsize, pub(crate) total_packet_batches_none: AtomicUsize, + pub(crate) total_packets_sent_for_batching: AtomicUsize, + pub(crate) total_packets_sent_to_consumer: AtomicUsize, pub(crate) total_stream_read_errors: AtomicUsize, pub(crate) total_stream_read_timeouts: AtomicUsize, pub(crate) num_evictions: AtomicUsize, @@ -251,6 +252,18 @@ impl StreamStats { self.total_packets_allocated.swap(0, Ordering::Relaxed), i64 ), + ( + "packets_sent_for_batching", + self.total_packets_sent_for_batching + .swap(0, Ordering::Relaxed), + i64 + ), + ( + "packets_sent_to_consumer", + self.total_packets_sent_to_consumer + .swap(0, Ordering::Relaxed), + i64 + ), ( "chunks_received", self.total_chunks_received.swap(0, Ordering::Relaxed), @@ -272,6 +285,12 @@ impl StreamStats { self.total_packet_batch_send_err.swap(0, Ordering::Relaxed), i64 ), + ( + "handle_chunk_to_packet_batcher_send_error", + self.total_handle_chunk_to_packet_batcher_send_err + .swap(0, Ordering::Relaxed), + i64 + ), ( "packet_batches_sent", self.total_packet_batches_sent.swap(0, Ordering::Relaxed), @@ -309,6 +328,7 @@ pub fn spawn_server( max_unstaked_connections: usize, stats: Arc, wait_for_chunk_timeout_ms: u64, + coalesce_ms: u64, ) -> Result<(Endpoint, thread::JoinHandle<()>), QuicServerError> { let runtime = rt(); let (endpoint, task) = { @@ -325,6 +345,7 @@ pub fn spawn_server( max_unstaked_connections, stats, wait_for_chunk_timeout_ms, + coalesce_ms, ) }?; let handle = thread::Builder::new() @@ -344,6 +365,7 @@ mod test { super::*, crate::nonblocking::quic::{test::*, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT_MS}, crossbeam_channel::unbounded, + solana_sdk::net::DEFAULT_TPU_COALESCE_MS, std::net::SocketAddr, }; @@ -373,6 +395,7 @@ mod test { MAX_UNSTAKED_CONNECTIONS, stats, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT_MS, + DEFAULT_TPU_COALESCE_MS, ) .unwrap(); (t, exit, receiver, server_address) @@ -429,6 +452,7 @@ mod test { MAX_UNSTAKED_CONNECTIONS, stats, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT_MS, + DEFAULT_TPU_COALESCE_MS, ) .unwrap(); @@ -472,6 +496,7 @@ mod test { 0, // Do not allow any connection from unstaked clients/nodes stats, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT_MS, + DEFAULT_TPU_COALESCE_MS, ) .unwrap();