Implementing plugin with quic

This commit is contained in:
Godmode Galactus 2023-10-16 10:12:05 +02:00
commit 665ebe1409
No known key found for this signature in database
GPG Key ID: A04142C71ABB0DEA
7 changed files with 5554 additions and 0 deletions

1
.gitignore vendored Normal file
View File

@ -0,0 +1 @@
/target

5297
Cargo.lock generated Normal file

File diff suppressed because it is too large Load Diff

9
Cargo.toml Normal file
View File

@ -0,0 +1,9 @@
[workspace]
members = [
"plugin"
]
[profile.release]
debug = true
lto = true
codegen-units = 1

41
plugin/Cargo.toml Normal file
View File

@ -0,0 +1,41 @@
[package]
name = "plugin"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
tokio = { version = "1.21.2", features = ["rt-multi-thread", "macros", "time", "fs"] }
solana-sdk = { path="/home/galactus/solana_1/sdk" }
solana-geyser-plugin-interface = { path="/home/galactus/solana_1/geyser-plugin-interface" }
solana-streamer = { path="/home/galactus/solana_1/streamer" }
itertools = "0.10.5"
serde = { version = "1.0.160", features = ["derive"] }
serde_json = "1.0.96"
bincode = "1.3.3"
bs58 = "0.4.0"
base64 = "0.21.0"
thiserror = "1.0.40"
futures = "0.3.28"
bytes = "1.4.0"
anyhow = "1.0.70"
log = "0.4.17"
clap = { version = "4.2.4", features = ["derive", "env"] }
dashmap = "5.4.0"
const_env = "0.1.2"
jsonrpsee = { version = "0.17.0", features = ["macros", "full"] }
tracing = "0.1.37"
tracing-subscriber = "0.3.16"
chrono = "0.4.24"
native-tls = "0.2.11"
postgres-native-tls = "0.5.0"
prometheus = "0.13.3"
lazy_static = "1.4.0"
dotenv = "0.15.0"
async-channel = "1.8.0"
quinn = "0.9.3"
rustls = { version = "=0.20.8", default-features = false }
rcgen = "0.10.0"
pkcs8 = "0.8.0"
pem = "1.1.1"

147
plugin/src/lib.rs Normal file
View File

@ -0,0 +1,147 @@
use std::{net::{IpAddr, Ipv4Addr, UdpSocket}, sync::Arc};
use pem::Pem;
use quinn::{ServerConfig, IdleTimeout, Endpoint, TokioRuntime, EndpointConfig};
use serde::{Serialize, Deserialize};
use solana_geyser_plugin_interface::geyser_plugin_interface::{GeyserPlugin, Result as PluginResult, GeyserPluginError};
use solana_sdk::{signature::{Signature, Keypair}, transaction::TransactionError, slot_history::Slot, quic::QUIC_MAX_TIMEOUT, packet::PACKET_DATA_SIZE};
use solana_streamer::{tls_certificates::new_self_signed_tls_certificate, quic::QuicServerError};
use tokio::{runtime::Runtime, task::JoinHandle, sync::mpsc::{UnboundedSender, UnboundedReceiver}};
use crate::skip_client_verification::SkipClientVerification;
mod skip_client_verification;
mod plugin_error;
pub const ALPN_GEYSER_PROTOCOL_ID: &[u8] = b"solana-geyser";
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct TransactionResults {
signature: Signature,
error: Option<TransactionError>,
slot: Slot,
}
#[derive(Debug)]
pub struct PluginInner {
pub runtime: Runtime,
pub handle: JoinHandle<()>,
pub sender: Arc<UnboundedSender<TransactionResults>>,
}
#[derive(Debug)]
pub struct Plugin {
inner: Option<PluginInner>,
}
impl GeyserPlugin for Plugin {
fn name(&self) -> &'static str {
"geyser_quic_banking_transactions_result_sender"
}
fn banking_transaction_results_notifications_enabled(&self) -> bool {
true
}
#[allow(unused_variables)]
fn notify_banking_stage_transaction_results(
&self,
transaction: Signature,
error: Option<TransactionError>,
slot: Slot,
) -> PluginResult<()> {
if let Some(inner) = self.inner {
inner.sender.send(TransactionResults { signature: transaction, error, slot });
Ok(())
} else {
Ok(())
}
}
fn on_load(&mut self, _config_file: &str) -> solana_geyser_plugin_interface::geyser_plugin_interface::Result<()> {
let runtime = Runtime::new().map_err(|error| GeyserPluginError::Custom(Box::new(error)))?;
let res = configure_server(&Keypair::new(), IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)));
let (config, _) = res.map_err(|error| GeyserPluginError::Custom(Box::new(error)))?;
let sock = UdpSocket::bind("127.0.0.1:18990").expect("couldn't bind to address");
let endpoint = Endpoint::new(EndpointConfig::default(), Some(config), sock, TokioRuntime)
.map_err(|error| GeyserPluginError::Custom(Box::new(error)))?;
let (sender, reciever) = tokio::sync::mpsc::unbounded_channel::<TransactionResults>();
let handle = tokio::spawn(async move {
let mut reciever = reciever;
loop {
let connecting = endpoint.accept().await;
if let Some(connecting) = connecting {
let connected = connecting.await;
let connection = match connected {
Ok(connection) => connection,
Err(e) => {
log::error!("geyser plugin connecting {} error", e);
continue;
}
};
let (mut send_stream, _) = match connection.accept_bi().await {
Ok(res) => res,
Err(e) => {
log::error!("geyser plugin accepting bi-channel {} error", e);
continue;
}
};
while let Some(msg) = reciever.recv().await {
let bytes = bincode::serialize(&msg).unwrap_or(vec![]);
if !bytes.is_empty() {
let _ = send_stream.write_all(&bytes).await;
}
}
}
}
});
self.inner = Some(PluginInner {
runtime,
handle,
sender: Arc::new(sender),
});
Ok(())
}
fn on_unload(&mut self) {}
}
pub(crate) fn configure_server(
identity_keypair: &Keypair,
host: IpAddr,
) -> Result<(ServerConfig, String), QuicServerError> {
let (cert, priv_key) = new_self_signed_tls_certificate(identity_keypair, host)?;
let cert_chain_pem_parts = vec![Pem {
tag: "CERTIFICATE".to_string(),
contents: cert.0.clone(),
}];
let cert_chain_pem = pem::encode_many(&cert_chain_pem_parts);
let mut server_tls_config = rustls::ServerConfig::builder()
.with_safe_defaults()
.with_client_cert_verifier(SkipClientVerification::new())
.with_single_cert(vec![cert], priv_key)?;
server_tls_config.alpn_protocols = vec![ALPN_GEYSER_PROTOCOL_ID.to_vec()];
let mut server_config = ServerConfig::with_crypto(Arc::new(server_tls_config));
server_config.use_retry(true);
let config = Arc::get_mut(&mut server_config.transport).unwrap();
config.max_concurrent_uni_streams((0 as u32).into());
let recv_size = (PACKET_DATA_SIZE as u32).into();
config.stream_receive_window(recv_size);
config.receive_window(recv_size);
let timeout = IdleTimeout::try_from(QUIC_MAX_TIMEOUT).unwrap();
config.max_idle_timeout(Some(timeout));
// disable bidi & datagrams
const MAX_CONCURRENT_BIDI_STREAMS: u32 = 1;
config.max_concurrent_bidi_streams(MAX_CONCURRENT_BIDI_STREAMS.into());
config.datagram_receive_buffer_size(None);
Ok((server_config, cert_chain_pem))
}

View File

@ -0,0 +1,33 @@
use std::{fmt::Display, error};
#[derive(Debug, Clone)]
pub struct PluginError {
msg: String,
}
impl PluginError {
pub fn new(msg: String) -> Self {
PluginError { msg }
}
}
impl Display for PluginError {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "Plugin error {}", self.msg)
}
}
impl error::Error for PluginError {
fn source(&self) -> Option<&(dyn error::Error + 'static)> {
None
}
fn description(&self) -> &str {
"description() is deprecated; use Display"
}
fn cause(&self) -> Option<&dyn error::Error> {
self.source()
}
}

View File

@ -0,0 +1,26 @@
use std::{sync::Arc, time::SystemTime};
use rustls::{DistinguishedNames, server::ClientCertVerified, Certificate};
pub struct SkipClientVerification;
impl SkipClientVerification {
pub fn new() -> Arc<Self> {
Arc::new(Self)
}
}
impl rustls::server::ClientCertVerifier for SkipClientVerification {
fn client_auth_root_subjects(&self) -> Option<DistinguishedNames> {
Some(DistinguishedNames::new())
}
fn verify_client_cert(
&self,
_end_entity: &Certificate,
_intermediates: &[Certificate],
_now: SystemTime,
) -> Result<ClientCertVerified, rustls::Error> {
Ok(rustls::server::ClientCertVerified::assertion())
}
}