From 72d7848bd3ceb8d416c76f7226d9551125757008 Mon Sep 17 00:00:00 2001 From: Godmode Galactus Date: Mon, 16 Oct 2023 15:58:08 +0200 Subject: [PATCH] implementing client libraries --- Cargo.lock | 287 +++++++++++++++++++++++++++++----- Cargo.toml | 3 +- client/Cargo.toml | 46 ++++++ client/src/main.rs | 30 ++++ client/src/quic_connection.rs | 183 ++++++++++++++++++++++ migration.sql | 12 ++ plugin/Cargo.toml | 2 +- plugin/src/lib.rs | 3 +- plugin/src/plugin_error.rs | 33 ---- 9 files changed, 522 insertions(+), 77 deletions(-) create mode 100644 client/Cargo.toml create mode 100644 client/src/main.rs create mode 100644 client/src/quic_connection.rs create mode 100644 migration.sql delete mode 100644 plugin/src/plugin_error.rs diff --git a/Cargo.lock b/Cargo.lock index 085481b..ff599c1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -399,6 +399,15 @@ dependencies = [ "event-listener", ] +[[package]] +name = "async-mutex" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "479db852db25d9dbf6204e6cb6253698f175c15726470f78af0d918e99d6156e" +dependencies = [ + "event-listener", +] + [[package]] name = "async-trait" version = "0.1.74" @@ -772,6 +781,22 @@ dependencies = [ "generic-array", ] +[[package]] +name = "clap" +version = "3.2.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ea181bf566f71cb9a5d17a59e1871af638180a18fb0035c92ae62b705207123" +dependencies = [ + "atty", + "bitflags 1.3.2", + "clap_lex 0.2.4", + "indexmap 1.9.3", + "once_cell", + "strsim", + "termcolor", + "textwrap", +] + [[package]] name = "clap" version = "4.4.6" @@ -790,7 +815,7 @@ checksum = "0e231faeaca65ebd1ea3c737966bf858971cd38c3849107aa3ea7de90a804e45" dependencies = [ "anstream", "anstyle", - "clap_lex", + "clap_lex 0.5.1", "strsim", ] @@ -806,12 +831,63 @@ dependencies = [ "syn 2.0.38", ] +[[package]] +name = "clap_lex" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2850f2f5a82cbf437dd5af4d49848fbdfc27c157c3d010345776f952765261c5" +dependencies = [ + "os_str_bytes", +] + [[package]] name = "clap_lex" version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cd7cc57abe963c6d3b9d8be5b06ba7c8957a930305ca90304f24ef040aa6f961" +[[package]] +name = "client" +version = "0.1.0" +dependencies = [ + "anyhow", + "async-channel", + "base64 0.21.4", + "bincode", + "bs58", + "bytes", + "chrono", + "clap 4.4.6", + "const_env", + "dashmap", + "dotenv", + "futures", + "geyser-quic-plugin", + "itertools", + "jsonrpsee", + "lazy_static", + "log", + "native-tls", + "pem", + "pkcs8", + "postgres-native-tls", + "prometheus", + "quinn", + "rcgen", + "rustls 0.20.8", + "serde", + "serde_json", + "solana-geyser-plugin-interface", + "solana-net-utils", + "solana-quic-client", + "solana-sdk 1.16.15", + "solana-streamer", + "thiserror", + "tokio", + "tracing", + "tracing-subscriber", +] + [[package]] name = "colorchoice" version = "1.0.0" @@ -1502,6 +1578,45 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "geyser-quic-plugin" +version = "0.1.0" +dependencies = [ + "anyhow", + "async-channel", + "base64 0.21.4", + "bincode", + "bs58", + "bytes", + "chrono", + "clap 4.4.6", + "const_env", + "dashmap", + "dotenv", + "futures", + "itertools", + "jsonrpsee", + "lazy_static", + "log", + "native-tls", + "pem", + "pkcs8", + "postgres-native-tls", + "prometheus", + "quinn", + "rcgen", + "rustls 0.20.8", + "serde", + "serde_json", + "solana-geyser-plugin-interface", + "solana-sdk 1.16.15", + "solana-streamer", + "thiserror", + "tokio", + "tracing", + "tracing-subscriber", +] + [[package]] name = "gimli" version = "0.28.0" @@ -1886,6 +2001,21 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "jsonrpc-core" +version = "18.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "14f7f76aef2d054868398427f6c54943cf3d1caa9a7ec7d0c38d69df97a965eb" +dependencies = [ + "futures", + "futures-executor", + "futures-util", + "log", + "serde", + "serde_derive", + "serde_json", +] + [[package]] name = "jsonrpsee" version = "0.17.1" @@ -2507,6 +2637,12 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "os_str_bytes" +version = "6.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e2355d85b9a3786f481747ced0e0ff2ba35213a1f9bd406ed906554d7af805a1" + [[package]] name = "overload" version = "0.1.1" @@ -2657,45 +2793,6 @@ version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b4596b6d070b27117e987119b4dac604f3c58cfb0b191112e24771b2faeac1a6" -[[package]] -name = "plugin" -version = "0.1.0" -dependencies = [ - "anyhow", - "async-channel", - "base64 0.21.4", - "bincode", - "bs58", - "bytes", - "chrono", - "clap", - "const_env", - "dashmap", - "dotenv", - "futures", - "itertools", - "jsonrpsee", - "lazy_static", - "log", - "native-tls", - "pem", - "pkcs8", - "postgres-native-tls", - "prometheus", - "quinn", - "rcgen", - "rustls 0.20.8", - "serde", - "serde_json", - "solana-geyser-plugin-interface", - "solana-sdk 1.16.15", - "solana-streamer", - "thiserror", - "tokio", - "tracing", - "tracing-subscriber", -] - [[package]] name = "polyval" version = "0.5.3" @@ -3608,6 +3705,26 @@ dependencies = [ "solana-sdk 1.16.15", ] +[[package]] +name = "solana-connection-cache" +version = "1.16.15" +source = "git+https://github.com/blockworks-foundation/solana.git?branch=geyser_send_transaction_result_plugin#4641300ad4ecb89340f1c7954be04a74dd2575f2" +dependencies = [ + "async-trait", + "bincode", + "futures-util", + "indexmap 1.9.3", + "log", + "rand 0.7.3", + "rayon", + "rcgen", + "solana-measure", + "solana-metrics", + "solana-sdk 1.16.15", + "thiserror", + "tokio", +] + [[package]] name = "solana-frozen-abi" version = "1.16.15" @@ -3747,6 +3864,27 @@ dependencies = [ "solana-sdk 1.16.15", ] +[[package]] +name = "solana-net-utils" +version = "1.16.15" +source = "git+https://github.com/blockworks-foundation/solana.git?branch=geyser_send_transaction_result_plugin#4641300ad4ecb89340f1c7954be04a74dd2575f2" +dependencies = [ + "bincode", + "clap 3.2.25", + "crossbeam-channel", + "log", + "nix", + "rand 0.7.3", + "serde", + "serde_derive", + "socket2 0.4.9", + "solana-logger 1.16.15", + "solana-sdk 1.16.15", + "solana-version", + "tokio", + "url", +] + [[package]] name = "solana-perf" version = "1.16.15" @@ -3908,6 +4046,33 @@ dependencies = [ "thiserror", ] +[[package]] +name = "solana-quic-client" +version = "1.16.15" +source = "git+https://github.com/blockworks-foundation/solana.git?branch=geyser_send_transaction_result_plugin#4641300ad4ecb89340f1c7954be04a74dd2575f2" +dependencies = [ + "async-mutex", + "async-trait", + "futures", + "itertools", + "lazy_static", + "log", + "quinn", + "quinn-proto", + "quinn-udp", + "rcgen", + "rustls 0.20.8", + "solana-connection-cache", + "solana-measure", + "solana-metrics", + "solana-net-utils", + "solana-rpc-client-api", + "solana-sdk 1.16.15", + "solana-streamer", + "thiserror", + "tokio", +] + [[package]] name = "solana-rayon-threadlimit" version = "1.16.15" @@ -3917,6 +4082,27 @@ dependencies = [ "num_cpus", ] +[[package]] +name = "solana-rpc-client-api" +version = "1.16.15" +source = "git+https://github.com/blockworks-foundation/solana.git?branch=geyser_send_transaction_result_plugin#4641300ad4ecb89340f1c7954be04a74dd2575f2" +dependencies = [ + "base64 0.21.4", + "bs58", + "jsonrpc-core", + "reqwest", + "semver", + "serde", + "serde_derive", + "serde_json", + "solana-account-decoder", + "solana-sdk 1.16.15", + "solana-transaction-status", + "solana-version", + "spl-token-2022", + "thiserror", +] + [[package]] name = "solana-sdk" version = "1.16.15" @@ -4105,6 +4291,21 @@ dependencies = [ "thiserror", ] +[[package]] +name = "solana-version" +version = "1.16.15" +source = "git+https://github.com/blockworks-foundation/solana.git?branch=geyser_send_transaction_result_plugin#4641300ad4ecb89340f1c7954be04a74dd2575f2" +dependencies = [ + "log", + "rustc_version", + "semver", + "serde", + "serde_derive", + "solana-frozen-abi 1.16.15", + "solana-frozen-abi-macro 1.16.15", + "solana-sdk 1.16.15", +] + [[package]] name = "solana-vote-program" version = "1.16.15" @@ -4365,6 +4566,12 @@ dependencies = [ "winapi-util", ] +[[package]] +name = "textwrap" +version = "0.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "222a222a5bfe1bba4a77b45ec488a741b3cb8872e5e499451fd7d0129c9c7c3d" + [[package]] name = "thiserror" version = "1.0.49" diff --git a/Cargo.toml b/Cargo.toml index 1ce4a8b..c7c2b96 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,7 @@ [workspace] members = [ - "plugin" + "plugin", + "client" ] [profile.release] diff --git a/client/Cargo.toml b/client/Cargo.toml new file mode 100644 index 0000000..5c0c4e7 --- /dev/null +++ b/client/Cargo.toml @@ -0,0 +1,46 @@ +[package] +name = "client" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +tokio = { version = "1.21.2", features = ["rt-multi-thread", "macros", "time", "fs"] } +solana-sdk = { git = "https://github.com/blockworks-foundation/solana.git", branch = "geyser_send_transaction_result_plugin" } +solana-geyser-plugin-interface = { git = "https://github.com/blockworks-foundation/solana.git", branch = "geyser_send_transaction_result_plugin" } +solana-streamer = { git = "https://github.com/blockworks-foundation/solana.git", branch = "geyser_send_transaction_result_plugin" } +solana-quic-client = { git = "https://github.com/blockworks-foundation/solana.git", branch = "geyser_send_transaction_result_plugin" } +solana-net-utils = { git = "https://github.com/blockworks-foundation/solana.git", branch = "geyser_send_transaction_result_plugin" } + +geyser-quic-plugin = { path = "../plugin" } + +itertools = "0.10.5" +serde = { version = "1.0.160", features = ["derive"] } +serde_json = "1.0.96" +bincode = "1.3.3" +bs58 = "0.4.0" +base64 = "0.21.0" +thiserror = "1.0.40" +futures = "0.3.28" +bytes = "1.4.0" +anyhow = "1.0.70" +log = "0.4.17" +clap = { version = "4.2.4", features = ["derive", "env"] } +dashmap = "5.4.0" +const_env = "0.1.2" +jsonrpsee = { version = "0.17.0", features = ["macros", "full"] } +tracing = "0.1.37" +tracing-subscriber = "0.3.16" +chrono = "0.4.24" +native-tls = "0.2.11" +postgres-native-tls = "0.5.0" +prometheus = "0.13.3" +lazy_static = "1.4.0" +dotenv = "0.15.0" +async-channel = "1.8.0" +quinn = "0.9.3" +rustls = { version = "=0.20.8", default-features = false } +rcgen = "0.10.0" +pkcs8 = "0.8.0" +pem = "1.1.1" \ No newline at end of file diff --git a/client/src/main.rs b/client/src/main.rs new file mode 100644 index 0000000..453870c --- /dev/null +++ b/client/src/main.rs @@ -0,0 +1,30 @@ +use std::time::{Instant, Duration}; + +use geyser_quic_plugin::TransactionResults; + +mod quic_connection; + +pub const PACKET_DATA_SIZE: usize = 1280 - 40 - 8; + +#[tokio::main()] +pub async fn main() { + tokio::spawn(async move { + // wait for 10 s max + let mut timeout: u64 = 10_000; + let mut start = Instant::now(); + + const LAST_BUFFER_SIZE: usize = QUIC_MESSAGE_SIZE + 1; + let mut last_buffer: [u8; LAST_BUFFER_SIZE] = [0; LAST_BUFFER_SIZE]; + let mut buffer_written = 0; + let mut recv_stream = recv_stream; + loop { + if let Ok(chunk) = tokio::time::timeout( + Duration::from_millis(timeout), + recv_stream.read_chunk(PACKET_DATA_SIZE, false), + ) + .await { + + } + } + }); +} \ No newline at end of file diff --git a/client/src/quic_connection.rs b/client/src/quic_connection.rs new file mode 100644 index 0000000..c5a8094 --- /dev/null +++ b/client/src/quic_connection.rs @@ -0,0 +1,183 @@ +use std::{sync::Arc, net::{SocketAddr, UdpSocket, IpAddr, Ipv4Addr}, time::Duration}; + +use quinn::{Connection, Endpoint, ConnectionError, EndpointConfig, TokioRuntime, ClientConfig, TransportConfig, IdleTimeout}; +use solana_net_utils::PortRange; +use solana_quic_client::nonblocking::quic_client::QuicClientCertificate; +use solana_sdk::{signature::Keypair, quic::{QUIC_MAX_TIMEOUT, QUIC_KEEP_ALIVE}}; +use solana_streamer::tls_certificates::new_self_signed_tls_certificate; +use tokio::{time::timeout, sync::OnceCell}; +use geyser_quic_plugin::ALPN_GEYSER_PROTOCOL_ID; + +pub const QUIC_CONNECTION_HANDSHAKE_TIMEOUT: Duration = Duration::from_secs(60); +pub const PORT_RANGE: PortRange = (8000, 20_000); + +// This code is copied from solana + +/// A lazy-initialized Quic Endpoint +pub struct QuicLazyInitializedEndpoint { + endpoint: OnceCell>, + client_certificate: Arc, + client_endpoint: Option, +} + + +impl QuicLazyInitializedEndpoint { + pub fn new( + client_certificate: Arc, + client_endpoint: Option, + ) -> Self { + Self { + endpoint: OnceCell::>::new(), + client_certificate, + client_endpoint, + } + } + + fn create_endpoint(&self) -> Endpoint { + let mut endpoint = if let Some(endpoint) = &self.client_endpoint { + endpoint.clone() + } else { + let client_socket = solana_net_utils::bind_in_range( + IpAddr::V4(Ipv4Addr::UNSPECIFIED), + PORT_RANGE, + ) + .expect("QuicLazyInitializedEndpoint::create_endpoint bind_in_range") + .1; + + QuicNewConnection::create_endpoint(EndpointConfig::default(), client_socket) + }; + + let mut crypto = rustls::ClientConfig::builder() + .with_safe_defaults() + .with_custom_certificate_verifier(SkipServerVerification::new()) + .with_single_cert( + vec![self.client_certificate.certificate.clone()], + self.client_certificate.key.clone(), + ) + .expect("Failed to set QUIC client certificates"); + + crypto.enable_early_data = true; + crypto.alpn_protocols = vec![ALPN_GEYSER_PROTOCOL_ID.to_vec()]; + + let mut config = ClientConfig::new(Arc::new(crypto)); + let mut transport_config = TransportConfig::default(); + + let timeout = IdleTimeout::try_from(QUIC_MAX_TIMEOUT).unwrap(); + transport_config.max_idle_timeout(Some(timeout)); + transport_config.keep_alive_interval(Some(QUIC_KEEP_ALIVE)); + config.transport_config(Arc::new(transport_config)); + + endpoint.set_default_client_config(config); + + endpoint + } + + async fn get_endpoint(&self) -> Arc { + self.endpoint + .get_or_init(|| async { Arc::new(self.create_endpoint()) }) + .await + .clone() + } +} + +impl Default for QuicLazyInitializedEndpoint { + fn default() -> Self { + let (cert, priv_key) = + new_self_signed_tls_certificate(&Keypair::new(), IpAddr::V4(Ipv4Addr::UNSPECIFIED)) + .expect("Failed to create QUIC client certificate"); + Self::new( + Arc::new(QuicClientCertificate { + certificate: cert, + key: priv_key, + }), + None, + ) + } +} + + +#[derive(Clone)] +struct QuicNewConnection { + endpoint: Arc, + connection: Arc, +} + +impl QuicNewConnection { + /// Create a QuicNewConnection given the remote address 'addr'. + async fn make_connection( + endpoint: Arc, + addr: SocketAddr, + ) -> anyhow::Result { + let endpoint = endpoint.get_endpoint().await; + + let connecting = endpoint.connect(addr, "connect")?; + if let Ok(connecting_result) = timeout(QUIC_CONNECTION_HANDSHAKE_TIMEOUT, connecting).await + { + let connection = connecting_result?; + + Ok(Self { + endpoint, + connection: Arc::new(connection), + }) + } else { + Err(ConnectionError::TimedOut.into()) + } + } + + fn create_endpoint(config: EndpointConfig, client_socket: UdpSocket) -> Endpoint { + quinn::Endpoint::new(config, None, client_socket, TokioRuntime) + .expect("QuicNewConnection::create_endpoint quinn::Endpoint::new") + } + + // Attempts to make a faster connection by taking advantage of pre-existing key material. + // Only works if connection to this endpoint was previously established. + async fn make_connection_0rtt( + &mut self, + addr: SocketAddr, + ) -> anyhow::Result> { + let connecting = self.endpoint.connect(addr, "connect")?; + let connection = match connecting.into_0rtt() { + Ok((connection, zero_rtt)) => { + if let Ok(zero_rtt) = timeout(QUIC_CONNECTION_HANDSHAKE_TIMEOUT, zero_rtt).await { + connection + } else { + return Err(ConnectionError::TimedOut.into()); + } + } + Err(connecting) => { + + if let Ok(connecting_result) = + timeout(QUIC_CONNECTION_HANDSHAKE_TIMEOUT, connecting).await + { + connecting_result? + } else { + return Err(ConnectionError::TimedOut.into()); + } + } + }; + self.connection = Arc::new(connection); + Ok(self.connection.clone()) + } +} + +pub struct SkipServerVerification; + +impl SkipServerVerification { + pub fn new() -> Arc { + Arc::new(Self) + } +} + +impl rustls::client::ServerCertVerifier for SkipServerVerification { + fn verify_server_cert( + &self, + _end_entity: &rustls::Certificate, + _intermediates: &[rustls::Certificate], + _server_name: &rustls::ServerName, + _scts: &mut dyn Iterator, + _ocsp_response: &[u8], + _now: std::time::SystemTime, + ) -> Result { + Ok(rustls::client::ServerCertVerified::assertion()) + } +} \ No newline at end of file diff --git a/migration.sql b/migration.sql new file mode 100644 index 0000000..ee15d0b --- /dev/null +++ b/migration.sql @@ -0,0 +1,12 @@ +CREATE SCHEMA banking_stage_results; + +CREATE TABLE banking_stage_results.transaction_infos ( + signature CHAR(88) NOT NULL, + message text, + errors text, + is_executed BOOL, + is_confirmed BOOL, + first_notification_slot BIGINT NOT NULL, + cu_requested BIGINT, + prioritization_fees BIGINT +); diff --git a/plugin/Cargo.toml b/plugin/Cargo.toml index 4e73e72..6f82652 100644 --- a/plugin/Cargo.toml +++ b/plugin/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "plugin" +name = "geyser-quic-plugin" version = "0.1.0" edition = "2021" diff --git a/plugin/src/lib.rs b/plugin/src/lib.rs index e9788d7..ba2ce24 100644 --- a/plugin/src/lib.rs +++ b/plugin/src/lib.rs @@ -10,8 +10,7 @@ use tokio::{runtime::Runtime, task::JoinHandle, sync::mpsc::UnboundedSender}; use crate::skip_client_verification::SkipClientVerification; -mod skip_client_verification; -mod plugin_error; +pub mod skip_client_verification; pub const ALPN_GEYSER_PROTOCOL_ID: &[u8] = b"solana-geyser"; diff --git a/plugin/src/plugin_error.rs b/plugin/src/plugin_error.rs deleted file mode 100644 index 154100a..0000000 --- a/plugin/src/plugin_error.rs +++ /dev/null @@ -1,33 +0,0 @@ -use std::{fmt::Display, error}; - - -#[derive(Debug, Clone)] -pub struct PluginError { - msg: String, -} - -impl PluginError { - pub fn new(msg: String) -> Self { - PluginError { msg } - } -} - -impl Display for PluginError { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - write!(f, "Plugin error {}", self.msg) - } -} - -impl error::Error for PluginError { - fn source(&self) -> Option<&(dyn error::Error + 'static)> { - None - } - - fn description(&self) -> &str { - "description() is deprecated; use Display" - } - - fn cause(&self) -> Option<&dyn error::Error> { - self.source() - } -} \ No newline at end of file