From f6676665367543eeb446b9780b710ac27ef23beb Mon Sep 17 00:00:00 2001 From: godmodegalactus Date: Fri, 24 May 2024 11:16:11 +0200 Subject: [PATCH] lot of changes related to optimization of quiche --- Cargo.lock | 3 - client/src/client.rs | 3 +- common/src/channel_message.rs | 2 +- common/src/compression.rs | 26 ++++++ common/src/filters.rs | 6 +- common/src/message.rs | 1 + common/src/quic/configure_server.rs | 2 +- common/src/quic/quic_server.rs | 10 ++- common/src/quic/quiche_client_loop.rs | 106 ++++++++++++----------- common/src/quic/quiche_reciever.rs | 11 ++- common/src/quic/quiche_sender.rs | 17 ++-- common/src/quic/quiche_server_loop.rs | 116 +++++++++++++------------- common/src/quic/quiche_utils.rs | 2 +- examples/tester-client/Cargo.toml | 3 +- examples/tester-client/src/main.rs | 11 ++- examples/tester-server/Cargo.toml | 4 +- examples/tester-server/src/cli.rs | 2 +- examples/tester-server/src/main.rs | 7 +- plugin/src/quic_plugin.rs | 9 +- 19 files changed, 193 insertions(+), 148 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 545d10e..0697ddf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2473,7 +2473,6 @@ dependencies = [ "log", "quic-geyser-client", "quic-geyser-common", - "quic-geyser-plugin", "serde", "serde_json", "solana-rpc-client", @@ -2490,9 +2489,7 @@ dependencies = [ "clap", "itertools", "log", - "quic-geyser-client", "quic-geyser-common", - "quic-geyser-plugin", "rand 0.8.5", "serde", "serde_json", diff --git a/client/src/client.rs b/client/src/client.rs index 470ddca..95a16ce 100644 --- a/client/src/client.rs +++ b/client/src/client.rs @@ -42,7 +42,6 @@ impl Client { connection_parameters.max_number_of_streams, ) { log::error!("client stopped with error {e}"); - println!("client stopped with error {e}"); } is_connected_client.store(false, std::sync::atomic::Ordering::Relaxed); }); @@ -146,7 +145,7 @@ mod tests { write_version: account.write_version, }, account.slot_identifier.slot, - false, + vec![8, 6, 3, 1], ), ) .unwrap(); diff --git a/common/src/channel_message.rs b/common/src/channel_message.rs index 2ef7400..6b16264 100644 --- a/common/src/channel_message.rs +++ b/common/src/channel_message.rs @@ -13,7 +13,7 @@ pub struct AccountData { #[derive(Debug, Clone, PartialEq, Eq)] pub enum ChannelMessage { - Account(AccountData, Slot, bool), + Account(AccountData, Slot, Vec), Slot(u64, u64, CommitmentLevel), BlockMeta(BlockMeta), Transaction(Box), diff --git a/common/src/compression.rs b/common/src/compression.rs index 0927ecf..eff48fe 100644 --- a/common/src/compression.rs +++ b/common/src/compression.rs @@ -13,3 +13,29 @@ impl Default for CompressionType { Self::Lz4Fast(8) } } + +impl CompressionType { + pub fn compress(&self, data: &[u8]) -> Vec { + if data.is_empty() { + return vec![]; + } + + match self { + CompressionType::None => data.to_vec(), + CompressionType::Lz4Fast(speed) => lz4::block::compress( + data, + Some(lz4::block::CompressionMode::FAST(*speed as i32)), + true, + ) + .expect("Compression should work"), + CompressionType::Lz4(compression) => lz4::block::compress( + data, + Some(lz4::block::CompressionMode::HIGHCOMPRESSION( + *compression as i32, + )), + true, + ) + .expect("compression should work"), + } + } +} diff --git a/common/src/filters.rs b/common/src/filters.rs index 718d49c..ccb3c2b 100644 --- a/common/src/filters.rs +++ b/common/src/filters.rs @@ -147,7 +147,7 @@ mod tests { write_version: 0, }, 0, - false, + vec![2, 3, 4, 6], ); let msg_2 = ChannelMessage::Account( @@ -157,7 +157,7 @@ mod tests { write_version: 0, }, 0, - false, + vec![2, 3, 4, 6], ); let msg_3 = ChannelMessage::Account( @@ -167,7 +167,7 @@ mod tests { write_version: 0, }, 0, - false, + vec![2, 3, 4, 6], ); let f1 = AccountFilter { diff --git a/common/src/message.rs b/common/src/message.rs index 8936ac7..383cd18 100644 --- a/common/src/message.rs +++ b/common/src/message.rs @@ -20,4 +20,5 @@ pub enum Message { BlockMetaMsg(BlockMeta), TransactionMsg(Box), Filters(Vec), // sent from client to server + Ping, } diff --git a/common/src/quic/configure_server.rs b/common/src/quic/configure_server.rs index 803ce1a..5c1d333 100644 --- a/common/src/quic/configure_server.rs +++ b/common/src/quic/configure_server.rs @@ -1,7 +1,7 @@ use boring::ssl::SslMethod; pub const ALPN_GEYSER_PROTOCOL_ID: &[u8] = b"geyser"; -pub const MAX_DATAGRAM_SIZE: usize = 65527; +pub const MAX_DATAGRAM_SIZE: usize = 65535; pub fn configure_server( max_concurrent_streams: u64, diff --git a/common/src/quic/quic_server.rs b/common/src/quic/quic_server.rs index 4d08f00..804f217 100644 --- a/common/src/quic/quic_server.rs +++ b/common/src/quic/quic_server.rs @@ -1,13 +1,14 @@ use std::fmt::Debug; use crate::{ - channel_message::ChannelMessage, config::ConfigQuicPlugin, plugin_error::QuicGeyserError, - quic::configure_server::configure_server, + channel_message::ChannelMessage, compression::CompressionType, config::ConfigQuicPlugin, + plugin_error::QuicGeyserError, quic::configure_server::configure_server, }; use super::quiche_server_loop::server_loop; pub struct QuicServer { data_channel_sender: mio_channel::Sender, + compression_type: CompressionType, } impl Debug for QuicServer { @@ -43,6 +44,7 @@ impl QuicServer { Ok(QuicServer { data_channel_sender, + compression_type, }) } @@ -51,4 +53,8 @@ impl QuicServer { .send(message) .map_err(|_| QuicGeyserError::MessageChannelClosed) } + + pub fn get_compression_type(&self) -> CompressionType { + self.compression_type + } } diff --git a/common/src/quic/quiche_client_loop.rs b/common/src/quic/quiche_client_loop.rs index 6f2ebf8..d431895 100644 --- a/common/src/quic/quiche_client_loop.rs +++ b/common/src/quic/quiche_client_loop.rs @@ -1,6 +1,7 @@ use std::{ net::SocketAddr, sync::{atomic::AtomicBool, Arc}, + time::{Duration, Instant}, }; use crate::{ @@ -45,7 +46,7 @@ pub fn client_loop( log::info!("connecing client with quiche"); let scid = quiche::ConnectionId::from_ref(&scid); - let local_addr = socket.local_addr().unwrap(); + let local_addr = socket.local_addr()?; let mut conn = quiche::connect(None, &scid, local_addr, server_address, &mut config)?; @@ -62,17 +63,17 @@ pub fn client_loop( let mut partial_responses = PartialResponses::new(); let mut read_streams = ReadStreams::new(); let mut connected = false; + let mut instance = Instant::now(); + let ping_binary = bincode::serialize(&Message::Ping)?; loop { - poll.poll(&mut events, conn.timeout()).unwrap(); + poll.poll(&mut events, conn.timeout())?; let network_updates = true; let channel_updates = true; if network_updates { 'read: loop { if events.is_empty() { - log::debug!("timed out"); - conn.on_timeout(); break 'read; } @@ -81,57 +82,45 @@ pub fn client_loop( Ok(v) => v, Err(e) => { if e.kind() == std::io::ErrorKind::WouldBlock { - log::debug!("recv() would block"); break 'read; } - panic!("recv() failed: {:?}", e); + bail!("recv() failed: {:?}", e); } }; - log::debug!("got {} bytes", len); - let recv_info = quiche::RecvInfo { - to: socket.local_addr().unwrap(), + to: socket.local_addr()?, from, }; // Process potentially coalesced packets. - let read = match conn.recv(&mut buf[..len], recv_info) { - Ok(v) => v, - - Err(e) => { - log::error!("recv failed: {:?}", e); - continue 'read; - } + if let Err(e) = conn.recv(&mut buf[..len], recv_info) { + log::error!("recv failed: {:?}", e); + continue 'read; }; - - log::debug!("processed {} bytes", read); } - - // io events - for stream_id in conn.readable() { - let message = recv_message(&mut conn, &mut read_streams, stream_id); - match message { - Ok(Some(message)) => { - if let Err(e) = message_recv_queue.send(message) { - log::error!("Error sending message on the channel : {e}"); - println!("Error sending message on the channel : {e}"); - } - } - Ok(None) => { - // do nothing - } - Err(e) => { - log::error!("Error recieving message : {e}") + } + // io events + for stream_id in conn.readable() { + let message = recv_message(&mut conn, &mut read_streams, stream_id); + match message { + Ok(Some(message)) => { + if let Err(e) = message_recv_queue.send(message) { + log::error!("Error sending message on the channel : {e}"); } } - } - - for stream_id in conn.writable() { - handle_writable(&mut conn, &mut partial_responses, stream_id); + Ok(None) => { + // do nothing + } + Err(e) => { + log::error!("Error recieving message : {e}") + } } } + for stream_id in conn.writable() { + handle_writable(&mut conn, &mut partial_responses, stream_id); + } if !connected && conn.is_established() { is_connected.store(true, std::sync::atomic::Ordering::Relaxed); connected = true; @@ -139,6 +128,22 @@ pub fn client_loop( // chanel updates if channel_updates && conn.is_established() { + let current_instance = Instant::now(); + // sending ping filled + if current_instance.duration_since(instance) > Duration::from_secs(5) { + log::debug!("sending ping to the server"); + instance = current_instance; + current_stream_id = get_next_unidi(current_stream_id, false, maximum_streams); + if let Err(e) = send_message( + &mut conn, + &mut partial_responses, + current_stream_id, + &ping_binary, + ) { + log::error!("sending ping failed with error {e:?}"); + } + } + // channel events if let Ok(message_to_send) = message_send_queue.try_recv() { current_stream_id = get_next_unidi(current_stream_id, false, maximum_streams); @@ -162,7 +167,6 @@ pub fn client_loop( Ok(v) => v, Err(quiche::Error::Done) => { - log::debug!("done writing"); break; } @@ -173,21 +177,21 @@ pub fn client_loop( } }; - if let Err(e) = socket.send_to(&out[..write], send_info.to) { - if e.kind() == std::io::ErrorKind::WouldBlock { - log::debug!("send() would block"); - break; + match socket.send_to(&out[..write], send_info.to) { + Ok(_len) => {} + Err(e) => { + if e.kind() == std::io::ErrorKind::WouldBlock { + log::debug!("send() would block"); + break; + } + log::error!("send() failed: {:?}", e); } - - bail!("send fail"); } - log::debug!("written {}", write); } if conn.is_closed() { is_connected.store(false, std::sync::atomic::Ordering::Relaxed); log::info!("connection closed, {:?}", conn.stats()); - println!("Connection closed {:?}", conn.stats()); break; } } @@ -248,7 +252,7 @@ mod tests { write_version: 1, }, 5, - false, + vec![2, 3, 4, 6], ); let message_3 = ChannelMessage::Account( @@ -264,7 +268,7 @@ mod tests { write_version: 1, }, 5, - false, + vec![2, 3, 4, 6], ); let message_4 = ChannelMessage::Account( @@ -280,7 +284,7 @@ mod tests { write_version: 1, }, 5, - false, + vec![2, 3, 4, 6], ); let message_5 = ChannelMessage::Account( @@ -296,7 +300,7 @@ mod tests { write_version: 1, }, 5, - false, + vec![2, 3, 4, 6], ); // server loop diff --git a/common/src/quic/quiche_reciever.rs b/common/src/quic/quiche_reciever.rs index 5fee9fb..1b30db7 100644 --- a/common/src/quic/quiche_reciever.rs +++ b/common/src/quic/quiche_reciever.rs @@ -25,11 +25,16 @@ pub fn recv_message( let mut buf = [0; MAX_DATAGRAM_SIZE]; // 10kk buffer size match connection.stream_recv(stream_id, &mut buf) { Ok((read, fin)) => { - log::debug!("read {} on stream {}", read, stream_id); + log::trace!("read {} on stream {}", read, stream_id); total_buf.extend_from_slice(&buf[..read]); if fin { - log::debug!("fin stream : {}", stream_id); - return Ok(Some(bincode::deserialize::(&total_buf)?)); + log::trace!("fin stream : {}", stream_id); + match bincode::deserialize::(&total_buf) { + Ok(message) => return Ok(Some(message)), + Err(e) => { + bail!("Error deserializing stream {stream_id} error: {e:?}"); + } + } } } Err(e) => { diff --git a/common/src/quic/quiche_sender.rs b/common/src/quic/quiche_sender.rs index afce5ec..de6dcb4 100644 --- a/common/src/quic/quiche_sender.rs +++ b/common/src/quic/quiche_sender.rs @@ -1,9 +1,6 @@ -use anyhow::bail; -use quiche::Connection; - -use crate::{message::Message, quic::quiche_utils::PartialResponse}; - use super::quiche_utils::PartialResponses; +use crate::{message::Message, quic::quiche_utils::PartialResponse}; +use quiche::Connection; pub fn convert_to_binary(message: &Message) -> anyhow::Result> { Ok(bincode::serialize(&message)?) @@ -14,15 +11,15 @@ pub fn send_message( partial_responses: &mut PartialResponses, stream_id: u64, message: &Vec, -) -> anyhow::Result<()> { +) -> std::result::Result<(), quiche::Error> { let written = match connection.stream_send(stream_id, message, false) { Ok(v) => v, Err(quiche::Error::Done) => 0, Err(e) => { - bail!("{} stream send failed {:?}", stream_id, e); + return Err(e); } }; - log::debug!("dispatched {} on stream id : {}", written, stream_id); + log::trace!("dispatched {} on stream id : {}", written, stream_id); if written < message.len() { let response = PartialResponse { @@ -35,7 +32,7 @@ pub fn send_message( Ok(_) => {} Err(quiche::Error::Done) => {} Err(e) => { - bail!("{} stream send failed {:?}", stream_id, e); + return Err(e); } } } @@ -48,7 +45,7 @@ pub fn handle_writable( partial_responses: &mut PartialResponses, stream_id: u64, ) { - log::debug!("{} stream {} is writable", conn.trace_id(), stream_id); + log::trace!("{} stream {} is writable", conn.trace_id(), stream_id); if !partial_responses.contains_key(&stream_id) { return; diff --git a/common/src/quic/quiche_server_loop.rs b/common/src/quic/quiche_server_loop.rs index 91850d4..7e9a4a7 100644 --- a/common/src/quic/quiche_server_loop.rs +++ b/common/src/quic/quiche_server_loop.rs @@ -1,5 +1,6 @@ use std::{collections::HashMap, net::SocketAddr}; +use anyhow::bail; use itertools::Itertools; use mio::{net::UdpSocket, Token}; use quiche::{Connection, ConnectionId}; @@ -52,11 +53,8 @@ pub fn server_loop( let mut poll = mio::Poll::new()?; let mut events = mio::Events::with_capacity(1024); - poll.registry().register( - &mut socket, - NETWORK_TOKEN, - mio::Interest::READABLE | mio::Interest::WRITABLE, - )?; + poll.registry() + .register(&mut socket, NETWORK_TOKEN, mio::Interest::READABLE)?; poll.registry().register( &mut message_send_queue, @@ -71,16 +69,11 @@ pub fn server_loop( loop { let timeout = clients.values().filter_map(|c| c.conn.timeout()).min(); - log::debug!("timeout : {}", timeout.unwrap_or_default().as_millis()); - poll.poll(&mut events, timeout).unwrap(); - let network_read = events - .iter() - .any(|x| x.token() == NETWORK_TOKEN && x.is_readable()); + let network_read = true; if events.is_empty() { - log::debug!("connection timed out"); clients.values_mut().for_each(|c| c.conn.on_timeout()); continue; } @@ -91,15 +84,13 @@ pub fn server_loop( Ok(v) => v, Err(e) => { if e.kind() == std::io::ErrorKind::WouldBlock { - log::debug!("recv() would block"); + log::trace!("recv() would block"); break 'read; } - panic!("recv() failed: {:?}", e); + bail!("recv() failed: {:?}", e); } }; - log::debug!("got {} bytes", len); - let pkt_buf = &mut buf[..len]; // Parse the QUIC packet's header. @@ -112,8 +103,6 @@ pub fn server_loop( } }; - log::trace!("got packet {:?}", hdr); - let conn_id = ring::hmac::sign(&conn_id_seed, &hdr.dcid); let conn_id = &conn_id.as_ref()[..quiche::MAX_CONN_ID_LEN]; let conn_id: ConnectionId<'static> = conn_id.to_vec().into(); @@ -133,7 +122,6 @@ pub fn server_loop( if let Err(e) = socket.send_to(out, from) { if e.kind() == std::io::ErrorKind::WouldBlock { - log::debug!("send() would block"); break; } panic!("send() failed: {:?}", e); @@ -169,7 +157,6 @@ pub fn server_loop( if let Err(e) = socket.send_to(out, from) { if e.kind() == std::io::ErrorKind::WouldBlock { - log::debug!("send() would block"); break; } @@ -192,7 +179,7 @@ pub fn server_loop( let scid = hdr.dcid.clone(); - log::debug!("New connection: dcid={:?} scid={:?}", hdr.dcid, scid); + log::info!("New connection: dcid={:?} scid={:?}", hdr.dcid, scid); let conn = quiche::accept(&scid, odcid.as_ref(), local_addr, from, &mut config) .unwrap(); @@ -224,40 +211,22 @@ pub fn server_loop( }; // Process potentially coalesced packets. - let read = match client.conn.recv(pkt_buf, recv_info) { + match client.conn.recv(pkt_buf, recv_info) { Ok(v) => v, Err(e) => { log::error!("{} recv failed: {:?}", client.conn.trace_id(), e); continue 'read; } }; - log::debug!("{} processed {} bytes", client.conn.trace_id(), read); - - if client.conn.is_in_early_data() || client.conn.is_established() { - // Process all readable streams. - for stream in client.conn.readable() { - let message = - recv_message(&mut client.conn, &mut client.read_streams, stream); - match message { - Ok(Some(message)) => match message { - Message::Filters(mut filters) => { - client.filters.append(&mut filters); - } - _ => { - log::error!("unknown message from the client"); - } - }, - Ok(None) => {} - Err(e) => { - log::error!("Error recieving message : {e}") - } - } - } - } } } - while let Ok(message) = message_send_queue.try_recv() { + // dispatch hundreds of messages at a time + for _ in 0..100 { + let Ok(message) = message_send_queue.try_recv() else { + break; + }; + let dispatch_to = clients .iter_mut() .filter(|(_, client)| { @@ -314,7 +283,7 @@ pub fn server_loop( ); } } - log::debug!("dispatching {} on stream id : {}", binary.len(), stream_id); + log::trace!("dispatching {} on stream id : {}", binary.len(), stream_id); if let Err(e) = send_message( &mut client.conn, @@ -324,9 +293,16 @@ pub fn server_loop( ) { log::error!("Error sending message : {e}"); if stop_laggy_client { - log::info!("Stopping laggy client : {}", client.conn.trace_id()); if let Err(e) = client.conn.close(true, 1, b"laggy client") { - log::error!("error closing client : {}", e); + if e != quiche::Error::Done { + log::error!("error closing client : {}", e); + } + } else { + log::info!( + "Stopping laggy client : {} because of error : {}", + client.conn.trace_id(), + e + ); } } } @@ -338,6 +314,30 @@ pub fn server_loop( // them on the UDP socket, until quiche reports that there are no more // packets to be sent. for client in clients.values_mut() { + if client.conn.is_in_early_data() || client.conn.is_established() { + // Process all readable streams. + for stream in client.conn.readable() { + let message = recv_message(&mut client.conn, &mut client.read_streams, stream); + match message { + Ok(Some(message)) => match message { + Message::Filters(mut filters) => { + client.filters.append(&mut filters); + } + Message::Ping => { + // got ping + } + _ => { + log::error!("unknown message from the client"); + } + }, + Ok(None) => {} + Err(e) => { + log::error!("Error recieving message : {e}") + } + } + } + } + for stream_id in client.conn.writable() { handle_writable(&mut client.conn, &mut client.partial_responses, stream_id); } @@ -346,7 +346,7 @@ pub fn server_loop( let (write, send_info) = match client.conn.send(&mut out) { Ok(v) => v, Err(quiche::Error::Done) => { - log::debug!("{} done writing", client.conn.trace_id()); + log::trace!("{} done writing", client.conn.trace_id()); break; } Err(e) => { @@ -360,20 +360,22 @@ pub fn server_loop( } }; - if let Err(e) = socket.send_to(&out[..write], send_info.to) { - if e.kind() == std::io::ErrorKind::WouldBlock { - log::debug!("send() would block"); - break; + match socket.send_to(&out[..write], send_info.to) { + Ok(_len) => { + // log::debug!("wrote: {} bytes", len); + } + Err(e) => { + if e.kind() == std::io::ErrorKind::WouldBlock { + break; + } + log::error!("send() failed: {:?}", e); } - log::error!("send() failed: {:?}", e); } - - log::debug!("{} written {} bytes", client.conn.trace_id(), write); } } // Garbage collect closed connections. - clients.retain(|_, ref mut c| { + clients.retain(|_, c| { log::debug!("Collecting garbage"); if c.conn.is_closed() { diff --git a/common/src/quic/quiche_utils.rs b/common/src/quic/quiche_utils.rs index b88a5b3..eb5e7f7 100644 --- a/common/src/quic/quiche_utils.rs +++ b/common/src/quic/quiche_utils.rs @@ -75,7 +75,7 @@ pub fn get_next_unidi( break; } - if is_unidi(stream_id, is_server) { + if is_unidi(stream_id, is_server) && !is_bidi(stream_id) { return stream_id; } } diff --git a/examples/tester-client/Cargo.toml b/examples/tester-client/Cargo.toml index a54f630..c73c729 100644 --- a/examples/tester-client/Cargo.toml +++ b/examples/tester-client/Cargo.toml @@ -19,5 +19,4 @@ bincode = { workspace = true } tracing-subscriber = { workspace = true } quic-geyser-client = { workspace = true } -quic-geyser-common = { workspace = true } -quic-geyser-plugin = { workspace = true } \ No newline at end of file +quic-geyser-common = { workspace = true } \ No newline at end of file diff --git a/examples/tester-client/src/main.rs b/examples/tester-client/src/main.rs index 7626b88..a9d0af7 100644 --- a/examples/tester-client/src/main.rs +++ b/examples/tester-client/src/main.rs @@ -140,7 +140,7 @@ pub fn main() { .fetch_add(message_size as u64, std::sync::atomic::Ordering::Relaxed); match message { quic_geyser_common::message::Message::AccountMsg(account) => { - log::debug!("got account notification : {} ", account.pubkey); + log::trace!("got account notification : {} ", account.pubkey); account_notification.fetch_add(1, std::sync::atomic::Ordering::Relaxed); let data_len = account.data_length as usize; total_accounts_size @@ -166,17 +166,17 @@ pub fn main() { ); } quic_geyser_common::message::Message::SlotMsg(slot) => { - log::debug!("got slot notification : {} ", slot.slot); + log::trace!("got slot notification : {} ", slot.slot); slot_notifications.fetch_add(1, std::sync::atomic::Ordering::Relaxed); slot_slot.store(slot.slot, std::sync::atomic::Ordering::Relaxed); } quic_geyser_common::message::Message::BlockMetaMsg(block_meta) => { - log::debug!("got blockmeta notification : {} ", block_meta.slot); + log::trace!("got blockmeta notification : {} ", block_meta.slot); blockmeta_notifications.fetch_add(1, std::sync::atomic::Ordering::Relaxed); blockmeta_slot.store(block_meta.slot, std::sync::atomic::Ordering::Relaxed); } quic_geyser_common::message::Message::TransactionMsg(tx) => { - log::debug!( + log::trace!( "got transaction notification: {}", tx.signatures[0].to_string() ); @@ -186,6 +186,9 @@ pub fn main() { quic_geyser_common::message::Message::Filters(_) => { // Not supported } + quic_geyser_common::message::Message::Ping => { + // not supported ping + } } } Err(e) => { diff --git a/examples/tester-server/Cargo.toml b/examples/tester-server/Cargo.toml index b63d98e..404ddca 100644 --- a/examples/tester-server/Cargo.toml +++ b/examples/tester-server/Cargo.toml @@ -20,6 +20,4 @@ tracing-subscriber = { workspace = true } rand = "0.8.5" -quic-geyser-client = { workspace = true } -quic-geyser-common = { workspace = true } -quic-geyser-plugin = { workspace = true } \ No newline at end of file +quic-geyser-common = { workspace = true } \ No newline at end of file diff --git a/examples/tester-server/src/cli.rs b/examples/tester-server/src/cli.rs index fc7969f..353891b 100644 --- a/examples/tester-server/src/cli.rs +++ b/examples/tester-server/src/cli.rs @@ -9,7 +9,7 @@ pub struct Args { #[clap(short, long, default_value_t = 20_000)] pub accounts_per_second: u32, - #[clap(short = 'l', long, default_value_t = 1_000_000)] + #[clap(short = 'l', long, default_value_t = 200)] pub account_data_size: u32, #[clap(short, long, default_value_t = 1_000)] diff --git a/examples/tester-server/src/main.rs b/examples/tester-server/src/main.rs index 7ece98a..9b6ca42 100644 --- a/examples/tester-server/src/main.rs +++ b/examples/tester-server/src/main.rs @@ -46,7 +46,10 @@ pub fn main() { .map(|_| rand.gen::()) .collect_vec(); loop { - std::thread::sleep(Duration::from_secs(1) - Instant::now().duration_since(instant)); + let diff = Instant::now().duration_since(instant); + if diff < Duration::from_secs(1) { + std::thread::sleep(diff); + } instant = Instant::now(); slot += 1; for _ in 0..args.accounts_per_second { @@ -62,7 +65,7 @@ pub fn main() { }, write_version, }; - let channel_message = ChannelMessage::Account(account, slot, false); + let channel_message = ChannelMessage::Account(account, slot, vec![8, 4, 7, 3]); quic_server.send_message(channel_message).unwrap(); } } diff --git a/plugin/src/quic_plugin.rs b/plugin/src/quic_plugin.rs index 83a7ff5..bd59f8f 100644 --- a/plugin/src/quic_plugin.rs +++ b/plugin/src/quic_plugin.rs @@ -50,7 +50,7 @@ impl GeyserPlugin for QuicGeyserPlugin { &self, account: ReplicaAccountInfoVersions, slot: Slot, - is_startup: bool, + _is_startup: bool, ) -> PluginResult<()> { let Some(quic_server) = &self.quic_server else { return Ok(()); @@ -68,6 +68,11 @@ impl GeyserPlugin for QuicGeyserPlugin { rent_epoch: account_info.rent_epoch, }; let pubkey: Pubkey = Pubkey::try_from(account_info.pubkey).expect("valid pubkey"); + + let compressed_data = quic_server + .get_compression_type() + .compress(account_info.data); + quic_server .send_message(ChannelMessage::Account( AccountData { @@ -76,7 +81,7 @@ impl GeyserPlugin for QuicGeyserPlugin { write_version: account_info.write_version, }, slot, - is_startup, + compressed_data, )) .map_err(|e| GeyserPluginError::Custom(Box::new(e)))?; Ok(())