improving quic plugin blocking client
This commit is contained in:
parent
c25c19aff0
commit
753d2644d2
|
@ -65,9 +65,9 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "agave-geyser-plugin-interface"
|
||||
version = "2.0.18"
|
||||
version = "2.0.19"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5189afece0a0b7b49419cf1cf6b71bdd6ed77d6f4a1b5ce0a0528c78d9ac822f"
|
||||
checksum = "1dfc81497159bc6abcc6e0af5cfeec43d7ac219a946ac90d58a7b01cab2bffa2"
|
||||
dependencies = [
|
||||
"log",
|
||||
"solana-sdk",
|
||||
|
@ -2937,7 +2937,10 @@ dependencies = [
|
|||
"bincode",
|
||||
"itertools 0.10.5",
|
||||
"lazy_static",
|
||||
"libc",
|
||||
"log",
|
||||
"mio 0.8.11",
|
||||
"nix",
|
||||
"prometheus",
|
||||
"quic-geyser-common",
|
||||
"quic-geyser-server",
|
||||
|
@ -2957,11 +2960,9 @@ dependencies = [
|
|||
"boring",
|
||||
"itertools 0.10.5",
|
||||
"lazy_static",
|
||||
"libc",
|
||||
"log",
|
||||
"mio 0.8.11",
|
||||
"mio_channel",
|
||||
"nix",
|
||||
"prometheus",
|
||||
"quic-geyser-common",
|
||||
"quic-geyser-quiche-utils",
|
||||
|
@ -3692,9 +3693,9 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "solana-account-decoder"
|
||||
version = "2.0.18"
|
||||
version = "2.0.19"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5c4e77c6e0b4e1557e738239cfbc18e92412ad393707bfa3f0861a7dd39cbc43"
|
||||
checksum = "15dacdeeb5368f079dabd980e674a026bb63c7a16cdeefce3f0a388db1fb8413"
|
||||
dependencies = [
|
||||
"Inflector",
|
||||
"base64 0.22.1",
|
||||
|
@ -3717,9 +3718,9 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "solana-compute-budget"
|
||||
version = "2.0.18"
|
||||
version = "2.0.19"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9c04e01f296142c78d36be3f53da362b71a605d2f7a3c3d98f1c943403ce5581"
|
||||
checksum = "52f59a50c002a9073361968bb7cc2baacd3ed41debb3af6845ad0ff4a9f66159"
|
||||
dependencies = [
|
||||
"rustc_version",
|
||||
"solana-sdk",
|
||||
|
@ -3727,9 +3728,9 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "solana-config-program"
|
||||
version = "2.0.18"
|
||||
version = "2.0.19"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "71169087810ede13cf2bd58f59293b45efbff7ea272a8b726dfa6c0c355d63da"
|
||||
checksum = "ca01faa9447969f633559eb85817e9f719341179805faf5f99271a977f04bde8"
|
||||
dependencies = [
|
||||
"bincode",
|
||||
"chrono",
|
||||
|
@ -3741,9 +3742,9 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "solana-curve25519"
|
||||
version = "2.0.18"
|
||||
version = "2.0.19"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c94998b7dd9eb9e356e2fbe57955dcaaeca8f22f62de50bff9c3c73bcc700f57"
|
||||
checksum = "a416f5db2943034d1a0992e124f62b9d3ee701c16da999b5e3828bee2c119760"
|
||||
dependencies = [
|
||||
"bytemuck",
|
||||
"bytemuck_derive",
|
||||
|
@ -3754,9 +3755,9 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "solana-inline-spl"
|
||||
version = "2.0.18"
|
||||
version = "2.0.19"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "501e25b91ebb2c1ab7360111be8c0ff8c70e17609a1c2e58c9762ffd29d88f28"
|
||||
checksum = "542bb8ff2e80dd7c7e7eb558a875f736afd23682b03d587ee51ebe88a656d6ee"
|
||||
dependencies = [
|
||||
"bytemuck",
|
||||
"rustc_version",
|
||||
|
@ -3765,9 +3766,9 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "solana-logger"
|
||||
version = "2.0.18"
|
||||
version = "2.0.19"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "dcc155d62e0639b392216d33ce0382e5cd53a855cfaf3a988c0a72fdfc721b2d"
|
||||
checksum = "d0444c6b92204a6d3ba922ae2c02cc39833405803a50aad2f7c56f66b6bc9eeb"
|
||||
dependencies = [
|
||||
"env_logger",
|
||||
"lazy_static",
|
||||
|
@ -3776,9 +3777,9 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "solana-measure"
|
||||
version = "2.0.18"
|
||||
version = "2.0.19"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1d8d2d32b3ea5dd96af0c5201bb2b139eee3fb8a7a42a614e0d4a09715a36527"
|
||||
checksum = "6d209e45a3beb408d302e9bda3146c0ba5e60fdc4f05c0d8ee7ae4f270d9a551"
|
||||
dependencies = [
|
||||
"log",
|
||||
"solana-sdk",
|
||||
|
@ -3786,9 +3787,9 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "solana-metrics"
|
||||
version = "2.0.18"
|
||||
version = "2.0.19"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6e174b0ee130287ede3caf28de3f6cd2f527d113ed7ccb1ccbbae716d1a3dda8"
|
||||
checksum = "50c3fafc2c2844cdcba3405bc4cc6d15e88099f87e0436e2057ec581d596b538"
|
||||
dependencies = [
|
||||
"crossbeam-channel",
|
||||
"gethostname",
|
||||
|
@ -3801,9 +3802,9 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "solana-program"
|
||||
version = "2.0.18"
|
||||
version = "2.0.19"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "549e1d73169e3b55ca832d498de82590df9a19a486318841c7ccddf755abdc00"
|
||||
checksum = "6f9daefe400235569b5ae3a59d68901ed9c8cdc774ff6fa618989ca996909464"
|
||||
dependencies = [
|
||||
"ark-bn254",
|
||||
"ark-ec",
|
||||
|
@ -3847,9 +3848,9 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "solana-program-runtime"
|
||||
version = "2.0.18"
|
||||
version = "2.0.19"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d8650cd041818feffda703dd174db3d17b25f2fdb658017ede33e30ff37f2eff"
|
||||
checksum = "51401b829c540d23b1cd8607065f9ad7cf47324eef5ae707091691ca043d9973"
|
||||
dependencies = [
|
||||
"base64 0.22.1",
|
||||
"bincode",
|
||||
|
@ -3876,9 +3877,9 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "solana-rpc-client"
|
||||
version = "2.0.18"
|
||||
version = "2.0.19"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "350c5b759e03eced58de2ac44c2bba5e56f6a07c1f28a50db56bfdd5a0899718"
|
||||
checksum = "794eda91017f943ae3e8a4327647840b9ef5b3b2ee47747c77a19910881f6b1c"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"base64 0.22.1",
|
||||
|
@ -3903,9 +3904,9 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "solana-rpc-client-api"
|
||||
version = "2.0.18"
|
||||
version = "2.0.19"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "59b44d991c6c9235adc436f545dad56a90b3bfa78dc831e17823685203f5dd8d"
|
||||
checksum = "07c89966d2cba3075545088653c0ff47903ce7a8c402d3058929e5008459ce49"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"base64 0.22.1",
|
||||
|
@ -3927,9 +3928,9 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "solana-sdk"
|
||||
version = "2.0.18"
|
||||
version = "2.0.19"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "eb4f2423204dc2ce50d0d17a7594bab1f247351699d7ad6b3cf562c76b92139a"
|
||||
checksum = "1812f69de2881c4ecef49d32ccabe605af7078ea2ecb285c70029caee3101916"
|
||||
dependencies = [
|
||||
"bincode",
|
||||
"bitflags 2.6.0",
|
||||
|
@ -3976,9 +3977,9 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "solana-sdk-macro"
|
||||
version = "2.0.18"
|
||||
version = "2.0.19"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "879caa7c8d816376bc9d11a76d7897c243d865d51cb03653e2e1dd5f5245d457"
|
||||
checksum = "e1c0c428030f6a74ebe1b8c2e0307db0a8ac77a5569822f590617f0fcef43c66"
|
||||
dependencies = [
|
||||
"bs58",
|
||||
"proc-macro2",
|
||||
|
@ -3995,9 +3996,9 @@ checksum = "468aa43b7edb1f9b7b7b686d5c3aeb6630dc1708e86e31343499dd5c4d775183"
|
|||
|
||||
[[package]]
|
||||
name = "solana-transaction-status"
|
||||
version = "2.0.18"
|
||||
version = "2.0.19"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "04f45de90e145dcce7b99922e1d35de88612211b7d616e3c57baa64e7c7af9b6"
|
||||
checksum = "a575b13d2169d92f4a3ad4a0145b6a2e7dea7ab657eaffcd7d2b5710394e0bc9"
|
||||
dependencies = [
|
||||
"Inflector",
|
||||
"base64 0.22.1",
|
||||
|
@ -4022,9 +4023,9 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "solana-type-overrides"
|
||||
version = "2.0.18"
|
||||
version = "2.0.19"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b381201d14e9677e11ae61fae614da4765e9a59f078b9c46c3e6c3dc1cd903f9"
|
||||
checksum = "e94b4007992cdc31b726e4c322998baadb8af7562b775373f6bb1e4595e5971a"
|
||||
dependencies = [
|
||||
"lazy_static",
|
||||
"rand 0.8.5",
|
||||
|
@ -4032,9 +4033,9 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "solana-version"
|
||||
version = "2.0.18"
|
||||
version = "2.0.19"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "815b3a6d5411f06ab5b9cfc4c9a3b983537c7f0b5c3b560e71305f493ddc6999"
|
||||
checksum = "516efbf38be854c39254cce3217bbe1e73640281827cf220cb88dd71f4a814c2"
|
||||
dependencies = [
|
||||
"log",
|
||||
"rustc_version",
|
||||
|
@ -4046,9 +4047,9 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "solana-vote"
|
||||
version = "2.0.18"
|
||||
version = "2.0.19"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ac76c326bb4e8a73df482a9afa88bbaafa0a78a307835ffa1e5c2fe09bf839f5"
|
||||
checksum = "c2dc59dce70c415a02f38723839e634bbbc2e51ee215bd9bfdf458fd148b2f04"
|
||||
dependencies = [
|
||||
"itertools 0.12.1",
|
||||
"log",
|
||||
|
@ -4061,9 +4062,9 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "solana-vote-program"
|
||||
version = "2.0.18"
|
||||
version = "2.0.19"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ce6857a073f5a3563919d0301600e47d7b62259f3fae76bf08c16703e105b77c"
|
||||
checksum = "7f6ca9ffc24d510cf9a6b6ea568f5d835e97a0cc164ee9f2ef2c6e4b1bb4df0f"
|
||||
dependencies = [
|
||||
"bincode",
|
||||
"log",
|
||||
|
@ -4081,9 +4082,9 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "solana-zk-token-sdk"
|
||||
version = "2.0.18"
|
||||
version = "2.0.19"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "818713988d1257168a84cf2c3cda06df7b092da7e84192a7fb88f9c86756366d"
|
||||
checksum = "5ef00dbdad3e384055aef00226e0eed830e4a54e2c634ec3538a15cce0357405"
|
||||
dependencies = [
|
||||
"aes-gcm-siv",
|
||||
"base64 0.22.1",
|
||||
|
|
16
Cargo.toml
16
Cargo.toml
|
@ -21,14 +21,14 @@ license = "AGPL"
|
|||
edition = "2021"
|
||||
|
||||
[workspace.dependencies]
|
||||
solana-sdk = "~2.0.18"
|
||||
solana-program = "~2.0.18"
|
||||
solana-transaction-status = "~2.0.18"
|
||||
solana-logger = "~2.0.18"
|
||||
solana-rpc-client = "~2.0.18"
|
||||
solana-rpc-client-api = "~2.0.18"
|
||||
solana-account-decoder = "~2.0.18"
|
||||
agave-geyser-plugin-interface = "=2.0.18"
|
||||
solana-sdk = "~2.0.19"
|
||||
solana-program = "~2.0.19"
|
||||
solana-transaction-status = "~2.0.19"
|
||||
solana-logger = "~2.0.19"
|
||||
solana-rpc-client = "~2.0.19"
|
||||
solana-rpc-client-api = "~2.0.19"
|
||||
solana-account-decoder = "~2.0.19"
|
||||
agave-geyser-plugin-interface = "=2.0.19"
|
||||
|
||||
itertools = "0.10.5"
|
||||
serde = "1.0.201"
|
||||
|
|
|
@ -1,4 +1,3 @@
|
|||
use crate::configure_client::configure_client;
|
||||
use crate::quiche_client_loop::client_loop;
|
||||
use quic_geyser_common::filters::Filter;
|
||||
use quic_geyser_common::message::Message;
|
||||
|
@ -18,13 +17,7 @@ impl Client {
|
|||
server_address: String,
|
||||
connection_parameters: ConnectionParameters,
|
||||
) -> anyhow::Result<(Client, std::sync::mpsc::Receiver<Message>)> {
|
||||
let config = configure_client(
|
||||
connection_parameters.max_number_of_streams,
|
||||
connection_parameters.recieve_window_size,
|
||||
connection_parameters.timeout_in_seconds,
|
||||
connection_parameters.max_ack_delay,
|
||||
connection_parameters.ack_exponent,
|
||||
)?;
|
||||
log::info!("client configured : {connection_parameters:?}");
|
||||
let server_address: SocketAddr = parse_host_port(&server_address)?;
|
||||
let socket_addr: SocketAddr =
|
||||
parse_host_port("[::]:0").expect("Socket address should be returned");
|
||||
|
@ -35,7 +28,7 @@ impl Client {
|
|||
let is_connected_client = is_connected.clone();
|
||||
let _client_loop_jh = std::thread::spawn(move || {
|
||||
if let Err(e) = client_loop(
|
||||
config,
|
||||
connection_parameters,
|
||||
socket_addr,
|
||||
server_address,
|
||||
rx_sent_queue,
|
||||
|
@ -168,7 +161,8 @@ mod tests {
|
|||
timeout_in_seconds: 10,
|
||||
max_ack_delay: 25,
|
||||
ack_exponent: 3,
|
||||
enable_gso: false,
|
||||
enable_gso: true,
|
||||
enable_pacing: true,
|
||||
},
|
||||
)
|
||||
.unwrap();
|
||||
|
|
|
@ -1,30 +1,41 @@
|
|||
use quic_geyser_common::defaults::{ALPN_GEYSER_PROTOCOL_ID, MAX_DATAGRAM_SIZE};
|
||||
use quic_geyser_common::{
|
||||
defaults::{ALPN_GEYSER_PROTOCOL_ID, MAX_DATAGRAM_SIZE},
|
||||
types::connections_parameters::ConnectionParameters,
|
||||
};
|
||||
|
||||
pub fn configure_client(
|
||||
maximum_concurrent_streams: u64,
|
||||
recieve_window_size: u64,
|
||||
timeout_in_seconds: u64,
|
||||
maximum_ack_delay: u64,
|
||||
ack_exponent: u64,
|
||||
connection_parameters: &ConnectionParameters,
|
||||
) -> anyhow::Result<quiche::Config> {
|
||||
let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION).unwrap();
|
||||
config
|
||||
.set_application_protos(&[ALPN_GEYSER_PROTOCOL_ID])
|
||||
.unwrap();
|
||||
|
||||
config.set_max_idle_timeout(timeout_in_seconds * 1000);
|
||||
config.set_max_idle_timeout(connection_parameters.timeout_in_seconds * 1000);
|
||||
config.set_max_recv_udp_payload_size(MAX_DATAGRAM_SIZE);
|
||||
config.set_max_send_udp_payload_size(MAX_DATAGRAM_SIZE);
|
||||
config.set_initial_max_data(recieve_window_size);
|
||||
config.set_initial_max_stream_data_bidi_local(recieve_window_size);
|
||||
config.set_initial_max_stream_data_bidi_remote(recieve_window_size);
|
||||
config.set_initial_max_stream_data_uni(recieve_window_size);
|
||||
config.set_initial_max_streams_bidi(maximum_concurrent_streams);
|
||||
config.set_initial_max_streams_uni(maximum_concurrent_streams);
|
||||
config.set_initial_max_data(connection_parameters.recieve_window_size);
|
||||
config.set_initial_max_stream_data_bidi_local(connection_parameters.recieve_window_size);
|
||||
config.set_initial_max_stream_data_bidi_remote(connection_parameters.recieve_window_size);
|
||||
config.set_initial_max_stream_data_uni(connection_parameters.recieve_window_size);
|
||||
config.set_initial_max_streams_bidi(connection_parameters.max_number_of_streams);
|
||||
config.set_initial_max_streams_uni(connection_parameters.max_number_of_streams);
|
||||
|
||||
config.set_disable_active_migration(true);
|
||||
config.set_cc_algorithm(quiche::CongestionControlAlgorithm::BBR2);
|
||||
config.set_max_ack_delay(maximum_ack_delay);
|
||||
config.set_ack_delay_exponent(ack_exponent);
|
||||
config.enable_pacing(false);
|
||||
config.set_cc_algorithm(quiche::CongestionControlAlgorithm::CUBIC);
|
||||
config.set_max_connection_window(48 * 1024 * 1024);
|
||||
config.set_max_stream_window(16 * 1024 * 1024);
|
||||
|
||||
config.enable_early_data();
|
||||
config.grease(true);
|
||||
config.enable_hystart(true);
|
||||
config.discover_pmtu(true);
|
||||
|
||||
config.set_active_connection_id_limit(16);
|
||||
config.set_max_ack_delay(connection_parameters.max_ack_delay);
|
||||
config.set_ack_delay_exponent(connection_parameters.ack_exponent);
|
||||
config.set_initial_congestion_window_packets(1024);
|
||||
|
||||
config.enable_pacing(true);
|
||||
Ok(config)
|
||||
}
|
||||
|
|
|
@ -5,19 +5,27 @@ use std::{
|
|||
};
|
||||
|
||||
use log::{debug, error, info, trace};
|
||||
use quic_geyser_common::{defaults::MAX_DATAGRAM_SIZE, message::Message};
|
||||
use quic_geyser_common::{
|
||||
defaults::MAX_DATAGRAM_SIZE, message::Message,
|
||||
types::connections_parameters::ConnectionParameters,
|
||||
};
|
||||
|
||||
use quic_geyser_quiche_utils::{
|
||||
quiche_reciever::{recv_message, ReadStreams},
|
||||
quiche_sender::{handle_writable, send_message},
|
||||
quiche_utils::{generate_cid_and_reset_token, get_next_unidi, StreamBufferMap},
|
||||
quiche_utils::{
|
||||
detect_gso, generate_cid_and_reset_token, get_next_unidi, send_with_pacing,
|
||||
set_txtime_sockopt, StreamBufferMap,
|
||||
},
|
||||
};
|
||||
|
||||
use anyhow::bail;
|
||||
use ring::rand::{SecureRandom, SystemRandom};
|
||||
|
||||
use crate::configure_client::configure_client;
|
||||
|
||||
pub fn client_loop(
|
||||
mut config: quiche::Config,
|
||||
connection_parameters: ConnectionParameters,
|
||||
socket_addr: SocketAddr,
|
||||
server_address: SocketAddr,
|
||||
mut message_send_queue: mio_channel::Receiver<Message>,
|
||||
|
@ -25,13 +33,51 @@ pub fn client_loop(
|
|||
is_connected: Arc<AtomicBool>,
|
||||
) -> anyhow::Result<()> {
|
||||
let mut socket = mio::net::UdpSocket::bind(socket_addr)?;
|
||||
|
||||
let mut config = configure_client(&connection_parameters)?;
|
||||
|
||||
let enable_pacing = if connection_parameters.enable_pacing {
|
||||
set_txtime_sockopt(&socket).is_ok()
|
||||
} else {
|
||||
false
|
||||
};
|
||||
|
||||
let enable_gso = if connection_parameters.enable_gso {
|
||||
detect_gso(&socket, MAX_DATAGRAM_SIZE)
|
||||
} else {
|
||||
false
|
||||
};
|
||||
|
||||
let (message_binary_channel_sx, message_binary_channel_rx) =
|
||||
std::sync::mpsc::channel::<Vec<u8>>();
|
||||
let _message_deserializing_task = std::thread::spawn(move || loop {
|
||||
match message_binary_channel_rx.recv() {
|
||||
Ok(message_binary) => match bincode::deserialize::<Message>(&message_binary) {
|
||||
Ok(message) => {
|
||||
if let Err(e) = message_recv_queue.send(message) {
|
||||
log::error!("Error sending message on the channel : {e}");
|
||||
break;
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
log::error!("Error deserializing message : {e:?}");
|
||||
}
|
||||
},
|
||||
Err(e) => {
|
||||
log::error!("recv failed: {:?}", e);
|
||||
break;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
let mut poll = mio::Poll::new()?;
|
||||
let mut events = mio::Events::with_capacity(1024);
|
||||
let mut buf = [0; 65535];
|
||||
let mut out = [0; MAX_DATAGRAM_SIZE];
|
||||
let mut read_streams = ReadStreams::new();
|
||||
const READ_BUFFER_SIZE: usize = 1024 * 1024; // 1 MB
|
||||
let send_stream_id = get_next_unidi(3, false, u64::MAX);
|
||||
const READ_BUFFER_SIZE: usize = 16 * 1024 * 1024; // 16 MBs
|
||||
// client always sends on same stream
|
||||
let send_stream_id = get_next_unidi(4, false, u64::MAX);
|
||||
let mut has_connected = false;
|
||||
// Generate a random source connection ID for the connection.
|
||||
let rng = SystemRandom::new();
|
||||
|
@ -50,7 +96,13 @@ pub fn client_loop(
|
|||
let scid = quiche::ConnectionId::from_ref(&scid);
|
||||
let local_addr = socket.local_addr()?;
|
||||
|
||||
let mut conn = quiche::connect(None, &scid, local_addr, server_address, &mut config)?;
|
||||
let mut conn = quiche::connect(
|
||||
Some("quiche_plugin_server"),
|
||||
&scid,
|
||||
local_addr,
|
||||
server_address,
|
||||
&mut config,
|
||||
)?;
|
||||
|
||||
// sending initial connection request
|
||||
{
|
||||
|
@ -61,29 +113,39 @@ pub fn client_loop(
|
|||
}
|
||||
}
|
||||
|
||||
poll.registry()
|
||||
.register(
|
||||
&mut message_send_queue,
|
||||
mio::Token(1),
|
||||
mio::Interest::READABLE,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let mut instance = Instant::now();
|
||||
let mut connection_recently_established = false;
|
||||
let ping_message = Message::Ping.to_binary_stream();
|
||||
let mut loss_rate = 0.0;
|
||||
let mut max_send_burst = MAX_DATAGRAM_SIZE * 10;
|
||||
let mut continue_write = true;
|
||||
let max_datagram_size = MAX_DATAGRAM_SIZE;
|
||||
loop {
|
||||
poll.poll(&mut events, conn.timeout()).unwrap();
|
||||
let timeout = match continue_write {
|
||||
true => Some(std::time::Duration::from_secs(0)),
|
||||
false => conn.timeout(),
|
||||
};
|
||||
|
||||
if conn.is_established() && !conn.is_closed() {
|
||||
poll.poll(&mut events, timeout).unwrap();
|
||||
|
||||
if connection_recently_established || events.iter().any(|e| e.token() == mio::Token(1)) {
|
||||
if connection_recently_established {
|
||||
connection_recently_established = false;
|
||||
}
|
||||
match message_send_queue.try_recv() {
|
||||
Ok(message) => {
|
||||
let binary_message = message.to_binary_stream();
|
||||
log::debug!("send message : {message:?}");
|
||||
let _ = send_message(
|
||||
log::info!("send message : {message:?}");
|
||||
if let Err(e) = send_message(
|
||||
&mut conn,
|
||||
&mut stream_sender_map,
|
||||
send_stream_id,
|
||||
binary_message,
|
||||
);
|
||||
) {
|
||||
log::error!(
|
||||
"Error sending filters : {e}, probably because filter is too long"
|
||||
);
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
match e {
|
||||
|
@ -154,7 +216,7 @@ pub fn client_loop(
|
|||
&mut conn,
|
||||
&mut stream_sender_map,
|
||||
send_stream_id,
|
||||
Message::Ping.to_binary_stream(),
|
||||
ping_message.clone(),
|
||||
) {
|
||||
log::error!("Error sending ping message : {e}");
|
||||
}
|
||||
|
@ -162,8 +224,17 @@ pub fn client_loop(
|
|||
}
|
||||
|
||||
if !has_connected && conn.is_established() {
|
||||
log::info!("connection established");
|
||||
has_connected = true;
|
||||
connection_recently_established = true;
|
||||
is_connected.store(true, std::sync::atomic::Ordering::Relaxed);
|
||||
poll.registry()
|
||||
.register(
|
||||
&mut message_send_queue,
|
||||
mio::Token(1),
|
||||
mio::Interest::READABLE,
|
||||
)
|
||||
.unwrap();
|
||||
}
|
||||
// See whether source Connection IDs have been retired.
|
||||
while let Some(retired_scid) = conn.retired_scid_next() {
|
||||
|
@ -187,7 +258,7 @@ pub fn client_loop(
|
|||
Ok(Some(messages)) => {
|
||||
log::debug!("got messages: {}", messages.len());
|
||||
for message in messages {
|
||||
if let Err(e) = message_recv_queue.send(message) {
|
||||
if let Err(e) = message_binary_channel_sx.send(message) {
|
||||
log::error!("Error sending message on the channel : {e}");
|
||||
break;
|
||||
}
|
||||
|
@ -210,35 +281,65 @@ pub fn client_loop(
|
|||
}
|
||||
}
|
||||
|
||||
// Generate outgoing QUIC packets and send them on the UDP socket, until
|
||||
// quiche reports that there are no more packets to be sent.
|
||||
loop {
|
||||
let (write, send_info) = match conn.send(&mut out) {
|
||||
Ok(v) => v,
|
||||
// Reduce max_send_burst by 25% if loss is increasing more than 0.1%.
|
||||
let calculated_loss_rate = conn.stats().lost as f64 / conn.stats().sent as f64;
|
||||
if calculated_loss_rate > loss_rate + 0.001 {
|
||||
max_send_burst = max_send_burst / 4 * 3;
|
||||
// Minimun bound of 10xMSS.
|
||||
max_send_burst = max_send_burst.max(max_datagram_size * 10);
|
||||
loss_rate = calculated_loss_rate;
|
||||
}
|
||||
|
||||
let max_send_burst =
|
||||
conn.send_quantum().min(max_send_burst) / max_datagram_size * max_datagram_size;
|
||||
let mut total_write = 0;
|
||||
let mut dst_info = None;
|
||||
|
||||
while total_write < max_send_burst {
|
||||
let (write, mut send_info) = match conn.send(&mut buf[total_write..max_send_burst]) {
|
||||
Ok(v) => v,
|
||||
Err(quiche::Error::Done) => {
|
||||
debug!("done writing");
|
||||
trace!("{} done writing", conn.trace_id());
|
||||
break;
|
||||
}
|
||||
|
||||
Err(e) => {
|
||||
error!("send failed: {:?}", e);
|
||||
|
||||
log::error!("{} send failed: {:?}", conn.trace_id(), e);
|
||||
conn.close(false, 0x1, b"fail").ok();
|
||||
break;
|
||||
}
|
||||
};
|
||||
|
||||
if let Err(e) = socket.send_to(&out[..write], send_info.to) {
|
||||
if e.kind() == std::io::ErrorKind::WouldBlock {
|
||||
debug!("send() would block");
|
||||
break;
|
||||
}
|
||||
total_write += write;
|
||||
|
||||
panic!("send() failed: {:?}", e);
|
||||
// Use the first packet time to send, not the last.
|
||||
send_info.to = server_address;
|
||||
let _ = dst_info.get_or_insert(send_info);
|
||||
|
||||
if write < max_datagram_size {
|
||||
continue_write = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
debug!("written {}", write);
|
||||
if total_write == 0 || dst_info.is_none() {
|
||||
continue;
|
||||
}
|
||||
|
||||
let send_result = if enable_pacing {
|
||||
send_with_pacing(
|
||||
&socket,
|
||||
&buf[..total_write],
|
||||
&dst_info.unwrap(),
|
||||
enable_gso,
|
||||
max_datagram_size as u16,
|
||||
)
|
||||
} else {
|
||||
socket.send(&buf[..total_write])
|
||||
};
|
||||
|
||||
if let Err(e) = send_result {
|
||||
log::error!("sending failed with error : {e:?}");
|
||||
break;
|
||||
}
|
||||
|
||||
if conn.is_closed() {
|
||||
|
@ -270,11 +371,9 @@ mod tests {
|
|||
filters::Filter,
|
||||
message::Message,
|
||||
net::parse_host_port,
|
||||
types::block_meta::SlotMeta,
|
||||
types::{block_meta::SlotMeta, connections_parameters::ConnectionParameters},
|
||||
};
|
||||
|
||||
use crate::configure_client::configure_client;
|
||||
|
||||
use super::client_loop;
|
||||
|
||||
#[test]
|
||||
|
@ -376,8 +475,10 @@ mod tests {
|
|||
let (sx_recv_queue, client_rx_queue) = mpsc::channel();
|
||||
|
||||
let _client_loop_jh = std::thread::spawn(move || {
|
||||
let client_config =
|
||||
configure_client(maximum_concurrent_streams, 20_000_000, 1, 25, 3).unwrap();
|
||||
let client_config = ConnectionParameters {
|
||||
max_number_of_streams: maximum_concurrent_streams,
|
||||
..Default::default()
|
||||
};
|
||||
let socket_addr: SocketAddr = parse_host_port("[::]:0").unwrap();
|
||||
let is_connected = Arc::new(AtomicBool::new(false));
|
||||
if let Err(e) = client_loop(
|
||||
|
|
|
@ -348,6 +348,7 @@ mod tests {
|
|||
max_ack_delay: 25,
|
||||
ack_exponent: 3,
|
||||
enable_gso: false,
|
||||
enable_pacing: false,
|
||||
},
|
||||
)
|
||||
.await
|
||||
|
|
|
@ -36,6 +36,18 @@ impl Message {
|
|||
Some((message, size + 8))
|
||||
}
|
||||
|
||||
// used by the network
|
||||
pub fn from_binary_stream_binary(stream: &[u8]) -> Option<(Vec<u8>, usize)> {
|
||||
if stream.len() < 8 {
|
||||
return None;
|
||||
}
|
||||
let size = u64::from_le_bytes(stream[0..8].try_into().unwrap()) as usize;
|
||||
if stream.len() < size + 8 {
|
||||
return None;
|
||||
}
|
||||
Some((stream[8..size + 8].to_vec(), size + 8))
|
||||
}
|
||||
|
||||
pub fn to_binary_stream(&self) -> Vec<u8> {
|
||||
let binary = bincode::serialize(self).unwrap();
|
||||
let size = binary.len().to_le_bytes();
|
||||
|
|
|
@ -1,3 +1,5 @@
|
|||
use std::io::BufRead;
|
||||
|
||||
pub struct StreamBuffer<const BUFFER_LEN: usize> {
|
||||
buffer: Box<circular_buffer::CircularBuffer<BUFFER_LEN, u8>>,
|
||||
}
|
||||
|
@ -25,11 +27,11 @@ impl<const BUFFER_LEN: usize> StreamBuffer<BUFFER_LEN> {
|
|||
}
|
||||
|
||||
pub fn consume(&mut self, nb_bytes: usize) -> bool {
|
||||
if self.buffer.len() < nb_bytes {
|
||||
let len = self.buffer.len();
|
||||
if len < nb_bytes {
|
||||
return false;
|
||||
}
|
||||
let d = self.buffer.drain(..nb_bytes);
|
||||
assert_eq!(d.len(), nb_bytes);
|
||||
self.buffer.consume(nb_bytes);
|
||||
true
|
||||
}
|
||||
|
||||
|
|
|
@ -5,6 +5,7 @@ use crate::compression::CompressionType;
|
|||
use super::{account::Account, block_meta::BlockMeta, transaction::Transaction};
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
|
||||
#[repr(C)]
|
||||
pub struct Block {
|
||||
pub meta: BlockMeta,
|
||||
pub transactions: Vec<u8>, // compressed transaction::Transaction
|
||||
|
|
|
@ -1,8 +1,8 @@
|
|||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::defaults::{
|
||||
DEFAULT_ACK_EXPONENT, DEFAULT_CONNECTION_TIMEOUT, DEFAULT_ENABLE_GSO, DEFAULT_MAX_ACK_DELAY,
|
||||
DEFAULT_MAX_RECIEVE_WINDOW_SIZE, DEFAULT_MAX_STREAMS,
|
||||
DEFAULT_ACK_EXPONENT, DEFAULT_CONNECTION_TIMEOUT, DEFAULT_ENABLE_GSO, DEFAULT_ENABLE_PACING,
|
||||
DEFAULT_MAX_ACK_DELAY, DEFAULT_MAX_RECIEVE_WINDOW_SIZE, DEFAULT_MAX_STREAMS,
|
||||
};
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
|
||||
|
@ -14,6 +14,7 @@ pub struct ConnectionParameters {
|
|||
pub max_ack_delay: u64,
|
||||
pub ack_exponent: u64,
|
||||
pub enable_gso: bool,
|
||||
pub enable_pacing: bool,
|
||||
}
|
||||
|
||||
impl Default for ConnectionParameters {
|
||||
|
@ -25,6 +26,7 @@ impl Default for ConnectionParameters {
|
|||
max_ack_delay: DEFAULT_MAX_ACK_DELAY,
|
||||
ack_exponent: DEFAULT_ACK_EXPONENT,
|
||||
enable_gso: DEFAULT_ENABLE_GSO,
|
||||
enable_pacing: DEFAULT_ENABLE_PACING,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -130,6 +130,7 @@ fn blocking(args: Args, client_stats: ClientStats, break_thread: Arc<AtomicBool>
|
|||
filters.push(Filter::AccountsAll);
|
||||
}
|
||||
|
||||
sleep(Duration::from_millis(100));
|
||||
println!("Subscribing");
|
||||
client.subscribe(filters).unwrap();
|
||||
println!("Subscribed");
|
||||
|
|
|
@ -33,7 +33,13 @@ impl GeyserPlugin for QuicGeyserPlugin {
|
|||
|
||||
fn on_load(&mut self, config_file: &str, _is_reload: bool) -> PluginResult<()> {
|
||||
log::info!("loading quic_geyser plugin");
|
||||
let config = Config::load_from_file(config_file)?;
|
||||
let config = match Config::load_from_file(config_file) {
|
||||
Ok(config) => config,
|
||||
Err(e) => {
|
||||
log::error!("Error loading config file: {}", e);
|
||||
return Err(e);
|
||||
}
|
||||
};
|
||||
let compression_type = config.quic_plugin.compression_parameters.compression_type;
|
||||
let enable_block_builder = config.quic_plugin.enable_block_builder;
|
||||
let build_blocks_with_accounts = config.quic_plugin.build_blocks_with_accounts;
|
||||
|
|
|
@ -25,6 +25,7 @@ pub fn main() -> anyhow::Result<()> {
|
|||
max_ack_delay: args.max_ack_delay,
|
||||
ack_exponent: args.ack_exponent,
|
||||
enable_gso: true,
|
||||
enable_pacing: true,
|
||||
},
|
||||
)?;
|
||||
|
||||
|
|
|
@ -18,6 +18,10 @@ quiche = { workspace = true }
|
|||
prometheus = { workspace = true }
|
||||
lazy_static = { workspace = true }
|
||||
|
||||
mio = { workspace = true }
|
||||
nix = { version = "0.27", features = ["net", "socket", "uio"] }
|
||||
libc = "0.2"
|
||||
|
||||
[dev-dependencies]
|
||||
rand = { workspace = true }
|
||||
tracing-subscriber = { workspace = true }
|
||||
|
|
|
@ -11,7 +11,7 @@ pub fn recv_message(
|
|||
connection: &mut quiche::Connection,
|
||||
read_streams: &mut ReadStreams,
|
||||
stream_id: u64,
|
||||
) -> anyhow::Result<Option<Vec<Message>>> {
|
||||
) -> anyhow::Result<Option<Vec<Vec<u8>>>> {
|
||||
let mut buf = [0; MAX_DATAGRAM_SIZE];
|
||||
if let Some(total_buf) = read_streams.get_mut(&stream_id) {
|
||||
loop {
|
||||
|
@ -23,12 +23,14 @@ pub fn recv_message(
|
|||
Err(e) => match &e {
|
||||
quiche::Error::Done => {
|
||||
let mut messages = vec![];
|
||||
if let Some((message, size)) =
|
||||
Message::from_binary_stream(total_buf.as_slices().0)
|
||||
|
||||
while let Some((message, size)) =
|
||||
Message::from_binary_stream_binary(total_buf.as_slices().0)
|
||||
{
|
||||
total_buf.consume(size);
|
||||
messages.push(message);
|
||||
}
|
||||
|
||||
return Ok(if messages.is_empty() {
|
||||
None
|
||||
} else {
|
||||
|
@ -52,8 +54,8 @@ pub fn recv_message(
|
|||
Err(e) => match &e {
|
||||
quiche::Error::Done => {
|
||||
let mut messages = vec![];
|
||||
if let Some((message, size)) =
|
||||
Message::from_binary_stream(total_buf.as_slices().0)
|
||||
while let Some((message, size)) =
|
||||
Message::from_binary_stream_binary(total_buf.as_slices().0)
|
||||
{
|
||||
total_buf.consume(size);
|
||||
messages.push(message);
|
||||
|
|
|
@ -162,6 +162,97 @@ pub fn generate_cid_and_reset_token<T: SecureRandom>(
|
|||
(scid, reset_token)
|
||||
}
|
||||
|
||||
pub fn detect_gso(socket: &mio::net::UdpSocket, segment_size: usize) -> bool {
|
||||
use nix::sys::socket::setsockopt;
|
||||
use nix::sys::socket::sockopt::UdpGsoSegment;
|
||||
use std::os::unix::io::AsRawFd;
|
||||
|
||||
// mio::net::UdpSocket doesn't implement AsFd (yet?).
|
||||
let fd = unsafe { std::os::fd::BorrowedFd::borrow_raw(socket.as_raw_fd()) };
|
||||
|
||||
setsockopt(&fd, UdpGsoSegment, &(segment_size as i32)).is_ok()
|
||||
}
|
||||
|
||||
/// Set SO_TXTIME socket option.
|
||||
///
|
||||
/// This socket option is set to send to kernel the outgoing UDP
|
||||
/// packet transmission time in the sendmsg syscall.
|
||||
///
|
||||
/// Note that this socket option is set only on linux platforms.
|
||||
#[cfg(target_os = "linux")]
|
||||
pub fn set_txtime_sockopt(sock: &mio::net::UdpSocket) -> std::io::Result<()> {
|
||||
use nix::sys::socket::setsockopt;
|
||||
use nix::sys::socket::sockopt::TxTime;
|
||||
use std::os::unix::io::AsRawFd;
|
||||
|
||||
let config = nix::libc::sock_txtime {
|
||||
clockid: libc::CLOCK_MONOTONIC,
|
||||
flags: 0,
|
||||
};
|
||||
|
||||
// mio::net::UdpSocket doesn't implement AsFd (yet?).
|
||||
let fd = unsafe { std::os::fd::BorrowedFd::borrow_raw(sock.as_raw_fd()) };
|
||||
|
||||
setsockopt(&fd, TxTime, &config)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(not(target_os = "linux"))]
|
||||
pub fn set_txtime_sockopt(_: &mio::net::UdpSocket) -> io::Result<()> {
|
||||
use std::io::Error;
|
||||
use std::io::ErrorKind;
|
||||
|
||||
Err(Error::new(
|
||||
ErrorKind::Other,
|
||||
"Not supported on this platform",
|
||||
))
|
||||
}
|
||||
|
||||
const NANOS_PER_SEC: u64 = 1_000_000_000;
|
||||
|
||||
const INSTANT_ZERO: std::time::Instant = unsafe { std::mem::transmute(std::time::UNIX_EPOCH) };
|
||||
|
||||
pub fn std_time_to_u64(time: &std::time::Instant) -> u64 {
|
||||
let raw_time = time.duration_since(INSTANT_ZERO);
|
||||
let sec = raw_time.as_secs();
|
||||
let nsec = raw_time.subsec_nanos();
|
||||
sec * NANOS_PER_SEC + nsec as u64
|
||||
}
|
||||
|
||||
pub fn send_with_pacing(
|
||||
socket: &mio::net::UdpSocket,
|
||||
buf: &[u8],
|
||||
send_info: &quiche::SendInfo,
|
||||
enable_gso: bool,
|
||||
segment_size: u16,
|
||||
) -> std::io::Result<usize> {
|
||||
use nix::sys::socket::sendmsg;
|
||||
use nix::sys::socket::ControlMessage;
|
||||
use nix::sys::socket::MsgFlags;
|
||||
use nix::sys::socket::SockaddrStorage;
|
||||
use std::io::IoSlice;
|
||||
use std::os::unix::io::AsRawFd;
|
||||
|
||||
let iov = [IoSlice::new(buf)];
|
||||
let dst = SockaddrStorage::from(send_info.to);
|
||||
let sockfd = socket.as_raw_fd();
|
||||
|
||||
// Pacing option.
|
||||
let send_time = std_time_to_u64(&send_info.at);
|
||||
let cmsg_txtime = ControlMessage::TxTime(&send_time);
|
||||
|
||||
let mut cmgs = vec![cmsg_txtime];
|
||||
if enable_gso {
|
||||
cmgs.push(ControlMessage::UdpGsoSegments(&segment_size));
|
||||
}
|
||||
|
||||
match sendmsg(sockfd, &iov, &cmgs, MsgFlags::empty(), Some(&dst)) {
|
||||
Ok(v) => Ok(v),
|
||||
Err(e) => Err(e.into()),
|
||||
}
|
||||
}
|
||||
|
||||
// 16 MB per buffer
|
||||
pub const SEND_BUFFER_LEN: usize = 32 * 1024 * 1024;
|
||||
pub type StreamBufferMap<const BUFFER_LEN: usize> = BTreeMap<u64, StreamBuffer<BUFFER_LEN>>;
|
||||
|
|
|
@ -21,10 +21,8 @@ boring = { workspace = true }
|
|||
mio = { workspace = true }
|
||||
mio_channel = { workspace = true }
|
||||
|
||||
libc = "0.2"
|
||||
nix = { version = "0.27", features = ["net", "socket", "uio"] }
|
||||
|
||||
quic-geyser-common = { workspace = true }
|
||||
|
||||
prometheus = { workspace = true }
|
||||
lazy_static = { workspace = true }
|
||||
|
||||
|
|
|
@ -46,15 +46,19 @@ use quic_geyser_quiche_utils::quiche_reciever::recv_message;
|
|||
use quic_geyser_quiche_utils::quiche_reciever::ReadStreams;
|
||||
use quic_geyser_quiche_utils::quiche_sender::handle_writable;
|
||||
use quic_geyser_quiche_utils::quiche_sender::send_message;
|
||||
use quic_geyser_quiche_utils::quiche_utils::detect_gso;
|
||||
use quic_geyser_quiche_utils::quiche_utils::generate_cid_and_reset_token;
|
||||
use quic_geyser_quiche_utils::quiche_utils::get_next_unidi;
|
||||
use quic_geyser_quiche_utils::quiche_utils::handle_path_events;
|
||||
use quic_geyser_quiche_utils::quiche_utils::mint_token;
|
||||
use quic_geyser_quiche_utils::quiche_utils::send_with_pacing;
|
||||
use quic_geyser_quiche_utils::quiche_utils::set_txtime_sockopt;
|
||||
use quic_geyser_quiche_utils::quiche_utils::validate_token;
|
||||
use quic_geyser_quiche_utils::quiche_utils::StreamBufferMap;
|
||||
use quic_geyser_quiche_utils::quiche_utils::SEND_BUFFER_LEN;
|
||||
use quiche::ConnectionId;
|
||||
use ring::rand::*;
|
||||
use std::collections::HashMap;
|
||||
use std::io;
|
||||
use std::net;
|
||||
use std::net::SocketAddr;
|
||||
|
||||
lazy_static::lazy_static! {
|
||||
|
@ -556,6 +560,8 @@ pub fn server_loop(
|
|||
match message {
|
||||
Ok(Some(messages)) => {
|
||||
for message in messages {
|
||||
let message = bincode::deserialize::<Message>(&message)
|
||||
.expect("Should be a message");
|
||||
match message {
|
||||
Message::Filters(mut f) => {
|
||||
client.filters.append(&mut f);
|
||||
|
@ -604,7 +610,7 @@ pub fn server_loop(
|
|||
}
|
||||
}
|
||||
|
||||
handle_path_events(client);
|
||||
handle_path_events(&mut client.conn);
|
||||
|
||||
// See whether source Connection IDs have been retired.
|
||||
while let Some(retired_scid) = client.conn.retired_scid_next() {
|
||||
|
@ -770,215 +776,3 @@ pub fn server_loop(
|
|||
});
|
||||
}
|
||||
}
|
||||
|
||||
/// Generate a stateless retry token.
|
||||
///
|
||||
/// The token includes the static string `"quiche"` followed by the IP address
|
||||
/// of the client and by the original destination connection ID generated by the
|
||||
/// client.
|
||||
///
|
||||
/// Note that this function is only an example and doesn't do any cryptographic
|
||||
/// authenticate of the token. *It should not be used in production system*.
|
||||
fn mint_token(hdr: &quiche::Header, src: &net::SocketAddr) -> Vec<u8> {
|
||||
let mut token = Vec::new();
|
||||
|
||||
token.extend_from_slice(b"quiche");
|
||||
|
||||
let addr = match src.ip() {
|
||||
std::net::IpAddr::V4(a) => a.octets().to_vec(),
|
||||
std::net::IpAddr::V6(a) => a.octets().to_vec(),
|
||||
};
|
||||
|
||||
token.extend_from_slice(&addr);
|
||||
token.extend_from_slice(&hdr.dcid);
|
||||
|
||||
token
|
||||
}
|
||||
|
||||
/// Validates a stateless retry token.
|
||||
///
|
||||
/// This checks that the ticket includes the `"quiche"` static string, and that
|
||||
/// the client IP address matches the address stored in the ticket.
|
||||
///
|
||||
/// Note that this function is only an example and doesn't do any cryptographic
|
||||
/// authenticate of the token. *It should not be used in production system*.
|
||||
fn validate_token<'a>(src: &net::SocketAddr, token: &'a [u8]) -> Option<quiche::ConnectionId<'a>> {
|
||||
if token.len() < 6 {
|
||||
return None;
|
||||
}
|
||||
|
||||
if &token[..6] != b"quiche" {
|
||||
return None;
|
||||
}
|
||||
|
||||
let token = &token[6..];
|
||||
|
||||
let addr = match src.ip() {
|
||||
std::net::IpAddr::V4(a) => a.octets().to_vec(),
|
||||
std::net::IpAddr::V6(a) => a.octets().to_vec(),
|
||||
};
|
||||
|
||||
if token.len() < addr.len() || &token[..addr.len()] != addr.as_slice() {
|
||||
return None;
|
||||
}
|
||||
|
||||
Some(quiche::ConnectionId::from_ref(&token[addr.len()..]))
|
||||
}
|
||||
|
||||
fn handle_path_events(client: &mut Client) {
|
||||
while let Some(qe) = client.conn.path_event_next() {
|
||||
match qe {
|
||||
quiche::PathEvent::New(local_addr, peer_addr) => {
|
||||
log::info!(
|
||||
"{} Seen new path ({}, {})",
|
||||
client.conn.trace_id(),
|
||||
local_addr,
|
||||
peer_addr
|
||||
);
|
||||
|
||||
// Directly probe the new path.
|
||||
client
|
||||
.conn
|
||||
.probe_path(local_addr, peer_addr)
|
||||
.expect("cannot probe");
|
||||
}
|
||||
|
||||
quiche::PathEvent::Validated(local_addr, peer_addr) => {
|
||||
log::info!(
|
||||
"{} Path ({}, {}) is now validated",
|
||||
client.conn.trace_id(),
|
||||
local_addr,
|
||||
peer_addr
|
||||
);
|
||||
}
|
||||
|
||||
quiche::PathEvent::FailedValidation(local_addr, peer_addr) => {
|
||||
log::info!(
|
||||
"{} Path ({}, {}) failed validation",
|
||||
client.conn.trace_id(),
|
||||
local_addr,
|
||||
peer_addr
|
||||
);
|
||||
}
|
||||
|
||||
quiche::PathEvent::Closed(local_addr, peer_addr) => {
|
||||
log::info!(
|
||||
"{} Path ({}, {}) is now closed and unusable",
|
||||
client.conn.trace_id(),
|
||||
local_addr,
|
||||
peer_addr
|
||||
);
|
||||
}
|
||||
|
||||
quiche::PathEvent::ReusedSourceConnectionId(cid_seq, old, new) => {
|
||||
log::info!(
|
||||
"{} Peer reused cid seq {} (initially {:?}) on {:?}",
|
||||
client.conn.trace_id(),
|
||||
cid_seq,
|
||||
old,
|
||||
new
|
||||
);
|
||||
}
|
||||
|
||||
quiche::PathEvent::PeerMigrated(local_addr, peer_addr) => {
|
||||
log::info!(
|
||||
"{} Connection migrated to ({}, {})",
|
||||
client.conn.trace_id(),
|
||||
local_addr,
|
||||
peer_addr
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn detect_gso(socket: &mio::net::UdpSocket, segment_size: usize) -> bool {
|
||||
use nix::sys::socket::setsockopt;
|
||||
use nix::sys::socket::sockopt::UdpGsoSegment;
|
||||
use std::os::unix::io::AsRawFd;
|
||||
|
||||
// mio::net::UdpSocket doesn't implement AsFd (yet?).
|
||||
let fd = unsafe { std::os::fd::BorrowedFd::borrow_raw(socket.as_raw_fd()) };
|
||||
|
||||
setsockopt(&fd, UdpGsoSegment, &(segment_size as i32)).is_ok()
|
||||
}
|
||||
|
||||
/// Set SO_TXTIME socket option.
|
||||
///
|
||||
/// This socket option is set to send to kernel the outgoing UDP
|
||||
/// packet transmission time in the sendmsg syscall.
|
||||
///
|
||||
/// Note that this socket option is set only on linux platforms.
|
||||
#[cfg(target_os = "linux")]
|
||||
fn set_txtime_sockopt(sock: &mio::net::UdpSocket) -> io::Result<()> {
|
||||
use nix::sys::socket::setsockopt;
|
||||
use nix::sys::socket::sockopt::TxTime;
|
||||
use std::os::unix::io::AsRawFd;
|
||||
|
||||
let config = nix::libc::sock_txtime {
|
||||
clockid: libc::CLOCK_MONOTONIC,
|
||||
flags: 0,
|
||||
};
|
||||
|
||||
// mio::net::UdpSocket doesn't implement AsFd (yet?).
|
||||
let fd = unsafe { std::os::fd::BorrowedFd::borrow_raw(sock.as_raw_fd()) };
|
||||
|
||||
setsockopt(&fd, TxTime, &config)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(not(target_os = "linux"))]
|
||||
fn set_txtime_sockopt(_: &mio::net::UdpSocket) -> io::Result<()> {
|
||||
use std::io::Error;
|
||||
use std::io::ErrorKind;
|
||||
|
||||
Err(Error::new(
|
||||
ErrorKind::Other,
|
||||
"Not supported on this platform",
|
||||
))
|
||||
}
|
||||
|
||||
const NANOS_PER_SEC: u64 = 1_000_000_000;
|
||||
|
||||
const INSTANT_ZERO: std::time::Instant = unsafe { std::mem::transmute(std::time::UNIX_EPOCH) };
|
||||
|
||||
fn std_time_to_u64(time: &std::time::Instant) -> u64 {
|
||||
let raw_time = time.duration_since(INSTANT_ZERO);
|
||||
let sec = raw_time.as_secs();
|
||||
let nsec = raw_time.subsec_nanos();
|
||||
sec * NANOS_PER_SEC + nsec as u64
|
||||
}
|
||||
|
||||
fn send_with_pacing(
|
||||
socket: &mio::net::UdpSocket,
|
||||
buf: &[u8],
|
||||
send_info: &quiche::SendInfo,
|
||||
enable_gso: bool,
|
||||
segment_size: u16,
|
||||
) -> std::io::Result<usize> {
|
||||
use nix::sys::socket::sendmsg;
|
||||
use nix::sys::socket::ControlMessage;
|
||||
use nix::sys::socket::MsgFlags;
|
||||
use nix::sys::socket::SockaddrStorage;
|
||||
use std::io::IoSlice;
|
||||
use std::os::unix::io::AsRawFd;
|
||||
|
||||
let iov = [IoSlice::new(buf)];
|
||||
let dst = SockaddrStorage::from(send_info.to);
|
||||
let sockfd = socket.as_raw_fd();
|
||||
|
||||
// Pacing option.
|
||||
let send_time = std_time_to_u64(&send_info.at);
|
||||
let cmsg_txtime = ControlMessage::TxTime(&send_time);
|
||||
|
||||
let mut cmgs = vec![cmsg_txtime];
|
||||
if enable_gso {
|
||||
cmgs.push(ControlMessage::UdpGsoSegments(&segment_size));
|
||||
}
|
||||
|
||||
match sendmsg(sockfd, &iov, &cmgs, MsgFlags::empty(), Some(&dst)) {
|
||||
Ok(v) => Ok(v),
|
||||
Err(e) => Err(e.into()),
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue