add cli option
This commit is contained in:
parent
0102058052
commit
f2dbdd8dea
|
@ -83,6 +83,7 @@ impl LiteBridge {
|
|||
validator_identity: Arc<Keypair>,
|
||||
retry_after: Duration,
|
||||
max_retries: usize,
|
||||
tpu_connection_path: TpuConnectionPath,
|
||||
) -> anyhow::Result<Self> {
|
||||
let rpc_client = Arc::new(RpcClient::new(rpc_url.clone()));
|
||||
let current_slot = rpc_client
|
||||
|
@ -108,7 +109,7 @@ impl LiteBridge {
|
|||
write_timeout: Duration::from_secs(1),
|
||||
number_of_transactions_per_unistream: 8,
|
||||
},
|
||||
tpu_connection_path: TpuConnectionPath::QuicDirectPath,
|
||||
tpu_connection_path,
|
||||
};
|
||||
|
||||
let tpu_service = TpuService::new(
|
||||
|
|
|
@ -29,4 +29,6 @@ pub struct Args {
|
|||
pub maximum_retries_per_tx: usize,
|
||||
#[arg(long, default_value_t = DEFAULT_RETRY_TIMEOUT)]
|
||||
pub transaction_retry_after_secs: u64,
|
||||
#[arg(long)]
|
||||
pub experimental_quic_proxy_addr: Option<String>,
|
||||
}
|
||||
|
|
|
@ -10,6 +10,8 @@ use lite_rpc::{bridge::LiteBridge, cli::Args};
|
|||
use solana_sdk::signature::Keypair;
|
||||
use std::env;
|
||||
use std::sync::Arc;
|
||||
use clap::builder::TypedValueParser;
|
||||
use solana_lite_rpc_services::tpu_utils::tpu_connection_path::TpuConnectionPath;
|
||||
|
||||
use crate::rpc_tester::RpcTester;
|
||||
|
||||
|
@ -48,12 +50,15 @@ pub async fn start_lite_rpc(args: Args) -> anyhow::Result<()> {
|
|||
identity_keypair,
|
||||
maximum_retries_per_tx,
|
||||
transaction_retry_after_secs,
|
||||
experimental_quic_proxy_addr,
|
||||
} = args;
|
||||
|
||||
let validator_identity = Arc::new(get_identity_keypair(&identity_keypair).await);
|
||||
|
||||
let retry_after = Duration::from_secs(transaction_retry_after_secs);
|
||||
|
||||
let tpu_connection_path = configure_tpu_connection_path(experimental_quic_proxy_addr);
|
||||
|
||||
LiteBridge::new(
|
||||
rpc_addr,
|
||||
ws_addr,
|
||||
|
@ -61,6 +66,7 @@ pub async fn start_lite_rpc(args: Args) -> anyhow::Result<()> {
|
|||
validator_identity,
|
||||
retry_after,
|
||||
maximum_retries_per_tx,
|
||||
tpu_connection_path
|
||||
)
|
||||
.await
|
||||
.context("Error building LiteBridge")?
|
||||
|
@ -73,6 +79,16 @@ pub async fn start_lite_rpc(args: Args) -> anyhow::Result<()> {
|
|||
.await
|
||||
}
|
||||
|
||||
fn configure_tpu_connection_path(experimental_quic_proxy_addr: Option<String>) -> TpuConnectionPath {
|
||||
match experimental_quic_proxy_addr {
|
||||
None => TpuConnectionPath::QuicDirectPath,
|
||||
Some(prox_address) => TpuConnectionPath::QuicForwardProxyPath {
|
||||
// e.g. "127.0.0.1:11111"
|
||||
forward_proxy_address: prox_address.parse().unwrap()
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
fn get_args() -> Args {
|
||||
let mut args = Args::parse();
|
||||
|
||||
|
|
|
@ -27,7 +27,7 @@ log = { workspace = true }
|
|||
clap = { workspace = true }
|
||||
dashmap = { workspace = true }
|
||||
itertools = { workspace = true }
|
||||
tracing = "0.1.37"
|
||||
tracing = { workspace = true }
|
||||
tracing-subscriber = { workspace = true }
|
||||
native-tls = { workspace = true }
|
||||
prometheus = { workspace = true }
|
||||
|
|
|
@ -60,8 +60,7 @@ impl QuicForwardProxy {
|
|||
transport_config.receive_window((PACKET_DATA_SIZE as u32 * MAX_CONCURRENT_UNI_STREAMS).into());
|
||||
|
||||
let endpoint = Endpoint::server(quinn_server_config, proxy_listener_addr).unwrap();
|
||||
info!("tpu forward proxy listening on {}", endpoint.local_addr()?);
|
||||
info!("staking from validator identity {}", validator_identity.pubkey());
|
||||
info!("Quic proxy uses validator identity {}", validator_identity.pubkey());
|
||||
|
||||
let tpu_quic_client =
|
||||
TpuQuicClient::new_with_validator_identity(validator_identity.as_ref()).await;
|
||||
|
@ -77,7 +76,7 @@ impl QuicForwardProxy {
|
|||
|
||||
let endpoint = self.endpoint.clone();
|
||||
let quic_proxy: AnyhowJoinHandle = tokio::spawn(async move {
|
||||
info!("TPU Quic Proxy server start on {}", endpoint.local_addr()?);
|
||||
info!("TPU Quic Proxy server listening on {}", endpoint.local_addr()?);
|
||||
|
||||
|
||||
while let Some(connecting) = endpoint.accept().await {
|
||||
|
@ -87,7 +86,7 @@ impl QuicForwardProxy {
|
|||
let tpu_quic_client = self.tpu_quic_client.clone();
|
||||
tokio::spawn(async move {
|
||||
|
||||
let connection = connecting.await.context("accept connection").unwrap();
|
||||
let connection = connecting.await.context("handshake").unwrap();
|
||||
match accept_client_connection(connection, tpu_quic_client, exit_signal, validator_identity_copy)
|
||||
.await {
|
||||
Ok(()) => {}
|
||||
|
|
|
@ -93,11 +93,23 @@ impl SingleTPUConnectionManager for TpuQuicClient {
|
|||
}
|
||||
}
|
||||
|
||||
let connection = match self.create_new(tpu_address).await {
|
||||
Ok(value) => value,
|
||||
Err(err) => return Err(err),
|
||||
};
|
||||
let connection =
|
||||
// TODO try 0rff
|
||||
match QuicConnectionUtils::make_connection_0rtt(
|
||||
self.endpoint.clone(), tpu_address, QUIC_CONNECTION_TIMEOUT)
|
||||
.await {
|
||||
Ok(conn) => conn,
|
||||
Err(err) => {
|
||||
warn!("Failed to open Quic connection to TPU {}: {}", tpu_address, err);
|
||||
return Err(anyhow!("Failed to create Quic connection to TPU {}: {}", tpu_address, err));
|
||||
},
|
||||
};
|
||||
|
||||
let old_value = self.connection_per_tpunode.insert(tpu_address, connection.clone());
|
||||
assert!(old_value.is_none(), "no prev value must be overridden");
|
||||
|
||||
debug!("Created new Quic connection {} to TPU node {}, total connections is now {}",
|
||||
connection.stable_id(), tpu_address, self.connection_per_tpunode.len());
|
||||
return Ok(connection);
|
||||
}
|
||||
|
||||
|
@ -256,26 +268,6 @@ impl TpuQuicClient {
|
|||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn create_new(&self, tpu_address: SocketAddr) -> anyhow::Result<Connection> {
|
||||
let connection =
|
||||
// TODO try 0rff
|
||||
match QuicConnectionUtils::make_connection_0rtt(
|
||||
self.endpoint.clone(), tpu_address, QUIC_CONNECTION_TIMEOUT)
|
||||
.await {
|
||||
Ok(conn) => conn,
|
||||
Err(err) => {
|
||||
warn!("Failed to open Quic connection to TPU {}: {}", tpu_address, err);
|
||||
return Err(anyhow!("Failed to create Quic connection to TPU {}: {}", tpu_address, err));
|
||||
},
|
||||
};
|
||||
|
||||
let old_value = self.connection_per_tpunode.insert(tpu_address, connection.clone());
|
||||
assert!(old_value.is_none(), "no prev value must be overridden");
|
||||
|
||||
debug!("Created new Quic connection {} to TPU node {}, total connections is now {}",
|
||||
connection.stable_id(), tpu_address, self.connection_per_tpunode.len());
|
||||
Ok(connection)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -46,6 +46,7 @@ impl QuicProxyConnectionManager {
|
|||
validator_identity: Arc<Keypair>,
|
||||
proxy_addr: SocketAddr,
|
||||
) -> Self {
|
||||
info!("Configure Quic proxy connection manager to {}", proxy_addr);
|
||||
let endpoint = Self::create_proxy_client_endpoint(certificate.clone(), key.clone());
|
||||
|
||||
Self {
|
||||
|
|
|
@ -88,8 +88,6 @@ impl ActiveConnection {
|
|||
identity_stakes.stakes,
|
||||
identity_stakes.total_stakes,
|
||||
) as u64;
|
||||
// TODO remove
|
||||
println!("max_uni_stream_connections {}", max_uni_stream_connections);
|
||||
let number_of_transactions_per_unistream = self
|
||||
.connection_parameters
|
||||
.number_of_transactions_per_unistream;
|
||||
|
|
Loading…
Reference in New Issue