Optimizations on serverside in case of lags
This commit is contained in:
parent
8f9c834467
commit
1db88d168d
|
@ -44,7 +44,7 @@ lz4 = "1.24.0"
|
||||||
mio = "0.8.11"
|
mio = "0.8.11"
|
||||||
mio_channel = "0.1.3"
|
mio_channel = "0.1.3"
|
||||||
|
|
||||||
quiche = { version = "=0.21.0", features = ["boringssl-vendored"] }
|
quiche = "=0.21.0"
|
||||||
boring = "4.6.0"
|
boring = "4.6.0"
|
||||||
ring = "0.17.8"
|
ring = "0.17.8"
|
||||||
|
|
||||||
|
|
|
@ -27,8 +27,9 @@ pub fn configure_client(
|
||||||
config.set_initial_max_streams_bidi(maximum_concurrent_streams);
|
config.set_initial_max_streams_bidi(maximum_concurrent_streams);
|
||||||
config.set_initial_max_streams_uni(maximum_concurrent_streams);
|
config.set_initial_max_streams_uni(maximum_concurrent_streams);
|
||||||
config.set_disable_active_migration(true);
|
config.set_disable_active_migration(true);
|
||||||
config.set_cc_algorithm(quiche::CongestionControlAlgorithm::BBR2);
|
config.set_cc_algorithm(quiche::CongestionControlAlgorithm::CUBIC);
|
||||||
config.set_max_ack_delay(maximum_ack_delay);
|
config.set_max_ack_delay(maximum_ack_delay);
|
||||||
config.set_ack_delay_exponent(ack_exponent);
|
config.set_ack_delay_exponent(ack_exponent);
|
||||||
|
config.enable_pacing(false);
|
||||||
Ok(config)
|
Ok(config)
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,7 +3,7 @@ use boring::ssl::SslMethod;
|
||||||
use crate::config::QuicParameters;
|
use crate::config::QuicParameters;
|
||||||
|
|
||||||
pub const ALPN_GEYSER_PROTOCOL_ID: &[u8] = b"geyser";
|
pub const ALPN_GEYSER_PROTOCOL_ID: &[u8] = b"geyser";
|
||||||
pub const MAX_DATAGRAM_SIZE: usize = 65527; // MAX: 65527
|
pub const MAX_DATAGRAM_SIZE: usize = 1350; // MAX: 65527
|
||||||
|
|
||||||
pub fn configure_server(quic_parameter: QuicParameters) -> anyhow::Result<quiche::Config> {
|
pub fn configure_server(quic_parameter: QuicParameters) -> anyhow::Result<quiche::Config> {
|
||||||
let max_concurrent_streams = quic_parameter.max_number_of_streams_per_client;
|
let max_concurrent_streams = quic_parameter.max_number_of_streams_per_client;
|
||||||
|
@ -35,15 +35,15 @@ pub fn configure_server(quic_parameter: QuicParameters) -> anyhow::Result<quiche
|
||||||
config.set_max_recv_udp_payload_size(MAX_DATAGRAM_SIZE);
|
config.set_max_recv_udp_payload_size(MAX_DATAGRAM_SIZE);
|
||||||
config.set_max_send_udp_payload_size(MAX_DATAGRAM_SIZE);
|
config.set_max_send_udp_payload_size(MAX_DATAGRAM_SIZE);
|
||||||
config.set_initial_max_data(recieve_window_size);
|
config.set_initial_max_data(recieve_window_size);
|
||||||
config.set_initial_max_stream_data_bidi_local(2048);
|
config.set_initial_max_stream_data_bidi_local(recieve_window_size);
|
||||||
config.set_initial_max_stream_data_bidi_remote(2048);
|
config.set_initial_max_stream_data_bidi_remote(recieve_window_size);
|
||||||
config.set_initial_max_stream_data_uni(recieve_window_size);
|
config.set_initial_max_stream_data_uni(recieve_window_size);
|
||||||
config.set_initial_max_streams_bidi(max_concurrent_streams);
|
config.set_initial_max_streams_bidi(max_concurrent_streams);
|
||||||
config.set_initial_max_streams_uni(max_concurrent_streams);
|
config.set_initial_max_streams_uni(max_concurrent_streams);
|
||||||
config.set_disable_active_migration(true);
|
config.set_disable_active_migration(true);
|
||||||
config.set_max_connection_window(128 * 1024 * 1024); // 128 Mbs
|
config.set_max_connection_window(128 * 1024 * 1024); // 128 Mbs
|
||||||
config.enable_early_data();
|
config.enable_early_data();
|
||||||
config.set_cc_algorithm(quiche::CongestionControlAlgorithm::BBR2);
|
config.set_cc_algorithm(quiche::CongestionControlAlgorithm::CUBIC);
|
||||||
config.set_active_connection_id_limit(max_number_of_connections);
|
config.set_active_connection_id_limit(max_number_of_connections);
|
||||||
config.set_max_ack_delay(maximum_ack_delay);
|
config.set_max_ack_delay(maximum_ack_delay);
|
||||||
config.set_ack_delay_exponent(ack_exponent);
|
config.set_ack_delay_exponent(ack_exponent);
|
||||||
|
|
|
@ -77,7 +77,7 @@ pub fn client_loop(
|
||||||
|
|
||||||
let mut buf = [0; 65535];
|
let mut buf = [0; 65535];
|
||||||
'client: loop {
|
'client: loop {
|
||||||
poll.poll(&mut events, Some(Duration::from_micros(500)))?;
|
poll.poll(&mut events, Some(Duration::from_micros(100)))?;
|
||||||
|
|
||||||
'read: loop {
|
'read: loop {
|
||||||
match socket.recv_from(&mut buf) {
|
match socket.recv_from(&mut buf) {
|
||||||
|
@ -146,7 +146,6 @@ pub fn create_quiche_client_thread(
|
||||||
let mut read_streams = ReadStreams::new();
|
let mut read_streams = ReadStreams::new();
|
||||||
let mut connected = false;
|
let mut connected = false;
|
||||||
let mut instance = Instant::now();
|
let mut instance = Instant::now();
|
||||||
let ping_binary = bincode::serialize(&Message::Ping).unwrap();
|
|
||||||
|
|
||||||
'client: loop {
|
'client: loop {
|
||||||
poll.poll(&mut events, Some(Duration::from_micros(100)))
|
poll.poll(&mut events, Some(Duration::from_micros(100)))
|
||||||
|
@ -155,6 +154,13 @@ pub fn create_quiche_client_thread(
|
||||||
connection.on_timeout();
|
connection.on_timeout();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// sending ping
|
||||||
|
if instance.elapsed() > Duration::from_secs(5) {
|
||||||
|
log::debug!("sending ping to the server");
|
||||||
|
instance = Instant::now();
|
||||||
|
connection.on_timeout();
|
||||||
|
}
|
||||||
|
|
||||||
while let Ok((recv_info, mut buf)) = receiver.try_recv() {
|
while let Ok((recv_info, mut buf)) = receiver.try_recv() {
|
||||||
// Process potentially coalesced packets.
|
// Process potentially coalesced packets.
|
||||||
if let Err(e) = connection.recv(buf.as_mut_slice(), recv_info) {
|
if let Err(e) = connection.recv(buf.as_mut_slice(), recv_info) {
|
||||||
|
@ -188,27 +194,13 @@ pub fn create_quiche_client_thread(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Ok(None) => {
|
Ok(None) => {
|
||||||
// do nothing
|
// do nothing / continue reading other streams
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
log::error!("Error recieving message : {e}")
|
log::error!("Error recieving message : {e}");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// sending ping
|
|
||||||
if instance.elapsed() > Duration::from_secs(5) {
|
|
||||||
log::debug!("sending ping to the server");
|
|
||||||
instance = Instant::now();
|
|
||||||
current_stream_id = get_next_unidi(current_stream_id, false, maximum_streams);
|
|
||||||
if let Err(e) = send_message(
|
|
||||||
&mut connection,
|
|
||||||
&mut partial_responses,
|
|
||||||
current_stream_id,
|
|
||||||
&ping_binary,
|
|
||||||
) {
|
|
||||||
log::error!("sending ping failed with error {e:?}");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
match message_send_queue.try_recv() {
|
match message_send_queue.try_recv() {
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
use std::collections::BTreeMap;
|
use std::collections::HashMap;
|
||||||
|
|
||||||
use anyhow::bail;
|
use anyhow::bail;
|
||||||
|
|
||||||
|
@ -10,7 +10,7 @@ pub fn convert_binary_to_message(bytes: Vec<u8>) -> anyhow::Result<Message> {
|
||||||
Ok(bincode::deserialize::<Message>(&bytes)?)
|
Ok(bincode::deserialize::<Message>(&bytes)?)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub type ReadStreams = BTreeMap<u64, Vec<u8>>;
|
pub type ReadStreams = HashMap<u64, Vec<u8>>;
|
||||||
|
|
||||||
pub fn recv_message(
|
pub fn recv_message(
|
||||||
connection: &mut quiche::Connection,
|
connection: &mut quiche::Connection,
|
||||||
|
|
|
@ -39,8 +39,9 @@ struct DispatchingData {
|
||||||
|
|
||||||
type DispachingConnections = Arc<Mutex<HashMap<ConnectionId<'static>, DispatchingData>>>;
|
type DispachingConnections = Arc<Mutex<HashMap<ConnectionId<'static>, DispatchingData>>>;
|
||||||
|
|
||||||
const MAX_MESSAGE_DEPILE_IN_LOOP: usize = 16 * 1024;
|
const MAX_MESSAGE_DEPILE_IN_LOOP: usize = 32;
|
||||||
const ACCEPTABLE_PACING_DELAY: Duration = Duration::from_millis(100);
|
const ACCEPTABLE_PACING_DELAY: Duration = Duration::from_millis(100);
|
||||||
|
const MAX_ALLOWED_PARTIAL_RESPONSES: usize = 128 * 1024;
|
||||||
|
|
||||||
struct Packet {
|
struct Packet {
|
||||||
pub buffer: Vec<u8>,
|
pub buffer: Vec<u8>,
|
||||||
|
@ -350,46 +351,56 @@ fn create_client_task(
|
||||||
while handle_writable(&mut connection, &mut partial_responses, stream_id) {}
|
while handle_writable(&mut connection, &mut partial_responses, stream_id) {}
|
||||||
}
|
}
|
||||||
|
|
||||||
for _ in 0..MAX_MESSAGE_DEPILE_IN_LOOP {
|
if partial_responses.len() < MAX_ALLOWED_PARTIAL_RESPONSES {
|
||||||
if let Ok((message, priority)) = message_channel.try_recv() {
|
for _ in 0..MAX_MESSAGE_DEPILE_IN_LOOP {
|
||||||
if connection.is_closed() || !connection.is_established() {
|
if let Ok((message, priority)) = message_channel.try_recv() {
|
||||||
continue;
|
if connection.is_closed() || !connection.is_established() {
|
||||||
}
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
let stream_id = next_stream;
|
let stream_id = next_stream;
|
||||||
|
|
||||||
next_stream = get_next_unidi(stream_id, true, maximum_concurrent_streams);
|
next_stream = get_next_unidi(stream_id, true, maximum_concurrent_streams);
|
||||||
|
|
||||||
if let Err(e) = connection.stream_priority(stream_id, priority, true) {
|
if let Err(e) = connection.stream_priority(stream_id, priority, true) {
|
||||||
log::error!(
|
if !closed {
|
||||||
"Unable to set priority for the stream {}, error {}",
|
log::error!(
|
||||||
stream_id,
|
"Unable to set priority for the stream {}, error {}",
|
||||||
e
|
stream_id,
|
||||||
);
|
|
||||||
}
|
|
||||||
if let Err(e) =
|
|
||||||
send_message(&mut connection, &mut partial_responses, stream_id, &message)
|
|
||||||
{
|
|
||||||
log::error!("Error sending message : {e}");
|
|
||||||
if stop_laggy_client && !closed {
|
|
||||||
if let Err(e) = connection.close(true, 1, b"laggy client") {
|
|
||||||
if e != quiche::Error::Done {
|
|
||||||
log::error!("error closing client : {}", e);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
log::info!(
|
|
||||||
"Stopping laggy client : {} because of error : {}",
|
|
||||||
connection.trace_id(),
|
|
||||||
e
|
e
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
closed = true;
|
}
|
||||||
|
if let Err(e) = send_message(
|
||||||
|
&mut connection,
|
||||||
|
&mut partial_responses,
|
||||||
|
stream_id,
|
||||||
|
&message,
|
||||||
|
) {
|
||||||
|
if !closed {
|
||||||
|
log::error!("Error sending message : {e}");
|
||||||
|
if stop_laggy_client && !closed {
|
||||||
|
if let Err(e) = connection.close(true, 1, b"laggy client") {
|
||||||
|
if e != quiche::Error::Done {
|
||||||
|
log::error!("error closing client : {}", e);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
log::info!(
|
||||||
|
"Stopping laggy client : {} because of error : {}",
|
||||||
|
connection.trace_id(),
|
||||||
|
e
|
||||||
|
);
|
||||||
|
}
|
||||||
|
closed = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if instance.elapsed() > Duration::from_secs(1) {
|
if instance.elapsed() > Duration::from_secs(2) {
|
||||||
instance = Instant::now();
|
instance = Instant::now();
|
||||||
connection.on_timeout();
|
connection.on_timeout();
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
use std::{collections::BTreeMap, net::SocketAddr};
|
use std::{collections::HashMap, net::SocketAddr};
|
||||||
|
|
||||||
pub fn validate_token<'a>(
|
pub fn validate_token<'a>(
|
||||||
src: &std::net::SocketAddr,
|
src: &std::net::SocketAddr,
|
||||||
|
@ -88,7 +88,7 @@ pub struct PartialResponse {
|
||||||
pub written: usize,
|
pub written: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub type PartialResponses = BTreeMap<u64, PartialResponse>;
|
pub type PartialResponses = HashMap<u64, PartialResponse>;
|
||||||
|
|
||||||
// returns true if the socket will block the writing of socket
|
// returns true if the socket will block the writing of socket
|
||||||
// return false otherwise
|
// return false otherwise
|
||||||
|
|
|
@ -132,6 +132,7 @@ pub fn main() {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
sleep(Duration::from_secs(1));
|
||||||
println!("Subscribing");
|
println!("Subscribing");
|
||||||
client
|
client
|
||||||
.subscribe(vec![
|
.subscribe(vec![
|
||||||
|
|
|
@ -7,7 +7,7 @@ authors = ["Godmode Galactus"]
|
||||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
solana-rpc-client = "~1.17.28"
|
solana-rpc-client = "~1.17.31"
|
||||||
|
|
||||||
clap = { workspace = true, features = ["derive", "env"] }
|
clap = { workspace = true, features = ["derive", "env"] }
|
||||||
serde = { workspace = true }
|
serde = { workspace = true }
|
||||||
|
|
Loading…
Reference in New Issue