Merge pull request #5 from blockworks-foundation/implementing_pacing

Implement pacing, adding option to change cc algorithm
This commit is contained in:
galactus 2024-06-11 14:12:15 +02:00 committed by GitHub
commit 611fef8b1a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 125 additions and 75 deletions

14
Cargo.lock generated
View File

@ -2165,6 +2165,18 @@ dependencies = [
"mio",
]
[[package]]
name = "nix"
version = "0.27.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2eb04e9c688eff1c89d72b407f168cf79bb9e867a9d3323ed6c01519eb9cc053"
dependencies = [
"bitflags 2.5.0",
"cfg-if",
"libc",
"memoffset",
]
[[package]]
name = "nom"
version = "7.1.3"
@ -2763,9 +2775,11 @@ dependencies = [
"bincode",
"boring",
"itertools",
"libc",
"log",
"mio",
"mio_channel",
"nix",
"quic-geyser-common",
"quic-geyser-quiche-utils",
"quiche",

View File

@ -278,7 +278,7 @@ mod tests {
};
use itertools::Itertools;
use quic_geyser_server::{configure_server::configure_server, quiche_server_loop::server_loop};
use quic_geyser_server::quiche_server_loop::server_loop;
use solana_sdk::{account::Account, pubkey::Pubkey};
use quic_geyser_common::{
@ -371,9 +371,8 @@ mod tests {
// server loop
let (server_send_queue, rx_sent_queue) = mpsc::channel::<ChannelMessage>();
let _server_loop_jh = std::thread::spawn(move || {
let config = configure_server(QuicParameters::default()).unwrap();
if let Err(e) = server_loop(
config,
QuicParameters::default(),
socket_addr,
rx_sent_queue,
CompressionType::Lz4Fast(8),

View File

@ -5,8 +5,9 @@ use serde::{Deserialize, Serialize};
use crate::{
compression::CompressionType,
defaults::{
DEFAULT_ACK_EXPONENT, DEFAULT_CONNECTION_TIMEOUT, DEFAULT_MAX_ACK_DELAY,
DEFAULT_MAX_NB_CONNECTIONS, DEFAULT_MAX_RECIEVE_WINDOW_SIZE, DEFAULT_MAX_STREAMS,
DEFAULT_ACK_EXPONENT, DEFAULT_CONNECTION_TIMEOUT, DEFAULT_ENABLE_PACING,
DEFAULT_MAX_ACK_DELAY, DEFAULT_MAX_NB_CONNECTIONS, DEFAULT_MAX_RECIEVE_WINDOW_SIZE,
DEFAULT_MAX_STREAMS, DEFAULT_USE_CC_BBR,
},
};
@ -62,6 +63,8 @@ pub struct QuicParameters {
pub max_number_of_connections: u64,
pub max_ack_delay: u64,
pub ack_exponent: u64,
pub enable_pacing: bool,
pub use_cc_bbr: bool,
}
impl Default for QuicParameters {
@ -73,6 +76,8 @@ impl Default for QuicParameters {
max_number_of_connections: DEFAULT_MAX_NB_CONNECTIONS,
max_ack_delay: DEFAULT_MAX_ACK_DELAY,
ack_exponent: DEFAULT_ACK_EXPONENT,
enable_pacing: DEFAULT_ENABLE_PACING,
use_cc_bbr: DEFAULT_USE_CC_BBR,
}
}
}

View File

@ -3,7 +3,9 @@ pub const MAX_ALLOWED_PARTIAL_RESPONSES: u64 = DEFAULT_MAX_STREAMS - 1;
pub const DEFAULT_MAX_RECIEVE_WINDOW_SIZE: u64 = 256 * 1024 * 1024; // 256 MBs
pub const DEFAULT_CONNECTION_TIMEOUT: u64 = 10;
pub const DEFAULT_MAX_NB_CONNECTIONS: u64 = 10;
pub const DEFAULT_MAX_ACK_DELAY: u64 = 100;
pub const DEFAULT_MAX_ACK_DELAY: u64 = 200;
pub const DEFAULT_ACK_EXPONENT: u64 = 3;
pub const ALPN_GEYSER_PROTOCOL_ID: &[u8] = b"geyser";
pub const MAX_DATAGRAM_SIZE: usize = 1350; // MAX: 65527
pub const DEFAULT_ENABLE_PACING: bool = true;
pub const DEFAULT_USE_CC_BBR: bool = false;

View File

@ -49,6 +49,7 @@ pub fn main() -> anyhow::Result<()> {
max_number_of_connections: args.max_number_of_connections,
max_ack_delay: args.max_ack_delay,
ack_exponent: args.ack_exponent,
..Default::default()
},
compression_parameters: CompressionParameters {
compression_type: quic_geyser_common::compression::CompressionType::Lz4Fast(

View File

@ -1,4 +1,4 @@
use std::{collections::HashMap, net::SocketAddr};
use std::collections::HashMap;
pub fn validate_token<'a>(
src: &std::net::SocketAddr,
@ -89,20 +89,3 @@ pub struct PartialResponse {
}
pub type PartialResponses = HashMap<u64, PartialResponse>;
// returns true if the socket will block the writing of socket
// return false otherwise
pub fn write_to_socket(socket: &mio::net::UdpSocket, buf: &[u8], to: SocketAddr) -> bool {
match socket.send_to(buf, to) {
Ok(_len) => false,
Err(e) => {
if e.kind() == std::io::ErrorKind::WouldBlock {
log::warn!("writing would block");
true
} else {
log::error!("send() failed: {:?}", e);
false
}
}
}
}

View File

@ -22,6 +22,9 @@ boring = { workspace = true }
mio = { workspace = true }
mio_channel = { workspace = true }
libc = "0.2"
nix = { version = "0.27", features = ["net", "socket", "uio"] }
quic-geyser-common = { workspace = true }
[dev-dependencies]

View File

@ -12,6 +12,8 @@ pub fn configure_server(quic_parameter: QuicParameters) -> anyhow::Result<quiche
let max_number_of_connections = quic_parameter.max_number_of_connections;
let maximum_ack_delay = quic_parameter.max_ack_delay;
let ack_exponent = quic_parameter.ack_exponent;
let enable_pacing = quic_parameter.enable_pacing;
let use_bbr = quic_parameter.use_cc_bbr;
let cert = rcgen::generate_simple_self_signed(vec!["quic_geyser".into()]).unwrap();
@ -43,12 +45,18 @@ pub fn configure_server(quic_parameter: QuicParameters) -> anyhow::Result<quiche
config.set_disable_active_migration(true);
config.set_max_connection_window(128 * 1024 * 1024); // 128 Mbs
config.enable_early_data();
config.set_cc_algorithm(quiche::CongestionControlAlgorithm::BBR2);
if use_bbr {
config.set_cc_algorithm(quiche::CongestionControlAlgorithm::BBR2);
} else {
config.set_cc_algorithm(quiche::CongestionControlAlgorithm::CUBIC);
}
config.set_active_connection_id_limit(max_number_of_connections);
config.set_max_ack_delay(maximum_ack_delay);
config.set_ack_delay_exponent(ack_exponent);
config.set_initial_congestion_window_packets(1024);
config.set_max_stream_window(256 * 1024 * 1024);
config.enable_pacing(false);
config.enable_pacing(enable_pacing);
Ok(config)
}

View File

@ -1,9 +1,7 @@
use std::{fmt::Debug, sync::mpsc};
use crate::configure_server::configure_server;
use quic_geyser_common::{
channel_message::ChannelMessage, config::ConfigQuicPlugin, plugin_error::QuicGeyserError,
};
use std::{fmt::Debug, sync::mpsc};
use super::quiche_server_loop::server_loop;
pub struct QuicServer {
@ -19,15 +17,15 @@ impl Debug for QuicServer {
impl QuicServer {
pub fn new(config: ConfigQuicPlugin) -> anyhow::Result<Self> {
let server_config = configure_server(config.quic_parameters)?;
let socket = config.address;
let compression_type = config.compression_parameters.compression_type;
let quic_parameters = config.quic_parameters;
let (data_channel_sender, data_channel_tx) = mpsc::channel();
let _server_loop_jh = std::thread::spawn(move || {
if let Err(e) = server_loop(
server_config,
quic_parameters,
socket,
data_channel_tx,
compression_type,

View File

@ -1,5 +1,5 @@
use std::{
collections::{BTreeMap, HashMap},
collections::HashMap,
net::SocketAddr,
sync::{
atomic::{AtomicBool, AtomicU64, AtomicUsize},
@ -17,6 +17,7 @@ use ring::rand::SystemRandom;
use quic_geyser_common::{
channel_message::ChannelMessage,
compression::CompressionType,
config::QuicParameters,
defaults::{MAX_ALLOWED_PARTIAL_RESPONSES, MAX_DATAGRAM_SIZE},
filters::Filter,
message::Message,
@ -26,9 +27,11 @@ use quic_geyser_common::{
use quic_geyser_quiche_utils::{
quiche_reciever::{recv_message, ReadStreams},
quiche_sender::{handle_writable, send_message},
quiche_utils::{get_next_unidi, mint_token, validate_token, write_to_socket, PartialResponses},
quiche_utils::{get_next_unidi, mint_token, validate_token, PartialResponses},
};
use crate::configure_server::configure_server;
struct DispatchingData {
pub sender: Sender<(Vec<u8>, u8)>,
pub filters: Arc<RwLock<Vec<Filter>>>,
@ -37,21 +40,15 @@ struct DispatchingData {
type DispachingConnections = Arc<Mutex<HashMap<ConnectionId<'static>, DispatchingData>>>;
const ACCEPTABLE_PACING_DELAY: Duration = Duration::from_millis(100);
struct Packet {
pub buffer: Vec<u8>,
pub to: SocketAddr,
}
pub fn server_loop(
mut config: quiche::Config,
quic_params: QuicParameters,
socket_addr: SocketAddr,
message_send_queue: mpsc::Receiver<ChannelMessage>,
compression_type: CompressionType,
stop_laggy_client: bool,
) -> anyhow::Result<()> {
let maximum_concurrent_streams_id = u64::MAX;
let mut config = configure_server(quic_params)?;
let mut socket = mio::net::UdpSocket::bind(socket_addr)?;
let mut poll = mio::Poll::new()?;
@ -82,7 +79,11 @@ pub fn server_loop(
// mio::Interest::READABLE,
// )?;
let mut packets_to_send: BTreeMap<Instant, Packet> = BTreeMap::new();
let enable_pacing = if quic_params.enable_pacing {
set_txtime_sockopt(&socket).is_ok()
} else {
false
};
let dispatching_connections: DispachingConnections = Arc::new(Mutex::new(HashMap::<
ConnectionId<'static>,
@ -169,8 +170,8 @@ pub fn server_loop(
)
.unwrap();
if write_to_socket(&socket, &out[..len], from) {
break;
if let Err(e) = socket.send_to(&out[..len], from) {
log::error!("Error sending retry messages : {e:?}");
}
continue 'read;
}
@ -253,38 +254,19 @@ pub fn server_loop(
};
}
let instant = Instant::now();
// send remaining packets
// log::debug!("packets to send : {}", packets_to_send.len());
while let Some((to_send, packet)) = packets_to_send.first_key_value() {
if instant + ACCEPTABLE_PACING_DELAY <= *to_send {
break;
}
if write_to_socket(&socket, &packet.buffer, packet.to) {
break;
}
packets_to_send.pop_first();
}
let instant = Instant::now();
while let Ok((send_info, buffer)) = write_reciver.try_recv() {
if send_info.at > instant + ACCEPTABLE_PACING_DELAY {
packets_to_send.insert(
send_info.at,
Packet {
buffer,
to: send_info.to,
},
);
} else if write_to_socket(&socket, &buffer, send_info.to) {
packets_to_send.insert(
send_info.at,
Packet {
buffer,
to: send_info.to,
},
);
break;
let send_result = if enable_pacing {
send_with_pacing(&socket, &buffer, &send_info)
} else {
socket.send_to(&buffer, send_info.to)
};
match send_result {
Ok(written) => {
log::debug!("written {written:?} to {:?}", send_info.to);
}
Err(e) => {
log::error!("sending failed with error : {e:?}");
}
}
}
}
@ -598,3 +580,58 @@ fn create_dispatching_thread(
}
});
}
fn set_txtime_sockopt(sock: &mio::net::UdpSocket) -> std::io::Result<()> {
use nix::sys::socket::setsockopt;
use nix::sys::socket::sockopt::TxTime;
use std::os::unix::io::AsRawFd;
let config = nix::libc::sock_txtime {
clockid: libc::CLOCK_MONOTONIC,
flags: 0,
};
// mio::net::UdpSocket doesn't implement AsFd (yet?).
let fd = unsafe { std::os::fd::BorrowedFd::borrow_raw(sock.as_raw_fd()) };
setsockopt(&fd, TxTime, &config)?;
Ok(())
}
const NANOS_PER_SEC: u64 = 1_000_000_000;
const INSTANT_ZERO: std::time::Instant = unsafe { std::mem::transmute(std::time::UNIX_EPOCH) };
fn std_time_to_u64(time: &std::time::Instant) -> u64 {
let raw_time = time.duration_since(INSTANT_ZERO);
let sec = raw_time.as_secs();
let nsec = raw_time.subsec_nanos();
sec * NANOS_PER_SEC + nsec as u64
}
fn send_with_pacing(
socket: &mio::net::UdpSocket,
buf: &[u8],
send_info: &quiche::SendInfo,
) -> std::io::Result<usize> {
use nix::sys::socket::sendmsg;
use nix::sys::socket::ControlMessage;
use nix::sys::socket::MsgFlags;
use nix::sys::socket::SockaddrStorage;
use std::io::IoSlice;
use std::os::unix::io::AsRawFd;
let iov = [IoSlice::new(buf)];
let dst = SockaddrStorage::from(send_info.to);
let sockfd = socket.as_raw_fd();
// Pacing option.
let send_time = std_time_to_u64(&send_info.at);
let cmsg_txtime = ControlMessage::TxTime(&send_time);
match sendmsg(sockfd, &iov, &[cmsg_txtime], MsgFlags::empty(), Some(&dst)) {
Ok(v) => Ok(v),
Err(e) => Err(e.into()),
}
}