cut dependency between quic-proxy and lite-rpc crate

This commit is contained in:
GroovieGermanikus 2023-07-20 09:28:55 +02:00
parent 7c3f7bea74
commit 228c58d861
17 changed files with 774 additions and 57 deletions

View File

@ -39,8 +39,6 @@ async-channel = { workspace = true }
quinn = { workspace = true }
solana-lite-rpc-core = { workspace = true }
solana-lite-rpc-services = { workspace = true }
# TODO remove
solana-lite-rpc-quic-forward-proxy = { path = "../quic-forward-proxy" }
async-trait = { workspace = true }
tokio = { version = "1.28.2", features = ["full", "fs"]}
tokio-postgres = { version = "0.7.8", features = ["with-chrono-0_4"] }

View File

@ -9,9 +9,6 @@ use solana_sdk::signature::Keypair;
use std::env;
use std::sync::Arc;
use tokio::time::timeout;
use solana_lite_rpc_quic_forward_proxy::proxy::QuicForwardProxy;
use solana_lite_rpc_quic_forward_proxy::SelfSignedTlsConfigProvider;
use solana_lite_rpc_quic_forward_proxy::test_client::quic_test_client::QuicTestClient;
// use lite_rpc_quic_forward_proxy::tls_config::SelfSignedTlsConfigProvider;
// note: copy of this method is used in quic-forward-proxy
@ -71,11 +68,11 @@ pub async fn main() -> anyhow::Result<()> {
let retry_after = Duration::from_secs(transaction_retry_after_secs);
let proxy_listener_addr = "127.0.0.1:11111".parse().unwrap();
let tls_configuration = SelfSignedTlsConfigProvider::new_singleton_self_signed_localhost();
let quicproxy_service = QuicForwardProxy::new(proxy_listener_addr, &tls_configuration, validator_identity.clone())
.await?
.start_services();
// let proxy_listener_addr = "127.0.0.1:11111".parse().unwrap();
//
// let quicproxy_service = QuicForwardProxy::new(proxy_listener_addr, validator_identity.clone())
// .await?
// .start_services();
let services = LiteBridge::new(
rpc_addr,
@ -106,9 +103,9 @@ pub async fn main() -> anyhow::Result<()> {
res = services => {
bail!("Services quit unexpectedly {res:?}");
},
res = quicproxy_service => {
bail!("Quic Proxy quit unexpectedly {res:?}");
},
// res = quicproxy_service => {
// bail!("Quic Proxy quit unexpectedly {res:?}");
// },
// res = test_client => {
// bail!("Test Client quit unexpectedly {res:?}");
// },

View File

@ -12,6 +12,8 @@ publish = false
[dependencies]
solana-sdk = { workspace = true }
solana-streamer = { workspace = true }
solana-transaction-status = { workspace = true }
solana-net-utils = { workspace = true }
rustls = { workspace = true, features = ["dangerous_configuration"]}
serde = { workspace = true }
serde_json = { workspace = true }
@ -32,8 +34,6 @@ lazy_static = { workspace = true }
dotenv = { workspace = true }
async-channel = { workspace = true }
quinn = { workspace = true }
solana-lite-rpc-core = { workspace = true }
solana-lite-rpc-services = { workspace = true }
async-trait = { workspace = true }
chrono = { workspace = true }
tokio = { version = "1.28.2", features = ["full", "fs"]}

View File

@ -22,9 +22,13 @@ Local Development / Testing
```bash
RUST_LOG="error,solana_streamer::nonblocking::quic=debug" solana-test-validator --log
```
3. run quic proxy
```bash
RUST_LOG=debug cargo run --bin solana-lite-rpc-quic-forward-proxy -- --identity-keypair /pathto-test-ledger/validator-keypair.json
```
2. run lite-rpc
```bash
RUST_LOG=info cargo run --bin lite-rpc -- --identity-keypair /pathto-test-ledger/validator-keypair.json
RUST_LOG=debug cargo run --bin lite-rpc
```
3. run rust bench tool in _lite-rpc_
```bash

View File

@ -0,0 +1,293 @@
use std::net::SocketAddr;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::time::Duration;
use anyhow::bail;
use log::{error, info, warn};
use prometheus::{opts, register_int_gauge};
use prometheus::core::GenericGauge;
use quinn::{Connection, Endpoint};
use solana_sdk::pubkey::Pubkey;
use solana_sdk::transaction::VersionedTransaction;
use solana_streamer::nonblocking::quic::compute_max_allowed_uni_streams;
use tokio::sync::broadcast::Receiver;
use tokio::sync::RwLock;
use tokio::time::timeout;
use crate::identity_stakes::IdentityStakes;
use crate::proxy_request_format::TpuForwardingRequest;
use crate::quic_connection_utils::QuicConnectionUtils;
use crate::tpu_quic_connection::{CONNECTION_RETRY_COUNT, QUIC_CONNECTION_TIMEOUT};
use crate::tx_store::TxStore;
use itertools::Itertools;
lazy_static::lazy_static! {
// TODO rename / cleanup
static ref NB_QUIC_CONNECTIONS: GenericGauge<prometheus::core::AtomicI64> =
register_int_gauge!(opts!("literpc_nb_active_quic_connections", "Number of quic connections open")).unwrap();
static ref NB_QUIC_ACTIVE_CONNECTIONS: GenericGauge<prometheus::core::AtomicI64> =
register_int_gauge!(opts!("literpc_nb_active_connections", "Number quic tasks that are running")).unwrap();
static ref NB_CONNECTIONS_TO_KEEP: GenericGauge<prometheus::core::AtomicI64> =
register_int_gauge!(opts!("literpc_connections_to_keep", "Number of connections to keep asked by tpu service")).unwrap();
static ref NB_QUIC_TASKS: GenericGauge<prometheus::core::AtomicI64> =
register_int_gauge!(opts!("literpc_quic_tasks", "Number of connections to keep asked by tpu service")).unwrap();
}
pub struct ActiveConnection {
endpoint: Endpoint,
identity: Pubkey,
tpu_address: SocketAddr,
exit_signal: Arc<AtomicBool>,
txs_sent_store: TxStore,
}
impl ActiveConnection {
pub fn new(
endpoint: Endpoint,
tpu_address: SocketAddr,
identity: Pubkey,
txs_sent_store: TxStore,
) -> Self {
Self {
endpoint,
tpu_address,
identity,
exit_signal: Arc::new(AtomicBool::new(false)),
txs_sent_store,
}
}
fn on_connect() {
NB_QUIC_CONNECTIONS.inc();
}
fn check_for_confirmation(txs_sent_store: &TxStore, signature: String) -> bool {
match txs_sent_store.get(&signature) {
Some(props) => props.status.is_some(),
None => false,
}
}
#[allow(clippy::too_many_arguments)]
async fn listen(
transaction_reciever: Receiver<(String, Vec<u8>)>,
exit_oneshot_channel: tokio::sync::mpsc::Receiver<()>,
endpoint: Endpoint,
tpu_address: SocketAddr,
exit_signal: Arc<AtomicBool>,
identity: Pubkey,
identity_stakes: IdentityStakes,
txs_sent_store: TxStore,
) {
NB_QUIC_ACTIVE_CONNECTIONS.inc();
let mut transaction_reciever = transaction_reciever;
let mut exit_oneshot_channel = exit_oneshot_channel;
let max_uni_stream_connections: u64 = compute_max_allowed_uni_streams(
identity_stakes.peer_type,
identity_stakes.stakes,
identity_stakes.total_stakes,
) as u64;
let number_of_transactions_per_unistream = 5;
let task_counter: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
let mut connection: Option<Arc<RwLock<Connection>>> = None;
let last_stable_id: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
loop {
// exit signal set
if exit_signal.load(Ordering::Relaxed) {
break;
}
if task_counter.load(Ordering::Relaxed) >= max_uni_stream_connections {
tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
continue;
}
tokio::select! {
tx = transaction_reciever.recv() => {
// exit signal set
if exit_signal.load(Ordering::Relaxed) {
break;
}
let first_tx: Vec<u8> = match tx {
Ok((sig, tx)) => {
if Self::check_for_confirmation(&txs_sent_store, sig) {
// transaction is already confirmed/ no need to send
continue;
}
tx
},
Err(e) => {
error!(
"Broadcast channel error on recv for {} error {}",
identity, e
);
continue;
}
};
let mut txs = vec![first_tx];
for _ in 1..number_of_transactions_per_unistream {
if let Ok((signature, tx)) = transaction_reciever.try_recv() {
if Self::check_for_confirmation(&txs_sent_store, signature) {
continue;
}
txs.push(tx);
}
}
if connection.is_none() {
// initial connection
let conn = QuicConnectionUtils::connect(
identity,
false,
endpoint.clone(),
tpu_address,
QUIC_CONNECTION_TIMEOUT,
CONNECTION_RETRY_COUNT,
exit_signal.clone(),
Self::on_connect).await;
if let Some(conn) = conn {
// could connect
connection = Some(Arc::new(RwLock::new(conn)));
} else {
break;
}
}
let task_counter = task_counter.clone();
let endpoint = endpoint.clone();
let exit_signal = exit_signal.clone();
let connection = connection.clone();
let last_stable_id = last_stable_id.clone();
tokio::spawn(async move {
task_counter.fetch_add(1, Ordering::Relaxed);
NB_QUIC_TASKS.inc();
let connection = connection.unwrap();
if true {
// TODO split to new service
// SOS
info!("Sending copy of transaction batch of {} to tpu with identity {} to quic proxy",
txs.len(), identity);
Self::send_copy_of_txs_to_quicproxy(
&txs, endpoint.clone(),
// proxy address
"127.0.0.1:11111".parse().unwrap(),
tpu_address,
identity.clone()).await.unwrap();
}
if false {
QuicConnectionUtils::send_transaction_batch(
connection,
txs,
identity,
endpoint,
tpu_address,
exit_signal,
last_stable_id,
QUIC_CONNECTION_TIMEOUT,
CONNECTION_RETRY_COUNT,
|| {
// do nothing as we are using the same connection
}
).await;
}
NB_QUIC_TASKS.dec();
task_counter.fetch_sub(1, Ordering::Relaxed);
});
},
_ = exit_oneshot_channel.recv() => {
break;
}
};
}
drop(transaction_reciever);
NB_QUIC_CONNECTIONS.dec();
NB_QUIC_ACTIVE_CONNECTIONS.dec();
}
async fn send_copy_of_txs_to_quicproxy(raw_tx_batch: &Vec<Vec<u8>>, endpoint: Endpoint,
proxy_address: SocketAddr, tpu_target_address: SocketAddr,
identity: Pubkey) -> anyhow::Result<()> {
info!("sending vecvec: {}", raw_tx_batch.iter().map(|tx| tx.len()).into_iter().join(","));
let raw_tx_batch_copy = raw_tx_batch.clone();
let mut txs = vec![];
for raw_tx in raw_tx_batch_copy {
let tx = match bincode::deserialize::<VersionedTransaction>(&raw_tx) {
Ok(tx) => tx,
Err(err) => {
bail!(err.to_string());
}
};
txs.push(tx);
}
let forwarding_request = TpuForwardingRequest::new(tpu_target_address, identity, txs);
let proxy_request_raw = bincode::serialize(&forwarding_request).expect("Expect to serialize transactions");
let send_result = timeout(Duration::from_millis(3500), Self::send_proxy_request(endpoint, proxy_address, &proxy_request_raw));
match send_result.await {
Ok(..) => {
info!("Successfully sent data to quic proxy");
}
Err(e) => {
warn!("Failed to send data to quic proxy: {:?}", e);
}
}
Ok(())
}
async fn send_proxy_request(endpoint: Endpoint, proxy_address: SocketAddr, proxy_request_raw: &Vec<u8>) -> anyhow::Result<()> {
info!("sending {} bytes to proxy", proxy_request_raw.len());
let mut connecting = endpoint.connect(proxy_address, "localhost")?;
let connection = timeout(Duration::from_millis(500), connecting).await??;
let mut send = connection.open_uni().await?;
send.write_all(proxy_request_raw).await?;
send.finish().await?;
Ok(())
}
pub fn start_listening(
&self,
transaction_reciever: Receiver<(String, Vec<u8>)>,
exit_oneshot_channel: tokio::sync::mpsc::Receiver<()>,
identity_stakes: IdentityStakes,
) {
let endpoint = self.endpoint.clone();
let tpu_address = self.tpu_address;
let exit_signal = self.exit_signal.clone();
let identity = self.identity;
let txs_sent_store = self.txs_sent_store.clone();
tokio::spawn(async move {
Self::listen(
transaction_reciever,
exit_oneshot_channel,
endpoint,
tpu_address,
exit_signal,
identity,
identity_stakes,
txs_sent_store,
)
.await;
});
}
}

View File

@ -0,0 +1,22 @@
use solana_streamer::nonblocking::quic::ConnectionPeerType;
#[derive(Debug, Copy, Clone)]
pub struct IdentityStakes {
pub peer_type: ConnectionPeerType,
pub stakes: u64,
pub total_stakes: u64,
pub min_stakes: u64,
pub max_stakes: u64,
}
impl Default for IdentityStakes {
fn default() -> Self {
IdentityStakes {
peer_type: ConnectionPeerType::Unstaked,
stakes: 0,
total_stakes: 0,
max_stakes: 0,
min_stakes: 0,
}
}
}

View File

@ -1,10 +0,0 @@
pub mod quic_util;
pub mod tls_config_provicer;
pub mod proxy;
pub mod tpu_quic_connection;
pub mod test_client;
pub mod cli;
pub use tls_config_provicer::SelfSignedTlsConfigProvider;
// pub mod tls_config;

View File

@ -7,14 +7,21 @@ use log::info;
use crate::cli::{Args, get_identity_keypair};
use crate::proxy::QuicForwardProxy;
use crate::test_client::quic_test_client::QuicTestClient;
use crate::tls_config_provicer::SelfSignedTlsConfigProvider;
pub use tls_config_provicer::SelfSignedTlsConfigProvider;
mod proxy;
mod tpu_quic_connection;
mod test_client;
mod quic_util;
mod tls_config_provicer;
mod cli;
pub mod quic_util;
pub mod tls_config_provicer;
pub mod proxy;
pub mod proxy_request_format;
pub mod tpu_quic_connection;
pub mod active_connection;
pub mod cli;
pub mod test_client;
mod util;
mod tx_store;
mod identity_stakes;
mod quic_connection_utils;
#[tokio::main(flavor = "multi_thread", worker_threads = 16)]
@ -33,15 +40,16 @@ pub async fn main() -> anyhow::Result<()> {
let validator_identity = Arc::new(get_identity_keypair(&identity_keypair).await);
let main_services = QuicForwardProxy::new(proxy_listener_addr, &tls_configuration, validator_identity)
let tls_config = SelfSignedTlsConfigProvider::new_singleton_self_signed_localhost();
let main_services = QuicForwardProxy::new(proxy_listener_addr, &tls_config, validator_identity)
.await?
.start_services();
let proxy_addr = "127.0.0.1:11111".parse().unwrap();
let test_client = QuicTestClient::new_with_endpoint(
proxy_addr, &tls_configuration)
.await?
.start_services();
// let proxy_addr = "127.0.0.1:11111".parse().unwrap();
// let test_client = QuicTestClient::new_with_endpoint(
// proxy_addr, &tls_configuration)
// .await?
// .start_services();
let ctrl_c_signal = tokio::signal::ctrl_c();
@ -50,9 +58,9 @@ pub async fn main() -> anyhow::Result<()> {
res = main_services => {
bail!("Services quit unexpectedly {res:?}");
},
res = test_client => {
bail!("Test Client quit unexpectedly {res:?}");
},
// res = test_client => {
// bail!("Test Client quit unexpectedly {res:?}");
// },
_ = ctrl_c_signal => {
info!("Received ctrl+c signal");

View File

@ -17,15 +17,12 @@ use solana_sdk::signature::Keypair;
use solana_sdk::signer::Signer;
use solana_sdk::transaction::VersionedTransaction;
use tokio::net::ToSocketAddrs;
use solana_lite_rpc_core::AnyhowJoinHandle;
use solana_streamer::tls_certificates::new_self_signed_tls_certificate;
use tokio::sync::RwLock;
use solana_lite_rpc_core::leader_schedule::LeaderSchedule;
use solana_lite_rpc_core::proxy_request_format::TpuForwardingRequest;
use solana_lite_rpc_core::quic_connection_utils::QuicConnectionUtils;
use solana_lite_rpc_services::tpu_utils::tpu_connection_manager::{ActiveConnection, CONNECTION_RETRY_COUNT, QUIC_CONNECTION_TIMEOUT};
use crate::proxy_request_format::TpuForwardingRequest;
use crate::tpu_quic_connection::TpuQuicConnection;
use crate::tls_config_provicer::{ProxyTlsConfigProvider, SelfSignedTlsConfigProvider};
use crate::util::AnyhowJoinHandle;
pub struct QuicForwardProxy {

View File

@ -0,0 +1,71 @@
use std::fmt;
use std::fmt::Display;
use std::net::{SocketAddr, SocketAddrV4};
use anyhow::Context;
use serde::{Deserialize, Serialize};
use solana_sdk::pubkey::Pubkey;
use solana_sdk::transaction::VersionedTransaction;
///
/// lite-rpc to proxy wire format
/// compat info: non-public format ATM
/// initial version
const FORMAT_VERSION1: u16 = 2301;
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct TpuForwardingRequest {
format_version: u16,
tpu_socket_addr: SocketAddr, // TODO is that correct
identity_tpunode: Pubkey,
transactions: Vec<VersionedTransaction>,
}
impl Display for TpuForwardingRequest {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "TpuForwardingRequest for tpu target {} with identity {}",
&self.get_tpu_socket_addr(), &self.get_identity_tpunode())
}
}
impl TpuForwardingRequest {
pub fn new(tpu_socket_addr: SocketAddr, identity_tpunode: Pubkey,
transactions: Vec<VersionedTransaction>) -> Self {
TpuForwardingRequest {
format_version: FORMAT_VERSION1,
tpu_socket_addr,
identity_tpunode,
transactions,
}
}
pub fn serialize_wire_format(
&self) -> Vec<u8> {
bincode::serialize(&self).expect("Expect to serialize transactions")
}
// TODO reame
pub fn deserialize_from_raw_request(raw_proxy_request: &Vec<u8>) -> TpuForwardingRequest {
let request = bincode::deserialize::<TpuForwardingRequest>(&raw_proxy_request)
.context("deserialize proxy request")
.unwrap();
assert_eq!(request.format_version, 2301);
request
}
pub fn get_tpu_socket_addr(&self) -> SocketAddr {
self.tpu_socket_addr
}
pub fn get_identity_tpunode(&self) -> Pubkey {
self.identity_tpunode
}
pub fn get_transactions(&self) -> Vec<VersionedTransaction> {
self.transactions.clone()
}
}

View File

@ -0,0 +1,309 @@
use log::{info, trace, warn};
use quinn::{
ClientConfig, Connection, ConnectionError, Endpoint, EndpointConfig, IdleTimeout, SendStream,
TokioRuntime, TransportConfig,
};
use solana_sdk::pubkey::Pubkey;
use std::{
collections::VecDeque,
net::{IpAddr, Ipv4Addr, SocketAddr},
sync::{
atomic::{AtomicBool, AtomicU64, Ordering},
Arc,
},
time::Duration,
};
use anyhow::bail;
use tokio::{sync::RwLock, time::timeout};
const ALPN_TPU_PROTOCOL_ID: &[u8] = b"solana-tpu";
pub struct QuicConnectionUtils {}
impl QuicConnectionUtils {
pub fn create_endpoint(certificate: rustls::Certificate, key: rustls::PrivateKey) -> Endpoint {
let mut endpoint = {
let client_socket =
solana_net_utils::bind_in_range(IpAddr::V4(Ipv4Addr::UNSPECIFIED), (8000, 10000))
.expect("create_endpoint bind_in_range")
.1;
let config = EndpointConfig::default();
quinn::Endpoint::new(config, None, client_socket, TokioRuntime)
.expect("create_endpoint quinn::Endpoint::new")
};
let mut crypto = rustls::ClientConfig::builder()
.with_safe_defaults()
.with_custom_certificate_verifier(SkipServerVerification::new())
.with_single_cert(vec![certificate], key)
.expect("Failed to set QUIC client certificates");
crypto.enable_early_data = true;
// FIXME TEMP HACK TO ALLOW PROXY PROTOCOL
const ALPN_TPU_FORWARDPROXY_PROTOCOL_ID: &[u8] = b"solana-tpu-forward-proxy";
crypto.alpn_protocols = vec![ALPN_TPU_PROTOCOL_ID.to_vec(), ALPN_TPU_FORWARDPROXY_PROTOCOL_ID.to_vec()];
let mut config = ClientConfig::new(Arc::new(crypto));
let mut transport_config = TransportConfig::default();
// TODO check timing
let timeout = IdleTimeout::try_from(Duration::from_secs(5)).unwrap();
transport_config.max_idle_timeout(Some(timeout));
transport_config.keep_alive_interval(Some(Duration::from_millis(500)));
config.transport_config(Arc::new(transport_config));
endpoint.set_default_client_config(config);
endpoint
}
pub async fn make_connection(
endpoint: Endpoint,
addr: SocketAddr,
connection_timeout: Duration,
) -> anyhow::Result<Connection> {
let connecting = endpoint.connect(addr, "connect")?;
let res = timeout(connection_timeout, connecting).await??;
Ok(res)
}
pub async fn make_connection_0rtt(
endpoint: Endpoint,
addr: SocketAddr,
connection_timeout: Duration,
) -> anyhow::Result<Connection> {
let connecting = endpoint.connect(addr, "connect")?;
let connection = match connecting.into_0rtt() {
Ok((connection, zero_rtt)) => {
if (timeout(connection_timeout, zero_rtt).await).is_ok() {
connection
} else {
return Err(ConnectionError::TimedOut.into());
}
}
Err(connecting) => {
if let Ok(connecting_result) = timeout(connection_timeout, connecting).await {
connecting_result?
} else {
return Err(ConnectionError::TimedOut.into());
}
}
};
Ok(connection)
}
#[allow(clippy::too_many_arguments)]
pub async fn connect(
identity: Pubkey,
already_connected: bool,
endpoint: Endpoint,
tpu_address: SocketAddr,
connection_timeout: Duration,
connection_retry_count: usize,
exit_signal: Arc<AtomicBool>,
on_connect: fn(),
) -> Option<Connection> {
for _ in 0..connection_retry_count {
let conn = if already_connected {
Self::make_connection_0rtt(endpoint.clone(), tpu_address, connection_timeout).await
} else {
Self::make_connection(endpoint.clone(), tpu_address, connection_timeout).await
};
match conn {
Ok(conn) => {
on_connect();
return Some(conn);
}
Err(e) => {
warn!("Could not connect to tpu {}/{}, error: {}", tpu_address, identity, e);
if exit_signal.load(Ordering::Relaxed) {
break;
}
}
}
}
None
}
pub async fn write_all(
mut send_stream: SendStream,
tx: &Vec<u8>,
identity: Pubkey,
last_stable_id: Arc<AtomicU64>,
connection_stable_id: u64,
connection_timeout: Duration,
) -> bool {
let write_timeout_res =
timeout(connection_timeout, send_stream.write_all(tx.as_slice())).await;
match write_timeout_res {
Ok(write_res) => {
if let Err(e) = write_res {
trace!(
"Error while writing transaction for {}, error {}",
identity,
e
);
// retry
last_stable_id.store(connection_stable_id, Ordering::Relaxed);
return true;
}
}
Err(_) => {
warn!("timeout while writing transaction for {}", identity);
}
}
let finish_timeout_res = timeout(connection_timeout, send_stream.finish()).await;
match finish_timeout_res {
Ok(finish_res) => {
if let Err(e) = finish_res {
last_stable_id.store(connection_stable_id, Ordering::Relaxed);
trace!(
"Error while writing transaction for {}, error {}",
identity,
e
);
return true;
}
}
Err(_) => {
warn!("timeout while finishing transaction for {}", identity);
}
}
false
}
pub async fn open_unistream(
connection: Connection,
last_stable_id: Arc<AtomicU64>,
connection_timeout: Duration,
) -> (Option<SendStream>, bool) {
match timeout(connection_timeout, connection.open_uni()).await {
Ok(Ok(unistream)) => (Some(unistream), false),
Ok(Err(_)) => {
// reset connection for next retry
last_stable_id.store(connection.stable_id() as u64, Ordering::Relaxed);
(None, true)
}
// timeout
Err(_) => (None, false),
}
}
#[allow(clippy::too_many_arguments)]
pub async fn send_transaction_batch(
connection: Arc<RwLock<Connection>>,
txs: Vec<Vec<u8>>,
identity: Pubkey,
endpoint: Endpoint,
tpu_address: SocketAddr,
exit_signal: Arc<AtomicBool>,
last_stable_id: Arc<AtomicU64>,
connection_timeout: Duration,
connection_retry_count: usize,
on_connect: fn(),
) {
info!("send transaction batch of size {} to address {}", txs.len(), tpu_address);
let mut queue = VecDeque::new();
for tx in txs {
queue.push_back(tx);
}
for _ in 0..connection_retry_count {
if queue.is_empty() || exit_signal.load(Ordering::Relaxed) {
// return
return;
}
// get new connection reset if necessary
let conn = {
let last_stable_id = last_stable_id.load(Ordering::Relaxed) as usize;
let conn = connection.read().await;
if conn.stable_id() == last_stable_id {
let current_stable_id = conn.stable_id();
// problematic connection
drop(conn);
let mut conn = connection.write().await;
// check may be already written by another thread
if conn.stable_id() != current_stable_id {
conn.clone()
} else {
let new_conn = Self::connect(
identity,
true,
endpoint.clone(),
tpu_address,
connection_timeout,
connection_retry_count,
exit_signal.clone(),
on_connect,
)
.await;
if let Some(new_conn) = new_conn {
*conn = new_conn;
conn.clone()
} else {
// could not connect
return;
}
}
} else {
conn.clone()
}
};
let mut retry = false;
while !queue.is_empty() {
let tx = queue.pop_front().unwrap();
let (stream, retry_conn) =
Self::open_unistream(conn.clone(), last_stable_id.clone(), connection_timeout)
.await;
if let Some(send_stream) = stream {
if exit_signal.load(Ordering::Relaxed) {
return;
}
retry = Self::write_all(
send_stream,
&tx,
identity,
last_stable_id.clone(),
conn.stable_id() as u64,
connection_timeout,
)
.await;
} else {
retry = retry_conn;
}
if retry {
queue.push_back(tx);
break;
}
}
if !retry {
break;
}
}
}
}
pub struct SkipServerVerification;
impl SkipServerVerification {
pub fn new() -> Arc<Self> {
Arc::new(Self)
}
}
impl rustls::client::ServerCertVerifier for SkipServerVerification {
fn verify_server_cert(
&self,
_end_entity: &rustls::Certificate,
_intermediates: &[rustls::Certificate],
_server_name: &rustls::ServerName,
_scts: &mut dyn Iterator<Item = &[u8]>,
_ocsp_response: &[u8],
_now: std::time::SystemTime,
) -> Result<rustls::client::ServerCertVerified, rustls::Error> {
Ok(rustls::client::ServerCertVerified::assertion())
}
}

View File

@ -10,12 +10,12 @@ use rustls::ClientConfig;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::transaction::{Transaction, VersionedTransaction};
use tokio::io::AsyncWriteExt;
use solana_lite_rpc_core::AnyhowJoinHandle;
use crate::proxy_request_format::TpuForwardingRequest;
use crate::quic_connection_utils::SkipServerVerification;
use crate::quic_util::ALPN_TPU_FORWARDPROXY_PROTOCOL_ID;
use crate::tls_config_provicer::ProxyTlsConfigProvider;
use solana_lite_rpc_core::quic_connection_utils::SkipServerVerification;
use solana_lite_rpc_core::proxy_request_format::TpuForwardingRequest;
use crate::test_client::sample_data_factory::build_raw_sample_tx;
use crate::util::AnyhowJoinHandle;
pub struct QuicTestClient {
pub endpoint: Endpoint,

View File

@ -1,7 +1,7 @@
use std::sync::atomic::{AtomicU32, Ordering};
use rcgen::generate_simple_self_signed;
use rustls::{Certificate, ClientConfig, PrivateKey, ServerConfig};
use solana_lite_rpc_core::quic_connection_utils::SkipServerVerification;
use crate::quic_connection_utils::SkipServerVerification;
use crate::quic_util::ALPN_TPU_FORWARDPROXY_PROTOCOL_ID;
// TODO integrate with tpu_service + quic_connection_utils

View File

@ -16,14 +16,14 @@ use solana_sdk::signature::Keypair;
use solana_sdk::signer::Signer;
use solana_sdk::transaction::VersionedTransaction;
use tokio::net::ToSocketAddrs;
use solana_lite_rpc_core::AnyhowJoinHandle;
use solana_streamer::tls_certificates::new_self_signed_tls_certificate;
use tokio::sync::RwLock;
use solana_lite_rpc_core::proxy_request_format::TpuForwardingRequest;
use solana_lite_rpc_core::quic_connection_utils::QuicConnectionUtils;
use solana_lite_rpc_services::tpu_utils::tpu_connection_manager::{ActiveConnection, CONNECTION_RETRY_COUNT, QUIC_CONNECTION_TIMEOUT};
use crate::quic_connection_utils::QuicConnectionUtils;
use crate::tls_config_provicer::{ProxyTlsConfigProvider, SelfSignedTlsConfigProvider};
pub const QUIC_CONNECTION_TIMEOUT: Duration = Duration::from_secs(5);
pub const CONNECTION_RETRY_COUNT: usize = 10;
/// stable connect to TPU to send transactions - optimized for proxy use case
#[derive(Clone)]
pub struct TpuQuicConnection {

View File

@ -0,0 +1,27 @@
use std::sync::Arc;
use dashmap::DashMap;
use solana_transaction_status::TransactionStatus;
use tokio::time::Instant;
/// Transaction Properties
pub struct TxProps {
pub status: Option<TransactionStatus>,
/// Time at which transaction was forwarded
pub sent_at: Instant,
}
impl Default for TxProps {
fn default() -> Self {
Self {
status: Default::default(),
sent_at: Instant::now(),
}
}
}
pub type TxStore = Arc<DashMap<String, TxProps>>;
pub fn empty_tx_store() -> TxStore {
Arc::new(DashMap::new())
}

View File

@ -0,0 +1,2 @@
pub type AnyhowJoinHandle = tokio::task::JoinHandle<anyhow::Result<()>>;

View File

@ -9,8 +9,7 @@ use solana_sdk::pubkey::Pubkey;
use solana_sdk::signature::{Keypair, Signer};
use solana_sdk::transaction::{Transaction, VersionedTransaction};
use spl_memo::solana_program::message::VersionedMessage;
use solana_lite_rpc_core::proxy_request_format::TpuForwardingRequest;
use solana_lite_rpc_core::proxy_request_format::*;
use solana_lite_rpc_quic_forward_proxy::proxy_request_format::TpuForwardingRequest;
#[test]
fn roundtrip() {