Quic server batching (#30330)

This commit is contained in:
ryleung-solana 2023-03-16 21:50:57 +08:00 committed by GitHub
parent 8be99c1c7d
commit 0ed9f62602
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 362 additions and 69 deletions

31
Cargo.lock generated
View File

@ -293,6 +293,17 @@ version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9b34d609dfbaf33d6889b2b7106d3ca345eacad44200913df5ba02bfd31d2ba9"
[[package]]
name = "async-channel"
version = "1.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cf46fee83e5ccffc220104713af3292ff9bc7c64c7de289f66dae8e38d826833"
dependencies = [
"concurrent-queue",
"event-listener",
"futures-core",
]
[[package]]
name = "async-compression"
version = "0.3.14"
@ -719,9 +730,9 @@ checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610"
[[package]]
name = "bytes"
version = "1.2.1"
version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ec8a7b6a70fde80372154c65702f00a0f56f3e1c36abbc6c440484be248856db"
checksum = "89b2fd2a0dcf38d7971e2194b6b6eebab45ae01067456a7fd93d5547a61b70be"
[[package]]
name = "bytesize"
@ -943,6 +954,15 @@ dependencies = [
"unreachable",
]
[[package]]
name = "concurrent-queue"
version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c278839b831783b70278b14df4d45e1beb1aad306c07bb796637de9a0e323e8e"
dependencies = [
"crossbeam-utils",
]
[[package]]
name = "console"
version = "0.15.0"
@ -1110,12 +1130,11 @@ dependencies = [
[[package]]
name = "crossbeam-utils"
version = "0.8.8"
version = "0.8.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0bf124c720b7686e3c2663cf54062ab0f68a88af2fb6a030e87e30bf721fcb38"
checksum = "4fb766fa798726286dbbb842f174001dab8abc7b627a1dd86e0b7222a95d929f"
dependencies = [
"cfg-if 1.0.0",
"lazy_static",
]
[[package]]
@ -6761,6 +6780,8 @@ dependencies = [
name = "solana-streamer"
version = "1.16.0"
dependencies = [
"async-channel",
"bytes",
"crossbeam-channel",
"futures-util",
"histogram",

View File

@ -130,6 +130,7 @@ array-bytes = "=1.4.1"
arrayref = "0.3.6"
assert_cmd = "2.0"
assert_matches = "1.5.0"
async-channel = "1.8.0"
async-mutex = "1.4.0"
async-trait = "0.1.57"
atty = "0.2.11"

View File

@ -204,7 +204,7 @@ mod tests {
super::*,
crate::connection_cache::ConnectionCache,
crossbeam_channel::unbounded,
solana_sdk::{quic::QUIC_PORT_OFFSET, signature::Keypair},
solana_sdk::{net::DEFAULT_TPU_COALESCE_MS, quic::QUIC_PORT_OFFSET, signature::Keypair},
solana_streamer::{
nonblocking::quic::DEFAULT_WAIT_FOR_CHUNK_TIMEOUT_MS, quic::StreamStats,
streamer::StakedNodes,
@ -263,6 +263,7 @@ mod tests {
10,
response_recv_stats,
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT_MS,
DEFAULT_TPU_COALESCE_MS,
)
.unwrap();

View File

@ -1,6 +1,7 @@
//! The `tpu` module implements the Transaction Processing Unit, a
//! multi-stage transaction processing pipeline in software.
pub use solana_sdk::net::DEFAULT_TPU_COALESCE_MS;
use {
crate::{
banking_stage::BankingStage,
@ -43,8 +44,6 @@ use {
},
};
pub const DEFAULT_TPU_COALESCE_MS: u64 = 5;
// allow multiple connections for NAT and any open/close overlap
pub const MAX_QUIC_CONNECTIONS_PER_PEER: usize = 8;
@ -177,6 +176,7 @@ impl Tpu {
MAX_UNSTAKED_CONNECTIONS,
stats.clone(),
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT_MS,
tpu_coalesce_ms,
)
.unwrap();
@ -196,6 +196,7 @@ impl Tpu {
0, // Prevent unstaked nodes from forwarding transactions
stats,
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT_MS,
tpu_coalesce_ms,
)
.unwrap();

View File

@ -279,6 +279,17 @@ version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9b34d609dfbaf33d6889b2b7106d3ca345eacad44200913df5ba02bfd31d2ba9"
[[package]]
name = "async-channel"
version = "1.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cf46fee83e5ccffc220104713af3292ff9bc7c64c7de289f66dae8e38d826833"
dependencies = [
"concurrent-queue",
"event-listener",
"futures-core",
]
[[package]]
name = "async-compression"
version = "0.3.14"
@ -678,9 +689,9 @@ checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610"
[[package]]
name = "bytes"
version = "1.2.1"
version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ec8a7b6a70fde80372154c65702f00a0f56f3e1c36abbc6c440484be248856db"
checksum = "89b2fd2a0dcf38d7971e2194b6b6eebab45ae01067456a7fd93d5547a61b70be"
[[package]]
name = "bzip2"
@ -842,6 +853,15 @@ dependencies = [
"unreachable",
]
[[package]]
name = "concurrent-queue"
version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c278839b831783b70278b14df4d45e1beb1aad306c07bb796637de9a0e323e8e"
dependencies = [
"crossbeam-utils",
]
[[package]]
name = "console"
version = "0.15.0"
@ -977,12 +997,11 @@ dependencies = [
[[package]]
name = "crossbeam-utils"
version = "0.8.5"
version = "0.8.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d82cfc11ce7f2c3faef78d8a684447b40d503d9681acebed6cb728d45940c4db"
checksum = "4fb766fa798726286dbbb842f174001dab8abc7b627a1dd86e0b7222a95d929f"
dependencies = [
"cfg-if 1.0.0",
"lazy_static",
]
[[package]]
@ -6036,6 +6055,8 @@ dependencies = [
name = "solana-streamer"
version = "1.16.0"
dependencies = [
"async-channel",
"bytes",
"crossbeam-channel",
"futures-util",
"histogram",

View File

@ -8,7 +8,7 @@ mod tests {
solana_quic_client::nonblocking::quic_client::{
QuicClientCertificate, QuicLazyInitializedEndpoint,
},
solana_sdk::{packet::PACKET_DATA_SIZE, signature::Keypair},
solana_sdk::{net::DEFAULT_TPU_COALESCE_MS, packet::PACKET_DATA_SIZE, signature::Keypair},
solana_streamer::{
nonblocking::quic::DEFAULT_WAIT_FOR_CHUNK_TIMEOUT_MS, quic::StreamStats,
streamer::StakedNodes, tls_certificates::new_self_signed_tls_certificate,
@ -21,6 +21,7 @@ mod tests {
},
time::{Duration, Instant},
},
tokio::time::sleep,
};
fn check_packets(
@ -86,6 +87,7 @@ mod tests {
10,
stats,
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT_MS,
DEFAULT_TPU_COALESCE_MS,
)
.unwrap();
@ -111,6 +113,38 @@ mod tests {
t.join().unwrap();
}
// A version of check_packets that avoids blocking in an
// async environment. todo: we really need a way of guaranteeing
// we don't block in async code/tests, as it can lead to subtle bugs
// that don't immediately manifest, but only show up when a separate
// change (often itself valid) is made
async fn nonblocking_check_packets(
receiver: Receiver<PacketBatch>,
num_bytes: usize,
num_expected_packets: usize,
) {
let mut all_packets = vec![];
let now = Instant::now();
let mut total_packets: usize = 0;
while now.elapsed().as_secs() < 10 {
if let Ok(packets) = receiver.try_recv() {
total_packets = total_packets.saturating_add(packets.len());
all_packets.push(packets)
} else {
sleep(Duration::from_secs(1)).await;
}
if total_packets >= num_expected_packets {
break;
}
}
for batch in all_packets {
for p in &batch {
assert_eq!(p.meta().size, num_bytes);
}
}
assert_eq!(total_packets, num_expected_packets);
}
#[tokio::test]
async fn test_nonblocking_quic_client_multiple_writes() {
use {
@ -133,6 +167,7 @@ mod tests {
10,
stats,
1000,
DEFAULT_TPU_COALESCE_MS,
)
.unwrap();
@ -152,7 +187,7 @@ mod tests {
let packets = vec![vec![0u8; PACKET_DATA_SIZE]; num_expected_packets];
assert!(client.send_data_batch(&packets).await.is_ok());
check_packets(receiver, num_bytes, num_expected_packets);
nonblocking_check_packets(receiver, num_bytes, num_expected_packets).await;
exit.store(true, Ordering::Relaxed);
t.await.unwrap();
}
@ -189,6 +224,7 @@ mod tests {
10,
request_recv_stats,
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT_MS,
DEFAULT_TPU_COALESCE_MS,
)
.unwrap();
@ -218,6 +254,7 @@ mod tests {
10,
response_recv_stats,
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT_MS,
DEFAULT_TPU_COALESCE_MS,
)
.unwrap();

View File

@ -77,6 +77,7 @@ pub mod hash;
pub mod inflation;
pub mod log;
pub mod native_loader;
pub mod net;
pub mod nonce_account;
pub mod offchain_message;
pub mod packet;

1
sdk/src/net.rs Normal file
View File

@ -0,0 +1 @@
pub const DEFAULT_TPU_COALESCE_MS: u64 = 5;

View File

@ -10,6 +10,8 @@ license = { workspace = true }
edition = { workspace = true }
[dependencies]
async-channel = { workspace = true }
bytes = { workspace = true }
crossbeam-channel = { workspace = true }
futures-util = { workspace = true }
histogram = { workspace = true }

View File

@ -4,15 +4,19 @@ use {
streamer::StakedNodes,
tls_certificates::get_pubkey_from_tls_certificate,
},
async_channel::{
unbounded as async_unbounded, Receiver as AsyncReceiver, Sender as AsyncSender,
},
bytes::Bytes,
crossbeam_channel::Sender,
indexmap::map::{Entry, IndexMap},
percentage::Percentage,
quinn::{Connecting, Connection, Endpoint, EndpointConfig, TokioRuntime, VarInt},
quinn_proto::VarIntBoundsExceeded,
rand::{thread_rng, Rng},
solana_perf::packet::PacketBatch,
solana_perf::packet::{PacketBatch, PACKETS_PER_BATCH},
solana_sdk::{
packet::{Packet, PACKET_DATA_SIZE},
packet::{Meta, PACKET_DATA_SIZE},
pubkey::Pubkey,
quic::{
QUIC_CONNECTION_HANDSHAKE_TIMEOUT_MS, QUIC_MAX_STAKED_CONCURRENT_STREAMS,
@ -54,6 +58,30 @@ const CONNECTION_CLOSE_REASON_EXCEED_MAX_STREAM_COUNT: &[u8] = b"exceed_max_stre
const CONNECTION_CLOSE_CODE_TOO_MANY: u32 = 4;
const CONNECTION_CLOSE_REASON_TOO_MANY: &[u8] = b"too_many";
// A sequence of bytes that is part of a packet
// along with where in the packet it is
struct PacketChunk {
pub bytes: Bytes,
// The offset of these bytes in the Quic stream
// and thus the beginning offset in the slice of the
// Packet data array into which the bytes will be copied
pub offset: usize,
// The end offset of these bytes in the Quic stream
// and thus the end of the slice in the Packet data array
// into which the bytes will be copied
pub end_of_chunk: usize,
}
// A struct to accumulate the bytes making up
// a packet, along with their offsets, and the
// packet metadata. We use this accumulator to avoid
// multiple copies of the Bytes (when building up
// the Packet and then when copying the Packet into a PacketBatch)
struct PacketAccumulator {
pub meta: Meta,
pub chunks: Vec<PacketChunk>,
}
#[allow(clippy::too_many_arguments)]
pub fn spawn_server(
sock: UdpSocket,
@ -67,6 +95,7 @@ pub fn spawn_server(
max_unstaked_connections: usize,
stats: Arc<StreamStats>,
wait_for_chunk_timeout_ms: u64,
coalesce_ms: u64,
) -> Result<(Endpoint, JoinHandle<()>), QuicServerError> {
info!("Start quic server on {:?}", sock);
let (config, _cert) = configure_server(keypair, gossip_host)?;
@ -86,10 +115,12 @@ pub fn spawn_server(
max_unstaked_connections,
stats,
wait_for_chunk_timeout_ms,
coalesce_ms,
));
Ok((endpoint, handle))
}
#[allow(clippy::too_many_arguments)]
pub async fn run_server(
incoming: Endpoint,
packet_sender: Sender<PacketBatch>,
@ -100,6 +131,7 @@ pub async fn run_server(
max_unstaked_connections: usize,
stats: Arc<StreamStats>,
wait_for_chunk_timeout_ms: u64,
coalesce_ms: u64,
) {
debug!("spawn quic server");
let mut last_datapoint = Instant::now();
@ -108,6 +140,14 @@ pub async fn run_server(
));
let staked_connection_table: Arc<Mutex<ConnectionTable>> =
Arc::new(Mutex::new(ConnectionTable::new(ConnectionPeerType::Staked)));
let (sender, receiver) = async_unbounded();
tokio::spawn(packet_batch_sender(
packet_sender,
receiver,
exit.clone(),
stats.clone(),
coalesce_ms,
));
while !exit.load(Ordering::Relaxed) {
const WAIT_FOR_CONNECTION_TIMEOUT_MS: u64 = 1000;
const WAIT_BETWEEN_NEW_CONNECTIONS_US: u64 = 1000;
@ -128,7 +168,7 @@ pub async fn run_server(
connection,
unstaked_connection_table.clone(),
staked_connection_table.clone(),
packet_sender.clone(),
sender.clone(),
max_connections_per_peer,
staked_nodes.clone(),
max_staked_connections,
@ -229,7 +269,13 @@ enum ConnectionHandlerError {
}
struct NewConnectionHandlerParams {
packet_sender: Sender<PacketBatch>,
// In principle, the code should work as-is if we replaced this with a crossbeam channel
// as the server code never does a blocking send (as the channel's unbounded)
// or a blocking recv (as we always use try_recv)
// but I've found that it's simply too easy to accidentally block
// in async code when using the crossbeam channel, so for the sake of maintainability,
// we're sticking with an async channel
packet_sender: AsyncSender<PacketAccumulator>,
remote_pubkey: Option<Pubkey>,
stake: u64,
total_stake: u64,
@ -241,7 +287,7 @@ struct NewConnectionHandlerParams {
impl NewConnectionHandlerParams {
fn new_unstaked(
packet_sender: Sender<PacketBatch>,
packet_sender: AsyncSender<PacketAccumulator>,
max_connections_per_peer: usize,
stats: Arc<StreamStats>,
) -> NewConnectionHandlerParams {
@ -415,7 +461,7 @@ async fn setup_connection(
connecting: Connecting,
unstaked_connection_table: Arc<Mutex<ConnectionTable>>,
staked_connection_table: Arc<Mutex<ConnectionTable>>,
packet_sender: Sender<PacketBatch>,
packet_sender: AsyncSender<PacketAccumulator>,
max_connections_per_peer: usize,
staked_nodes: Arc<RwLock<StakedNodes>>,
max_staked_connections: usize,
@ -519,10 +565,76 @@ async fn setup_connection(
}
}
async fn packet_batch_sender(
packet_sender: Sender<PacketBatch>,
packet_receiver: AsyncReceiver<PacketAccumulator>,
exit: Arc<AtomicBool>,
stats: Arc<StreamStats>,
coalesce_ms: u64,
) {
trace!("enter packet_batch_sender");
let coalesce_ms = coalesce_ms as u128;
let mut batch_start_time = Instant::now();
loop {
let mut packet_batch = PacketBatch::with_capacity(PACKETS_PER_BATCH);
stats
.total_packets_allocated
.fetch_add(PACKETS_PER_BATCH, Ordering::Relaxed);
loop {
if exit.load(Ordering::Relaxed) {
return;
}
let elapsed = batch_start_time.elapsed();
if packet_batch.len() >= PACKETS_PER_BATCH
|| (!packet_batch.is_empty() && elapsed.as_millis() >= coalesce_ms)
{
let len = packet_batch.len();
if let Err(e) = packet_sender.send(packet_batch) {
stats
.total_packet_batch_send_err
.fetch_add(1, Ordering::Relaxed);
trace!("Send error: {}", e);
} else {
stats
.total_packet_batches_sent
.fetch_add(1, Ordering::Relaxed);
stats
.total_packets_sent_to_consumer
.fetch_add(len, Ordering::Relaxed);
trace!("Sent {} packet batch", len);
}
break;
}
let timeout_res = timeout(Duration::from_micros(250), packet_receiver.recv()).await;
if let Ok(Ok(packet_accumulator)) = timeout_res {
unsafe {
packet_batch.set_len(packet_batch.len() + 1);
}
let i = packet_batch.len() - 1;
*packet_batch[i].meta_mut() = packet_accumulator.meta;
for chunk in packet_accumulator.chunks {
packet_batch[i].buffer_mut()[chunk.offset..chunk.end_of_chunk]
.copy_from_slice(&chunk.bytes);
}
if packet_batch.len() == 1 {
batch_start_time = Instant::now();
}
}
}
}
}
#[allow(clippy::too_many_arguments)]
async fn handle_connection(
connection: Connection,
packet_sender: Sender<PacketBatch>,
packet_sender: AsyncSender<PacketAccumulator>,
remote_addr: SocketAddr,
remote_pubkey: Option<Pubkey>,
last_update: Arc<AtomicU64>,
@ -573,14 +685,16 @@ async fn handle_connection(
.await
{
if handle_chunk(
&chunk,
chunk,
&mut maybe_batch,
&remote_addr,
&packet_sender,
stats.clone(),
stake,
peer_type,
) {
)
.await
{
last_update.store(timing::timestamp(), Ordering::Relaxed);
break;
}
@ -625,11 +739,11 @@ async fn handle_connection(
}
// Return true if the server should drop the stream
fn handle_chunk(
chunk: &Result<Option<quinn::Chunk>, quinn::ReadError>,
maybe_batch: &mut Option<PacketBatch>,
async fn handle_chunk(
chunk: Result<Option<quinn::Chunk>, quinn::ReadError>,
packet_accum: &mut Option<PacketAccumulator>,
remote_addr: &SocketAddr,
packet_sender: &Sender<PacketBatch>,
packet_sender: &AsyncSender<PacketAccumulator>,
stats: Arc<StreamStats>,
stake: u64,
peer_type: ConnectionPeerType,
@ -657,56 +771,63 @@ fn handle_chunk(
}
// chunk looks valid
if maybe_batch.is_none() {
let mut batch = PacketBatch::with_capacity(1);
let mut packet = Packet::default();
packet.meta_mut().set_socket_addr(remote_addr);
packet.meta_mut().sender_stake = stake;
batch.push(packet);
*maybe_batch = Some(batch);
stats
.total_packets_allocated
.fetch_add(1, Ordering::Relaxed);
if packet_accum.is_none() {
let mut meta = Meta::default();
meta.set_socket_addr(remote_addr);
meta.sender_stake = stake;
*packet_accum = Some(PacketAccumulator {
meta,
chunks: Vec::new(),
});
}
if let Some(batch) = maybe_batch.as_mut() {
if let Some(accum) = packet_accum.as_mut() {
let offset = chunk.offset;
let end_of_chunk = match (chunk.offset as usize).checked_add(chunk.bytes.len())
{
Some(end) => end,
None => return true,
};
batch[0].buffer_mut()[chunk.offset as usize..end_of_chunk]
.copy_from_slice(&chunk.bytes);
batch[0].meta_mut().size = std::cmp::max(batch[0].meta().size, end_of_chunk);
stats.total_chunks_received.fetch_add(1, Ordering::Relaxed);
match peer_type {
ConnectionPeerType::Staked => {
stats
.total_staked_chunks_received
.fetch_add(1, Ordering::Relaxed);
}
ConnectionPeerType::Unstaked => {
stats
.total_unstaked_chunks_received
.fetch_add(1, Ordering::Relaxed);
}
accum.chunks.push(PacketChunk {
bytes: chunk.bytes,
offset: offset as usize,
end_of_chunk,
});
accum.meta.size = std::cmp::max(accum.meta.size, end_of_chunk);
}
match peer_type {
ConnectionPeerType::Staked => {
stats
.total_staked_chunks_received
.fetch_add(1, Ordering::Relaxed);
}
ConnectionPeerType::Unstaked => {
stats
.total_unstaked_chunks_received
.fetch_add(1, Ordering::Relaxed);
}
}
} else {
trace!("chunk is none");
// done receiving chunks
if let Some(batch) = maybe_batch.take() {
let len = batch[0].meta().size;
if let Err(e) = packet_sender.send(batch) {
trace!("chunk is none");
if let Some(accum) = packet_accum.take() {
let len = accum
.chunks
.iter()
.map(|packet_bytes| packet_bytes.bytes.len())
.sum::<usize>();
if let Err(err) = packet_sender.send(accum).await {
stats
.total_packet_batch_send_err
.total_handle_chunk_to_packet_batcher_send_err
.fetch_add(1, Ordering::Relaxed);
info!("send error: {}", e);
trace!("packet batch send error {:?}", err);
} else {
stats
.total_packet_batches_sent
.total_packets_sent_for_batching
.fetch_add(1, Ordering::Relaxed);
trace!("sent {} byte packet", len);
trace!("sent {} byte packet for batching", len);
}
} else {
stats
@ -951,9 +1072,11 @@ pub mod test {
quic::{MAX_STAKED_CONNECTIONS, MAX_UNSTAKED_CONNECTIONS},
tls_certificates::new_self_signed_tls_certificate,
},
async_channel::unbounded as async_unbounded,
crossbeam_channel::{unbounded, Receiver},
quinn::{ClientConfig, IdleTimeout, TransportConfig, VarInt},
solana_sdk::{
net::DEFAULT_TPU_COALESCE_MS,
quic::{QUIC_KEEP_ALIVE_MS, QUIC_MAX_TIMEOUT_MS},
signature::Keypair,
signer::Signer,
@ -1039,6 +1162,7 @@ pub mod test {
MAX_UNSTAKED_CONNECTIONS,
stats.clone(),
2000,
DEFAULT_TPU_COALESCE_MS,
)
.unwrap();
(t, exit, receiver, server_address, stats)
@ -1075,9 +1199,11 @@ pub mod test {
}
let mut received = 0;
loop {
if let Ok(_x) = receiver.recv_timeout(Duration::from_millis(500)) {
if let Ok(_x) = receiver.try_recv() {
received += 1;
info!("got {}", received);
} else {
sleep(Duration::from_millis(500)).await;
}
if received >= total {
break;
@ -1128,9 +1254,11 @@ pub mod test {
let now = Instant::now();
let mut total_packets = 0;
while now.elapsed().as_secs() < 10 {
if let Ok(packets) = receiver.recv_timeout(Duration::from_secs(1)) {
if let Ok(packets) = receiver.try_recv() {
total_packets += packets.len();
all_packets.push(packets)
} else {
sleep(Duration::from_secs(1)).await;
}
if total_packets == num_expected_packets {
break;
@ -1164,9 +1292,13 @@ pub mod test {
let now = Instant::now();
let mut total_packets = 0;
while now.elapsed().as_secs() < 5 {
if let Ok(packets) = receiver.recv_timeout(Duration::from_secs(1)) {
// We're running in an async environment, we (almost) never
// want to block
if let Ok(packets) = receiver.try_recv() {
total_packets += packets.len();
all_packets.push(packets)
} else {
sleep(Duration::from_secs(1)).await;
}
if total_packets >= num_expected_packets {
break;
@ -1209,6 +1341,54 @@ pub mod test {
t.await.unwrap();
}
#[tokio::test]
async fn test_packet_batcher() {
solana_logger::setup();
let (pkt_batch_sender, pkt_batch_receiver) = unbounded();
let (ptk_sender, pkt_receiver) = async_unbounded();
let exit = Arc::new(AtomicBool::new(false));
let stats = Arc::new(StreamStats::default());
let handle = tokio::spawn(packet_batch_sender(
pkt_batch_sender,
pkt_receiver,
exit.clone(),
stats,
DEFAULT_TPU_COALESCE_MS,
));
let num_packets = 1000;
for _i in 0..num_packets {
let mut meta = Meta::default();
let bytes = Bytes::from("Hello world");
let offset = 0;
let size = bytes.len();
meta.size = size;
let packet_accum = PacketAccumulator {
meta,
chunks: vec![PacketChunk {
bytes,
offset,
end_of_chunk: size,
}],
};
ptk_sender.send(packet_accum).await.unwrap();
}
let mut i = 0;
let start = Instant::now();
while i < num_packets && start.elapsed().as_secs() < 2 {
if let Ok(batch) = pkt_batch_receiver.try_recv() {
i += batch.len();
} else {
sleep(Duration::from_millis(1)).await;
}
}
assert_eq!(i, num_packets);
exit.store(true, Ordering::Relaxed);
handle.await.unwrap();
}
#[tokio::test]
async fn test_quic_stream_timeout() {
solana_logger::setup();
@ -1406,6 +1586,7 @@ pub mod test {
0, // Do not allow any connection from unstaked clients/nodes
stats,
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT_MS,
DEFAULT_TPU_COALESCE_MS,
)
.unwrap();
@ -1437,6 +1618,7 @@ pub mod test {
MAX_UNSTAKED_CONNECTIONS,
stats.clone(),
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT_MS,
DEFAULT_TPU_COALESCE_MS,
)
.unwrap();

View File

@ -22,12 +22,11 @@ use {
thread,
time::SystemTime,
},
tokio::runtime::{Builder, Runtime},
tokio::runtime::Runtime,
};
pub const MAX_STAKED_CONNECTIONS: usize = 2000;
pub const MAX_UNSTAKED_CONNECTIONS: usize = 500;
const NUM_QUIC_STREAMER_WORKER_THREADS: usize = 1;
struct SkipClientVerification;
@ -98,8 +97,7 @@ pub(crate) fn configure_server(
}
fn rt() -> Runtime {
Builder::new_multi_thread()
.worker_threads(NUM_QUIC_STREAMER_WORKER_THREADS)
tokio::runtime::Builder::new_multi_thread()
.thread_name("quic-server")
.enable_all()
.build()
@ -128,8 +126,11 @@ pub struct StreamStats {
pub(crate) total_staked_chunks_received: AtomicUsize,
pub(crate) total_unstaked_chunks_received: AtomicUsize,
pub(crate) total_packet_batch_send_err: AtomicUsize,
pub(crate) total_handle_chunk_to_packet_batcher_send_err: AtomicUsize,
pub(crate) total_packet_batches_sent: AtomicUsize,
pub(crate) total_packet_batches_none: AtomicUsize,
pub(crate) total_packets_sent_for_batching: AtomicUsize,
pub(crate) total_packets_sent_to_consumer: AtomicUsize,
pub(crate) total_stream_read_errors: AtomicUsize,
pub(crate) total_stream_read_timeouts: AtomicUsize,
pub(crate) num_evictions: AtomicUsize,
@ -251,6 +252,18 @@ impl StreamStats {
self.total_packets_allocated.swap(0, Ordering::Relaxed),
i64
),
(
"packets_sent_for_batching",
self.total_packets_sent_for_batching
.swap(0, Ordering::Relaxed),
i64
),
(
"packets_sent_to_consumer",
self.total_packets_sent_to_consumer
.swap(0, Ordering::Relaxed),
i64
),
(
"chunks_received",
self.total_chunks_received.swap(0, Ordering::Relaxed),
@ -272,6 +285,12 @@ impl StreamStats {
self.total_packet_batch_send_err.swap(0, Ordering::Relaxed),
i64
),
(
"handle_chunk_to_packet_batcher_send_error",
self.total_handle_chunk_to_packet_batcher_send_err
.swap(0, Ordering::Relaxed),
i64
),
(
"packet_batches_sent",
self.total_packet_batches_sent.swap(0, Ordering::Relaxed),
@ -309,6 +328,7 @@ pub fn spawn_server(
max_unstaked_connections: usize,
stats: Arc<StreamStats>,
wait_for_chunk_timeout_ms: u64,
coalesce_ms: u64,
) -> Result<(Endpoint, thread::JoinHandle<()>), QuicServerError> {
let runtime = rt();
let (endpoint, task) = {
@ -325,6 +345,7 @@ pub fn spawn_server(
max_unstaked_connections,
stats,
wait_for_chunk_timeout_ms,
coalesce_ms,
)
}?;
let handle = thread::Builder::new()
@ -344,6 +365,7 @@ mod test {
super::*,
crate::nonblocking::quic::{test::*, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT_MS},
crossbeam_channel::unbounded,
solana_sdk::net::DEFAULT_TPU_COALESCE_MS,
std::net::SocketAddr,
};
@ -373,6 +395,7 @@ mod test {
MAX_UNSTAKED_CONNECTIONS,
stats,
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT_MS,
DEFAULT_TPU_COALESCE_MS,
)
.unwrap();
(t, exit, receiver, server_address)
@ -429,6 +452,7 @@ mod test {
MAX_UNSTAKED_CONNECTIONS,
stats,
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT_MS,
DEFAULT_TPU_COALESCE_MS,
)
.unwrap();
@ -472,6 +496,7 @@ mod test {
0, // Do not allow any connection from unstaked clients/nodes
stats,
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT_MS,
DEFAULT_TPU_COALESCE_MS,
)
.unwrap();