wip works

This commit is contained in:
GroovieGermanikus 2023-07-25 09:04:09 +02:00
parent fd2cf5a0eb
commit a2207da37a
6 changed files with 72 additions and 34 deletions

View File

@ -15,7 +15,6 @@ pub mod tls_config_provicer;
pub mod proxy;
pub mod proxy_request_format;
pub mod tpu_quic_client;
pub mod active_connection;
pub mod cli;
pub mod test_client;
mod util;

View File

@ -124,6 +124,7 @@ async fn accept_client_connection(client_connection: Connection, tpu_quic_client
let exit_signal_copy = exit_signal.clone();
let validator_identity_copy = validator_identity.clone();
let tpu_quic_client_copy = tpu_quic_client.clone();
tokio::spawn(async move {
let raw_request = recv_stream.read_to_end(10_000_000).await // TODO extract to const
@ -137,15 +138,16 @@ async fn accept_client_connection(client_connection: Connection, tpu_quic_client
let tpu_address = proxy_request.get_tpu_socket_addr();
let txs = proxy_request.get_transactions();
// TODO join get_or_create_connection future and read_to_end
let tpu_connection = tpu_quic_client_copy.get_or_create_connection(tpu_address).await;
info!("send transaction batch of size {} to address {}", txs.len(), tpu_address);
tpu_quic_client_copy.send_txs_to_tpu(tpu_connection, &txs, exit_signal_copy).await;
// active_tpu_connection_copy.send_txs_to_tpu(exit_signal_copy, validator_identity_copy, tpu_identity, tpu_address, &txs).await;
}).instrument(debug_span!("send_txs_to_tpu")).await.unwrap();
})
.await.unwrap();
} // -- loop
}

View File

@ -13,6 +13,7 @@ use std::{
use anyhow::bail;
use tokio::{sync::RwLock, time::timeout};
use tokio::time::error::Elapsed;
use tracing::instrument;
const ALPN_TPU_PROTOCOL_ID: &[u8] = b"solana-tpu";
@ -185,6 +186,7 @@ impl QuicConnectionUtils {
}
#[allow(clippy::too_many_arguments)]
#[tracing::instrument(skip_all, level = "debug")]
pub async fn send_transaction_batch(
connection: Connection,
txs: Vec<Vec<u8>>,

View File

@ -1,7 +1,9 @@
use std::collections::HashMap;
use std::io::Write;
use std::net::{IpAddr, Ipv4Addr, SocketAddr, SocketAddrV4};
use std::path::Path;
use std::str::FromStr;
use std::sync::Arc;
use std::sync::{Arc, Mutex};
use std::sync::atomic::{AtomicBool, AtomicU64};
use std::time::Duration;
use anyhow::{anyhow, bail};
@ -31,6 +33,7 @@ pub const CONNECTION_RETRY_COUNT: usize = 10;
pub struct TpuQuicClient {
endpoint: Endpoint,
// naive single non-recoverable connection - TODO moke it smarter
// TODO consider using DashMap again
connection_per_tpunode: Arc<DashMap<SocketAddr, Connection>>,
}
@ -58,8 +61,18 @@ impl TpuQuicClient {
#[tracing::instrument(skip(self), level = "debug")]
pub async fn get_or_create_connection(&self, tpu_address: SocketAddr) -> Connection {
if let Some(conn) = self.connection_per_tpunode.get(&tpu_address) {
return conn.clone();
info!("looking up {}", tpu_address);
// TODO try 0rff
// QuicConnectionUtils::make_connection(
// self.endpoint.clone(), tpu_address, QUIC_CONNECTION_TIMEOUT)
// .await.unwrap()
{
if let Some(conn) = self.connection_per_tpunode.get(&tpu_address) {
debug!("reusing connection {:?}", conn);
return conn.clone();
}
}
let connection =
@ -68,8 +81,8 @@ impl TpuQuicClient {
self.endpoint.clone(), tpu_address, QUIC_CONNECTION_TIMEOUT)
.await.unwrap();
self.connection_per_tpunode.insert(tpu_address, connection.clone());
let old_value = self.connection_per_tpunode.insert(tpu_address, connection.clone());
assert!(old_value.is_none(), "no prev value must be overridden");
debug!("Created new Quic connection to TPU node {}, total connections is now {}", tpu_address, self.connection_per_tpunode.len());
return connection;

View File

@ -11,7 +11,7 @@ use async_trait::async_trait;
use futures::FutureExt;
use itertools::Itertools;
use log::{debug, error, info, warn};
use quinn::{ClientConfig, Endpoint, EndpointConfig, IdleTimeout, TokioRuntime, TransportConfig};
use quinn::{ClientConfig, Connection, Endpoint, EndpointConfig, IdleTimeout, TokioRuntime, TransportConfig};
use solana_sdk::pubkey::Pubkey;
use solana_sdk::signer::Signer;
use solana_sdk::signature::Keypair;
@ -65,8 +65,6 @@ impl QuicProxyConnectionManager {
{
let mut lock = self.current_tpu_nodes.write().await;
let list_of_nodes = connections_to_keep.iter().map(|(identity, tpu_address)| {
TpuNode {
tpu_identity: identity.clone(),
@ -74,6 +72,7 @@ impl QuicProxyConnectionManager {
}
}).collect_vec();
let mut lock = self.current_tpu_nodes.write().await;
*lock = list_of_nodes;
}
@ -82,6 +81,7 @@ impl QuicProxyConnectionManager {
// already started
return;
}
self.simple_thread_started.store(true, Relaxed);
info!("Starting very simple proxy thread");
@ -141,6 +141,10 @@ impl QuicProxyConnectionManager {
proxy_addr: SocketAddr,
endpoint: Endpoint,
) {
let mut connection = endpoint.connect(proxy_addr, "localhost").unwrap()
.await.unwrap();
loop {
// TODO exit signal ???
@ -150,7 +154,7 @@ impl QuicProxyConnectionManager {
// exit signal???
let the_tx: Vec<u8> = match tx {
let first_tx: Vec<u8> = match tx {
Ok((sig, tx)) => {
// if Self::check_for_confirmation(&txs_sent_store, sig) {
// // transaction is already confirmed/ no need to send
@ -165,20 +169,31 @@ impl QuicProxyConnectionManager {
}
};
// TODO read all txs from channel (see "let mut txs = vec![first_tx];")
let txs = vec![the_tx];
let number_of_transactions_per_unistream = 8; // TODO read from QuicConnectionParameters
let tpu_fanout_nodes = current_tpu_nodes.read().await;
let mut txs = vec![first_tx];
// TODO comment in
// for _ in 1..number_of_transactions_per_unistream {
// if let Ok((signature, tx)) = transaction_receiver.try_recv() {
// // if Self::check_for_confirmation(&txs_sent_store, signature) {
// // continue;
// // }
// txs.push(tx);
// }
// }
info!("Sending copy of transaction batch of {} to {} tpu nodes via quic proxy",
let tpu_fanout_nodes = current_tpu_nodes.read().await.clone();
info!("Sending copy of transaction batch of {} txs to {} tpu nodes via quic proxy",
txs.len(), tpu_fanout_nodes.len());
for target_tpu_node in &*tpu_fanout_nodes {
for target_tpu_node in tpu_fanout_nodes {
Self::send_copy_of_txs_to_quicproxy(
&txs, endpoint.clone(),
proxy_addr,
proxy_addr,
target_tpu_node.tpu_address,
target_tpu_node.tpu_identity).await.unwrap();
target_tpu_node.tpu_identity)
.await.unwrap();
}
},
@ -190,7 +205,11 @@ impl QuicProxyConnectionManager {
proxy_address: SocketAddr, tpu_target_address: SocketAddr,
target_tpu_identity: Pubkey) -> anyhow::Result<()> {
info!("sending vecvec: {}", raw_tx_batch.iter().map(|tx| tx.len()).into_iter().join(","));
info!("sending vecvec {} to quic proxy for TPU node {}",
raw_tx_batch.iter().map(|tx| tx.len()).into_iter().join(","), tpu_target_address);
// TODO add timeout
// let mut send_stream = timeout(Duration::from_millis(500), connection.open_uni()).await??;
let raw_tx_batch_copy = raw_tx_batch.clone();
@ -207,6 +226,7 @@ impl QuicProxyConnectionManager {
}
let forwarding_request = TpuForwardingRequest::new(tpu_target_address, target_tpu_identity, txs);
debug!("forwarding_request: {}", forwarding_request);
let proxy_request_raw = bincode::serialize(&forwarding_request).expect("Expect to serialize transactions");
@ -222,24 +242,25 @@ impl QuicProxyConnectionManager {
bail!("Failed to send data to quic proxy: {:?}", e);
}
}
Ok(())
}
async fn send_proxy_request(endpoint: Endpoint, proxy_address: SocketAddr, proxy_request_raw: &Vec<u8>) -> anyhow::Result<()> {
info!("sending {} bytes to proxy", proxy_request_raw.len());
let mut connecting = endpoint.connect(proxy_address, "localhost")?;
let connection = timeout(Duration::from_millis(500), connecting).await??;
let mut send = connection.open_uni().await?;
send.write_all(proxy_request_raw).await?;
send.finish().await?;
Ok(())
}
async fn send_proxy_request(endpoint: Endpoint, proxy_address: SocketAddr, proxy_request_raw: &Vec<u8>) -> anyhow::Result<()> {
info!("sending {} bytes to proxy", proxy_request_raw.len());
let mut connecting = endpoint.connect(proxy_address, "localhost")?;
let connection = timeout(Duration::from_millis(500), connecting).await??;
let mut send = connection.open_uni().await?;
send.write_all(proxy_request_raw).await?;
send.finish().await?;
Ok(())
}
}

View File

@ -237,6 +237,7 @@ impl TpuConnectionManager {
) -> Self {
let number_of_clients = fanout * 2;
Self {
// TODO
endpoints: RotatingQueue::new(number_of_clients, || {
QuicConnectionUtils::create_endpoint(certificate.clone(), key.clone())
})