updating server loop to make it similar to quiche example

This commit is contained in:
godmodegalactus 2024-06-10 10:21:27 +02:00
parent a9f8851a2f
commit 804c75a7cd
No known key found for this signature in database
GPG Key ID: 22DA4A30887FDA3C
7 changed files with 528 additions and 481 deletions

1
Cargo.lock generated
View File

@ -2457,6 +2457,7 @@ dependencies = [
"bincode", "bincode",
"itertools", "itertools",
"log", "log",
"mio_channel",
"quic-geyser-common", "quic-geyser-common",
"quic-geyser-server", "quic-geyser-server",
"rand 0.8.5", "rand 0.8.5",

View File

@ -14,6 +14,7 @@ log = { workspace = true }
quic-geyser-common = { workspace = true } quic-geyser-common = { workspace = true }
bincode = { workspace = true } bincode = { workspace = true }
itertools = { workspace = true } itertools = { workspace = true }
mio_channel = { workspace = true }
[dev-dependencies] [dev-dependencies]
rand = { workspace = true } rand = { workspace = true }

View File

@ -1,6 +1,6 @@
use std::{ use std::{
collections::{BTreeMap, HashMap}, collections::{BTreeMap, HashMap},
sync::mpsc::{Receiver, Sender}, sync::mpsc::Receiver,
}; };
use itertools::Itertools; use itertools::Itertools;
@ -16,7 +16,7 @@ use solana_sdk::pubkey::Pubkey;
pub fn start_block_building_thread( pub fn start_block_building_thread(
channel_messages: Receiver<ChannelMessage>, channel_messages: Receiver<ChannelMessage>,
output: Sender<ChannelMessage>, output: mio_channel::Sender<ChannelMessage>,
compression_type: CompressionType, compression_type: CompressionType,
) { ) {
std::thread::spawn(move || { std::thread::spawn(move || {
@ -33,7 +33,7 @@ struct PartialBlock {
pub fn build_blocks( pub fn build_blocks(
channel_messages: Receiver<ChannelMessage>, channel_messages: Receiver<ChannelMessage>,
output: Sender<ChannelMessage>, output: mio_channel::Sender<ChannelMessage>,
compression_type: CompressionType, compression_type: CompressionType,
) { ) {
let mut partially_build_blocks = BTreeMap::<u64, PartialBlock>::new(); let mut partially_build_blocks = BTreeMap::<u64, PartialBlock>::new();
@ -156,7 +156,7 @@ pub fn build_blocks(
fn dispatch_partial_block( fn dispatch_partial_block(
partial_blocks: &mut BTreeMap<u64, PartialBlock>, partial_blocks: &mut BTreeMap<u64, PartialBlock>,
slot: u64, slot: u64,
output: &Sender<ChannelMessage>, output: &mio_channel::Sender<ChannelMessage>,
compression_type: CompressionType, compression_type: CompressionType,
) { ) {
if let Some(dispatched_partial_block) = partial_blocks.remove(&slot) { if let Some(dispatched_partial_block) = partial_blocks.remove(&slot) {

View File

@ -34,7 +34,7 @@ mod tests {
#[test] #[test]
fn test_block_creation_transactions_after_blockmeta() { fn test_block_creation_transactions_after_blockmeta() {
let (channelmsg_sx, cm_rx) = channel(); let (channelmsg_sx, cm_rx) = channel();
let (ms_sx, msg_rx) = channel(); let (ms_sx, msg_rx) = mio_channel::channel();
start_block_building_thread( start_block_building_thread(
cm_rx, cm_rx,
ms_sx, ms_sx,
@ -227,7 +227,8 @@ mod tests {
.send(ChannelMessage::Transaction(Box::new(tx3.clone()))) .send(ChannelMessage::Transaction(Box::new(tx3.clone())))
.unwrap(); .unwrap();
let block_message = msg_rx.recv().unwrap(); sleep(Duration::from_millis(1));
let block_message = msg_rx.try_recv().unwrap();
let ChannelMessage::Block(block) = block_message else { let ChannelMessage::Block(block) = block_message else {
unreachable!(); unreachable!();
}; };
@ -256,7 +257,7 @@ mod tests {
#[test] #[test]
fn test_block_creation_blockmeta_after_transactions() { fn test_block_creation_blockmeta_after_transactions() {
let (channelmsg_sx, cm_rx) = channel(); let (channelmsg_sx, cm_rx) = channel();
let (ms_sx, msg_rx) = channel(); let (ms_sx, msg_rx) = mio_channel::channel();
start_block_building_thread( start_block_building_thread(
cm_rx, cm_rx,
ms_sx, ms_sx,
@ -450,7 +451,8 @@ mod tests {
.send(ChannelMessage::BlockMeta(block_meta.clone())) .send(ChannelMessage::BlockMeta(block_meta.clone()))
.unwrap(); .unwrap();
let block_message = msg_rx.recv().unwrap(); sleep(Duration::from_millis(1));
let block_message = msg_rx.try_recv().unwrap();
let ChannelMessage::Block(block) = block_message else { let ChannelMessage::Block(block) = block_message else {
unreachable!(); unreachable!();
}; };
@ -479,7 +481,7 @@ mod tests {
#[test] #[test]
fn test_block_creation_incomplete_block_after_slot_notification() { fn test_block_creation_incomplete_block_after_slot_notification() {
let (channelmsg_sx, cm_rx) = channel(); let (channelmsg_sx, cm_rx) = channel();
let (ms_sx, msg_rx) = channel(); let (ms_sx, msg_rx) = mio_channel::channel();
start_block_building_thread( start_block_building_thread(
cm_rx, cm_rx,
ms_sx, ms_sx,
@ -672,7 +674,8 @@ mod tests {
.send(ChannelMessage::Transaction(Box::new(tx3.clone()))) .send(ChannelMessage::Transaction(Box::new(tx3.clone())))
.unwrap(); .unwrap();
let block_message = msg_rx.recv().unwrap(); sleep(Duration::from_millis(1));
let block_message = msg_rx.try_recv().unwrap();
let ChannelMessage::Block(block) = block_message else { let ChannelMessage::Block(block) = block_message else {
unreachable!(); unreachable!();
}; };
@ -701,7 +704,7 @@ mod tests {
#[test] #[test]
fn test_block_creation_incomplete_slot() { fn test_block_creation_incomplete_slot() {
let (channelmsg_sx, cm_rx) = channel(); let (channelmsg_sx, cm_rx) = channel();
let (ms_sx, msg_rx) = channel(); let (ms_sx, msg_rx) = mio_channel::channel();
start_block_building_thread( start_block_building_thread(
cm_rx, cm_rx,
ms_sx, ms_sx,

View File

@ -373,7 +373,7 @@ mod tests {
); );
// server loop // server loop
let (server_send_queue, rx_sent_queue) = mpsc::channel::<ChannelMessage>(); let (server_send_queue, rx_sent_queue) = mio_channel::channel::<ChannelMessage>();
let _server_loop_jh = std::thread::spawn(move || { let _server_loop_jh = std::thread::spawn(move || {
if let Err(e) = server_loop( if let Err(e) = server_loop(
QuicParameters::default(), QuicParameters::default(),

View File

@ -1,11 +1,11 @@
use quic_geyser_common::{ use quic_geyser_common::{
channel_message::ChannelMessage, config::ConfigQuicPlugin, plugin_error::QuicGeyserError, channel_message::ChannelMessage, config::ConfigQuicPlugin, plugin_error::QuicGeyserError,
}; };
use std::{fmt::Debug, sync::mpsc}; use std::fmt::Debug;
use super::quiche_server_loop::server_loop; use super::quiche_server_loop::server_loop;
pub struct QuicServer { pub struct QuicServer {
pub data_channel_sender: mpsc::Sender<ChannelMessage>, pub data_channel_sender: mio_channel::Sender<ChannelMessage>,
pub quic_plugin_config: ConfigQuicPlugin, pub quic_plugin_config: ConfigQuicPlugin,
} }
@ -20,7 +20,7 @@ impl QuicServer {
let socket = config.address; let socket = config.address;
let compression_type = config.compression_parameters.compression_type; let compression_type = config.compression_parameters.compression_type;
let (data_channel_sender, data_channel_tx) = mpsc::channel(); let (data_channel_sender, data_channel_tx) = mio_channel::channel();
let _server_loop_jh = std::thread::spawn(move || { let _server_loop_jh = std::thread::spawn(move || {
if let Err(e) = server_loop( if let Err(e) = server_loop(

File diff suppressed because it is too large Load Diff