Merge pull request #177 from blockworks-foundation/quic-proxy-gm-improvements

Quic proxy gm improvements
This commit is contained in:
galactus 2023-09-05 09:19:07 +02:00 committed by GitHub
commit ffc6a15e5b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 95 additions and 116 deletions

1
Cargo.lock generated
View File

@ -4317,6 +4317,7 @@ dependencies = [
"rustls 0.20.8",
"serde",
"serde_json",
"solana-lite-rpc-core",
"solana-net-utils",
"solana-sdk",
"solana-streamer",

View File

@ -0,0 +1,26 @@
use solana_sdk::signature::Keypair;
use std::env;
// note this is duplicated from lite-rpc module
pub async fn load_identity_keypair(identity_from_cli: &String) -> Option<Keypair> {
if let Ok(identity_env_var) = env::var("IDENTITY") {
if let Ok(identity_bytes) = serde_json::from_str::<Vec<u8>>(identity_env_var.as_str()) {
Some(Keypair::from_bytes(identity_bytes.as_slice()).unwrap())
} else {
// must be a file
let identity_file = tokio::fs::read_to_string(identity_env_var.as_str())
.await
.expect("Cannot find the identity file provided");
let identity_bytes: Vec<u8> = serde_json::from_str(&identity_file).unwrap();
Some(Keypair::from_bytes(identity_bytes.as_slice()).unwrap())
}
} else if identity_from_cli.is_empty() {
None
} else {
let identity_file = tokio::fs::read_to_string(identity_from_cli.as_str())
.await
.expect("Cannot find the identity file provided");
let identity_bytes: Vec<u8> = serde_json::from_str(&identity_file).unwrap();
Some(Keypair::from_bytes(identity_bytes.as_slice()).unwrap())
}
}

View File

@ -1,6 +1,7 @@
pub mod block_information_store;
pub mod cluster_info;
pub mod data_cache;
pub mod keypair_loader;
pub mod leaders_fetcher_trait;
pub mod notifications;
pub mod proxy_request_format;

View File

@ -31,7 +31,7 @@ pub struct Args {
#[arg(long, default_value_t = DEFAULT_RETRY_TIMEOUT)]
pub transaction_retry_after_secs: u64,
#[arg(long)]
pub experimental_quic_proxy_addr: Option<String>,
pub quic_proxy_addr: Option<String>,
#[arg(short = 'g', long)]
pub use_grpc: bool,
/// grpc address

View File

@ -17,6 +17,7 @@ use solana_lite_rpc_cluster_endpoints::json_rpc_subscription::create_json_rpc_po
use solana_lite_rpc_core::block_information_store::{BlockInformation, BlockInformationStore};
use solana_lite_rpc_core::cluster_info::ClusterInfo;
use solana_lite_rpc_core::data_cache::{DataCache, SlotCache};
use solana_lite_rpc_core::keypair_loader::load_identity_keypair;
use solana_lite_rpc_core::notifications::NotificationSender;
use solana_lite_rpc_core::quic_connection_utils::QuicConnectionParameters;
use solana_lite_rpc_core::streams::BlockStream;
@ -40,29 +41,6 @@ use tokio::sync::mpsc;
use crate::rpc_tester::RpcTester;
async fn get_identity_keypair(identity_from_cli: &str) -> Keypair {
if let Ok(identity_env_var) = env::var("IDENTITY") {
if let Ok(identity_bytes) = serde_json::from_str::<Vec<u8>>(identity_env_var.as_str()) {
Keypair::from_bytes(identity_bytes.as_slice()).unwrap()
} else {
// must be a file
let identity_file = tokio::fs::read_to_string(identity_env_var.as_str())
.await
.expect("Cannot find the identity file provided");
let identity_bytes: Vec<u8> = serde_json::from_str(&identity_file).unwrap();
Keypair::from_bytes(identity_bytes.as_slice()).unwrap()
}
} else if identity_from_cli.is_empty() {
Keypair::new()
} else {
let identity_file = tokio::fs::read_to_string(identity_from_cli)
.await
.expect("Cannot find the identity file provided");
let identity_bytes: Vec<u8> = serde_json::from_str(&identity_file).unwrap();
Keypair::from_bytes(identity_bytes.as_slice()).unwrap()
}
}
async fn get_latest_block(
mut block_stream: BlockStream,
commitment_config: CommitmentConfig,
@ -106,17 +84,21 @@ pub async fn start_lite_rpc(args: Args) -> anyhow::Result<()> {
identity_keypair,
maximum_retries_per_tx,
transaction_retry_after_secs,
experimental_quic_proxy_addr,
quic_proxy_addr,
use_grpc,
grpc_addr,
..
} = args;
let validator_identity = Arc::new(get_identity_keypair(&identity_keypair).await);
let validator_identity = Arc::new(
load_identity_keypair(&identity_keypair)
.await
.unwrap_or_else(Keypair::new),
);
let retry_after = Duration::from_secs(transaction_retry_after_secs);
let tpu_connection_path = configure_tpu_connection_path(experimental_quic_proxy_addr);
let tpu_connection_path = configure_tpu_connection_path(quic_proxy_addr);
// rpc client
let rpc_client = Arc::new(RpcClient::new(rpc_addr.clone()));
@ -278,14 +260,12 @@ pub async fn main() -> anyhow::Result<()> {
}
}
fn configure_tpu_connection_path(
experimental_quic_proxy_addr: Option<String>,
) -> TpuConnectionPath {
match experimental_quic_proxy_addr {
fn configure_tpu_connection_path(quic_proxy_addr: Option<String>) -> TpuConnectionPath {
match quic_proxy_addr {
None => TpuConnectionPath::QuicDirectPath,
Some(prox_address) => TpuConnectionPath::QuicForwardProxyPath {
// e.g. "127.0.0.1:11111"
forward_proxy_address: prox_address.parse().unwrap(),
forward_proxy_address: prox_address.parse().expect("Invalid proxy address"),
},
}
}

View File

@ -10,6 +10,7 @@ license = "AGPL"
publish = false
[dependencies]
solana-lite-rpc-core = { workspace = true }
solana-sdk = { workspace = true }
solana-streamer = { workspace = true }
solana-transaction-status = { workspace = true }

View File

@ -28,7 +28,7 @@ RUST_LOG=debug cargo run --bin solana-lite-rpc-quic-forward-proxy -- --proxy-lis
```
2. run lite-rpc
```bash
RUST_LOG=debug cargo run --bin lite-rpc -- --experimental-quic-proxy-addr 127.0.0.1:11111
RUST_LOG=debug cargo run --bin lite-rpc -- --quic-proxy-addr 127.0.0.1:11111
```
3. run rust bench tool in _lite-rpc_
```bash

View File

@ -6,10 +6,9 @@ use crate::tls_self_signed_pair_generator::SelfSignedTlsConfigProvider;
use crate::util::FALLBACK_TIMEOUT;
use anyhow::{anyhow, bail, Context};
use log::{debug, error, info, trace, warn};
use quinn::{Connection, Endpoint, ServerConfig, VarInt};
use quinn::{Connecting, Endpoint, ServerConfig, VarInt};
use solana_sdk::packet::PACKET_DATA_SIZE;
use std::net::SocketAddr;
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc::Sender;
@ -34,11 +33,7 @@ impl ProxyListener {
}
}
pub async fn listen(
&self,
exit_signal: Arc<AtomicBool>,
forwarder_channel: &Sender<ForwardPacket>,
) -> anyhow::Result<()> {
pub async fn listen(&self, forwarder_channel: &Sender<ForwardPacket>) -> anyhow::Result<()> {
info!(
"TPU Quic Proxy server listening on {}",
self.proxy_listener_addr
@ -49,23 +44,15 @@ impl ProxyListener {
.await;
while let Some(connecting) = endpoint.accept().await {
let exit_signal = exit_signal.clone();
let forwarder_channel_copy = forwarder_channel.clone();
tokio::spawn(async move {
let connection = connecting.await.context("handshake").unwrap();
match Self::accept_client_connection(
connection,
forwarder_channel_copy,
exit_signal,
)
.await
{
match Self::handle_client_connection(connecting, forwarder_channel_copy).await {
Ok(()) => {
debug!("connection handles correctly");
debug!("connection handled correctly");
}
Err(err) => {
error!(
"failed to accept connection from client: {reason} - skip",
"failed handling connection from client: {reason} - skip",
reason = err
);
}
@ -99,11 +86,12 @@ impl ProxyListener {
}
#[tracing::instrument(skip_all, level = "debug")]
async fn accept_client_connection(
client_connection: Connection,
async fn handle_client_connection(
client_conn_handshake: Connecting,
forwarder_channel: Sender<ForwardPacket>,
_exit_signal: Arc<AtomicBool>,
) -> anyhow::Result<()> {
let client_connection = client_conn_handshake.await.context("handshake")?;
debug!(
"inbound connection established, client {}",
client_connection.remote_address()
@ -112,21 +100,6 @@ impl ProxyListener {
loop {
let maybe_stream = client_connection.accept_uni().await;
match maybe_stream {
Err(quinn::ConnectionError::ApplicationClosed(reason)) => {
debug!("connection closed by client - reason: {:?}", reason);
if reason.error_code != VarInt::from_u32(0) {
return Err(anyhow!(
"connection closed by client with unexpected reason: {:?}",
reason
));
}
debug!("connection gracefully closed by client");
return Ok(());
}
Err(e) => {
error!("failed to accept stream: {}", e);
bail!("error accepting stream");
}
Ok(recv_stream) => {
let forwarder_channel_copy = forwarder_channel.clone();
tokio::spawn(async move {
@ -180,6 +153,21 @@ impl ProxyListener {
connection_stats(&client_connection)
);
}
Err(quinn::ConnectionError::ApplicationClosed(reason)) => {
debug!("connection closed by client - reason: {:?}", reason);
if reason.error_code != VarInt::from_u32(0) {
return Err(anyhow!(
"connection closed by client with unexpected reason: {:?}",
reason
));
}
debug!("connection gracefully closed by client");
return Ok(());
}
Err(e) => {
error!("failed to accept stream: {}", e);
bail!("error accepting stream");
}
}; // -- result
} // -- loop
}

View File

@ -1,11 +1,11 @@
use crate::cli::Args;
use crate::proxy::QuicForwardProxy;
use crate::tls_self_signed_pair_generator::SelfSignedTlsConfigProvider;
use crate::util::get_identity_keypair;
use anyhow::bail;
use clap::Parser;
use dotenv::dotenv;
use log::info;
use solana_lite_rpc_core::keypair_loader::load_identity_keypair;
use std::sync::Arc;
use crate::validator_identity::ValidatorIdentity;
@ -35,7 +35,7 @@ pub async fn main() -> anyhow::Result<()> {
dotenv().ok();
let proxy_listener_addr = proxy_listen_addr.parse().unwrap();
let validator_identity = ValidatorIdentity::new(get_identity_keypair(&identity_keypair).await);
let validator_identity = ValidatorIdentity::new(load_identity_keypair(&identity_keypair).await);
let tls_config = Arc::new(SelfSignedTlsConfigProvider::new_singleton_self_signed_localhost());
let main_services = QuicForwardProxy::new(proxy_listener_addr, tls_config, validator_identity)

View File

@ -16,27 +16,26 @@ use solana_streamer::nonblocking::quic::ALPN_TPU_PROTOCOL_ID;
use solana_streamer::tls_certificates::new_self_signed_tls_certificate;
use std::collections::HashMap;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::mpsc::Receiver;
use tokio::sync::RwLock;
const MAX_PARALLEL_STREAMS: usize = 6;
pub const PARALLEL_TPU_CONNECTION_COUNT: usize = 4;
const AGENT_SHUTDOWN_IDLE: u64 = 2500; // ms; should be 4x400ms+buffer
const AGENT_SHUTDOWN_IDLE: Duration = Duration::from_millis(2500); // ms; should be 4x400ms+buffer
struct AgentHandle {
pub tpu_address: SocketAddr,
pub agent_exit_signal: Arc<AtomicBool>,
pub created_at: Instant,
// relative to start
pub age_ms: AtomicU64,
pub last_used_at: Arc<RwLock<Instant>>,
}
impl AgentHandle {
pub fn touch(&self) {
let age_ms = Instant::now().duration_since(self.created_at).as_millis() as u64;
self.age_ms.store(age_ms, Ordering::Relaxed);
pub async fn touch(&self) {
let mut timestamp = self.last_used_at.write().await;
*timestamp = Instant::now();
}
}
@ -194,16 +193,15 @@ pub async fn tx_forwarder(
AgentHandle {
tpu_address,
agent_exit_signal,
created_at: now,
age_ms: AtomicU64::new(0),
last_used_at: Arc::new(RwLock::new(now))
}
}); // -- new agent
let agent = agents.get(&tpu_address).unwrap();
agent.touch();
agent.touch().await;
if agent_shutdown_debouncer.can_fire() {
cleanup_agents(&mut agents, &tpu_address);
cleanup_agents(&mut agents, &tpu_address).await;
}
if broadcast_in.len() > 5 {
@ -218,16 +216,23 @@ pub async fn tx_forwarder(
// not reachable
}
fn cleanup_agents(agents: &mut HashMap<SocketAddr, AgentHandle>, current_tpu_address: &SocketAddr) {
async fn cleanup_agents(
agents: &mut HashMap<SocketAddr, AgentHandle>,
current_tpu_address: &SocketAddr,
) {
let now = Instant::now();
let mut to_shutdown = Vec::new();
for (tpu_address, handle) in &*agents {
if tpu_address == current_tpu_address {
continue;
}
let last_used_ms = handle.age_ms.load(Ordering::Relaxed);
let unused_period = {
let last_used_at = handle.last_used_at.read().await;
now - *last_used_at
};
if last_used_ms > AGENT_SHUTDOWN_IDLE {
if unused_period > AGENT_SHUTDOWN_IDLE {
to_shutdown.push(tpu_address.to_owned())
}
}
@ -239,10 +244,14 @@ fn cleanup_agents(agents: &mut HashMap<SocketAddr, AgentHandle>, current_tpu_add
.compare_exchange(false, true, Ordering::SeqCst, Ordering::Relaxed)
.is_ok();
if was_signaled {
let last_used_ms = removed_agent.age_ms.load(Ordering::Relaxed);
let unused_period = {
let last_used_ts = removed_agent.last_used_at.read().await;
Instant::now() - *last_used_ts
};
debug!(
"Idle Agent for tpu node {} idle for {}ms - sending exit signal",
removed_agent.tpu_address, last_used_ms
removed_agent.tpu_address,
unused_period.as_millis()
);
}
}

View File

@ -41,10 +41,9 @@ impl QuicForwardProxy {
let proxy_listener =
proxy_listener::ProxyListener::new(self.proxy_listener_addr, self.tls_config);
let exit_signal_clone = exit_signal.clone();
let quic_proxy = tokio::spawn(async move {
proxy_listener
.listen(exit_signal_clone.clone(), &forwarder_channel)
.listen(&forwarder_channel)
.await
.expect("proxy listen service");
});

View File

@ -1,6 +1,4 @@
#![allow(dead_code)]
use solana_sdk::signature::Keypair;
use std::env;
use std::future::Future;
use std::time::Duration;
@ -15,27 +13,3 @@ where
{
tokio::time::timeout(FALLBACK_TIMEOUT, future)
}
// note this is duplicated from lite-rpc module
pub async fn get_identity_keypair(identity_from_cli: &String) -> Option<Keypair> {
if let Ok(identity_env_var) = env::var("IDENTITY") {
if let Ok(identity_bytes) = serde_json::from_str::<Vec<u8>>(identity_env_var.as_str()) {
Some(Keypair::from_bytes(identity_bytes.as_slice()).unwrap())
} else {
// must be a file
let identity_file = tokio::fs::read_to_string(identity_env_var.as_str())
.await
.expect("Cannot find the identity file provided");
let identity_bytes: Vec<u8> = serde_json::from_str(&identity_file).unwrap();
Some(Keypair::from_bytes(identity_bytes.as_slice()).unwrap())
}
} else if identity_from_cli.is_empty() {
None
} else {
let identity_file = tokio::fs::read_to_string(identity_from_cli.as_str())
.await
.expect("Cannot find the identity file provided");
let identity_bytes: Vec<u8> = serde_json::from_str(&identity_file).unwrap();
Some(Keypair::from_bytes(identity_bytes.as_slice()).unwrap())
}
}