lot of changes related to optimization of quiche

This commit is contained in:
godmodegalactus 2024-05-24 11:16:11 +02:00
parent f8f35095e3
commit f667666536
No known key found for this signature in database
GPG Key ID: 22DA4A30887FDA3C
19 changed files with 193 additions and 148 deletions

3
Cargo.lock generated
View File

@ -2473,7 +2473,6 @@ dependencies = [
"log",
"quic-geyser-client",
"quic-geyser-common",
"quic-geyser-plugin",
"serde",
"serde_json",
"solana-rpc-client",
@ -2490,9 +2489,7 @@ dependencies = [
"clap",
"itertools",
"log",
"quic-geyser-client",
"quic-geyser-common",
"quic-geyser-plugin",
"rand 0.8.5",
"serde",
"serde_json",

View File

@ -42,7 +42,6 @@ impl Client {
connection_parameters.max_number_of_streams,
) {
log::error!("client stopped with error {e}");
println!("client stopped with error {e}");
}
is_connected_client.store(false, std::sync::atomic::Ordering::Relaxed);
});
@ -146,7 +145,7 @@ mod tests {
write_version: account.write_version,
},
account.slot_identifier.slot,
false,
vec![8, 6, 3, 1],
),
)
.unwrap();

View File

@ -13,7 +13,7 @@ pub struct AccountData {
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ChannelMessage {
Account(AccountData, Slot, bool),
Account(AccountData, Slot, Vec<u8>),
Slot(u64, u64, CommitmentLevel),
BlockMeta(BlockMeta),
Transaction(Box<Transaction>),

View File

@ -13,3 +13,29 @@ impl Default for CompressionType {
Self::Lz4Fast(8)
}
}
impl CompressionType {
pub fn compress(&self, data: &[u8]) -> Vec<u8> {
if data.is_empty() {
return vec![];
}
match self {
CompressionType::None => data.to_vec(),
CompressionType::Lz4Fast(speed) => lz4::block::compress(
data,
Some(lz4::block::CompressionMode::FAST(*speed as i32)),
true,
)
.expect("Compression should work"),
CompressionType::Lz4(compression) => lz4::block::compress(
data,
Some(lz4::block::CompressionMode::HIGHCOMPRESSION(
*compression as i32,
)),
true,
)
.expect("compression should work"),
}
}
}

View File

@ -147,7 +147,7 @@ mod tests {
write_version: 0,
},
0,
false,
vec![2, 3, 4, 6],
);
let msg_2 = ChannelMessage::Account(
@ -157,7 +157,7 @@ mod tests {
write_version: 0,
},
0,
false,
vec![2, 3, 4, 6],
);
let msg_3 = ChannelMessage::Account(
@ -167,7 +167,7 @@ mod tests {
write_version: 0,
},
0,
false,
vec![2, 3, 4, 6],
);
let f1 = AccountFilter {

View File

@ -20,4 +20,5 @@ pub enum Message {
BlockMetaMsg(BlockMeta),
TransactionMsg(Box<Transaction>),
Filters(Vec<Filter>), // sent from client to server
Ping,
}

View File

@ -1,7 +1,7 @@
use boring::ssl::SslMethod;
pub const ALPN_GEYSER_PROTOCOL_ID: &[u8] = b"geyser";
pub const MAX_DATAGRAM_SIZE: usize = 65527;
pub const MAX_DATAGRAM_SIZE: usize = 65535;
pub fn configure_server(
max_concurrent_streams: u64,

View File

@ -1,13 +1,14 @@
use std::fmt::Debug;
use crate::{
channel_message::ChannelMessage, config::ConfigQuicPlugin, plugin_error::QuicGeyserError,
quic::configure_server::configure_server,
channel_message::ChannelMessage, compression::CompressionType, config::ConfigQuicPlugin,
plugin_error::QuicGeyserError, quic::configure_server::configure_server,
};
use super::quiche_server_loop::server_loop;
pub struct QuicServer {
data_channel_sender: mio_channel::Sender<ChannelMessage>,
compression_type: CompressionType,
}
impl Debug for QuicServer {
@ -43,6 +44,7 @@ impl QuicServer {
Ok(QuicServer {
data_channel_sender,
compression_type,
})
}
@ -51,4 +53,8 @@ impl QuicServer {
.send(message)
.map_err(|_| QuicGeyserError::MessageChannelClosed)
}
pub fn get_compression_type(&self) -> CompressionType {
self.compression_type
}
}

View File

@ -1,6 +1,7 @@
use std::{
net::SocketAddr,
sync::{atomic::AtomicBool, Arc},
time::{Duration, Instant},
};
use crate::{
@ -45,7 +46,7 @@ pub fn client_loop(
log::info!("connecing client with quiche");
let scid = quiche::ConnectionId::from_ref(&scid);
let local_addr = socket.local_addr().unwrap();
let local_addr = socket.local_addr()?;
let mut conn = quiche::connect(None, &scid, local_addr, server_address, &mut config)?;
@ -62,17 +63,17 @@ pub fn client_loop(
let mut partial_responses = PartialResponses::new();
let mut read_streams = ReadStreams::new();
let mut connected = false;
let mut instance = Instant::now();
let ping_binary = bincode::serialize(&Message::Ping)?;
loop {
poll.poll(&mut events, conn.timeout()).unwrap();
poll.poll(&mut events, conn.timeout())?;
let network_updates = true;
let channel_updates = true;
if network_updates {
'read: loop {
if events.is_empty() {
log::debug!("timed out");
conn.on_timeout();
break 'read;
}
@ -81,33 +82,24 @@ pub fn client_loop(
Ok(v) => v,
Err(e) => {
if e.kind() == std::io::ErrorKind::WouldBlock {
log::debug!("recv() would block");
break 'read;
}
panic!("recv() failed: {:?}", e);
bail!("recv() failed: {:?}", e);
}
};
log::debug!("got {} bytes", len);
let recv_info = quiche::RecvInfo {
to: socket.local_addr().unwrap(),
to: socket.local_addr()?,
from,
};
// Process potentially coalesced packets.
let read = match conn.recv(&mut buf[..len], recv_info) {
Ok(v) => v,
Err(e) => {
if let Err(e) = conn.recv(&mut buf[..len], recv_info) {
log::error!("recv failed: {:?}", e);
continue 'read;
}
};
log::debug!("processed {} bytes", read);
}
}
// io events
for stream_id in conn.readable() {
let message = recv_message(&mut conn, &mut read_streams, stream_id);
@ -115,7 +107,6 @@ pub fn client_loop(
Ok(Some(message)) => {
if let Err(e) = message_recv_queue.send(message) {
log::error!("Error sending message on the channel : {e}");
println!("Error sending message on the channel : {e}");
}
}
Ok(None) => {
@ -130,8 +121,6 @@ pub fn client_loop(
for stream_id in conn.writable() {
handle_writable(&mut conn, &mut partial_responses, stream_id);
}
}
if !connected && conn.is_established() {
is_connected.store(true, std::sync::atomic::Ordering::Relaxed);
connected = true;
@ -139,6 +128,22 @@ pub fn client_loop(
// chanel updates
if channel_updates && conn.is_established() {
let current_instance = Instant::now();
// sending ping filled
if current_instance.duration_since(instance) > Duration::from_secs(5) {
log::debug!("sending ping to the server");
instance = current_instance;
current_stream_id = get_next_unidi(current_stream_id, false, maximum_streams);
if let Err(e) = send_message(
&mut conn,
&mut partial_responses,
current_stream_id,
&ping_binary,
) {
log::error!("sending ping failed with error {e:?}");
}
}
// channel events
if let Ok(message_to_send) = message_send_queue.try_recv() {
current_stream_id = get_next_unidi(current_stream_id, false, maximum_streams);
@ -162,7 +167,6 @@ pub fn client_loop(
Ok(v) => v,
Err(quiche::Error::Done) => {
log::debug!("done writing");
break;
}
@ -173,21 +177,21 @@ pub fn client_loop(
}
};
if let Err(e) = socket.send_to(&out[..write], send_info.to) {
match socket.send_to(&out[..write], send_info.to) {
Ok(_len) => {}
Err(e) => {
if e.kind() == std::io::ErrorKind::WouldBlock {
log::debug!("send() would block");
break;
}
bail!("send fail");
log::error!("send() failed: {:?}", e);
}
}
log::debug!("written {}", write);
}
if conn.is_closed() {
is_connected.store(false, std::sync::atomic::Ordering::Relaxed);
log::info!("connection closed, {:?}", conn.stats());
println!("Connection closed {:?}", conn.stats());
break;
}
}
@ -248,7 +252,7 @@ mod tests {
write_version: 1,
},
5,
false,
vec![2, 3, 4, 6],
);
let message_3 = ChannelMessage::Account(
@ -264,7 +268,7 @@ mod tests {
write_version: 1,
},
5,
false,
vec![2, 3, 4, 6],
);
let message_4 = ChannelMessage::Account(
@ -280,7 +284,7 @@ mod tests {
write_version: 1,
},
5,
false,
vec![2, 3, 4, 6],
);
let message_5 = ChannelMessage::Account(
@ -296,7 +300,7 @@ mod tests {
write_version: 1,
},
5,
false,
vec![2, 3, 4, 6],
);
// server loop

View File

@ -25,11 +25,16 @@ pub fn recv_message(
let mut buf = [0; MAX_DATAGRAM_SIZE]; // 10kk buffer size
match connection.stream_recv(stream_id, &mut buf) {
Ok((read, fin)) => {
log::debug!("read {} on stream {}", read, stream_id);
log::trace!("read {} on stream {}", read, stream_id);
total_buf.extend_from_slice(&buf[..read]);
if fin {
log::debug!("fin stream : {}", stream_id);
return Ok(Some(bincode::deserialize::<Message>(&total_buf)?));
log::trace!("fin stream : {}", stream_id);
match bincode::deserialize::<Message>(&total_buf) {
Ok(message) => return Ok(Some(message)),
Err(e) => {
bail!("Error deserializing stream {stream_id} error: {e:?}");
}
}
}
}
Err(e) => {

View File

@ -1,9 +1,6 @@
use anyhow::bail;
use quiche::Connection;
use crate::{message::Message, quic::quiche_utils::PartialResponse};
use super::quiche_utils::PartialResponses;
use crate::{message::Message, quic::quiche_utils::PartialResponse};
use quiche::Connection;
pub fn convert_to_binary(message: &Message) -> anyhow::Result<Vec<u8>> {
Ok(bincode::serialize(&message)?)
@ -14,15 +11,15 @@ pub fn send_message(
partial_responses: &mut PartialResponses,
stream_id: u64,
message: &Vec<u8>,
) -> anyhow::Result<()> {
) -> std::result::Result<(), quiche::Error> {
let written = match connection.stream_send(stream_id, message, false) {
Ok(v) => v,
Err(quiche::Error::Done) => 0,
Err(e) => {
bail!("{} stream send failed {:?}", stream_id, e);
return Err(e);
}
};
log::debug!("dispatched {} on stream id : {}", written, stream_id);
log::trace!("dispatched {} on stream id : {}", written, stream_id);
if written < message.len() {
let response = PartialResponse {
@ -35,7 +32,7 @@ pub fn send_message(
Ok(_) => {}
Err(quiche::Error::Done) => {}
Err(e) => {
bail!("{} stream send failed {:?}", stream_id, e);
return Err(e);
}
}
}
@ -48,7 +45,7 @@ pub fn handle_writable(
partial_responses: &mut PartialResponses,
stream_id: u64,
) {
log::debug!("{} stream {} is writable", conn.trace_id(), stream_id);
log::trace!("{} stream {} is writable", conn.trace_id(), stream_id);
if !partial_responses.contains_key(&stream_id) {
return;

View File

@ -1,5 +1,6 @@
use std::{collections::HashMap, net::SocketAddr};
use anyhow::bail;
use itertools::Itertools;
use mio::{net::UdpSocket, Token};
use quiche::{Connection, ConnectionId};
@ -52,11 +53,8 @@ pub fn server_loop(
let mut poll = mio::Poll::new()?;
let mut events = mio::Events::with_capacity(1024);
poll.registry().register(
&mut socket,
NETWORK_TOKEN,
mio::Interest::READABLE | mio::Interest::WRITABLE,
)?;
poll.registry()
.register(&mut socket, NETWORK_TOKEN, mio::Interest::READABLE)?;
poll.registry().register(
&mut message_send_queue,
@ -71,16 +69,11 @@ pub fn server_loop(
loop {
let timeout = clients.values().filter_map(|c| c.conn.timeout()).min();
log::debug!("timeout : {}", timeout.unwrap_or_default().as_millis());
poll.poll(&mut events, timeout).unwrap();
let network_read = events
.iter()
.any(|x| x.token() == NETWORK_TOKEN && x.is_readable());
let network_read = true;
if events.is_empty() {
log::debug!("connection timed out");
clients.values_mut().for_each(|c| c.conn.on_timeout());
continue;
}
@ -91,15 +84,13 @@ pub fn server_loop(
Ok(v) => v,
Err(e) => {
if e.kind() == std::io::ErrorKind::WouldBlock {
log::debug!("recv() would block");
log::trace!("recv() would block");
break 'read;
}
panic!("recv() failed: {:?}", e);
bail!("recv() failed: {:?}", e);
}
};
log::debug!("got {} bytes", len);
let pkt_buf = &mut buf[..len];
// Parse the QUIC packet's header.
@ -112,8 +103,6 @@ pub fn server_loop(
}
};
log::trace!("got packet {:?}", hdr);
let conn_id = ring::hmac::sign(&conn_id_seed, &hdr.dcid);
let conn_id = &conn_id.as_ref()[..quiche::MAX_CONN_ID_LEN];
let conn_id: ConnectionId<'static> = conn_id.to_vec().into();
@ -133,7 +122,6 @@ pub fn server_loop(
if let Err(e) = socket.send_to(out, from) {
if e.kind() == std::io::ErrorKind::WouldBlock {
log::debug!("send() would block");
break;
}
panic!("send() failed: {:?}", e);
@ -169,7 +157,6 @@ pub fn server_loop(
if let Err(e) = socket.send_to(out, from) {
if e.kind() == std::io::ErrorKind::WouldBlock {
log::debug!("send() would block");
break;
}
@ -192,7 +179,7 @@ pub fn server_loop(
let scid = hdr.dcid.clone();
log::debug!("New connection: dcid={:?} scid={:?}", hdr.dcid, scid);
log::info!("New connection: dcid={:?} scid={:?}", hdr.dcid, scid);
let conn = quiche::accept(&scid, odcid.as_ref(), local_addr, from, &mut config)
.unwrap();
@ -224,40 +211,22 @@ pub fn server_loop(
};
// Process potentially coalesced packets.
let read = match client.conn.recv(pkt_buf, recv_info) {
match client.conn.recv(pkt_buf, recv_info) {
Ok(v) => v,
Err(e) => {
log::error!("{} recv failed: {:?}", client.conn.trace_id(), e);
continue 'read;
}
};
log::debug!("{} processed {} bytes", client.conn.trace_id(), read);
if client.conn.is_in_early_data() || client.conn.is_established() {
// Process all readable streams.
for stream in client.conn.readable() {
let message =
recv_message(&mut client.conn, &mut client.read_streams, stream);
match message {
Ok(Some(message)) => match message {
Message::Filters(mut filters) => {
client.filters.append(&mut filters);
}
_ => {
log::error!("unknown message from the client");
}
},
Ok(None) => {}
Err(e) => {
log::error!("Error recieving message : {e}")
}
}
}
}
}
}
while let Ok(message) = message_send_queue.try_recv() {
// dispatch hundreds of messages at a time
for _ in 0..100 {
let Ok(message) = message_send_queue.try_recv() else {
break;
};
let dispatch_to = clients
.iter_mut()
.filter(|(_, client)| {
@ -314,7 +283,7 @@ pub fn server_loop(
);
}
}
log::debug!("dispatching {} on stream id : {}", binary.len(), stream_id);
log::trace!("dispatching {} on stream id : {}", binary.len(), stream_id);
if let Err(e) = send_message(
&mut client.conn,
@ -324,10 +293,17 @@ pub fn server_loop(
) {
log::error!("Error sending message : {e}");
if stop_laggy_client {
log::info!("Stopping laggy client : {}", client.conn.trace_id());
if let Err(e) = client.conn.close(true, 1, b"laggy client") {
if e != quiche::Error::Done {
log::error!("error closing client : {}", e);
}
} else {
log::info!(
"Stopping laggy client : {} because of error : {}",
client.conn.trace_id(),
e
);
}
}
}
}
@ -338,6 +314,30 @@ pub fn server_loop(
// them on the UDP socket, until quiche reports that there are no more
// packets to be sent.
for client in clients.values_mut() {
if client.conn.is_in_early_data() || client.conn.is_established() {
// Process all readable streams.
for stream in client.conn.readable() {
let message = recv_message(&mut client.conn, &mut client.read_streams, stream);
match message {
Ok(Some(message)) => match message {
Message::Filters(mut filters) => {
client.filters.append(&mut filters);
}
Message::Ping => {
// got ping
}
_ => {
log::error!("unknown message from the client");
}
},
Ok(None) => {}
Err(e) => {
log::error!("Error recieving message : {e}")
}
}
}
}
for stream_id in client.conn.writable() {
handle_writable(&mut client.conn, &mut client.partial_responses, stream_id);
}
@ -346,7 +346,7 @@ pub fn server_loop(
let (write, send_info) = match client.conn.send(&mut out) {
Ok(v) => v,
Err(quiche::Error::Done) => {
log::debug!("{} done writing", client.conn.trace_id());
log::trace!("{} done writing", client.conn.trace_id());
break;
}
Err(e) => {
@ -360,20 +360,22 @@ pub fn server_loop(
}
};
if let Err(e) = socket.send_to(&out[..write], send_info.to) {
match socket.send_to(&out[..write], send_info.to) {
Ok(_len) => {
// log::debug!("wrote: {} bytes", len);
}
Err(e) => {
if e.kind() == std::io::ErrorKind::WouldBlock {
log::debug!("send() would block");
break;
}
log::error!("send() failed: {:?}", e);
}
log::debug!("{} written {} bytes", client.conn.trace_id(), write);
}
}
}
// Garbage collect closed connections.
clients.retain(|_, ref mut c| {
clients.retain(|_, c| {
log::debug!("Collecting garbage");
if c.conn.is_closed() {

View File

@ -75,7 +75,7 @@ pub fn get_next_unidi(
break;
}
if is_unidi(stream_id, is_server) {
if is_unidi(stream_id, is_server) && !is_bidi(stream_id) {
return stream_id;
}
}

View File

@ -20,4 +20,3 @@ tracing-subscriber = { workspace = true }
quic-geyser-client = { workspace = true }
quic-geyser-common = { workspace = true }
quic-geyser-plugin = { workspace = true }

View File

@ -140,7 +140,7 @@ pub fn main() {
.fetch_add(message_size as u64, std::sync::atomic::Ordering::Relaxed);
match message {
quic_geyser_common::message::Message::AccountMsg(account) => {
log::debug!("got account notification : {} ", account.pubkey);
log::trace!("got account notification : {} ", account.pubkey);
account_notification.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let data_len = account.data_length as usize;
total_accounts_size
@ -166,17 +166,17 @@ pub fn main() {
);
}
quic_geyser_common::message::Message::SlotMsg(slot) => {
log::debug!("got slot notification : {} ", slot.slot);
log::trace!("got slot notification : {} ", slot.slot);
slot_notifications.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
slot_slot.store(slot.slot, std::sync::atomic::Ordering::Relaxed);
}
quic_geyser_common::message::Message::BlockMetaMsg(block_meta) => {
log::debug!("got blockmeta notification : {} ", block_meta.slot);
log::trace!("got blockmeta notification : {} ", block_meta.slot);
blockmeta_notifications.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
blockmeta_slot.store(block_meta.slot, std::sync::atomic::Ordering::Relaxed);
}
quic_geyser_common::message::Message::TransactionMsg(tx) => {
log::debug!(
log::trace!(
"got transaction notification: {}",
tx.signatures[0].to_string()
);
@ -186,6 +186,9 @@ pub fn main() {
quic_geyser_common::message::Message::Filters(_) => {
// Not supported
}
quic_geyser_common::message::Message::Ping => {
// not supported ping
}
}
}
Err(e) => {

View File

@ -20,6 +20,4 @@ tracing-subscriber = { workspace = true }
rand = "0.8.5"
quic-geyser-client = { workspace = true }
quic-geyser-common = { workspace = true }
quic-geyser-plugin = { workspace = true }

View File

@ -9,7 +9,7 @@ pub struct Args {
#[clap(short, long, default_value_t = 20_000)]
pub accounts_per_second: u32,
#[clap(short = 'l', long, default_value_t = 1_000_000)]
#[clap(short = 'l', long, default_value_t = 200)]
pub account_data_size: u32,
#[clap(short, long, default_value_t = 1_000)]

View File

@ -46,7 +46,10 @@ pub fn main() {
.map(|_| rand.gen::<u8>())
.collect_vec();
loop {
std::thread::sleep(Duration::from_secs(1) - Instant::now().duration_since(instant));
let diff = Instant::now().duration_since(instant);
if diff < Duration::from_secs(1) {
std::thread::sleep(diff);
}
instant = Instant::now();
slot += 1;
for _ in 0..args.accounts_per_second {
@ -62,7 +65,7 @@ pub fn main() {
},
write_version,
};
let channel_message = ChannelMessage::Account(account, slot, false);
let channel_message = ChannelMessage::Account(account, slot, vec![8, 4, 7, 3]);
quic_server.send_message(channel_message).unwrap();
}
}

View File

@ -50,7 +50,7 @@ impl GeyserPlugin for QuicGeyserPlugin {
&self,
account: ReplicaAccountInfoVersions,
slot: Slot,
is_startup: bool,
_is_startup: bool,
) -> PluginResult<()> {
let Some(quic_server) = &self.quic_server else {
return Ok(());
@ -68,6 +68,11 @@ impl GeyserPlugin for QuicGeyserPlugin {
rent_epoch: account_info.rent_epoch,
};
let pubkey: Pubkey = Pubkey::try_from(account_info.pubkey).expect("valid pubkey");
let compressed_data = quic_server
.get_compression_type()
.compress(account_info.data);
quic_server
.send_message(ChannelMessage::Account(
AccountData {
@ -76,7 +81,7 @@ impl GeyserPlugin for QuicGeyserPlugin {
write_version: account_info.write_version,
},
slot,
is_startup,
compressed_data,
))
.map_err(|e| GeyserPluginError::Custom(Box::new(e)))?;
Ok(())