make validator id optional from cli

This commit is contained in:
GroovieGermanikus 2023-07-28 23:04:53 +02:00
parent 385d94bc02
commit 816f978ee6
7 changed files with 29 additions and 27 deletions

View File

@ -41,6 +41,7 @@ use tracing_subscriber::{filter::LevelFilter, fmt};
use tracing_subscriber::fmt::format::FmtSpan;
use solana_lite_rpc_quic_forward_proxy::proxy::QuicForwardProxy;
use solana_lite_rpc_quic_forward_proxy::tls_config_provicer::SelfSignedTlsConfigProvider;
use solana_lite_rpc_quic_forward_proxy::ValidatorIdentity;
use solana_lite_rpc_services::tpu_utils::quic_proxy_connection_manager::QuicProxyConnectionManager;
#[derive(Copy, Clone, Debug)]
@ -366,7 +367,7 @@ fn configure_logging(verbose: bool) {
async fn start_literpc_client(
test_case_params: TestCaseParams,
streamer_listen_addrs: SocketAddr,
literpc_validator_identity: Arc<Keypair>,
literpc_validator_identity: ValidatorIdentity,
) -> anyhow::Result<()> {
info!("Start lite-rpc test client ...");
@ -376,7 +377,7 @@ async fn start_literpc_client(
let (sender, _) = tokio::sync::broadcast::channel(MAXIMUM_TRANSACTIONS_IN_QUEUE);
let broadcast_sender = Arc::new(sender);
let (certificate, key) = new_self_signed_tls_certificate(
literpc_validator_identity.as_ref(),
literpc_validator_identity.get_keypair_for_tls().as_ref(),
IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
)
.expect("Failed to initialize QUIC connection certificates");
@ -405,7 +406,7 @@ async fn start_literpc_client(
);
// this is the real streamer
connections_to_keep.insert(literpc_validator_identity.pubkey(), streamer_listen_addrs);
connections_to_keep.insert(literpc_validator_identity.get_pubkey(), streamer_listen_addrs);
// get information about the optional validator identity stake
// populated from get_stakes_for_identity()
@ -677,7 +678,7 @@ async fn start_literpc_client_proxy_mode(
async fn start_quic_proxy(proxy_listen_addr: SocketAddr) -> anyhow::Result<()> {
let tls_configuration = SelfSignedTlsConfigProvider::new_singleton_self_signed_localhost();
let random_unstaked_validator_identity = Arc::new(Keypair::new());
let random_unstaked_validator_identity = ValidatorIdentity::new(None);
let tls_config = SelfSignedTlsConfigProvider::new_singleton_self_signed_localhost();
let proxy_service = QuicForwardProxy::new(proxy_listen_addr, &tls_config, random_unstaked_validator_identity)

View File

@ -10,25 +10,25 @@ pub struct Args {
}
// note this is duplicated from lite-rpc module
pub async fn get_identity_keypair(identity_from_cli: &String) -> Keypair {
pub async fn get_identity_keypair(identity_from_cli: &String) -> Option<Keypair> {
if let Ok(identity_env_var) = env::var("IDENTITY") {
if let Ok(identity_bytes) = serde_json::from_str::<Vec<u8>>(identity_env_var.as_str()) {
Keypair::from_bytes(identity_bytes.as_slice()).unwrap()
Some(Keypair::from_bytes(identity_bytes.as_slice()).unwrap())
} else {
// must be a file
let identity_file = tokio::fs::read_to_string(identity_env_var.as_str())
.await
.expect("Cannot find the identity file provided");
let identity_bytes: Vec<u8> = serde_json::from_str(&identity_file).unwrap();
Keypair::from_bytes(identity_bytes.as_slice()).unwrap()
Some(Keypair::from_bytes(identity_bytes.as_slice()).unwrap())
}
} else if identity_from_cli.is_empty() {
Keypair::new()
None
} else {
let identity_file = tokio::fs::read_to_string(identity_from_cli.as_str())
.await
.expect("Cannot find the identity file provided");
let identity_bytes: Vec<u8> = serde_json::from_str(&identity_file).unwrap();
Keypair::from_bytes(identity_bytes.as_slice()).unwrap()
Some(Keypair::from_bytes(identity_bytes.as_slice()).unwrap())
}
}

View File

@ -10,3 +10,4 @@ mod tx_store;
mod identity_stakes;
mod quic_connection_utils;
mod quinn_auto_reconnect;
mod validator_identity;

View File

@ -8,6 +8,7 @@ use crate::cli::{Args, get_identity_keypair};
use crate::proxy::QuicForwardProxy;
use crate::test_client::quic_test_client::QuicTestClient;
pub use tls_config_provicer::SelfSignedTlsConfigProvider;
use crate::validator_identity::ValidatorIdentity;
pub mod quic_util;
@ -22,6 +23,7 @@ mod tx_store;
mod identity_stakes;
mod quic_connection_utils;
mod quinn_auto_reconnect;
mod validator_identity;
#[tokio::main(flavor = "multi_thread", worker_threads = 16)]
@ -37,8 +39,8 @@ pub async fn main() -> anyhow::Result<()> {
// TODO build args struct dedicyted to proxy
let proxy_listener_addr = "127.0.0.1:11111".parse().unwrap();
let tls_configuration = SelfSignedTlsConfigProvider::new_singleton_self_signed_localhost();
let validator_identity = Arc::new(get_identity_keypair(&identity_keypair).await);
let validator_identity =
ValidatorIdentity::new(get_identity_keypair(&identity_keypair).await);
let tls_config = SelfSignedTlsConfigProvider::new_singleton_self_signed_localhost();
let main_services = QuicForwardProxy::new(proxy_listener_addr, &tls_config, validator_identity)

View File

@ -39,6 +39,7 @@ use crate::quinn_auto_reconnect::AutoReconnect;
use crate::tpu_quic_client::{send_txs_to_tpu_static, SingleTPUConnectionManager, TpuQuicClient};
use crate::tls_config_provicer::{ProxyTlsConfigProvider, SelfSignedTlsConfigProvider};
use crate::util::AnyhowJoinHandle;
use crate::validator_identity::ValidatorIdentity;
// TODO tweak this value - solana server sets 256
// setting this to "1" did not make a difference!
@ -46,7 +47,7 @@ const MAX_CONCURRENT_UNI_STREAMS: u32 = 24;
pub struct QuicForwardProxy {
endpoint: Endpoint,
validator_identity: Arc<Keypair>,
// validator_identity: ValidatorIdentity,
tpu_quic_client: TpuQuicClient,
}
@ -61,7 +62,7 @@ impl QuicForwardProxy {
pub async fn new(
proxy_listener_addr: SocketAddr,
tls_config: &SelfSignedTlsConfigProvider,
validator_identity: Arc<Keypair>) -> anyhow::Result<Self> {
validator_identity: ValidatorIdentity) -> anyhow::Result<Self> {
let server_tls_config = tls_config.get_server_tls_crypto_config();
let mut quinn_server_config = ServerConfig::with_crypto(Arc::new(server_tls_config));
@ -78,12 +79,12 @@ impl QuicForwardProxy {
transport_config.receive_window((PACKET_DATA_SIZE as u32 * MAX_CONCURRENT_UNI_STREAMS).into());
let endpoint = Endpoint::server(quinn_server_config, proxy_listener_addr).unwrap();
info!("Quic proxy uses validator identity {}", validator_identity.pubkey());
info!("Quic proxy uses validator identity {}", validator_identity);
let tpu_quic_client =
TpuQuicClient::new_with_validator_identity(validator_identity.as_ref()).await;
TpuQuicClient::new_with_validator_identity(validator_identity).await;
Ok(Self { endpoint, validator_identity, tpu_quic_client })
Ok(Self { endpoint, tpu_quic_client })
}
@ -115,13 +116,12 @@ impl QuicForwardProxy {
while let Some(connecting) = endpoint.accept().await {
let exit_signal = exit_signal.clone();
let validator_identity_copy = self.validator_identity.clone();
let tpu_quic_client = self.tpu_quic_client.clone();
let forwarder_channel_copy = forwarder_channel.clone();
tokio::spawn(async move {
let connection = connecting.await.context("handshake").unwrap();
match accept_client_connection(connection, forwarder_channel_copy,
tpu_quic_client, exit_signal, validator_identity_copy)
tpu_quic_client, exit_signal)
.await {
Ok(()) => {}
Err(err) => {
@ -140,12 +140,9 @@ impl QuicForwardProxy {
#[tracing::instrument(skip_all, level = "debug")]
async fn accept_client_connection(client_connection: Connection, forwarder_channel: Sender<ForwardPacket>,
tpu_quic_client: TpuQuicClient,
exit_signal: Arc<AtomicBool>, validator_identity: Arc<Keypair>) -> anyhow::Result<()> {
exit_signal: Arc<AtomicBool>) -> anyhow::Result<()> {
debug!("inbound connection established, client {}", client_connection.remote_address());
// let active_tpu_connection =
// TpuQuicClient::new_with_validator_identity(validator_identity.as_ref()).await;
loop {
let maybe_stream = client_connection.accept_uni().await;
let result = match maybe_stream {
@ -163,7 +160,6 @@ async fn accept_client_connection(client_connection: Connection, forwarder_chann
}
Ok(recv_stream) => {
let exit_signal_copy = exit_signal.clone();
let validator_identity_copy = validator_identity.clone();
let tpu_quic_client_copy = tpu_quic_client.clone();
let forwarder_channel_copy = forwarder_channel.clone();

View File

@ -13,7 +13,7 @@ use futures::future::join_all;
use itertools::{any, Itertools};
use log::{debug, error, info, trace, warn};
use quinn::{Connecting, Connection, ConnectionError, Endpoint, SendStream, ServerConfig, VarInt};
use rcgen::generate_simple_self_signed;
use rcgen::{generate_simple_self_signed, KeyPair};
use rustls::{Certificate, PrivateKey};
use rustls::server::ResolvesServerCert;
use serde::{Deserialize, Serialize};
@ -27,6 +27,7 @@ use tokio::sync::RwLock;
use crate::quic_connection_utils::{connection_stats, QuicConnectionError, QuicConnectionParameters, QuicConnectionUtils};
use crate::quinn_auto_reconnect::AutoReconnect;
use crate::tls_config_provicer::{ProxyTlsConfigProvider, SelfSignedTlsConfigProvider};
use crate::validator_identity::ValidatorIdentity;
const QUIC_CONNECTION_TIMEOUT: Duration = Duration::from_secs(5);
pub const CONNECTION_RETRY_COUNT: usize = 10;
@ -150,10 +151,10 @@ impl TpuQuicClient {
/// takes a validator identity and creates a new QUIC client; appears as staked peer to TPU
// note: ATM the provided identity might or might not be a valid validator keypair
pub async fn new_with_validator_identity(validator_identity: &Keypair) -> TpuQuicClient {
info!("Setup TPU Quic stable connection with validator identity {} ...", bs58::encode(validator_identity.pubkey()).into_string());
pub async fn new_with_validator_identity(validator_identity: ValidatorIdentity) -> TpuQuicClient {
info!("Setup TPU Quic stable connection with validator identity {} ...", validator_identity);
let (certificate, key) = new_self_signed_tls_certificate(
validator_identity,
&validator_identity.get_keypair_for_tls(),
IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
)
.expect("Failed to initialize QUIC connection certificates");

View File

@ -34,6 +34,7 @@ pub struct TpuNode {
pub struct QuicProxyConnectionManager {
endpoint: Endpoint,
// TODO remove
validator_identity: Arc<Keypair>,
simple_thread_started: AtomicBool,
proxy_addr: SocketAddr,