Move to a runtime in geyser plugin

This commit is contained in:
Godmode Galactus 2024-05-15 16:18:20 +02:00
parent 9703e7e8f9
commit 4816f43bea
No known key found for this signature in database
GPG Key ID: 22DA4A30887FDA3C
3 changed files with 46 additions and 53 deletions

View File

@ -44,7 +44,6 @@ pub enum ChannelMessage {
#[derive(Debug)]
pub struct QuicServer {
runtime: Runtime,
_quic_connection_manager: ConnectionManager,
data_channel_sender: UnboundedSender<ChannelMessage>,
}
@ -63,35 +62,43 @@ impl QuicServer {
let socket = UdpSocket::bind(config.quic_plugin.address)?;
let compression_type = config.quic_plugin.compression_parameters.compression_type;
let endpoint = Endpoint::new(
EndpointConfig::default(),
Some(server_config),
socket,
Arc::new(TokioRuntime),
);
let endpoint = match endpoint {
Ok(e) => e,
Err(e) => {
let s = e.to_string();
log::info!("{}", s);
panic!("todo")
}
};
let retry_count = config.quic_plugin.number_of_retries;
let runtime = Builder::new_multi_thread()
.thread_name_fn(|| {
static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0);
let id = ATOMIC_ID.fetch_add(1, Ordering::Relaxed);
format!("solGeyserGrpc{id:02}")
})
.enable_all()
.build()
.map_err(|error| {
let s = error.to_string();
log::info!("Runtime Error : {}", s);
GeyserPluginError::Custom(Box::new(error))
})?;
let (quic_connection_manager, _jh) = ConnectionManager::new(
endpoint,
config
.quic_plugin
.quic_parameters
.max_number_of_streams_per_client as usize,
);
let (data_channel_sender, mut data_channel_tx) = tokio::sync::mpsc::unbounded_channel();
{
let quic_connection_manager = quic_connection_manager.clone();
tokio::spawn(async move {
runtime.spawn(async move {
let endpoint = Endpoint::new(
EndpointConfig::default(),
Some(server_config),
socket,
Arc::new(TokioRuntime),
)
.unwrap();
let retry_count = config.quic_plugin.number_of_retries;
let (quic_connection_manager, _jh) = ConnectionManager::new(
endpoint,
config
.quic_plugin
.quic_parameters
.max_number_of_streams_per_client as usize,
);
log::info!("Connection manager sucessfully started");
while let Some(channel_message) = data_channel_tx.recv().await {
log::info!("recieved message");
match channel_message {
ChannelMessage::Account(account, slot, is_startup) => {
// avoid sending messages at startup
@ -123,7 +130,6 @@ impl QuicServer {
}
Ok(QuicServer {
_quic_connection_manager: quic_connection_manager,
data_channel_sender,
runtime,
})

View File

@ -28,6 +28,7 @@ impl GeyserPlugin for QuicGeyserPlugin {
log::info!("loading quic_geyser plugin");
let config = Config::load_from_file(config_file)?;
log::info!("Quic plugin config correctly loaded");
let quic_server = QuicServer::new(Keypair::new(), config).map_err(|_| {
GeyserPluginError::Custom(Box::new(QuicGeyserError::ErrorConfiguringServer))
})?;

View File

@ -1,5 +1,4 @@
use std::{
net::{Ipv4Addr, SocketAddrV4},
sync::{atomic::AtomicU64, Arc},
time::Duration,
};
@ -9,8 +8,6 @@ use cli::Args;
use futures::StreamExt;
use quic_geyser_client::{client::Client, DEFAULT_MAX_STREAM};
use quic_geyser_common::filters::{AccountFilter, Filter};
use quic_geyser_plugin::config::{CompressionParameters, Config, ConfigQuicPlugin, QuicParameters};
use serde_json::json;
use solana_rpc_client::nonblocking::rpc_client::RpcClient;
use solana_sdk::{commitment_config::CommitmentConfig, pubkey::Pubkey, signature::Keypair};
use tokio::pin;
@ -18,6 +15,9 @@ use tokio::pin;
pub mod cli;
// to create a config json
// use std::net::{Ipv4Addr, SocketAddrV4};
// use quic_geyser_plugin::config::{CompressionParameters, Config, ConfigQuicPlugin, QuicParameters};
// use serde_json::json;
// let config = Config {
// libpath: "temp".to_string(),
// quic_plugin: ConfigQuicPlugin {
@ -38,28 +38,12 @@ pub mod cli;
#[tokio::main]
async fn main() {
let config = Config {
libpath: "temp".to_string(),
quic_plugin: ConfigQuicPlugin {
address: std::net::SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 10800)),
quic_parameters: QuicParameters {
max_number_of_streams_per_client: 1024,
recieve_window_size: 1_000_000,
connection_timeout: 600,
},
compression_parameters: CompressionParameters {
compression_type: quic_geyser_common::compression::CompressionType::Lz4Fast(8),
},
number_of_retries: 100,
},
};
let config_json = json!(config);
println!("{}", config_json);
let args = Args::parse();
println!("Connecting");
let client = Client::new(args.url, &Keypair::new(), DEFAULT_MAX_STREAM)
.await
.unwrap();
println!("Connected");
let bytes_transfered = Arc::new(AtomicU64::new(0));
let slot_notifications = Arc::new(AtomicU64::new(0));
@ -101,26 +85,27 @@ async fn main() {
tokio::time::sleep(Duration::from_secs(1)).await;
let bytes_transfered =
bytes_transfered.swap(0, std::sync::atomic::Ordering::Relaxed);
log::info!("------------------------------------------");
log::info!(" Bytes Transfered : {}", bytes_transfered);
log::info!(
println!("------------------------------------------");
println!(" Bytes Transfered : {}", bytes_transfered);
println!(
" Accounts Notified : {}",
account_notification.swap(0, std::sync::atomic::Ordering::Relaxed)
);
log::info!(
println!(
" Slots Notified : {}",
slot_notifications.swap(0, std::sync::atomic::Ordering::Relaxed)
);
log::info!(
println!(
" Blockmeta notified : {}",
blockmeta_notifications.swap(0, std::sync::atomic::Ordering::Relaxed)
);
log::info!(" Cluster Slots: {}, Account Slot: {}, Slot Notification slot: {}, BlockMeta slot: {} ", cluster_slot.load(std::sync::atomic::Ordering::Relaxed), account_slot.load(std::sync::atomic::Ordering::Relaxed), slot_slot.load(std::sync::atomic::Ordering::Relaxed), blockmeta_slot.load(std::sync::atomic::Ordering::Relaxed));
println!(" Cluster Slots: {}, Account Slot: {}, Slot Notification slot: {}, BlockMeta slot: {} ", cluster_slot.load(std::sync::atomic::Ordering::Relaxed), account_slot.load(std::sync::atomic::Ordering::Relaxed), slot_slot.load(std::sync::atomic::Ordering::Relaxed), blockmeta_slot.load(std::sync::atomic::Ordering::Relaxed));
}
});
}
println!("Subscribing");
client
.subscribe(vec![
Filter::Account(AccountFilter {
@ -132,6 +117,7 @@ async fn main() {
])
.await
.unwrap();
println!("Subscribed");
let stream = client.get_stream();
pin!(stream);