implementing client libraries

This commit is contained in:
Godmode Galactus 2023-10-16 15:58:08 +02:00
parent 0514689dc3
commit 72d7848bd3
No known key found for this signature in database
GPG Key ID: A04142C71ABB0DEA
9 changed files with 522 additions and 77 deletions

287
Cargo.lock generated
View File

@ -399,6 +399,15 @@ dependencies = [
"event-listener",
]
[[package]]
name = "async-mutex"
version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "479db852db25d9dbf6204e6cb6253698f175c15726470f78af0d918e99d6156e"
dependencies = [
"event-listener",
]
[[package]]
name = "async-trait"
version = "0.1.74"
@ -772,6 +781,22 @@ dependencies = [
"generic-array",
]
[[package]]
name = "clap"
version = "3.2.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4ea181bf566f71cb9a5d17a59e1871af638180a18fb0035c92ae62b705207123"
dependencies = [
"atty",
"bitflags 1.3.2",
"clap_lex 0.2.4",
"indexmap 1.9.3",
"once_cell",
"strsim",
"termcolor",
"textwrap",
]
[[package]]
name = "clap"
version = "4.4.6"
@ -790,7 +815,7 @@ checksum = "0e231faeaca65ebd1ea3c737966bf858971cd38c3849107aa3ea7de90a804e45"
dependencies = [
"anstream",
"anstyle",
"clap_lex",
"clap_lex 0.5.1",
"strsim",
]
@ -806,12 +831,63 @@ dependencies = [
"syn 2.0.38",
]
[[package]]
name = "clap_lex"
version = "0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2850f2f5a82cbf437dd5af4d49848fbdfc27c157c3d010345776f952765261c5"
dependencies = [
"os_str_bytes",
]
[[package]]
name = "clap_lex"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cd7cc57abe963c6d3b9d8be5b06ba7c8957a930305ca90304f24ef040aa6f961"
[[package]]
name = "client"
version = "0.1.0"
dependencies = [
"anyhow",
"async-channel",
"base64 0.21.4",
"bincode",
"bs58",
"bytes",
"chrono",
"clap 4.4.6",
"const_env",
"dashmap",
"dotenv",
"futures",
"geyser-quic-plugin",
"itertools",
"jsonrpsee",
"lazy_static",
"log",
"native-tls",
"pem",
"pkcs8",
"postgres-native-tls",
"prometheus",
"quinn",
"rcgen",
"rustls 0.20.8",
"serde",
"serde_json",
"solana-geyser-plugin-interface",
"solana-net-utils",
"solana-quic-client",
"solana-sdk 1.16.15",
"solana-streamer",
"thiserror",
"tokio",
"tracing",
"tracing-subscriber",
]
[[package]]
name = "colorchoice"
version = "1.0.0"
@ -1502,6 +1578,45 @@ dependencies = [
"wasm-bindgen",
]
[[package]]
name = "geyser-quic-plugin"
version = "0.1.0"
dependencies = [
"anyhow",
"async-channel",
"base64 0.21.4",
"bincode",
"bs58",
"bytes",
"chrono",
"clap 4.4.6",
"const_env",
"dashmap",
"dotenv",
"futures",
"itertools",
"jsonrpsee",
"lazy_static",
"log",
"native-tls",
"pem",
"pkcs8",
"postgres-native-tls",
"prometheus",
"quinn",
"rcgen",
"rustls 0.20.8",
"serde",
"serde_json",
"solana-geyser-plugin-interface",
"solana-sdk 1.16.15",
"solana-streamer",
"thiserror",
"tokio",
"tracing",
"tracing-subscriber",
]
[[package]]
name = "gimli"
version = "0.28.0"
@ -1886,6 +2001,21 @@ dependencies = [
"wasm-bindgen",
]
[[package]]
name = "jsonrpc-core"
version = "18.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "14f7f76aef2d054868398427f6c54943cf3d1caa9a7ec7d0c38d69df97a965eb"
dependencies = [
"futures",
"futures-executor",
"futures-util",
"log",
"serde",
"serde_derive",
"serde_json",
]
[[package]]
name = "jsonrpsee"
version = "0.17.1"
@ -2507,6 +2637,12 @@ dependencies = [
"vcpkg",
]
[[package]]
name = "os_str_bytes"
version = "6.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e2355d85b9a3786f481747ced0e0ff2ba35213a1f9bd406ed906554d7af805a1"
[[package]]
name = "overload"
version = "0.1.1"
@ -2657,45 +2793,6 @@ version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b4596b6d070b27117e987119b4dac604f3c58cfb0b191112e24771b2faeac1a6"
[[package]]
name = "plugin"
version = "0.1.0"
dependencies = [
"anyhow",
"async-channel",
"base64 0.21.4",
"bincode",
"bs58",
"bytes",
"chrono",
"clap",
"const_env",
"dashmap",
"dotenv",
"futures",
"itertools",
"jsonrpsee",
"lazy_static",
"log",
"native-tls",
"pem",
"pkcs8",
"postgres-native-tls",
"prometheus",
"quinn",
"rcgen",
"rustls 0.20.8",
"serde",
"serde_json",
"solana-geyser-plugin-interface",
"solana-sdk 1.16.15",
"solana-streamer",
"thiserror",
"tokio",
"tracing",
"tracing-subscriber",
]
[[package]]
name = "polyval"
version = "0.5.3"
@ -3608,6 +3705,26 @@ dependencies = [
"solana-sdk 1.16.15",
]
[[package]]
name = "solana-connection-cache"
version = "1.16.15"
source = "git+https://github.com/blockworks-foundation/solana.git?branch=geyser_send_transaction_result_plugin#4641300ad4ecb89340f1c7954be04a74dd2575f2"
dependencies = [
"async-trait",
"bincode",
"futures-util",
"indexmap 1.9.3",
"log",
"rand 0.7.3",
"rayon",
"rcgen",
"solana-measure",
"solana-metrics",
"solana-sdk 1.16.15",
"thiserror",
"tokio",
]
[[package]]
name = "solana-frozen-abi"
version = "1.16.15"
@ -3747,6 +3864,27 @@ dependencies = [
"solana-sdk 1.16.15",
]
[[package]]
name = "solana-net-utils"
version = "1.16.15"
source = "git+https://github.com/blockworks-foundation/solana.git?branch=geyser_send_transaction_result_plugin#4641300ad4ecb89340f1c7954be04a74dd2575f2"
dependencies = [
"bincode",
"clap 3.2.25",
"crossbeam-channel",
"log",
"nix",
"rand 0.7.3",
"serde",
"serde_derive",
"socket2 0.4.9",
"solana-logger 1.16.15",
"solana-sdk 1.16.15",
"solana-version",
"tokio",
"url",
]
[[package]]
name = "solana-perf"
version = "1.16.15"
@ -3908,6 +4046,33 @@ dependencies = [
"thiserror",
]
[[package]]
name = "solana-quic-client"
version = "1.16.15"
source = "git+https://github.com/blockworks-foundation/solana.git?branch=geyser_send_transaction_result_plugin#4641300ad4ecb89340f1c7954be04a74dd2575f2"
dependencies = [
"async-mutex",
"async-trait",
"futures",
"itertools",
"lazy_static",
"log",
"quinn",
"quinn-proto",
"quinn-udp",
"rcgen",
"rustls 0.20.8",
"solana-connection-cache",
"solana-measure",
"solana-metrics",
"solana-net-utils",
"solana-rpc-client-api",
"solana-sdk 1.16.15",
"solana-streamer",
"thiserror",
"tokio",
]
[[package]]
name = "solana-rayon-threadlimit"
version = "1.16.15"
@ -3917,6 +4082,27 @@ dependencies = [
"num_cpus",
]
[[package]]
name = "solana-rpc-client-api"
version = "1.16.15"
source = "git+https://github.com/blockworks-foundation/solana.git?branch=geyser_send_transaction_result_plugin#4641300ad4ecb89340f1c7954be04a74dd2575f2"
dependencies = [
"base64 0.21.4",
"bs58",
"jsonrpc-core",
"reqwest",
"semver",
"serde",
"serde_derive",
"serde_json",
"solana-account-decoder",
"solana-sdk 1.16.15",
"solana-transaction-status",
"solana-version",
"spl-token-2022",
"thiserror",
]
[[package]]
name = "solana-sdk"
version = "1.16.15"
@ -4105,6 +4291,21 @@ dependencies = [
"thiserror",
]
[[package]]
name = "solana-version"
version = "1.16.15"
source = "git+https://github.com/blockworks-foundation/solana.git?branch=geyser_send_transaction_result_plugin#4641300ad4ecb89340f1c7954be04a74dd2575f2"
dependencies = [
"log",
"rustc_version",
"semver",
"serde",
"serde_derive",
"solana-frozen-abi 1.16.15",
"solana-frozen-abi-macro 1.16.15",
"solana-sdk 1.16.15",
]
[[package]]
name = "solana-vote-program"
version = "1.16.15"
@ -4365,6 +4566,12 @@ dependencies = [
"winapi-util",
]
[[package]]
name = "textwrap"
version = "0.16.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "222a222a5bfe1bba4a77b45ec488a741b3cb8872e5e499451fd7d0129c9c7c3d"
[[package]]
name = "thiserror"
version = "1.0.49"

View File

@ -1,6 +1,7 @@
[workspace]
members = [
"plugin"
"plugin",
"client"
]
[profile.release]

46
client/Cargo.toml Normal file
View File

@ -0,0 +1,46 @@
[package]
name = "client"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
tokio = { version = "1.21.2", features = ["rt-multi-thread", "macros", "time", "fs"] }
solana-sdk = { git = "https://github.com/blockworks-foundation/solana.git", branch = "geyser_send_transaction_result_plugin" }
solana-geyser-plugin-interface = { git = "https://github.com/blockworks-foundation/solana.git", branch = "geyser_send_transaction_result_plugin" }
solana-streamer = { git = "https://github.com/blockworks-foundation/solana.git", branch = "geyser_send_transaction_result_plugin" }
solana-quic-client = { git = "https://github.com/blockworks-foundation/solana.git", branch = "geyser_send_transaction_result_plugin" }
solana-net-utils = { git = "https://github.com/blockworks-foundation/solana.git", branch = "geyser_send_transaction_result_plugin" }
geyser-quic-plugin = { path = "../plugin" }
itertools = "0.10.5"
serde = { version = "1.0.160", features = ["derive"] }
serde_json = "1.0.96"
bincode = "1.3.3"
bs58 = "0.4.0"
base64 = "0.21.0"
thiserror = "1.0.40"
futures = "0.3.28"
bytes = "1.4.0"
anyhow = "1.0.70"
log = "0.4.17"
clap = { version = "4.2.4", features = ["derive", "env"] }
dashmap = "5.4.0"
const_env = "0.1.2"
jsonrpsee = { version = "0.17.0", features = ["macros", "full"] }
tracing = "0.1.37"
tracing-subscriber = "0.3.16"
chrono = "0.4.24"
native-tls = "0.2.11"
postgres-native-tls = "0.5.0"
prometheus = "0.13.3"
lazy_static = "1.4.0"
dotenv = "0.15.0"
async-channel = "1.8.0"
quinn = "0.9.3"
rustls = { version = "=0.20.8", default-features = false }
rcgen = "0.10.0"
pkcs8 = "0.8.0"
pem = "1.1.1"

30
client/src/main.rs Normal file
View File

@ -0,0 +1,30 @@
use std::time::{Instant, Duration};
use geyser_quic_plugin::TransactionResults;
mod quic_connection;
pub const PACKET_DATA_SIZE: usize = 1280 - 40 - 8;
#[tokio::main()]
pub async fn main() {
tokio::spawn(async move {
// wait for 10 s max
let mut timeout: u64 = 10_000;
let mut start = Instant::now();
const LAST_BUFFER_SIZE: usize = QUIC_MESSAGE_SIZE + 1;
let mut last_buffer: [u8; LAST_BUFFER_SIZE] = [0; LAST_BUFFER_SIZE];
let mut buffer_written = 0;
let mut recv_stream = recv_stream;
loop {
if let Ok(chunk) = tokio::time::timeout(
Duration::from_millis(timeout),
recv_stream.read_chunk(PACKET_DATA_SIZE, false),
)
.await {
}
}
});
}

View File

@ -0,0 +1,183 @@
use std::{sync::Arc, net::{SocketAddr, UdpSocket, IpAddr, Ipv4Addr}, time::Duration};
use quinn::{Connection, Endpoint, ConnectionError, EndpointConfig, TokioRuntime, ClientConfig, TransportConfig, IdleTimeout};
use solana_net_utils::PortRange;
use solana_quic_client::nonblocking::quic_client::QuicClientCertificate;
use solana_sdk::{signature::Keypair, quic::{QUIC_MAX_TIMEOUT, QUIC_KEEP_ALIVE}};
use solana_streamer::tls_certificates::new_self_signed_tls_certificate;
use tokio::{time::timeout, sync::OnceCell};
use geyser_quic_plugin::ALPN_GEYSER_PROTOCOL_ID;
pub const QUIC_CONNECTION_HANDSHAKE_TIMEOUT: Duration = Duration::from_secs(60);
pub const PORT_RANGE: PortRange = (8000, 20_000);
// This code is copied from solana
/// A lazy-initialized Quic Endpoint
pub struct QuicLazyInitializedEndpoint {
endpoint: OnceCell<Arc<Endpoint>>,
client_certificate: Arc<QuicClientCertificate>,
client_endpoint: Option<Endpoint>,
}
impl QuicLazyInitializedEndpoint {
pub fn new(
client_certificate: Arc<QuicClientCertificate>,
client_endpoint: Option<Endpoint>,
) -> Self {
Self {
endpoint: OnceCell::<Arc<Endpoint>>::new(),
client_certificate,
client_endpoint,
}
}
fn create_endpoint(&self) -> Endpoint {
let mut endpoint = if let Some(endpoint) = &self.client_endpoint {
endpoint.clone()
} else {
let client_socket = solana_net_utils::bind_in_range(
IpAddr::V4(Ipv4Addr::UNSPECIFIED),
PORT_RANGE,
)
.expect("QuicLazyInitializedEndpoint::create_endpoint bind_in_range")
.1;
QuicNewConnection::create_endpoint(EndpointConfig::default(), client_socket)
};
let mut crypto = rustls::ClientConfig::builder()
.with_safe_defaults()
.with_custom_certificate_verifier(SkipServerVerification::new())
.with_single_cert(
vec![self.client_certificate.certificate.clone()],
self.client_certificate.key.clone(),
)
.expect("Failed to set QUIC client certificates");
crypto.enable_early_data = true;
crypto.alpn_protocols = vec![ALPN_GEYSER_PROTOCOL_ID.to_vec()];
let mut config = ClientConfig::new(Arc::new(crypto));
let mut transport_config = TransportConfig::default();
let timeout = IdleTimeout::try_from(QUIC_MAX_TIMEOUT).unwrap();
transport_config.max_idle_timeout(Some(timeout));
transport_config.keep_alive_interval(Some(QUIC_KEEP_ALIVE));
config.transport_config(Arc::new(transport_config));
endpoint.set_default_client_config(config);
endpoint
}
async fn get_endpoint(&self) -> Arc<Endpoint> {
self.endpoint
.get_or_init(|| async { Arc::new(self.create_endpoint()) })
.await
.clone()
}
}
impl Default for QuicLazyInitializedEndpoint {
fn default() -> Self {
let (cert, priv_key) =
new_self_signed_tls_certificate(&Keypair::new(), IpAddr::V4(Ipv4Addr::UNSPECIFIED))
.expect("Failed to create QUIC client certificate");
Self::new(
Arc::new(QuicClientCertificate {
certificate: cert,
key: priv_key,
}),
None,
)
}
}
#[derive(Clone)]
struct QuicNewConnection {
endpoint: Arc<Endpoint>,
connection: Arc<Connection>,
}
impl QuicNewConnection {
/// Create a QuicNewConnection given the remote address 'addr'.
async fn make_connection(
endpoint: Arc<QuicLazyInitializedEndpoint>,
addr: SocketAddr,
) -> anyhow::Result<Self> {
let endpoint = endpoint.get_endpoint().await;
let connecting = endpoint.connect(addr, "connect")?;
if let Ok(connecting_result) = timeout(QUIC_CONNECTION_HANDSHAKE_TIMEOUT, connecting).await
{
let connection = connecting_result?;
Ok(Self {
endpoint,
connection: Arc::new(connection),
})
} else {
Err(ConnectionError::TimedOut.into())
}
}
fn create_endpoint(config: EndpointConfig, client_socket: UdpSocket) -> Endpoint {
quinn::Endpoint::new(config, None, client_socket, TokioRuntime)
.expect("QuicNewConnection::create_endpoint quinn::Endpoint::new")
}
// Attempts to make a faster connection by taking advantage of pre-existing key material.
// Only works if connection to this endpoint was previously established.
async fn make_connection_0rtt(
&mut self,
addr: SocketAddr,
) -> anyhow::Result<Arc<Connection>> {
let connecting = self.endpoint.connect(addr, "connect")?;
let connection = match connecting.into_0rtt() {
Ok((connection, zero_rtt)) => {
if let Ok(zero_rtt) = timeout(QUIC_CONNECTION_HANDSHAKE_TIMEOUT, zero_rtt).await {
connection
} else {
return Err(ConnectionError::TimedOut.into());
}
}
Err(connecting) => {
if let Ok(connecting_result) =
timeout(QUIC_CONNECTION_HANDSHAKE_TIMEOUT, connecting).await
{
connecting_result?
} else {
return Err(ConnectionError::TimedOut.into());
}
}
};
self.connection = Arc::new(connection);
Ok(self.connection.clone())
}
}
pub struct SkipServerVerification;
impl SkipServerVerification {
pub fn new() -> Arc<Self> {
Arc::new(Self)
}
}
impl rustls::client::ServerCertVerifier for SkipServerVerification {
fn verify_server_cert(
&self,
_end_entity: &rustls::Certificate,
_intermediates: &[rustls::Certificate],
_server_name: &rustls::ServerName,
_scts: &mut dyn Iterator<Item = &[u8]>,
_ocsp_response: &[u8],
_now: std::time::SystemTime,
) -> Result<rustls::client::ServerCertVerified, rustls::Error> {
Ok(rustls::client::ServerCertVerified::assertion())
}
}

12
migration.sql Normal file
View File

@ -0,0 +1,12 @@
CREATE SCHEMA banking_stage_results;
CREATE TABLE banking_stage_results.transaction_infos (
signature CHAR(88) NOT NULL,
message text,
errors text,
is_executed BOOL,
is_confirmed BOOL,
first_notification_slot BIGINT NOT NULL,
cu_requested BIGINT,
prioritization_fees BIGINT
);

View File

@ -1,5 +1,5 @@
[package]
name = "plugin"
name = "geyser-quic-plugin"
version = "0.1.0"
edition = "2021"

View File

@ -10,8 +10,7 @@ use tokio::{runtime::Runtime, task::JoinHandle, sync::mpsc::UnboundedSender};
use crate::skip_client_verification::SkipClientVerification;
mod skip_client_verification;
mod plugin_error;
pub mod skip_client_verification;
pub const ALPN_GEYSER_PROTOCOL_ID: &[u8] = b"solana-geyser";

View File

@ -1,33 +0,0 @@
use std::{fmt::Display, error};
#[derive(Debug, Clone)]
pub struct PluginError {
msg: String,
}
impl PluginError {
pub fn new(msg: String) -> Self {
PluginError { msg }
}
}
impl Display for PluginError {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "Plugin error {}", self.msg)
}
}
impl error::Error for PluginError {
fn source(&self) -> Option<&(dyn error::Error + 'static)> {
None
}
fn description(&self) -> &str {
"description() is deprecated; use Display"
}
fn cause(&self) -> Option<&dyn error::Error> {
self.source()
}
}