Implementing quic server and client side

This commit is contained in:
Godmode Galactus 2023-11-08 14:46:32 +01:00
parent 72d7848bd3
commit 234b3e083e
No known key found for this signature in database
GPG Key ID: 22DA4A30887FDA3C
8 changed files with 865 additions and 351 deletions

791
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -7,11 +7,12 @@ edition = "2021"
[dependencies]
tokio = { version = "1.21.2", features = ["rt-multi-thread", "macros", "time", "fs"] }
solana-sdk = { git = "https://github.com/blockworks-foundation/solana.git", branch = "geyser_send_transaction_result_plugin" }
solana-geyser-plugin-interface = { git = "https://github.com/blockworks-foundation/solana.git", branch = "geyser_send_transaction_result_plugin" }
solana-streamer = { git = "https://github.com/blockworks-foundation/solana.git", branch = "geyser_send_transaction_result_plugin" }
solana-quic-client = { git = "https://github.com/blockworks-foundation/solana.git", branch = "geyser_send_transaction_result_plugin" }
solana-net-utils = { git = "https://github.com/blockworks-foundation/solana.git", branch = "geyser_send_transaction_result_plugin" }
solana-sdk = "1.16.18"
solana-geyser-plugin-interface = "1.16.18"
solana-streamer = "1.16.18"
solana-quic-client = "1.16.18"
solana-net-utils = "1.16.18"
solana-connection-cache = "1.16.18"
geyser-quic-plugin = { path = "../plugin" }
@ -39,8 +40,8 @@ prometheus = "0.13.3"
lazy_static = "1.4.0"
dotenv = "0.15.0"
async-channel = "1.8.0"
quinn = "0.9.3"
rustls = { version = "=0.20.8", default-features = false }
quinn = "0.10.2"
rustls = { version = "0.21.8", default-features = false, features = ["quic"] }
rcgen = "0.10.0"
pkcs8 = "0.8.0"
pem = "1.1.1"

11
client/src/cli.rs Normal file
View File

@ -0,0 +1,11 @@
use clap::Parser;
#[derive(Parser, Debug, Clone)]
#[command(author, version, about, long_about = None)]
pub struct Args {
#[arg(short, long, default_value_t = String::from("http://127.0.0.1:10000"))]
pub geyser_quic_address: String,
#[arg(short, long, default_value_t = String::from("connection_identity.json"))]
pub identity: String,
}

View File

@ -1,30 +1,88 @@
use std::time::{Instant, Duration};
use std::{time::Duration, net::{IpAddr, Ipv4Addr, SocketAddr}, sync::Arc};
use geyser_quic_plugin::TransactionResults;
use cli::Args;
use geyser_quic_plugin::{TransactionResults, ALPN_GEYSER_PROTOCOL_ID};
use quinn::{TokioRuntime, EndpointConfig, Endpoint, ClientConfig, TransportConfig, IdleTimeout};
use solana_quic_client::nonblocking::quic_client:: SkipServerVerification;
use solana_sdk::signature::Keypair;
use clap::Parser;
use solana_streamer::tls_certificates::new_self_signed_tls_certificate;
mod quic_connection;
mod cli;
pub const PACKET_DATA_SIZE: usize = 1280 - 40 - 8;
pub async fn load_identity_keypair(identity_file: &String) -> Option<Keypair> {
let identity_file = tokio::fs::read_to_string(identity_file.as_str())
.await
.expect("Cannot find the identity file provided");
let identity_bytes: Vec<u8> = serde_json::from_str(&identity_file).unwrap();
Some(Keypair::from_bytes(identity_bytes.as_slice()).unwrap())
}
pub fn create_endpoint(certificate: rustls::Certificate, key: rustls::PrivateKey) -> Endpoint {
let mut endpoint = {
let client_socket =
solana_net_utils::bind_in_range(IpAddr::V4(Ipv4Addr::UNSPECIFIED), (8000, 10000))
.expect("create_endpoint bind_in_range")
.1;
let config = EndpointConfig::default();
quinn::Endpoint::new(config, None, client_socket, Arc::new(TokioRuntime))
.expect("create_endpoint quinn::Endpoint::new")
};
let mut crypto = rustls::ClientConfig::builder()
.with_safe_defaults()
.with_custom_certificate_verifier(Arc::new(SkipServerVerification {}))
.with_client_auth_cert(vec![certificate], key)
.expect("Failed to set QUIC client certificates");
crypto.enable_early_data = true;
crypto.alpn_protocols = vec![ALPN_GEYSER_PROTOCOL_ID.to_vec()];
let mut config = ClientConfig::new(Arc::new(crypto));
let mut transport_config = TransportConfig::default();
let timeout = IdleTimeout::try_from(Duration::from_secs(3600 * 48)).unwrap();
transport_config.max_idle_timeout(Some(timeout));
transport_config.keep_alive_interval(Some(Duration::from_millis(500)));
config.transport_config(Arc::new(transport_config));
endpoint.set_default_client_config(config);
endpoint
}
#[tokio::main()]
pub async fn main() {
pub async fn main() -> anyhow::Result<()> {
let args = Args::parse();
let address: SocketAddr = args.geyser_quic_address.parse().expect("should be valid socket address");
let keypair = load_identity_keypair(&args.identity).await.expect("Identity file should be valid");
let (certificate, key) = new_self_signed_tls_certificate(
&keypair,
IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
)
.expect("Failed to initialize QUIC client certificates");
let endpoint = create_endpoint(certificate, key);
let connection = endpoint.connect(address, "quic_geyser_plugin").expect("Should be connecting").await.expect("Should be able to connect to the plugin");
let (_send_stream, recv_stream) = connection.open_bi().await.expect("Should be able to create a bi directional connection");
tokio::spawn(async move {
// wait for 10 s max
let mut timeout: u64 = 10_000;
let mut start = Instant::now();
const LAST_BUFFER_SIZE: usize = QUIC_MESSAGE_SIZE + 1;
let mut last_buffer: [u8; LAST_BUFFER_SIZE] = [0; LAST_BUFFER_SIZE];
let mut buffer_written = 0;
let mut buffer: [u8; PACKET_DATA_SIZE] = [0; PACKET_DATA_SIZE];
let mut recv_stream = recv_stream;
loop {
if let Ok(chunk) = tokio::time::timeout(
Duration::from_millis(timeout),
recv_stream.read_chunk(PACKET_DATA_SIZE, false),
)
.await {
if let Ok(Some(size)) = recv_stream.read(&mut buffer ).await
{
let data = &buffer[0..size];
if let Ok(result) = bincode::deserialize::<TransactionResults>(&data) {
println!("Transaction Result \n s:{} e:{} slt:{}", result.signature, result.error.map(|x| x.to_string()).unwrap_or_default(), result.slot);
}
}
}
});
}
Ok(())
}

View File

@ -1,183 +0,0 @@
use std::{sync::Arc, net::{SocketAddr, UdpSocket, IpAddr, Ipv4Addr}, time::Duration};
use quinn::{Connection, Endpoint, ConnectionError, EndpointConfig, TokioRuntime, ClientConfig, TransportConfig, IdleTimeout};
use solana_net_utils::PortRange;
use solana_quic_client::nonblocking::quic_client::QuicClientCertificate;
use solana_sdk::{signature::Keypair, quic::{QUIC_MAX_TIMEOUT, QUIC_KEEP_ALIVE}};
use solana_streamer::tls_certificates::new_self_signed_tls_certificate;
use tokio::{time::timeout, sync::OnceCell};
use geyser_quic_plugin::ALPN_GEYSER_PROTOCOL_ID;
pub const QUIC_CONNECTION_HANDSHAKE_TIMEOUT: Duration = Duration::from_secs(60);
pub const PORT_RANGE: PortRange = (8000, 20_000);
// This code is copied from solana
/// A lazy-initialized Quic Endpoint
pub struct QuicLazyInitializedEndpoint {
endpoint: OnceCell<Arc<Endpoint>>,
client_certificate: Arc<QuicClientCertificate>,
client_endpoint: Option<Endpoint>,
}
impl QuicLazyInitializedEndpoint {
pub fn new(
client_certificate: Arc<QuicClientCertificate>,
client_endpoint: Option<Endpoint>,
) -> Self {
Self {
endpoint: OnceCell::<Arc<Endpoint>>::new(),
client_certificate,
client_endpoint,
}
}
fn create_endpoint(&self) -> Endpoint {
let mut endpoint = if let Some(endpoint) = &self.client_endpoint {
endpoint.clone()
} else {
let client_socket = solana_net_utils::bind_in_range(
IpAddr::V4(Ipv4Addr::UNSPECIFIED),
PORT_RANGE,
)
.expect("QuicLazyInitializedEndpoint::create_endpoint bind_in_range")
.1;
QuicNewConnection::create_endpoint(EndpointConfig::default(), client_socket)
};
let mut crypto = rustls::ClientConfig::builder()
.with_safe_defaults()
.with_custom_certificate_verifier(SkipServerVerification::new())
.with_single_cert(
vec![self.client_certificate.certificate.clone()],
self.client_certificate.key.clone(),
)
.expect("Failed to set QUIC client certificates");
crypto.enable_early_data = true;
crypto.alpn_protocols = vec![ALPN_GEYSER_PROTOCOL_ID.to_vec()];
let mut config = ClientConfig::new(Arc::new(crypto));
let mut transport_config = TransportConfig::default();
let timeout = IdleTimeout::try_from(QUIC_MAX_TIMEOUT).unwrap();
transport_config.max_idle_timeout(Some(timeout));
transport_config.keep_alive_interval(Some(QUIC_KEEP_ALIVE));
config.transport_config(Arc::new(transport_config));
endpoint.set_default_client_config(config);
endpoint
}
async fn get_endpoint(&self) -> Arc<Endpoint> {
self.endpoint
.get_or_init(|| async { Arc::new(self.create_endpoint()) })
.await
.clone()
}
}
impl Default for QuicLazyInitializedEndpoint {
fn default() -> Self {
let (cert, priv_key) =
new_self_signed_tls_certificate(&Keypair::new(), IpAddr::V4(Ipv4Addr::UNSPECIFIED))
.expect("Failed to create QUIC client certificate");
Self::new(
Arc::new(QuicClientCertificate {
certificate: cert,
key: priv_key,
}),
None,
)
}
}
#[derive(Clone)]
struct QuicNewConnection {
endpoint: Arc<Endpoint>,
connection: Arc<Connection>,
}
impl QuicNewConnection {
/// Create a QuicNewConnection given the remote address 'addr'.
async fn make_connection(
endpoint: Arc<QuicLazyInitializedEndpoint>,
addr: SocketAddr,
) -> anyhow::Result<Self> {
let endpoint = endpoint.get_endpoint().await;
let connecting = endpoint.connect(addr, "connect")?;
if let Ok(connecting_result) = timeout(QUIC_CONNECTION_HANDSHAKE_TIMEOUT, connecting).await
{
let connection = connecting_result?;
Ok(Self {
endpoint,
connection: Arc::new(connection),
})
} else {
Err(ConnectionError::TimedOut.into())
}
}
fn create_endpoint(config: EndpointConfig, client_socket: UdpSocket) -> Endpoint {
quinn::Endpoint::new(config, None, client_socket, TokioRuntime)
.expect("QuicNewConnection::create_endpoint quinn::Endpoint::new")
}
// Attempts to make a faster connection by taking advantage of pre-existing key material.
// Only works if connection to this endpoint was previously established.
async fn make_connection_0rtt(
&mut self,
addr: SocketAddr,
) -> anyhow::Result<Arc<Connection>> {
let connecting = self.endpoint.connect(addr, "connect")?;
let connection = match connecting.into_0rtt() {
Ok((connection, zero_rtt)) => {
if let Ok(zero_rtt) = timeout(QUIC_CONNECTION_HANDSHAKE_TIMEOUT, zero_rtt).await {
connection
} else {
return Err(ConnectionError::TimedOut.into());
}
}
Err(connecting) => {
if let Ok(connecting_result) =
timeout(QUIC_CONNECTION_HANDSHAKE_TIMEOUT, connecting).await
{
connecting_result?
} else {
return Err(ConnectionError::TimedOut.into());
}
}
};
self.connection = Arc::new(connection);
Ok(self.connection.clone())
}
}
pub struct SkipServerVerification;
impl SkipServerVerification {
pub fn new() -> Arc<Self> {
Arc::new(Self)
}
}
impl rustls::client::ServerCertVerifier for SkipServerVerification {
fn verify_server_cert(
&self,
_end_entity: &rustls::Certificate,
_intermediates: &[rustls::Certificate],
_server_name: &rustls::ServerName,
_scts: &mut dyn Iterator<Item = &[u8]>,
_ocsp_response: &[u8],
_now: std::time::SystemTime,
) -> Result<rustls::client::ServerCertVerified, rustls::Error> {
Ok(rustls::client::ServerCertVerified::assertion())
}
}

View File

@ -8,5 +8,16 @@ CREATE TABLE banking_stage_results.transaction_infos (
is_confirmed BOOL,
first_notification_slot BIGINT NOT NULL,
cu_requested BIGINT,
prioritization_fees BIGINT
prioritization_fees BIGINT,
time_of_first_notification BIGINT,
);
CREATE TABLE banking_stage_results.blocks (
block_hash text not null,
notification_timestamp BIGINT,
confirmed_timestamp BIGINT,
number_of_transactions BIGINT,
number_of_transactions BIGINT,
total_cu BIGINT,
cu_by_accounts text,
)

View File

@ -1,12 +1,29 @@
use std::{net::{IpAddr, Ipv4Addr, UdpSocket}, sync::Arc, str::FromStr};
use std::{
net::{IpAddr, Ipv4Addr, UdpSocket},
str::FromStr,
sync::Arc,
};
use itertools::Itertools;
use pem::Pem;
use quinn::{ServerConfig, IdleTimeout, Endpoint, TokioRuntime, EndpointConfig};
use serde::{Serialize, Deserialize};
use solana_geyser_plugin_interface::geyser_plugin_interface::{GeyserPlugin, Result as PluginResult, GeyserPluginError};
use solana_sdk::{signature::{Signature, Keypair}, transaction::{TransactionError, SanitizedTransaction}, slot_history::Slot, quic::QUIC_MAX_TIMEOUT, packet::PACKET_DATA_SIZE, pubkey::Pubkey};
use solana_streamer::{tls_certificates::{new_self_signed_tls_certificate, get_pubkey_from_tls_certificate}, quic::QuicServerError};
use tokio::{runtime::Runtime, task::JoinHandle, sync::mpsc::UnboundedSender};
use quinn::{Endpoint, EndpointConfig, IdleTimeout, ServerConfig, TokioRuntime};
use serde::{Deserialize, Serialize};
use solana_geyser_plugin_interface::geyser_plugin_interface::{
GeyserPlugin, GeyserPluginError, Result as PluginResult,
};
use solana_sdk::{
packet::PACKET_DATA_SIZE,
pubkey::Pubkey,
quic::QUIC_MAX_TIMEOUT,
signature::{Keypair, Signature},
slot_history::Slot,
transaction::{SanitizedTransaction, TransactionError}, compute_budget::{self, ComputeBudgetInstruction}, borsh0_10::try_from_slice_unchecked,
};
use solana_streamer::{
quic::QuicServerError,
tls_certificates::{get_pubkey_from_tls_certificate, new_self_signed_tls_certificate},
};
use tokio::{runtime::Runtime, sync::mpsc::UnboundedSender, task::JoinHandle};
use crate::skip_client_verification::SkipClientVerification;
@ -16,9 +33,42 @@ pub const ALPN_GEYSER_PROTOCOL_ID: &[u8] = b"solana-geyser";
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct TransactionResults {
signature: Signature,
error: Option<TransactionError>,
slot: Slot,
pub signature: Signature,
pub error: Option<TransactionError>,
pub slot: Slot,
pub writable_accounts: Vec<Pubkey>,
pub readable_accounts: Vec<Pubkey>,
pub cu_requested: u64,
pub prioritization_fees : u64,
}
fn decode_cu_requested_and_prioritization_fees(transaction: &SanitizedTransaction,) -> (u64, u64) {
let mut cu_requested:u64 = 200_000;
let mut prioritization_fees: u64 = 0;
let accounts = transaction.message().account_keys().iter().map(|x| *x).collect_vec();
for ix in transaction.message().instructions() {
if ix.program_id(accounts.as_slice())
.eq(&compute_budget::id())
{
let cb_ix = try_from_slice_unchecked::<ComputeBudgetInstruction>(ix.data.as_slice());
if let Ok(ComputeBudgetInstruction::RequestUnitsDeprecated {
units,
additional_fee,
}) = cb_ix
{
if additional_fee > 0 {
return (units as u64, ((units * 1000) / additional_fee) as u64);
} else {
return (units as u64, 0);
}
} else if let Ok(ComputeBudgetInstruction::SetComputeUnitLimit(units)) = cb_ix {
cu_requested = units as u64;
} else if let Ok(ComputeBudgetInstruction::SetComputeUnitPrice(price)) = cb_ix {
prioritization_fees = price;
}
}
}
(cu_requested, prioritization_fees)
}
#[derive(Debug)]
@ -50,7 +100,28 @@ impl GeyserPlugin for Plugin {
slot: Slot,
) -> PluginResult<()> {
if let Some(inner) = &self.inner {
if let Err(e) = inner.sender.send(TransactionResults { signature: transaction.signature().clone(), error, slot }) {
let message = transaction.message();
let accounts = message.account_keys();
let is_writable = accounts.iter().enumerate().map(|(index, _)| {
transaction.message().is_writable(index)
}).collect_vec();
let mut writable_accounts = is_writable.iter().enumerate().filter(|(_, v)| **v).map(|(index, get_mut)| accounts[index]).collect_vec();
let mut readable_accounts = is_writable.iter().enumerate().filter(|(_, v)| !**v).map(|(index, get_mut)| accounts[index]).collect_vec();
writable_accounts.truncate(32);
readable_accounts.truncate(32);
let (cu_requested, prioritization_fees) = decode_cu_requested_and_prioritization_fees(transaction);
if let Err(e) = inner.sender.send(TransactionResults {
signature: transaction.signature().clone(),
error,
slot,
writable_accounts,
readable_accounts,
cu_requested,
prioritization_fees,
}) {
log::error!("error sending on the channel {}", e);
}
Ok(())
@ -59,17 +130,21 @@ impl GeyserPlugin for Plugin {
}
}
fn on_load(&mut self, _config_file: &str) -> solana_geyser_plugin_interface::geyser_plugin_interface::Result<()> {
fn on_load(
&mut self,
_config_file: &str,
) -> solana_geyser_plugin_interface::geyser_plugin_interface::Result<()> {
let runtime = Runtime::new().map_err(|error| GeyserPluginError::Custom(Box::new(error)))?;
let res = configure_server(&Keypair::new(), IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)));
let res = configure_server(&Keypair::new(), IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)));
let (config, _) = res.map_err(|error| GeyserPluginError::Custom(Box::new(error)))?;
let sock = UdpSocket::bind("127.0.0.1:18990").expect("couldn't bind to address");
let endpoint = Endpoint::new(EndpointConfig::default(), Some(config), sock, TokioRuntime)
.map_err(|error| GeyserPluginError::Custom(Box::new(error)))?;
.map_err(|error| GeyserPluginError::Custom(Box::new(error)))?;
let (sender, reciever) = tokio::sync::mpsc::unbounded_channel::<TransactionResults>();
let allowed_connection = Pubkey::from_str("G8pLuvzarejjLuuPNVNR1gk9xiFKmAcs9J5LL3GZGM6F").unwrap();
let allowed_connection =
Pubkey::from_str("G8pLuvzarejjLuuPNVNR1gk9xiFKmAcs9J5LL3GZGM6F").unwrap();
let handle = tokio::spawn(async move {
let mut reciever = reciever;
@ -169,4 +244,4 @@ pub fn get_remote_pubkey(connection: &quinn::Connection) -> Option<Pubkey> {
.filter(|certs| certs.len() == 1)?
.first()
.and_then(get_pubkey_from_tls_certificate)
}
}

View File

@ -1,6 +1,6 @@
use std::{sync::Arc, time::SystemTime};
use rustls::{DistinguishedNames, server::ClientCertVerified, Certificate};
use rustls::{server::ClientCertVerified, Certificate, DistinguishedNames};
pub struct SkipClientVerification;