fmt+clippy

This commit is contained in:
GroovieGermanikus 2023-07-31 14:26:21 +02:00
parent 4844844e21
commit 035ce6e3d5
18 changed files with 291 additions and 237 deletions

View File

@ -1,6 +1,6 @@
use std::env;
use clap::Parser;
use solana_sdk::signature::Keypair;
use std::env;
#[derive(Parser, Debug)]
#[command(author, version, about, long_about = None)]

View File

@ -1 +1 @@
pub(crate) mod proxy_listener;
pub(crate) mod proxy_listener;

View File

@ -1,40 +1,51 @@
use std::net::SocketAddr;
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
use std::time::Duration;
use anyhow::{anyhow, bail, Context};
use log::{debug, error, info, trace};
use quinn::{Connection, Endpoint, ServerConfig, VarInt};
use solana_sdk::packet::PACKET_DATA_SIZE;
use tokio::sync::mpsc::Sender;
use crate::proxy_request_format::TpuForwardingRequest;
use crate::shared::ForwardPacket;
use crate::tls_config_provider_server::ProxyTlsConfigProvider;
use crate::tls_self_signed_pair_generator::SelfSignedTlsConfigProvider;
use crate::util::FALLBACK_TIMEOUT;
use anyhow::{anyhow, bail, Context};
use log::{debug, error, info, trace};
use quinn::{Connection, Endpoint, ServerConfig, VarInt};
use solana_sdk::packet::PACKET_DATA_SIZE;
use std::net::SocketAddr;
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc::Sender;
// TODO tweak this value - solana server sets 256
// setting this to "1" did not make a difference!
const MAX_CONCURRENT_UNI_STREAMS: u32 = 24;
pub struct ProxyListener {
tls_config: Arc<SelfSignedTlsConfigProvider>,
proxy_listener_addr: SocketAddr,
}
impl ProxyListener {
pub fn new(
proxy_listener_addr: SocketAddr,
tls_config: Arc<SelfSignedTlsConfigProvider>) -> Self {
Self { proxy_listener_addr, tls_config }
tls_config: Arc<SelfSignedTlsConfigProvider>,
) -> Self {
Self {
proxy_listener_addr,
tls_config,
}
}
pub async fn listen(&self, exit_signal: Arc<AtomicBool>, forwarder_channel: Sender<ForwardPacket>) -> anyhow::Result<()> {
info!("TPU Quic Proxy server listening on {}", self.proxy_listener_addr);
pub async fn listen(
&self,
exit_signal: Arc<AtomicBool>,
forwarder_channel: Sender<ForwardPacket>,
) -> anyhow::Result<()> {
info!(
"TPU Quic Proxy server listening on {}",
self.proxy_listener_addr
);
let endpoint = Self::new_proxy_listen_server_endpoint(&self.tls_config, self.proxy_listener_addr).await;
let endpoint =
Self::new_proxy_listen_server_endpoint(&self.tls_config, self.proxy_listener_addr)
.await;
while let Some(connecting) = endpoint.accept().await {
let exit_signal = exit_signal.clone();
@ -42,13 +53,20 @@ impl ProxyListener {
tokio::spawn(async move {
let connection = connecting.await.context("handshake").unwrap();
match Self::accept_client_connection(
connection, forwarder_channel_copy, exit_signal)
.await {
connection,
forwarder_channel_copy,
exit_signal,
)
.await
{
Ok(()) => {
debug!("connection handles correctly");
}
Err(err) => {
error!("failed to accect connection from client: {reason} - skip", reason = err);
error!(
"failed to accect connection from client: {reason} - skip",
reason = err
);
}
}
});
@ -57,9 +75,10 @@ impl ProxyListener {
bail!("TPU Quic Proxy server stopped");
}
async fn new_proxy_listen_server_endpoint(tls_config: &SelfSignedTlsConfigProvider, proxy_listener_addr: SocketAddr) -> Endpoint {
async fn new_proxy_listen_server_endpoint(
tls_config: &SelfSignedTlsConfigProvider,
proxy_listener_addr: SocketAddr,
) -> Endpoint {
let server_tls_config = tls_config.get_server_tls_crypto_config();
let mut quinn_server_config = ServerConfig::with_crypto(Arc::new(server_tls_config));
@ -73,17 +92,23 @@ impl ProxyListener {
transport_config.max_idle_timeout(Some(timeout));
transport_config.keep_alive_interval(Some(Duration::from_millis(500)));
transport_config.stream_receive_window((PACKET_DATA_SIZE as u32).into());
transport_config.receive_window((PACKET_DATA_SIZE as u32 * MAX_CONCURRENT_UNI_STREAMS).into());
transport_config
.receive_window((PACKET_DATA_SIZE as u32 * MAX_CONCURRENT_UNI_STREAMS).into());
Endpoint::server(quinn_server_config, proxy_listener_addr).unwrap()
}
// TODO use interface abstraction for connection_per_tpunode
#[tracing::instrument(skip_all, level = "debug")]
async fn accept_client_connection(client_connection: Connection, forwarder_channel: Sender<ForwardPacket>,
exit_signal: Arc<AtomicBool>) -> anyhow::Result<()> {
debug!("inbound connection established, client {}", client_connection.remote_address());
async fn accept_client_connection(
client_connection: Connection,
forwarder_channel: Sender<ForwardPacket>,
_exit_signal: Arc<AtomicBool>,
) -> anyhow::Result<()> {
debug!(
"inbound connection established, client {}",
client_connection.remote_address()
);
loop {
let maybe_stream = client_connection.accept_uni().await;
@ -91,11 +116,14 @@ impl ProxyListener {
Err(quinn::ConnectionError::ApplicationClosed(reason)) => {
debug!("connection closed by client - reason: {:?}", reason);
if reason.error_code != VarInt::from_u32(0) {
return Err(anyhow!("connection closed by client with unexpected reason: {:?}", reason));
return Err(anyhow!(
"connection closed by client with unexpected reason: {:?}",
reason
));
}
debug!("connection gracefully closed by client");
return Ok(());
},
}
Err(e) => {
error!("failed to accept stream: {}", e);
return Err(anyhow::Error::msg("error accepting stream"));
@ -103,37 +131,41 @@ impl ProxyListener {
Ok(recv_stream) => {
let forwarder_channel_copy = forwarder_channel.clone();
tokio::spawn(async move {
let raw_request = recv_stream.read_to_end(10_000_000).await.unwrap();
let raw_request = recv_stream.read_to_end(10_000_000).await
.unwrap();
let proxy_request = TpuForwardingRequest::deserialize_from_raw_request(&raw_request);
let proxy_request =
TpuForwardingRequest::deserialize_from_raw_request(&raw_request);
trace!("proxy request details: {}", proxy_request);
let _tpu_identity = proxy_request.get_identity_tpunode();
let tpu_address = proxy_request.get_tpu_socket_addr();
let txs = proxy_request.get_transactions();
debug!("enqueue transaction batch of size {} to address {}", txs.len(), tpu_address);
forwarder_channel_copy.send_timeout(ForwardPacket { transactions: txs, tpu_address },
FALLBACK_TIMEOUT)
.await
debug!(
"enqueue transaction batch of size {} to address {}",
txs.len(),
tpu_address
);
forwarder_channel_copy
.send_timeout(
ForwardPacket {
transactions: txs,
tpu_address,
},
FALLBACK_TIMEOUT,
)
.await
.context("sending internal packet from proxy to forwarder")
.unwrap();
});
Ok(())
},
}
}; // -- result
if let Err(e) = result {
return Err(e);
}
} // -- loop
}
}

View File

@ -1,16 +1,16 @@
// lib definition is only required for 'quic-forward-proxy-integration-test' to work
mod cli;
mod inbound;
mod outbound;
pub mod proxy;
pub mod proxy_request_format;
mod quic_util;
mod quinn_auto_reconnect;
mod shared;
mod test_client;
pub mod tls_config_provider_client;
pub mod tls_config_provider_server;
pub mod tls_self_signed_pair_generator;
pub mod proxy;
pub mod validator_identity;
pub mod proxy_request_format;
mod cli;
mod test_client;
mod util;
mod quinn_auto_reconnect;
mod outbound;
mod inbound;
mod shared;
pub mod validator_identity;

View File

@ -1,31 +1,29 @@
use std::sync::Arc;
use crate::cli::{get_identity_keypair, Args};
use crate::proxy::QuicForwardProxy;
use crate::tls_self_signed_pair_generator::SelfSignedTlsConfigProvider;
use anyhow::bail;
use clap::Parser;
use dotenv::dotenv;
use log::info;
use crate::cli::{Args, get_identity_keypair};
use crate::proxy::QuicForwardProxy;
use crate::tls_self_signed_pair_generator::SelfSignedTlsConfigProvider;
use std::sync::Arc;
use crate::validator_identity::ValidatorIdentity;
pub mod cli;
mod inbound;
mod outbound;
pub mod proxy;
pub mod proxy_request_format;
pub mod quic_util;
mod quinn_auto_reconnect;
mod shared;
pub mod test_client;
pub mod tls_config_provider_client;
pub mod tls_config_provider_server;
pub mod tls_self_signed_pair_generator;
pub mod proxy;
pub mod proxy_request_format;
pub mod cli;
pub mod test_client;
mod util;
mod quinn_auto_reconnect;
mod outbound;
mod inbound;
mod shared;
mod validator_identity;
#[tokio::main(flavor = "multi_thread", worker_threads = 16)]
pub async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt::init();
@ -40,8 +38,7 @@ pub async fn main() -> anyhow::Result<()> {
// TODO build args struct dedicated to proxy
let proxy_listener_addr = proxy_rpc_addr.parse().unwrap();
let _tls_configuration = SelfSignedTlsConfigProvider::new_singleton_self_signed_localhost();
let validator_identity =
ValidatorIdentity::new(get_identity_keypair(&identity_keypair).await);
let validator_identity = ValidatorIdentity::new(get_identity_keypair(&identity_keypair).await);
let tls_config = Arc::new(SelfSignedTlsConfigProvider::new_singleton_self_signed_localhost());
let main_services = QuicForwardProxy::new(proxy_listener_addr, tls_config, validator_identity)
@ -54,7 +51,6 @@ pub async fn main() -> anyhow::Result<()> {
// .await?
// .start_services();
let ctrl_c_signal = tokio::signal::ctrl_c();
tokio::select! {
@ -70,5 +66,4 @@ pub async fn main() -> anyhow::Result<()> {
Ok(())
}
}
}

View File

@ -1,25 +1,26 @@
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use log::{debug, info, warn};
use std::collections::HashMap;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use fan::tokio::mpsc::FanOut;
use std::time::Duration;
use anyhow::{bail, Context};
use futures::future::join_all;
use itertools::Itertools;
use quinn::{ClientConfig, Endpoint, EndpointConfig, IdleTimeout, TokioRuntime, TransportConfig, VarInt};
use solana_sdk::quic::QUIC_MAX_TIMEOUT_MS;
use solana_sdk::transaction::VersionedTransaction;
use solana_streamer::nonblocking::quic::ALPN_TPU_PROTOCOL_ID;
use solana_streamer::tls_certificates::new_self_signed_tls_certificate;
use tokio::sync::mpsc::{channel, Receiver};
use crate::quic_util::SkipServerVerification;
use crate::quinn_auto_reconnect::AutoReconnect;
use crate::shared::ForwardPacket;
use crate::util::timeout_fallback;
use crate::validator_identity::ValidatorIdentity;
use anyhow::{bail, Context};
use fan::tokio::mpsc::FanOut;
use futures::future::join_all;
use itertools::Itertools;
use log::{debug, info, warn};
use quinn::{
ClientConfig, Endpoint, EndpointConfig, IdleTimeout, TokioRuntime, TransportConfig, VarInt,
};
use solana_sdk::quic::QUIC_MAX_TIMEOUT_MS;
use solana_sdk::transaction::VersionedTransaction;
use solana_streamer::nonblocking::quic::ALPN_TPU_PROTOCOL_ID;
use solana_streamer::tls_certificates::new_self_signed_tls_certificate;
use std::collections::HashMap;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc::{channel, Receiver};
const QUIC_CONNECTION_TIMEOUT: Duration = Duration::from_secs(5);
pub const CONNECTION_RETRY_COUNT: usize = 10;
@ -30,7 +31,11 @@ const MAX_PARALLEL_STREAMS: usize = 6;
pub const PARALLEL_TPU_CONNECTION_COUNT: usize = 4;
// takes transactions from upstream clients and forwards them to the TPU
pub async fn tx_forwarder(validator_identity: ValidatorIdentity, mut transaction_channel: Receiver<ForwardPacket>, exit_signal: Arc<AtomicBool>) -> anyhow::Result<()> {
pub async fn tx_forwarder(
validator_identity: ValidatorIdentity,
mut transaction_channel: Receiver<ForwardPacket>,
exit_signal: Arc<AtomicBool>,
) -> anyhow::Result<()> {
info!("TPU Quic forwarder started");
let endpoint = new_endpoint_with_validator_identity(validator_identity).await;
@ -42,7 +47,10 @@ pub async fn tx_forwarder(validator_identity: ValidatorIdentity, mut transaction
bail!("exit signal received");
}
let forward_packet = transaction_channel.recv().await.expect("channel closed unexpectedly");
let forward_packet = transaction_channel
.recv()
.await
.expect("channel closed unexpectedly");
// TODO drain the queue with .try_recv() and batch the transactions
let tpu_address = forward_packet.tpu_address;
@ -56,14 +64,17 @@ pub async fn tx_forwarder(validator_identity: ValidatorIdentity, mut transaction
let exit_signal = exit_signal.clone();
let endpoint_copy = endpoint.clone();
tokio::spawn(async move {
debug!("Start Quic forwarder agent #{} for TPU {}", connection_idx, tpu_address);
debug!(
"Start Quic forwarder agent #{} for TPU {}",
connection_idx, tpu_address
);
if exit_signal.load(Ordering::Relaxed) {
return;
}
let auto_connection = AutoReconnect::new(endpoint_copy, tpu_address);
let exit_signal_copy = exit_signal.clone();
let _exit_signal_copy = exit_signal.clone();
while let Some(packet) = receiver.recv().await {
assert_eq!(packet.tpu_address, tpu_address, "routing error");
@ -75,35 +86,49 @@ pub async fn tx_forwarder(validator_identity: ValidatorIdentity, mut transaction
batch_size += 1;
}
debug!("forwarding transaction batch of size {} to address {}", transactions_batch.len(), packet.tpu_address);
debug!(
"forwarding transaction batch of size {} to address {}",
transactions_batch.len(),
packet.tpu_address
);
let result = timeout_fallback(send_tx_batch_to_tpu(&auto_connection, &transactions_batch)).await
.context(format!("send txs to tpu node {}", auto_connection.target_address));
let result = timeout_fallback(send_tx_batch_to_tpu(
&auto_connection,
&transactions_batch,
))
.await
.context(format!(
"send txs to tpu node {}",
auto_connection.target_address
));
if result.is_err() {
warn!("got send_txs_to_tpu_static error {:?} - loop over errors", result);
warn!(
"got send_txs_to_tpu_static error {:?} - loop over errors",
result
);
} else {
debug!("send_txs_to_tpu_static sent {}", transactions_batch.len());
}
} // -- while all packtes from channel
info!("Quic forwarder agent #{} for TPU {} exited", connection_idx, tpu_address);
info!(
"Quic forwarder agent #{} for TPU {} exited",
connection_idx, tpu_address
);
});
}
let fanout = FanOut::new(senders);
agents.insert(tpu_address, fanout);
} // -- new agent
let agent_channel = agents.get(&tpu_address).unwrap();
timeout_fallback(agent_channel.send(forward_packet)).await
timeout_fallback(agent_channel.send(forward_packet))
.await
.context("send to agent channel")??;
} // -- loop over transactions from upstream channels
// not reachable
@ -112,19 +137,25 @@ pub async fn tx_forwarder(validator_identity: ValidatorIdentity, mut transaction
/// takes a validator identity and creates a new QUIC client; appears as staked peer to TPU
// note: ATM the provided identity might or might not be a valid validator keypair
async fn new_endpoint_with_validator_identity(validator_identity: ValidatorIdentity) -> Endpoint {
info!("Setup TPU Quic stable connection with validator identity {} ...", validator_identity);
info!(
"Setup TPU Quic stable connection with validator identity {} ...",
validator_identity
);
let (certificate, key) = new_self_signed_tls_certificate(
&validator_identity.get_keypair_for_tls(),
IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
)
.expect("Failed to initialize QUIC connection certificates");
.expect("Failed to initialize QUIC connection certificates");
let endpoint_outbound = create_tpu_client_endpoint(certificate.clone(), key.clone());
endpoint_outbound
}
fn create_tpu_client_endpoint(certificate: rustls::Certificate, key: rustls::PrivateKey) -> Endpoint {
fn create_tpu_client_endpoint(
certificate: rustls::Certificate,
key: rustls::PrivateKey,
) -> Endpoint {
let mut endpoint = {
let client_socket =
solana_net_utils::bind_in_range(IpAddr::V4(Ipv4Addr::UNSPECIFIED), (8000, 10000))
@ -165,21 +196,18 @@ fn create_tpu_client_endpoint(certificate: rustls::Certificate, key: rustls::Pri
// send potentially large amount of transactions to a single TPU
#[tracing::instrument(skip_all, level = "debug")]
async fn send_tx_batch_to_tpu(
auto_connection: &AutoReconnect,
txs: &Vec<VersionedTransaction>,
) {
async fn send_tx_batch_to_tpu(auto_connection: &AutoReconnect, txs: &Vec<VersionedTransaction>) {
for chunk in txs.chunks(MAX_PARALLEL_STREAMS) {
let all_send_fns = chunk.iter().map(|tx| {
let tx_raw = bincode::serialize(tx).unwrap();
tx_raw
})
let all_send_fns = chunk
.iter()
.map(|tx| {
let tx_raw = bincode::serialize(tx).unwrap();
tx_raw
})
.map(|tx_raw| {
auto_connection.send_uni(tx_raw) // ignores error
});
join_all(all_send_fns).await;
}
}

View File

@ -1,16 +1,15 @@
use std::net::SocketAddr;
use std::sync::Arc;
use anyhow::bail;
use std::sync::atomic::AtomicBool;
use anyhow::{anyhow, bail, Context};
use std::sync::Arc;
use log::{debug, error, info, trace};
use crate::inbound::proxy_listener;
use crate::outbound::tx_forward::tx_forwarder;
use crate::tls_self_signed_pair_generator::SelfSignedTlsConfigProvider;
use crate::util::AnyhowJoinHandle;
use crate::validator_identity::ValidatorIdentity;
use log::info;
pub struct QuicForwardProxy {
// endpoint: Endpoint,
@ -23,36 +22,40 @@ impl QuicForwardProxy {
pub async fn new(
proxy_listener_addr: SocketAddr,
tls_config: Arc<SelfSignedTlsConfigProvider>,
validator_identity: ValidatorIdentity) -> anyhow::Result<Self> {
validator_identity: ValidatorIdentity,
) -> anyhow::Result<Self> {
info!("Quic proxy uses validator identity {}", validator_identity);
Ok(Self { proxy_listener_addr, validator_identity, tls_config })
Ok(Self {
proxy_listener_addr,
validator_identity,
tls_config,
})
}
pub async fn start_services(
self,
) -> anyhow::Result<()> {
pub async fn start_services(self) -> anyhow::Result<()> {
let exit_signal = Arc::new(AtomicBool::new(false));
let (forwarder_channel, forward_receiver) = tokio::sync::mpsc::channel(100_000);
let proxy_listener = proxy_listener::ProxyListener::new(
self.proxy_listener_addr,
self.tls_config);
let proxy_listener =
proxy_listener::ProxyListener::new(self.proxy_listener_addr, self.tls_config);
let exit_signal_clone = exit_signal.clone();
let quic_proxy = tokio::spawn(async move {
proxy_listener.listen(exit_signal_clone.clone(), forwarder_channel).await
proxy_listener
.listen(exit_signal_clone.clone(), forwarder_channel)
.await
.expect("proxy listen service");
});
let validator_identity = self.validator_identity.clone();
let exit_signal_clone = exit_signal.clone();
let forwarder: AnyhowJoinHandle = tokio::spawn(tx_forwarder(validator_identity,
forward_receiver, exit_signal_clone));
let forwarder: AnyhowJoinHandle = tokio::spawn(tx_forwarder(
validator_identity,
forward_receiver,
exit_signal_clone,
));
tokio::select! {
res = quic_proxy => {
@ -63,9 +66,4 @@ impl QuicForwardProxy {
},
}
}
}

View File

@ -1,10 +1,10 @@
use std::fmt;
use std::fmt::Display;
use std::net::{SocketAddr};
use anyhow::Context;
use serde::{Deserialize, Serialize};
use solana_sdk::pubkey::Pubkey;
use solana_sdk::transaction::VersionedTransaction;
use std::fmt;
use std::fmt::Display;
use std::net::SocketAddr;
///
/// lite-rpc to proxy wire format
@ -16,21 +16,29 @@ pub const FORMAT_VERSION1: u16 = 2301;
pub struct TpuForwardingRequest {
format_version: u16,
tpu_socket_addr: SocketAddr, // TODO is that correct
identity_tpunode: Pubkey, // note: this is only used fro
identity_tpunode: Pubkey, // note: this is only used fro
// TODO consider not deserializing transactions in proxy
transactions: Vec<VersionedTransaction>,
}
impl Display for TpuForwardingRequest {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "TpuForwardingRequest for tpu target {} with identity {}: payload {} tx",
&self.get_tpu_socket_addr(), &self.get_identity_tpunode(), &self.get_transactions().len())
write!(
f,
"TpuForwardingRequest for tpu target {} with identity {}: payload {} tx",
&self.get_tpu_socket_addr(),
&self.get_identity_tpunode(),
&self.get_transactions().len()
)
}
}
impl TpuForwardingRequest {
pub fn new(tpu_socket_addr: SocketAddr, identity_tpunode: Pubkey,
transactions: Vec<VersionedTransaction>) -> Self {
pub fn new(
tpu_socket_addr: SocketAddr,
identity_tpunode: Pubkey,
transactions: Vec<VersionedTransaction>,
) -> Self {
TpuForwardingRequest {
format_version: FORMAT_VERSION1,
tpu_socket_addr,
@ -39,8 +47,7 @@ impl TpuForwardingRequest {
}
}
pub fn serialize_wire_format(
&self) -> Vec<u8> {
pub fn serialize_wire_format(&self) -> Vec<u8> {
bincode::serialize(&self).expect("Expect to serialize transactions")
}
@ -67,6 +74,3 @@ impl TpuForwardingRequest {
self.transactions.clone()
}
}

View File

@ -1,9 +1,8 @@
use std::sync::Arc;
use quinn::Connection;
use std::sync::Arc;
pub const ALPN_TPU_FORWARDPROXY_PROTOCOL_ID: &[u8] = b"solana-tpu-forward-proxy";
pub struct SkipServerVerification;
impl SkipServerVerification {
@ -34,6 +33,10 @@ impl rustls::client::ServerCertVerifier for SkipServerVerification {
// RETIRE_CONNECTION_ID: 1, STREAM_DATA_BLOCKED: 0, STREAMS_BLOCKED_BIDI: 0,
// STREAMS_BLOCKED_UNI: 0, STOP_SENDING: 0, STREAM: 0 }
pub fn connection_stats(connection: &Connection) -> String {
format!("stable_id {} stats {:?}, rtt={:?}",
connection.stable_id(), connection.stats().frame_rx, connection.stats().path.rtt)
format!(
"stable_id {} stats {:?}, rtt={:?}",
connection.stable_id(),
connection.stats().frame_rx,
connection.stats().path.rtt
)
}

View File

@ -1,13 +1,12 @@
use crate::util::timeout_fallback;
use anyhow::Context;
use log::warn;
use quinn::{Connection, Endpoint};
use std::fmt;
use std::net::SocketAddr;
use std::sync::atomic::{AtomicU32, Ordering};
use anyhow::Context;
use log::warn;
use tracing::{debug, info};
use quinn::{Connection, Endpoint};
use tokio::sync::{RwLock};
use crate::util::timeout_fallback;
use tokio::sync::RwLock;
use tracing::debug;
pub struct AutoReconnect {
// endoint should be configures with keep-alive and idle timeout
@ -29,19 +28,22 @@ impl AutoReconnect {
pub async fn send_uni(&self, payload: Vec<u8>) -> anyhow::Result<()> {
// TOOD do smart error handling + reconnect
let mut send_stream = timeout_fallback(self.refresh().await.open_uni()).await
let mut send_stream = timeout_fallback(self.refresh().await.open_uni())
.await
.context("open uni stream for sending")??;
send_stream.write_all(payload.as_slice()).await?;
send_stream.finish().await?;
Ok(())
}
pub async fn refresh(&self) -> Connection {
{
let lock = self.current.read().await;
let maybe_conn = lock.as_ref();
if maybe_conn.filter(|conn| conn.close_reason().is_none()).is_some() {
if maybe_conn
.filter(|conn| conn.close_reason().is_none())
.is_some()
{
let reuse = maybe_conn.unwrap();
debug!("Reuse connection {}", reuse.stable_id());
return reuse.clone();
@ -53,17 +55,23 @@ impl AutoReconnect {
Some(current) => {
if current.close_reason().is_some() {
let old_stable_id = current.stable_id();
warn!("Connection {} is closed for reason: {:?}", old_stable_id, current.close_reason());
warn!(
"Connection {} is closed for reason: {:?}",
old_stable_id,
current.close_reason()
);
let new_connection = self.create_connection().await;
*lock = Some(new_connection.clone());
// let old_conn = lock.replace(new_connection.clone());
self.reconnect_count.fetch_add(1, Ordering::SeqCst);
debug!("Replace closed connection {} with {} (retry {})",
debug!(
"Replace closed connection {} with {} (retry {})",
old_stable_id,
new_connection.stable_id(),
self.reconnect_count.load(Ordering::SeqCst));
self.reconnect_count.load(Ordering::SeqCst)
);
new_connection.clone()
} else {
@ -81,12 +89,14 @@ impl AutoReconnect {
new_connection.clone()
}
}
};
}
async fn create_connection(&self) -> Connection {
let connection =
self.endpoint.connect(self.target_address, "localhost").expect("handshake");
let connection = self
.endpoint
.connect(self.target_address, "localhost")
.expect("handshake");
connection.await.expect("connection")
}
@ -94,9 +104,6 @@ impl AutoReconnect {
impl fmt::Display for AutoReconnect {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "Connection to {}",
self.target_address,
)
write!(f, "Connection to {}", self.target_address,)
}
}

View File

@ -1,19 +1,19 @@
use std::net::{SocketAddr};
use anyhow::bail;
use std::net::SocketAddr;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use anyhow::bail;
use crate::proxy_request_format::TpuForwardingRequest;
use crate::quic_util::{SkipServerVerification, ALPN_TPU_FORWARDPROXY_PROTOCOL_ID};
use crate::tls_config_provider_client::TpuCLientTlsConfigProvider;
use crate::util::AnyhowJoinHandle;
use log::{info, trace};
use quinn::{Endpoint, VarInt};
use rustls::ClientConfig;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::transaction::{Transaction, VersionedTransaction};
use tokio::io::AsyncWriteExt;
use crate::proxy_request_format::TpuForwardingRequest;
use crate::quic_util::{ALPN_TPU_FORWARDPROXY_PROTOCOL_ID, SkipServerVerification};
use crate::tls_config_provider_client::TpuCLientTlsConfigProvider;
use crate::util::AnyhowJoinHandle;
pub struct QuicTestClient {
pub endpoint: Endpoint,
@ -23,19 +23,20 @@ pub struct QuicTestClient {
impl QuicTestClient {
pub async fn new_with_endpoint(
proxy_addr: SocketAddr,
tls_config: &impl TpuCLientTlsConfigProvider
tls_config: &impl TpuCLientTlsConfigProvider,
) -> anyhow::Result<Self> {
let client_crypto = tls_config.get_client_tls_crypto_config();
let mut endpoint = quinn::Endpoint::client("0.0.0.0:0".parse().unwrap())?;
endpoint.set_default_client_config(quinn::ClientConfig::new(Arc::new(client_crypto)));
Ok(Self { proxy_addr, endpoint })
Ok(Self {
proxy_addr,
endpoint,
})
}
// connect to a server
pub async fn start_services(
self,
) -> anyhow::Result<()> {
pub async fn start_services(self) -> anyhow::Result<()> {
let endpoint_copy = self.endpoint.clone();
let test_client_service: AnyhowJoinHandle = tokio::spawn(async move {
info!("Sample Quic Client starting ...");
@ -64,8 +65,6 @@ impl QuicTestClient {
ticker.tick().await;
}
Ok(())
});
@ -75,7 +74,6 @@ impl QuicTestClient {
},
}
}
}
fn build_tls_config() -> ClientConfig {
@ -94,7 +92,6 @@ fn build_tls_config() -> ClientConfig {
return client_crypto;
}
fn build_memo_tx_raw() -> Vec<u8> {
let payer_pubkey = Pubkey::new_unique();
let signer_pubkey = Pubkey::new_unique();
@ -107,19 +104,19 @@ fn build_memo_tx_raw() -> Vec<u8> {
// FIXME hardcoded to local test-validator
"127.0.0.1:1027".parse().unwrap(),
Pubkey::from_str("EPLzGRhibYmZ7qysF9BiPmSTRaL8GiLhrQdFTfL8h2fy").unwrap(),
vec![tx.into()]);
vec![tx.into()],
);
println!("wire_data: {:02X?}", wire_data);
wire_data
}
fn serialize_tpu_forwarding_request(
tpu_socket_addr: SocketAddr,
tpu_identity: Pubkey,
transactions: Vec<VersionedTransaction>) -> Vec<u8> {
transactions: Vec<VersionedTransaction>,
) -> Vec<u8> {
let request = TpuForwardingRequest::new(tpu_socket_addr, tpu_identity, transactions);
bincode::serialize(&request).expect("Expect to serialize transactions")

View File

@ -1,21 +1,18 @@
use std::path::Path;
use std::str::FromStr;
use solana_sdk::hash::Hash;
use solana_sdk::instruction::Instruction;
use solana_sdk::message::Message;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::signature::{Keypair, keypair, Signer};
use solana_sdk::signature::{keypair, Keypair, Signer};
use solana_sdk::transaction::{Transaction, VersionedTransaction};
use std::path::Path;
use std::str::FromStr;
const MEMO_PROGRAM_ID: &str = "MemoSq4gqABAXKb96qnH8TysNcWxMyWCqXgDLGmfcHr";
pub fn build_raw_sample_tx() -> Vec<u8> {
let payer_keypair = keypair::read_keypair_file(
Path::new("/Users/stefan/mango/solana-wallet/solana-testnet-stefantest.json")
).unwrap();
let payer_keypair = keypair::read_keypair_file(Path::new(
"/Users/stefan/mango/solana-wallet/solana-testnet-stefantest.json",
))
.unwrap();
let tx = build_sample_tx(&payer_keypair);
@ -37,5 +34,3 @@ pub fn create_memo_tx(msg: &[u8], payer: &Keypair, blockhash: Hash) -> Transacti
let message = Message::new(&[instruction], Some(&payer.pubkey()));
Transaction::new(&[payer], message, blockhash)
}

View File

@ -1,9 +1,7 @@
use rustls::{Certificate, ClientConfig, PrivateKey, ServerConfig};
use rustls::ClientConfig;
// TODO integrate with tpu_service + quic_connection_utils
pub trait TpuCLientTlsConfigProvider {
fn get_client_tls_crypto_config(&self) -> ClientConfig;
}

View File

@ -3,7 +3,5 @@ use rustls::ServerConfig;
// TODO integrate with tpu_service + quic_connection_utils
pub trait ProxyTlsConfigProvider {
fn get_server_tls_crypto_config(&self) -> ServerConfig;
}

View File

@ -1,9 +1,9 @@
use std::sync::atomic::{AtomicU32, Ordering};
use rcgen::generate_simple_self_signed;
use rustls::{Certificate, ClientConfig, PrivateKey, ServerConfig};
use crate::quic_util::{ALPN_TPU_FORWARDPROXY_PROTOCOL_ID, SkipServerVerification};
use crate::quic_util::{SkipServerVerification, ALPN_TPU_FORWARDPROXY_PROTOCOL_ID};
use crate::tls_config_provider_client::TpuCLientTlsConfigProvider;
use crate::tls_config_provider_server::ProxyTlsConfigProvider;
use rcgen::generate_simple_self_signed;
use rustls::{Certificate, ClientConfig, PrivateKey, ServerConfig};
use std::sync::atomic::{AtomicU32, Ordering};
impl ProxyTlsConfigProvider for SelfSignedTlsConfigProvider {
fn get_server_tls_crypto_config(&self) -> ServerConfig {
@ -30,7 +30,11 @@ const INSTANCES: AtomicU32 = AtomicU32::new(0);
impl SelfSignedTlsConfigProvider {
pub fn new_singleton_self_signed_localhost() -> Self {
// note: this check could be relaxed when you know what you are doing!
assert_eq!(INSTANCES.fetch_add(1, Ordering::Relaxed), 0, "should be a singleton");
assert_eq!(
INSTANCES.fetch_add(1, Ordering::Relaxed),
0,
"should be a singleton"
);
let hostnames = vec!["localhost".to_string()];
let (certificate, private_key) = Self::gen_tls_certificate_and_key(hostnames.clone());
let server_crypto = Self::build_server_crypto(certificate.clone(), private_key.clone());
@ -76,7 +80,4 @@ impl SelfSignedTlsConfigProvider {
pub fn get_client_tls_crypto_config(&self) -> &ClientConfig {
&self.client_crypto
}
}

View File

@ -1,14 +1,14 @@
use std::future::Future;
use std::time::Duration;
use futures::TryFutureExt;
use tokio::time::{Timeout, timeout};
use tokio::time::Timeout;
pub type AnyhowJoinHandle = tokio::task::JoinHandle<anyhow::Result<()>>;
pub const FALLBACK_TIMEOUT: Duration = Duration::from_secs(5);
pub fn timeout_fallback<F>(future: F) -> Timeout<F>
where
F: Future,
where
F: Future,
{
tokio::time::timeout(FALLBACK_TIMEOUT, future)
}

View File

@ -1,8 +1,8 @@
use std::fmt::Display;
use std::sync::Arc;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::signature::Keypair;
use solana_sdk::signer::Signer;
use std::fmt::Display;
use std::sync::Arc;
#[derive(Clone)]
pub struct ValidatorIdentity {

View File

@ -1,14 +1,14 @@
use std::str::FromStr;
use solana_lite_rpc_quic_forward_proxy::proxy_request_format::TpuForwardingRequest;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::signature::{Keypair, Signer};
use solana_sdk::transaction::{Transaction};
use solana_lite_rpc_quic_forward_proxy::proxy_request_format::TpuForwardingRequest;
use solana_sdk::transaction::Transaction;
use std::str::FromStr;
#[test]
fn roundtrip() {
let payer = Keypair::from_base58_string("rKiJ7H5UUp3JR18kNyTF1XPuwPKHEM7gMLWHZPWP5djrW1vSjfwjhvJrevxF9MPmUmN9gJMLHZdLMgc9ao78eKr");
let payer = Keypair::from_base58_string(
"rKiJ7H5UUp3JR18kNyTF1XPuwPKHEM7gMLWHZPWP5djrW1vSjfwjhvJrevxF9MPmUmN9gJMLHZdLMgc9ao78eKr",
);
let payer_pubkey = payer.pubkey();
let memo_ix = spl_memo::build_memo("Hello world".as_bytes(), &[&payer_pubkey]);
@ -18,8 +18,9 @@ fn roundtrip() {
let wire_data = TpuForwardingRequest::new(
"127.0.0.1:5454".parse().unwrap(),
Pubkey::from_str("Bm8rtweCQ19ksNebrLY92H7x4bCaeDJSSmEeWqkdCeop").unwrap(),
vec![tx.into()]
).serialize_wire_format();
vec![tx.into()],
)
.serialize_wire_format();
println!("wire_data: {:02X?}", wire_data);
@ -27,7 +28,4 @@ fn roundtrip() {
assert_eq!(request.get_tpu_socket_addr().is_ipv4(), true);
assert_eq!(request.get_transactions().len(), 1);
}