Merge pull request #9 from blockworks-foundation/testing_removing_of_max_limit_on_channel

Multiple performance improvement over the quic implementation
This commit is contained in:
galactus 2024-09-11 10:43:57 +02:00 committed by GitHub
commit aa48577750
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 364 additions and 449 deletions

7
Cargo.lock generated
View File

@ -3014,7 +3014,6 @@ dependencies = [
"bincode",
"itertools",
"log",
"mio",
"quic-geyser-common",
"quic-geyser-server",
"quiche",
@ -3034,8 +3033,6 @@ dependencies = [
"itertools",
"libc",
"log",
"mio",
"mio_channel",
"nix",
"quic-geyser-common",
"quic-geyser-quiche-utils",
@ -3111,9 +3108,9 @@ dependencies = [
[[package]]
name = "quiche"
version = "0.21.0"
version = "0.22.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f7986f11a8f2c63ade53d9ce49ce46d912c8d4770defbb117e16fafae76d8b5e"
checksum = "28e5a763fecb47867bd3720f69ec87031ff42fda1dc88be2cb5fbb3a558fa5e4"
dependencies = [
"boring",
"cmake",

View File

@ -52,7 +52,7 @@ lz4 = "1.24.0"
mio = "0.8.11"
mio_channel = "0.1.3"
quiche = "=0.21.0"
quiche = "=0.22.0"
boring = "4.6.0"
ring = "0.17.8"

View File

@ -76,7 +76,7 @@ pub fn client_loop(
let mut buf = [0; 65535];
'client: loop {
poll.poll(&mut events, Some(Duration::from_micros(100)))?;
poll.poll(&mut events, Some(Duration::from_millis(10)))?;
'read: loop {
match socket.recv_from(&mut buf) {
@ -151,7 +151,7 @@ pub fn create_quiche_client_thread(
let rng = SystemRandom::new();
'client: loop {
poll.poll(&mut events, Some(Duration::from_millis(100)))
poll.poll(&mut events, Some(Duration::from_millis(10)))
.unwrap();
if events.is_empty() {
connection.on_timeout();
@ -249,7 +249,6 @@ pub fn create_quiche_client_thread(
// no more new messages
}
std::sync::mpsc::TryRecvError::Disconnected => {
log::error!("message_queue disconnected");
let _ = connection.close(true, 0, b"no longer needed");
}
}
@ -405,7 +404,10 @@ mod tests {
let (server_send_queue, rx_sent_queue) = mpsc::channel::<ChannelMessage>();
let _server_loop_jh = std::thread::spawn(move || {
if let Err(e) = server_loop(
QuicParameters::default(),
QuicParameters {
incremental_priority: true,
..Default::default()
},
socket_addr,
rx_sent_queue,
CompressionType::Lz4Fast(8),

View File

@ -51,7 +51,6 @@ pub fn create_client_endpoint(connection_parameters: ConnectionParameters) -> En
transport_config.max_concurrent_uni_streams(VarInt::from(
connection_parameters.max_number_of_streams as u32,
));
transport_config.mtu_discovery_config(None);
transport_config.crypto_buffer_size(64 * 1024);
transport_config
@ -128,10 +127,12 @@ impl Client {
let message = recv_message(recv_stream, timeout).await;
match message {
Ok(message) => {
let _ = sender.send(message);
if let Err(e) = sender.send(message) {
log::error!("Message sent error : {:?}", e)
}
}
Err(e) => {
log::trace!("Error getting message {}", e);
log::trace!("Error getting message {:?}", e);
}
}
});
@ -140,10 +141,11 @@ impl Client {
ConnectionError::ConnectionClosed(_)
| ConnectionError::ApplicationClosed(_)
| ConnectionError::LocallyClosed => {
log::debug!("Got {:?} while listing to the connection", e);
break;
}
_ => {
log::error!("Got {} while listing to the connection", e);
log::error!("Got {:?} while listing to the connection", e);
break;
}
},

View File

@ -7,8 +7,8 @@ use crate::{
defaults::{
DEFAULT_ACK_EXPONENT, DEFAULT_CONNECTION_TIMEOUT, DEFAULT_ENABLE_GSO,
DEFAULT_ENABLE_PACING, DEFAULT_INCREMENTAL_PRIORITY, DEFAULT_MAX_ACK_DELAY,
DEFAULT_MAX_MESSAGES_IN_QUEUE, DEFAULT_MAX_NB_CONNECTIONS, DEFAULT_MAX_RECIEVE_WINDOW_SIZE,
DEFAULT_MAX_STREAMS, DEFAULT_USE_CC_BBR,
DEFAULT_MAX_NB_CONNECTIONS, DEFAULT_MAX_RECIEVE_WINDOW_SIZE, DEFAULT_MAX_STREAMS,
DEFAULT_USE_CC_BBR,
},
};
@ -69,7 +69,6 @@ pub struct QuicParameters {
pub enable_pacing: bool,
pub use_cc_bbr: bool,
pub incremental_priority: bool,
pub max_messages_in_queue: u64,
pub enable_gso: bool,
}
@ -85,7 +84,6 @@ impl Default for QuicParameters {
enable_pacing: DEFAULT_ENABLE_PACING,
use_cc_bbr: DEFAULT_USE_CC_BBR,
incremental_priority: DEFAULT_INCREMENTAL_PRIORITY,
max_messages_in_queue: DEFAULT_MAX_MESSAGES_IN_QUEUE,
enable_gso: DEFAULT_ENABLE_GSO,
}
}

View File

@ -1,5 +1,4 @@
pub const DEFAULT_MAX_STREAMS: u64 = 128 * 1024;
pub const DEFAULT_MAX_MESSAGES_IN_QUEUE: u64 = 1024 * 1024;
pub const MAX_ALLOWED_PARTIAL_RESPONSES: u64 = DEFAULT_MAX_STREAMS * 3 / 4;
pub const DEFAULT_MAX_RECIEVE_WINDOW_SIZE: u64 = 24 * 1024 * 1024; // 24 MBs
pub const DEFAULT_CONNECTION_TIMEOUT: u64 = 10;
@ -7,9 +6,9 @@ pub const DEFAULT_MAX_NB_CONNECTIONS: u64 = 10;
pub const DEFAULT_MAX_ACK_DELAY: u64 = 25;
pub const DEFAULT_ACK_EXPONENT: u64 = 3;
pub const ALPN_GEYSER_PROTOCOL_ID: &[u8] = b"geyser";
pub const MAX_DATAGRAM_SIZE: usize = 1200;
pub const MAX_DATAGRAM_SIZE: usize = 1350;
pub const MAX_PAYLOAD_BUFFER: usize = 10 * MAX_DATAGRAM_SIZE;
pub const DEFAULT_ENABLE_PACING: bool = true;
pub const DEFAULT_USE_CC_BBR: bool = false;
pub const DEFAULT_INCREMENTAL_PRIORITY: bool = true;
pub const DEFAULT_INCREMENTAL_PRIORITY: bool = false;
pub const DEFAULT_ENABLE_GSO: bool = true;

View File

@ -428,23 +428,23 @@ pub async fn main() {
transaction_notifications_stats.add_value(&transaction_notifications);
block_notifications_stats.add_value(&block_notifications);
println!("------------------------------------------");
println!(
log::info!("------------------------------------------");
log::info!(
" DateTime : {:?}",
instant.duration_since(start_instance).as_secs()
);
println!(" Bytes Transfered : {} Mbs/s", bytes_transfered / 1_000_000);
println!(
log::info!(" Bytes Transfered : {} Mbs/s", bytes_transfered / 1_000_000);
log::info!(
" Accounts transfered size (uncompressed) : {} Mbs",
total_accounts_size / 1_000_000
);
println!(" Accounts Notified : {}", account_notification);
println!(" Slots Notified : {}", slot_notifications);
println!(" Blockmeta notified : {}", blockmeta_notifications);
println!(" Transactions notified : {}", transaction_notifications);
println!(" Blocks notified : {}", block_notifications);
log::info!(" Accounts Notified : {}", account_notification);
log::info!(" Slots Notified : {}", slot_notifications);
log::info!(" Blockmeta notified : {}", blockmeta_notifications);
log::info!(" Transactions notified : {}", transaction_notifications);
log::info!(" Blocks notified : {}", block_notifications);
println!(" Cluster Slots: {}, Account Slot: {}, Slot Notification slot: {}, BlockMeta slot: {}, Block slot: {}", cluster_slot.load(std::sync::atomic::Ordering::Relaxed), account_slot.load(std::sync::atomic::Ordering::Relaxed), slot_slot.load(std::sync::atomic::Ordering::Relaxed), blockmeta_slot.load(std::sync::atomic::Ordering::Relaxed), block_slot.load(std::sync::atomic::Ordering::Relaxed));
log::info!(" Cluster Slots: {}, Account Slot: {}, Slot Notification slot: {}, BlockMeta slot: {}, Block slot: {}", cluster_slot.load(std::sync::atomic::Ordering::Relaxed), account_slot.load(std::sync::atomic::Ordering::Relaxed), slot_slot.load(std::sync::atomic::Ordering::Relaxed), blockmeta_slot.load(std::sync::atomic::Ordering::Relaxed), block_slot.load(std::sync::atomic::Ordering::Relaxed));
if counter % 10 == 0 {
println!("------------------STATS------------------------");

View File

@ -15,7 +15,6 @@ quic-geyser-common = { workspace = true }
bincode = { workspace = true }
ring = {workspace = true}
quiche = { workspace = true }
mio = { workspace = true }
[dev-dependencies]
rand = { workspace = true }

View File

@ -19,9 +19,6 @@ quic-geyser-quiche-utils = { workspace = true }
rcgen = { workspace = true }
boring = { workspace = true }
mio = { workspace = true }
mio_channel = { workspace = true }
libc = "0.2"
nix = { version = "0.27", features = ["net", "socket", "uio"] }

View File

@ -42,12 +42,13 @@ pub fn configure_server(quic_parameter: QuicParameters) -> anyhow::Result<quiche
config.set_initial_max_stream_data_uni(recieve_window_size);
config.set_initial_max_streams_bidi(max_concurrent_streams);
config.set_initial_max_streams_uni(max_concurrent_streams);
config.set_max_connection_window(24 * 1024 * 1024);
config.set_max_connection_window(48 * 1024 * 1024);
config.set_max_stream_window(16 * 1024 * 1024);
config.enable_early_data();
config.grease(true);
config.enable_hystart(true);
config.discover_pmtu(true);
if use_bbr {
config.set_cc_algorithm(quiche::CongestionControlAlgorithm::BBR2);

View File

@ -1,8 +1,9 @@
use std::{
collections::HashMap,
net::SocketAddr,
net::UdpSocket,
sync::{
atomic::{AtomicBool, AtomicU64, AtomicUsize},
atomic::AtomicBool,
mpsc::{self, Sender},
Arc, Mutex, RwLock,
},
@ -11,7 +12,7 @@ use std::{
use anyhow::bail;
use itertools::Itertools;
use mio::Token;
use nix::sys::socket::sockopt::ReusePort;
use quiche::ConnectionId;
use ring::rand::SystemRandom;
@ -36,10 +37,14 @@ use quic_geyser_quiche_utils::{
use crate::configure_server::configure_server;
enum InternalMessage {
Packet(quiche::RecvInfo, Vec<u8>),
ClientMessage(Vec<u8>, u8),
}
struct DispatchingData {
pub sender: Sender<(Vec<u8>, u8)>,
pub sender: Sender<InternalMessage>,
pub filters: Arc<RwLock<Vec<Filter>>>,
pub messages_in_queue: Arc<AtomicUsize>,
}
type DispachingConnections = Arc<Mutex<HashMap<ConnectionId<'static>, DispatchingData>>>;
@ -52,34 +57,20 @@ pub fn server_loop(
stop_laggy_client: bool,
) -> anyhow::Result<()> {
let maximum_concurrent_streams_id = u64::MAX;
let max_messages_in_queue = quic_params.max_messages_in_queue;
let mut config = configure_server(quic_params)?;
let mut socket = mio::net::UdpSocket::bind(socket_addr)?;
let mut poll = mio::Poll::new()?;
let mut events = mio::Events::with_capacity(1024);
poll.registry()
.register(&mut socket, mio::Token(0), mio::Interest::READABLE)?;
let socket = Arc::new(UdpSocket::bind(socket_addr)?);
let mut buf = [0; 65535];
let mut out = [0; MAX_DATAGRAM_SIZE];
let local_addr = socket.local_addr()?;
let rng = SystemRandom::new();
let conn_id_seed = ring::hmac::Key::generate(ring::hmac::HMAC_SHA256, &rng).unwrap();
let mut client_messsage_channel_by_id: HashMap<
u64,
mio_channel::Sender<(quiche::RecvInfo, Vec<u8>)>,
> = HashMap::new();
let mut client_messsage_channel_by_id: HashMap<u64, mpsc::Sender<InternalMessage>> =
HashMap::new();
let clients_by_id: Arc<Mutex<HashMap<ConnectionId<'static>, u64>>> =
Arc::new(Mutex::new(HashMap::new()));
let (write_sender, mut write_reciver) = mio_channel::channel::<(quiche::SendInfo, Vec<u8>)>();
poll.registry()
.register(&mut write_reciver, mio::Token(1), mio::Interest::READABLE)?;
let enable_pacing = if quic_params.enable_pacing {
set_txtime_sockopt(&socket).is_ok()
} else {
@ -103,21 +94,15 @@ pub fn server_loop(
message_send_queue,
dispatching_connections.clone(),
compression_type,
max_messages_in_queue,
);
let mut client_id_counter = 0;
loop {
poll.poll(&mut events, Some(Duration::from_millis(10)))?;
let do_read = events.is_empty() || events.iter().any(|x| x.token() == Token(0));
if do_read {
'read: loop {
let (len, from) = match socket.recv_from(&mut buf) {
Ok(v) => v,
Err(e) => {
if e.kind() == std::io::ErrorKind::WouldBlock {
log::trace!("recv() would block");
break 'read;
break;
}
bail!("recv() failed: {:?}", e);
}
@ -131,7 +116,7 @@ pub fn server_loop(
Err(e) => {
log::error!("Parsing packet header failed: {:?}", e);
continue 'read;
continue;
}
};
@ -140,15 +125,15 @@ pub fn server_loop(
let conn_id: ConnectionId<'static> = conn_id.to_vec().into();
let mut clients_lk = clients_by_id.lock().unwrap();
if !clients_lk.contains_key(&hdr.dcid) && !clients_lk.contains_key(&conn_id) {
drop(clients_lk);
if hdr.ty != quiche::Type::Initial {
log::error!("Packet is not Initial");
continue 'read;
continue;
}
if !quiche::version_is_supported(hdr.version) {
log::warn!("Doing version negotiation");
let len =
quiche::negotiate_version(&hdr.scid, &hdr.dcid, &mut out).unwrap();
let len = quiche::negotiate_version(&hdr.scid, &hdr.dcid, &mut out).unwrap();
let out = &out[..len];
@ -158,7 +143,7 @@ pub fn server_loop(
}
panic!("send() failed: {:?}", e);
}
continue 'read;
continue;
}
let mut scid = [0; quiche::MAX_CONN_ID_LEN];
@ -188,27 +173,26 @@ pub fn server_loop(
if let Err(e) = socket.send_to(&out[..len], from) {
log::error!("Error sending retry messages : {e:?}");
}
continue 'read;
continue;
}
let odcid = validate_token(&from, token);
if odcid.is_none() {
log::error!("Invalid address validation token");
continue 'read;
continue;
}
if scid.len() != hdr.dcid.len() {
log::error!("Invalid destination connection ID");
continue 'read;
continue;
}
let scid = hdr.dcid.clone();
log::info!("New connection: dcid={:?} scid={:?}", hdr.dcid, scid);
let mut conn =
quiche::accept(&scid, odcid.as_ref(), local_addr, from, &mut config)?;
let mut conn = quiche::accept(&scid, odcid.as_ref(), local_addr, from, &mut config)?;
let recv_info = quiche::RecvInfo {
to: socket.local_addr().unwrap(),
@ -219,42 +203,40 @@ pub fn server_loop(
Ok(v) => v,
Err(e) => {
log::error!("{} recv failed: {:?}", conn.trace_id(), e);
continue 'read;
continue;
}
};
let (client_sender, client_reciver) = mio_channel::channel();
let (client_message_sx, client_message_rx) = mpsc::channel();
let messages_in_queue = Arc::new(AtomicUsize::new(0));
let current_client_id = client_id_counter;
client_id_counter += 1;
let filters = Arc::new(RwLock::new(Vec::new()));
create_client_task(
socket.clone(),
conn,
current_client_id,
clients_by_id.clone(),
client_reciver,
write_sender.clone(),
client_message_rx,
filters.clone(),
maximum_concurrent_streams_id,
stop_laggy_client,
messages_in_queue.clone(),
quic_params.incremental_priority,
rng.clone(),
enable_pacing,
enable_gso,
);
let mut lk = dispatching_connections.lock().unwrap();
lk.insert(
scid.clone(),
DispatchingData {
sender: client_message_sx,
sender: client_message_sx.clone(),
filters,
messages_in_queue,
},
);
let mut clients_lk = clients_by_id.lock().unwrap();
clients_lk.insert(scid, current_client_id);
client_messsage_channel_by_id.insert(current_client_id, client_sender);
client_messsage_channel_by_id.insert(current_client_id, client_message_sx);
} else {
// get the existing client
let client_id = match clients_lk.get(&hdr.dcid) {
@ -270,7 +252,10 @@ pub fn server_loop(
};
match client_messsage_channel_by_id.get_mut(&client_id) {
Some(channel) => {
if channel.send((recv_info, pkt_buf.to_vec())).is_err() {
if channel
.send(InternalMessage::Packet(recv_info, pkt_buf.to_vec()))
.is_err()
{
// client is closed
clients_lk.remove(&hdr.dcid);
clients_lk.remove(&conn_id);
@ -283,38 +268,23 @@ pub fn server_loop(
}
};
}
}
while let Ok((send_info, buffer)) = write_reciver.try_recv() {
let send_result = if enable_pacing {
send_with_pacing(&socket, &buffer, &send_info, enable_gso)
} else {
socket.send_to(&buffer, send_info.to)
};
match send_result {
Ok(_written) => {}
Err(e) => {
log::error!("sending failed with error : {e:?}");
}
}
}
}
Ok(())
}
#[allow(clippy::too_many_arguments)]
fn create_client_task(
socket: Arc<UdpSocket>,
connection: quiche::Connection,
client_id: u64,
client_id_by_scid: Arc<Mutex<HashMap<ConnectionId<'static>, u64>>>,
mut receiver: mio_channel::Receiver<(quiche::RecvInfo, Vec<u8>)>,
sender: mio_channel::Sender<(quiche::SendInfo, Vec<u8>)>,
message_channel: mpsc::Receiver<(Vec<u8>, u8)>,
receiver: mpsc::Receiver<InternalMessage>,
filters: Arc<RwLock<Vec<Filter>>>,
maximum_concurrent_streams_id: u64,
stop_laggy_client: bool,
messages_in_queue: Arc<AtomicUsize>,
incremental_priority: bool,
rng: SystemRandom,
enable_pacing: bool,
enable_gso: bool,
) {
std::thread::spawn(move || {
let mut partial_responses = PartialResponses::new();
@ -324,94 +294,95 @@ fn create_client_task(
let mut instance = Instant::now();
let mut closed = false;
let mut out = [0; 65535];
let mut poll = mio::Poll::new().unwrap();
let mut events = mio::Events::with_capacity(1024);
poll.registry()
.register(&mut receiver, mio::Token(0), mio::Interest::READABLE)
.unwrap();
let number_of_loops = Arc::new(AtomicU64::new(0));
let number_of_meesages_from_network = Arc::new(AtomicU64::new(0));
let number_of_meesages_to_network = Arc::new(AtomicU64::new(0));
let number_of_readable_streams = Arc::new(AtomicU64::new(0));
let number_of_writable_streams = Arc::new(AtomicU64::new(0));
let messages_added = Arc::new(AtomicU64::new(0));
let mut datagram_size = MAX_DATAGRAM_SIZE;
let mut logged_is_draining = false;
let quit = Arc::new(AtomicBool::new(false));
let max_burst_size = MAX_DATAGRAM_SIZE * 10;
{
let number_of_loops = number_of_loops.clone();
let number_of_meesages_from_network = number_of_meesages_from_network.clone();
let number_of_meesages_to_network = number_of_meesages_to_network.clone();
let number_of_readable_streams = number_of_readable_streams.clone();
let number_of_writable_streams = number_of_writable_streams.clone();
let messages_added = messages_added.clone();
let messages_in_queue = messages_in_queue.clone();
let quit = quit.clone();
std::thread::spawn(move || {
while !quit.load(std::sync::atomic::Ordering::Relaxed) {
std::thread::sleep(Duration::from_secs(1));
log::debug!("---------------------------------");
log::debug!(
"number of loop : {}",
number_of_loops.swap(0, std::sync::atomic::Ordering::Relaxed)
);
log::debug!(
"number of packets read : {}",
number_of_meesages_from_network
.swap(0, std::sync::atomic::Ordering::Relaxed)
);
log::debug!(
"number of packets write : {}",
number_of_meesages_to_network.swap(0, std::sync::atomic::Ordering::Relaxed)
);
log::debug!(
"number_of_readable_streams : {}",
number_of_readable_streams.swap(0, std::sync::atomic::Ordering::Relaxed)
);
log::debug!(
"number_of_writable_streams : {}",
number_of_writable_streams.swap(0, std::sync::atomic::Ordering::Relaxed)
);
log::debug!(
"messages_added : {}",
messages_added.swap(0, std::sync::atomic::Ordering::Relaxed)
);
log::debug!(
"messages in queue to be sent : {}",
messages_in_queue.load(std::sync::atomic::Ordering::Relaxed)
);
}
});
}
let mut continue_write = true;
loop {
number_of_loops.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
poll.poll(&mut events, Some(Duration::from_millis(1)))
.unwrap();
let mut timeout = if continue_write {
Duration::from_secs(0)
} else {
connection.timeout().unwrap_or(Duration::from_secs(1))
};
if !events.is_empty() {
while let Ok((info, mut buf)) = receiver.try_recv() {
let mut did_read = false;
while let Ok(internal_message) = receiver.recv_timeout(timeout) {
did_read = true;
timeout = Duration::from_secs(0);
match internal_message {
InternalMessage::Packet(info, mut buf) => {
// handle packet from udp socket
let buf = buf.as_mut_slice();
match connection.recv(buf, info) {
Ok(_) => {
number_of_meesages_from_network
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
}
Ok(_) => {}
Err(e) => {
log::error!("{} recv failed: {:?}", connection.trace_id(), e);
break;
}
};
}
continue;
InternalMessage::ClientMessage(message, priority) => {
// handle message from client
let stream_id = next_stream;
next_stream =
get_next_unidi(stream_id, true, maximum_concurrent_streams_id);
let close = if let Err(e) =
connection.stream_priority(stream_id, priority, incremental_priority)
{
if !closed {
log::error!(
"Unable to set priority for the stream {}, error {}",
stream_id,
e
);
}
true
} else {
match send_message(
&mut connection,
&mut partial_responses,
stream_id,
message,
) {
Ok(_) => {
// do nothing
false
}
Err(e) => {
// done writing / queue is full
log::error!("got error sending message client : {}", e);
true
}
}
};
if close && !closed && stop_laggy_client {
if let Err(e) = connection.close(true, 1, b"laggy client") {
if e != quiche::Error::Done {
log::error!("error closing client : {}", e);
}
} else {
log::info!("Stopping laggy client : {}", connection.trace_id(),);
}
closed = true;
break;
}
}
}
}
if !did_read && !continue_write {
connection.on_timeout();
}
continue_write = false;
if connection.is_in_early_data() || connection.is_established() {
// Process all readable streams.
for stream in connection.readable() {
number_of_readable_streams.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let message = recv_message(&mut connection, &mut read_streams, stream);
match message {
Ok(Some(message)) => match message {
@ -437,13 +408,12 @@ fn create_client_task(
if !connection.is_closed()
&& (connection.is_established() || connection.is_in_early_data())
{
let mut is_writable = true;
datagram_size = connection.max_send_udp_payload_size();
for stream_id in connection.writable() {
if let Err(e) =
handle_writable(&mut connection, &mut partial_responses, stream_id)
{
if e == quiche::Error::Done {
is_writable = false;
break;
}
if !closed {
@ -454,83 +424,11 @@ fn create_client_task(
}
}
}
if is_writable {
loop {
let close = match message_channel.try_recv() {
Ok((message, priority)) => {
messages_in_queue
.fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
let stream_id = next_stream;
next_stream =
get_next_unidi(stream_id, true, maximum_concurrent_streams_id);
if let Err(e) = connection.stream_priority(
stream_id,
priority,
incremental_priority,
) {
if !closed {
log::error!(
"Unable to set priority for the stream {}, error {}",
stream_id,
e
);
}
true
} else {
messages_added
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
match send_message(
&mut connection,
&mut partial_responses,
stream_id,
message,
) {
Ok(_) => false,
Err(quiche::Error::Done) => {
// done writing / queue is full
break;
}
Err(e) => {
log::error!("error sending message : {e:?}");
true
}
}
}
}
Err(e) => {
match e {
mpsc::TryRecvError::Empty => {
break;
}
mpsc::TryRecvError::Disconnected => {
// too many message the connection is lagging
log::error!("channel disconnected by dispatcher");
true
}
}
}
};
if close && !closed && stop_laggy_client {
if let Err(e) = connection.close(true, 1, b"laggy client") {
if e != quiche::Error::Done {
log::error!("error closing client : {}", e);
}
} else {
log::info!("Stopping laggy client : {}", connection.trace_id(),);
}
closed = true;
break;
}
}
}
}
if instance.elapsed() > Duration::from_secs(1) {
log::debug!("other tasks");
instance = Instant::now();
connection.on_timeout();
handle_path_events(&mut connection);
// See whether source Connection IDs have been retired.
@ -553,17 +451,19 @@ fn create_client_task(
let mut send_message_to = None;
let mut total_length = 0;
let mut done_writing = false;
'writing_loop: while !done_writing {
let max_burst_size = if enable_gso {
std::cmp::min(datagram_size * 10, out.len())
} else {
datagram_size
};
while total_length < max_burst_size {
match connection.send(&mut out[total_length..max_burst_size]) {
Ok((len, send_info)) => {
number_of_meesages_to_network
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
send_message_to.get_or_insert(send_info);
total_length += len;
if len < MAX_DATAGRAM_SIZE {
if len < datagram_size {
break;
}
}
@ -573,7 +473,6 @@ fn create_client_task(
break;
}
Err(quiche::Error::Done) => {
done_writing = true;
break;
}
Err(e) => {
@ -583,22 +482,48 @@ fn create_client_task(
e
);
connection.close(false, 0x1, b"fail").ok();
break 'writing_loop;
}
};
}
if total_length > 0 && send_message_to.is_some() {
sender
.send((send_message_to.unwrap(), out[..total_length].to_vec()))
.unwrap();
total_length = 0;
log::debug!("sending :{total_length:?}");
continue_write = true;
let send_result = if enable_pacing {
send_with_pacing(
&socket,
&out[..total_length],
&send_message_to.unwrap(),
enable_gso,
datagram_size as u16,
)
} else {
break;
socket.send(&out[..total_length])
};
match send_result {
Ok(_written) => {
log::debug!("finished sending");
}
Err(e) => {
log::error!("sending failed with error : {e:?}");
}
}
}
if !logged_is_draining && connection.is_draining() {
log::warn!("connection is draining");
logged_is_draining = true;
}
if connection.is_closed() {
if let Some(e) = connection.peer_error() {
log::error!("peer error : {e:?} ");
}
if let Some(e) = connection.local_error() {
log::error!("local error : {e:?} ");
}
log::info!(
"{} connection closed {:?}",
connection.trace_id(),
@ -615,10 +540,8 @@ fn create_dispatching_thread(
message_send_queue: mpsc::Receiver<ChannelMessage>,
dispatching_connections: DispachingConnections,
compression_type: CompressionType,
max_messages_in_queue: u64,
) {
std::thread::spawn(move || {
let max_messages_in_queue = max_messages_in_queue as usize;
while let Ok(message) = message_send_queue.recv() {
let mut dispatching_connections_lk = dispatching_connections.lock().unwrap();
@ -670,11 +593,10 @@ fn create_dispatching_thread(
bincode::serialize(&message).expect("Message should be serializable in binary");
for id in dispatching_connections.iter() {
let data = dispatching_connections_lk.get(id).unwrap();
let messages_in_queue = data
.messages_in_queue
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
if data.sender.send((binary.clone(), priority)).is_err()
|| messages_in_queue > max_messages_in_queue
if data
.sender
.send(InternalMessage::ClientMessage(binary.clone(), priority))
.is_err()
{
// client is closed
dispatching_connections_lk.remove(id);
@ -685,7 +607,7 @@ fn create_dispatching_thread(
});
}
fn set_txtime_sockopt(sock: &mio::net::UdpSocket) -> std::io::Result<()> {
fn set_txtime_sockopt(sock: &UdpSocket) -> std::io::Result<()> {
use nix::sys::socket::setsockopt;
use nix::sys::socket::sockopt::TxTime;
use std::os::unix::io::AsRawFd;
@ -695,11 +617,10 @@ fn set_txtime_sockopt(sock: &mio::net::UdpSocket) -> std::io::Result<()> {
flags: 0,
};
// mio::net::UdpSocket doesn't implement AsFd (yet?).
let fd = unsafe { std::os::fd::BorrowedFd::borrow_raw(sock.as_raw_fd()) };
setsockopt(&fd, TxTime, &config)?;
setsockopt(&fd, ReusePort, &true)?;
Ok(())
}
@ -714,13 +635,12 @@ fn std_time_to_u64(time: &std::time::Instant) -> u64 {
sec * NANOS_PER_SEC + nsec as u64
}
const GSO_SEGMENT_SIZE: u16 = MAX_DATAGRAM_SIZE as u16;
fn send_with_pacing(
socket: &mio::net::UdpSocket,
socket: &UdpSocket,
buf: &[u8],
send_info: &quiche::SendInfo,
enable_gso: bool,
segment_size: u16,
) -> std::io::Result<usize> {
use nix::sys::socket::sendmsg;
use nix::sys::socket::ControlMessage;
@ -739,7 +659,7 @@ fn send_with_pacing(
let mut cmgs = vec![cmsg_txtime];
if enable_gso {
cmgs.push(ControlMessage::UdpGsoSegments(&GSO_SEGMENT_SIZE));
cmgs.push(ControlMessage::UdpGsoSegments(&segment_size));
}
match sendmsg(sockfd, &iov, &cmgs, MsgFlags::empty(), Some(&dst)) {
@ -748,7 +668,7 @@ fn send_with_pacing(
}
}
pub fn detect_gso(socket: &mio::net::UdpSocket, segment_size: usize) -> bool {
pub fn detect_gso(socket: &UdpSocket, segment_size: usize) -> bool {
use nix::sys::socket::setsockopt;
use nix::sys::socket::sockopt::UdpGsoSegment;
use std::os::unix::io::AsRawFd;