code format lite-rpc

This commit is contained in:
GroovieGermanikus 2023-08-02 15:22:59 +02:00
parent f00d174c30
commit a8863cd74d
13 changed files with 157 additions and 163 deletions

View File

@ -2,6 +2,7 @@ pub mod block_processor;
pub mod block_store;
pub mod leader_schedule;
pub mod notifications;
pub mod proxy_request_format;
pub mod quic_connection;
pub mod quic_connection_utils;
pub mod rotating_queue;
@ -10,6 +11,5 @@ pub mod structures;
pub mod subscription_handler;
pub mod subscription_sink;
pub mod tx_store;
pub mod proxy_request_format;
pub type AnyhowJoinHandle = tokio::task::JoinHandle<anyhow::Result<()>>;

View File

@ -1,10 +1,10 @@
use std::fmt;
use std::fmt::Display;
use std::net::{SocketAddr};
use anyhow::Context;
use serde::{Deserialize, Serialize};
use solana_sdk::pubkey::Pubkey;
use solana_sdk::transaction::VersionedTransaction;
use std::fmt;
use std::fmt::Display;
use std::net::SocketAddr;
///
/// lite-rpc to proxy wire format
@ -22,14 +22,22 @@ pub struct TpuForwardingRequest {
impl Display for TpuForwardingRequest {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "TpuForwardingRequest for tpu target {} with identity {}: payload {} tx",
&self.get_tpu_socket_addr(), &self.get_identity_tpunode(), &self.get_transactions().len())
write!(
f,
"TpuForwardingRequest for tpu target {} with identity {}: payload {} tx",
&self.get_tpu_socket_addr(),
&self.get_identity_tpunode(),
&self.get_transactions().len()
)
}
}
impl TpuForwardingRequest {
pub fn new(tpu_socket_addr: SocketAddr, identity_tpunode: Pubkey,
transactions: Vec<VersionedTransaction>) -> Self {
pub fn new(
tpu_socket_addr: SocketAddr,
identity_tpunode: Pubkey,
transactions: Vec<VersionedTransaction>,
) -> Self {
TpuForwardingRequest {
format_version: FORMAT_VERSION1,
tpu_socket_addr,
@ -38,13 +46,12 @@ impl TpuForwardingRequest {
}
}
pub fn serialize_wire_format(
&self) -> Vec<u8> {
pub fn serialize_wire_format(&self) -> Vec<u8> {
bincode::serialize(&self).expect("Expect to serialize transactions")
}
pub fn deserialize_from_raw_request(raw_proxy_request: &Vec<u8>) -> TpuForwardingRequest {
let request = bincode::deserialize::<TpuForwardingRequest>(&raw_proxy_request)
pub fn deserialize_from_raw_request(raw_proxy_request: &[u8]) -> TpuForwardingRequest {
let request = bincode::deserialize::<TpuForwardingRequest>(raw_proxy_request)
.context("deserialize proxy request")
.unwrap();
@ -65,6 +72,3 @@ impl TpuForwardingRequest {
self.transactions.clone()
}
}

View File

@ -226,7 +226,10 @@ impl rustls::client::ServerCertVerifier for SkipServerVerification {
// rtt=1.08178ms
pub fn connection_stats(connection: &Connection) -> String {
// see https://www.rfc-editor.org/rfc/rfc9000.html#name-frame-types-and-formats
format!("stable_id {}, rtt={:?}, stats {:?}",
connection.stable_id(), connection.stats().path.rtt, connection.stats().frame_rx)
format!(
"stable_id {}, rtt={:?}, stats {:?}",
connection.stable_id(),
connection.stats().path.rtt,
connection.stats().frame_rx
)
}

View File

@ -1,7 +1,9 @@
use crate::structures::identity_stakes::IdentityStakes;
use anyhow::Context;
use log::info;
use serde::Serialize;
use solana_rpc_client::nonblocking::rpc_client::RpcClient;
use solana_sdk::hash::Hash;
use solana_sdk::pubkey::Pubkey;
use solana_streamer::nonblocking::quic::ConnectionPeerType;
use std::{
@ -12,11 +14,9 @@ use std::{
},
time::Duration,
};
use serde::Serialize;
use solana_sdk::hash::Hash;
use solana_sdk::signature::Signature;
use solana_sdk::transaction::{Transaction, uses_durable_nonce, VersionedTransaction};
use solana_sdk::transaction::{uses_durable_nonce, Transaction, VersionedTransaction};
use tokio::sync::mpsc::UnboundedReceiver;
const AVERAGE_SLOT_CHANGE_TIME_IN_MILLIS: u64 = 400;

View File

@ -39,11 +39,11 @@ use solana_sdk::{
use solana_transaction_status::TransactionStatus;
use std::{ops::Deref, str::FromStr, sync::Arc, time::Duration};
use solana_lite_rpc_services::tpu_utils::tpu_connection_path::TpuConnectionPath;
use tokio::{
net::ToSocketAddrs,
sync::mpsc::{self, Sender},
};
use solana_lite_rpc_services::tpu_utils::tpu_connection_path::TpuConnectionPath;
lazy_static::lazy_static! {
static ref RPC_SEND_TX: IntCounter =

View File

@ -7,11 +7,11 @@ use clap::Parser;
use dotenv::dotenv;
use lite_rpc::{bridge::LiteBridge, cli::Args};
use clap::builder::TypedValueParser;
use solana_lite_rpc_services::tpu_utils::tpu_connection_path::TpuConnectionPath;
use solana_sdk::signature::Keypair;
use std::env;
use std::sync::Arc;
use clap::builder::TypedValueParser;
use solana_lite_rpc_services::tpu_utils::tpu_connection_path::TpuConnectionPath;
use crate::rpc_tester::RpcTester;
@ -66,7 +66,7 @@ pub async fn start_lite_rpc(args: Args) -> anyhow::Result<()> {
validator_identity,
retry_after,
maximum_retries_per_tx,
tpu_connection_path
tpu_connection_path,
)
.await
.context("Error building LiteBridge")?
@ -123,12 +123,14 @@ pub async fn main() -> anyhow::Result<()> {
}
}
fn configure_tpu_connection_path(experimental_quic_proxy_addr: Option<String>) -> TpuConnectionPath {
fn configure_tpu_connection_path(
experimental_quic_proxy_addr: Option<String>,
) -> TpuConnectionPath {
match experimental_quic_proxy_addr {
None => TpuConnectionPath::QuicDirectPath,
Some(prox_address) => TpuConnectionPath::QuicForwardProxyPath {
// e.g. "127.0.0.1:11111"
forward_proxy_address: prox_address.parse().unwrap()
forward_proxy_address: prox_address.parse().unwrap(),
},
}
}
}

View File

@ -1,8 +1,6 @@
pub mod tpu_service;
pub mod tpu_connection_path;
pub mod tpu_connection_manager;
pub mod quic_proxy_connection_manager;
pub mod quinn_auto_reconnect;
pub mod tpu_connection_manager;
pub mod tpu_connection_path;

View File

@ -1,28 +1,27 @@
use std::collections::HashMap;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::atomic::Ordering::Relaxed;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use anyhow::bail;
use std::time::Duration;
use anyhow::{bail};
use futures::FutureExt;
use itertools::Itertools;
use log::{debug, error, info, trace};
use quinn::{ClientConfig, Endpoint, EndpointConfig, IdleTimeout, TokioRuntime, TransportConfig, VarInt};
use solana_sdk::packet::PACKET_DATA_SIZE;
use quinn::{
ClientConfig, Endpoint, EndpointConfig, IdleTimeout, TokioRuntime, TransportConfig, VarInt,
};
use solana_sdk::pubkey::Pubkey;
use solana_sdk::signature::Keypair;
use solana_sdk::transaction::VersionedTransaction;
use tokio::sync::{broadcast::Receiver, broadcast::Sender, RwLock};
use tokio::time::timeout;
use solana_lite_rpc_core::proxy_request_format::TpuForwardingRequest;
use solana_lite_rpc_core::quic_connection_utils::{connection_stats, QuicConnectionParameters, SkipServerVerification};
use solana_lite_rpc_core::quic_connection_utils::{
connection_stats, QuicConnectionParameters, SkipServerVerification,
};
use crate::tpu_utils::quinn_auto_reconnect::AutoReconnect;
@ -36,7 +35,7 @@ pub struct QuicProxyConnectionManager {
endpoint: Endpoint,
simple_thread_started: AtomicBool,
proxy_addr: SocketAddr,
current_tpu_nodes: Arc<RwLock<Vec<TpuNode>>>
current_tpu_nodes: Arc<RwLock<Vec<TpuNode>>>,
}
const CHUNK_SIZE_PER_STREAM: usize = 20;
@ -48,7 +47,7 @@ impl QuicProxyConnectionManager {
proxy_addr: SocketAddr,
) -> Self {
info!("Configure Quic proxy connection manager to {}", proxy_addr);
let endpoint = Self::create_proxy_client_endpoint(certificate.clone(), key.clone());
let endpoint = Self::create_proxy_client_endpoint(certificate, key);
Self {
endpoint,
@ -65,21 +64,24 @@ impl QuicProxyConnectionManager {
connections_to_keep: HashMap<Pubkey, SocketAddr>,
connection_parameters: QuicConnectionParameters,
) {
debug!("reconfigure quic proxy connection (# of tpu nodes: {})", connections_to_keep.len());
debug!(
"reconfigure quic proxy connection (# of tpu nodes: {})",
connections_to_keep.len()
);
{
let list_of_nodes = connections_to_keep.iter().map(|(identity, tpu_address)| {
TpuNode {
tpu_identity: identity.clone(),
tpu_address: tpu_address.clone(),
}
}).collect_vec();
let list_of_nodes = connections_to_keep
.iter()
.map(|(identity, tpu_address)| TpuNode {
tpu_identity: *identity,
tpu_address: *tpu_address,
})
.collect_vec();
let mut lock = self.current_tpu_nodes.write().await;
*lock = list_of_nodes;
}
if self.simple_thread_started.load(Relaxed) {
// already started
return;
@ -100,11 +102,12 @@ impl QuicProxyConnectionManager {
exit_signal,
connection_parameters,
));
}
fn create_proxy_client_endpoint(certificate: rustls::Certificate, key: rustls::PrivateKey) -> Endpoint {
fn create_proxy_client_endpoint(
certificate: rustls::Certificate,
key: rustls::PrivateKey,
) -> Endpoint {
const ALPN_TPU_FORWARDPROXY_PROTOCOL_ID: &[u8] = b"solana-tpu-forward-proxy";
let mut endpoint = {
@ -152,7 +155,6 @@ impl QuicProxyConnectionManager {
exit_signal: Arc<AtomicBool>,
connection_parameters: QuicConnectionParameters,
) {
let auto_connection = AutoReconnect::new(endpoint, proxy_addr);
loop {
@ -162,56 +164,58 @@ impl QuicProxyConnectionManager {
}
tokio::select! {
// TODO add timeout
tx = transaction_receiver.recv() => {
// TODO add timeout
tx = transaction_receiver.recv() => {
let first_tx: Vec<u8> = match tx {
Ok((_sig, tx)) => {
tx
},
Err(e) => {
error!(
"Broadcast channel error on recv error {}", e);
continue;
}
};
let mut txs = vec![first_tx];
for _ in 1..connection_parameters.number_of_transactions_per_unistream {
if let Ok((_signature, tx)) = transaction_receiver.try_recv() {
txs.push(tx);
}
let first_tx: Vec<u8> = match tx {
Ok((_sig, tx)) => {
tx
},
Err(e) => {
error!(
"Broadcast channel error on recv error {}", e);
continue;
}
};
let tpu_fanout_nodes = current_tpu_nodes.read().await.clone();
trace!("Sending copy of transaction batch of {} txs to {} tpu nodes via quic proxy",
txs.len(), tpu_fanout_nodes.len());
for target_tpu_node in tpu_fanout_nodes {
Self::send_copy_of_txs_to_quicproxy(
&txs, &auto_connection,
proxy_addr,
target_tpu_node.tpu_address,
target_tpu_node.tpu_identity)
.await.unwrap();
let mut txs = vec![first_tx];
for _ in 1..connection_parameters.number_of_transactions_per_unistream {
if let Ok((_signature, tx)) = transaction_receiver.try_recv() {
txs.push(tx);
}
}
},
};
let tpu_fanout_nodes = current_tpu_nodes.read().await.clone();
trace!("Sending copy of transaction batch of {} txs to {} tpu nodes via quic proxy",
txs.len(), tpu_fanout_nodes.len());
for target_tpu_node in tpu_fanout_nodes {
Self::send_copy_of_txs_to_quicproxy(
&txs, &auto_connection,
proxy_addr,
target_tpu_node.tpu_address,
target_tpu_node.tpu_identity)
.await.unwrap();
}
},
};
}
}
async fn send_copy_of_txs_to_quicproxy(raw_tx_batch: &Vec<Vec<u8>>, auto_connection: &AutoReconnect,
_proxy_address: SocketAddr, tpu_target_address: SocketAddr,
target_tpu_identity: Pubkey) -> anyhow::Result<()> {
let raw_tx_batch_copy = raw_tx_batch.clone();
async fn send_copy_of_txs_to_quicproxy(
raw_tx_batch: &[Vec<u8>],
auto_connection: &AutoReconnect,
_proxy_address: SocketAddr,
tpu_target_address: SocketAddr,
target_tpu_identity: Pubkey,
) -> anyhow::Result<()> {
let mut txs = vec![];
for raw_tx in raw_tx_batch_copy {
let tx = match bincode::deserialize::<VersionedTransaction>(&raw_tx) {
for raw_tx in raw_tx_batch {
let tx = match bincode::deserialize::<VersionedTransaction>(raw_tx) {
Ok(tx) => tx,
Err(err) => {
bail!(err.to_string());
@ -220,13 +224,13 @@ impl QuicProxyConnectionManager {
txs.push(tx);
}
for chunk in txs.chunks(CHUNK_SIZE_PER_STREAM) {
let forwarding_request = TpuForwardingRequest::new(tpu_target_address, target_tpu_identity, chunk.into());
let forwarding_request =
TpuForwardingRequest::new(tpu_target_address, target_tpu_identity, chunk.into());
debug!("forwarding_request: {}", forwarding_request);
let proxy_request_raw = bincode::serialize(&forwarding_request).expect("Expect to serialize transactions");
let proxy_request_raw =
bincode::serialize(&forwarding_request).expect("Expect to serialize transactions");
let send_result = auto_connection.send_uni(proxy_request_raw).await;
@ -238,29 +242,9 @@ impl QuicProxyConnectionManager {
bail!("Failed to send data to quic proxy: {:?}", e);
}
}
} // -- one chunk
Ok(())
}
async fn send_proxy_request(endpoint: Endpoint, proxy_address: SocketAddr, proxy_request_raw: &Vec<u8>) -> anyhow::Result<()> {
info!("sending {} bytes to proxy", proxy_request_raw.len());
let connecting = endpoint.connect(proxy_address, "localhost")?;
let connection = timeout(Duration::from_millis(500), connecting).await??;
let mut send = connection.open_uni().await?;
send.write_all(proxy_request_raw).await?;
send.finish().await?;
debug!("connection stats (lite-rpc to proxy): {}", connection_stats(&connection));
Ok(())
}
}

View File

@ -29,17 +29,14 @@ impl AutoReconnect {
pub async fn send_uni(&self, payload: Vec<u8>) -> anyhow::Result<()> {
// TOOD do smart error handling + reconnect
let mut send_stream = timeout(
Duration::from_secs(4), self.refresh()
.await.open_uni())
.await
.context("open uni stream for sending")??;
let mut send_stream = timeout(Duration::from_secs(4), self.refresh().await.open_uni())
.await
.context("open uni stream for sending")??;
send_stream.write_all(payload.as_slice()).await?;
send_stream.finish().await?;
Ok(())
}
pub async fn refresh(&self) -> Connection {
{
let lock = self.current.read().await;
@ -55,7 +52,7 @@ impl AutoReconnect {
}
let mut lock = self.current.write().await;
let maybe_conn = lock.as_ref();
return match maybe_conn {
match maybe_conn {
Some(current) => {
if current.close_reason().is_some() {
let old_stable_id = current.stable_id();
@ -66,7 +63,6 @@ impl AutoReconnect {
);
let new_connection = self.create_connection().await;
let prev_stable_id = current.stable_id();
*lock = Some(new_connection.clone());
// let old_conn = lock.replace(new_connection.clone());
self.reconnect_count.fetch_add(1, Ordering::SeqCst);
@ -78,7 +74,7 @@ impl AutoReconnect {
self.reconnect_count.load(Ordering::SeqCst)
);
new_connection.clone()
new_connection
} else {
debug!("Reuse connection {} with write-lock", current.stable_id());
current.clone()
@ -92,9 +88,9 @@ impl AutoReconnect {
// let old_conn = foo.replace(Some(new_connection.clone()));
debug!("Create initial connection {}", new_connection.stable_id());
new_connection.clone()
new_connection
}
};
}
}
async fn create_connection(&self) -> Connection {
@ -112,4 +108,3 @@ impl fmt::Display for AutoReconnect {
write!(f, "Connection to {}", self.target_address,)
}
}

View File

@ -1,4 +1,3 @@
use std::fmt::Display;
use std::net::SocketAddr;
@ -12,7 +11,9 @@ impl Display for TpuConnectionPath {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
TpuConnectionPath::QuicDirectPath => write!(f, "Direct QUIC connection to TPU"),
TpuConnectionPath::QuicForwardProxyPath { forward_proxy_address } => {
TpuConnectionPath::QuicForwardProxyPath {
forward_proxy_address,
} => {
write!(f, "QUIC Forward Proxy on {}", forward_proxy_address)
}
}

View File

@ -10,6 +10,9 @@ use solana_lite_rpc_core::{
};
use super::tpu_connection_manager::TpuConnectionManager;
use crate::tpu_utils::quic_proxy_connection_manager::QuicProxyConnectionManager;
use crate::tpu_utils::tpu_connection_path::TpuConnectionPath;
use crate::tpu_utils::tpu_service::ConnectionManager::{DirectTpu, QuicProxy};
use solana_sdk::{
pubkey::Pubkey, quic::QUIC_PORT_OFFSET, signature::Keypair, signer::Signer, slot_history::Slot,
};
@ -26,9 +29,6 @@ use tokio::{
sync::RwLock,
time::{Duration, Instant},
};
use crate::tpu_utils::quic_proxy_connection_manager::QuicProxyConnectionManager;
use crate::tpu_utils::tpu_connection_path::TpuConnectionPath;
use crate::tpu_utils::tpu_service::ConnectionManager::{DirectTpu, QuicProxy};
lazy_static::lazy_static! {
static ref NB_CLUSTER_NODES: GenericGauge<prometheus::core::AtomicI64> =
@ -72,8 +72,12 @@ pub struct TpuService {
#[derive(Clone)]
enum ConnectionManager {
DirectTpu { tpu_connection_manager: Arc<TpuConnectionManager> },
QuicProxy { quic_proxy_connection_manager: Arc<QuicProxyConnectionManager> },
DirectTpu {
tpu_connection_manager: Arc<TpuConnectionManager>,
},
QuicProxy {
quic_proxy_connection_manager: Arc<QuicProxyConnectionManager>,
},
}
impl TpuService {
@ -91,25 +95,25 @@ impl TpuService {
)
.expect("Failed to initialize QUIC client certificates");
let connection_manager =
match config.tpu_connection_path {
TpuConnectionPath::QuicDirectPath => {
let tpu_connection_manager =
TpuConnectionManager::new(certificate, key,
config.fanout_slots as usize).await;
DirectTpu {
tpu_connection_manager: Arc::new(tpu_connection_manager),
}
let connection_manager = match config.tpu_connection_path {
TpuConnectionPath::QuicDirectPath => {
let tpu_connection_manager =
TpuConnectionManager::new(certificate, key, config.fanout_slots as usize).await;
DirectTpu {
tpu_connection_manager: Arc::new(tpu_connection_manager),
}
TpuConnectionPath::QuicForwardProxyPath { forward_proxy_address } => {
let quic_proxy_connection_manager =
QuicProxyConnectionManager::new(certificate, key, forward_proxy_address).await;
}
TpuConnectionPath::QuicForwardProxyPath {
forward_proxy_address,
} => {
let quic_proxy_connection_manager =
QuicProxyConnectionManager::new(certificate, key, forward_proxy_address).await;
QuicProxy {
quic_proxy_connection_manager: Arc::new(quic_proxy_connection_manager),
}
QuicProxy {
quic_proxy_connection_manager: Arc::new(quic_proxy_connection_manager),
}
};
}
};
Ok(Self {
current_slot: Arc::new(AtomicU64::new(current_slot)),
@ -194,13 +198,17 @@ impl TpuService {
self.config.quic_connection_params,
)
.await;
},
QuicProxy { quic_proxy_connection_manager } => {
quic_proxy_connection_manager.update_connection(
self.broadcast_sender.clone(),
connections_to_keep,
self.config.quic_connection_params,
).await;
}
QuicProxy {
quic_proxy_connection_manager,
} => {
quic_proxy_connection_manager
.update_connection(
self.broadcast_sender.clone(),
connections_to_keep,
self.config.quic_connection_params,
)
.await;
}
}
}

View File

@ -11,12 +11,12 @@ use crate::{
tx_sender::{TransactionInfo, TxSender},
};
use anyhow::bail;
use solana_lite_rpc_core::solana_utils::SerializableTransaction;
use solana_lite_rpc_core::{
block_store::{BlockInformation, BlockStore},
notifications::NotificationSender,
AnyhowJoinHandle,
};
use solana_lite_rpc_core::solana_utils::SerializableTransaction;
use solana_sdk::{commitment_config::CommitmentConfig, transaction::VersionedTransaction};
use tokio::{
sync::mpsc::{self, Sender, UnboundedSender},

View File

@ -33,7 +33,6 @@ use tokio::task::JoinHandle;
use tokio::time::sleep;
use tracing_subscriber::util::SubscriberInitExt;
#[derive(Copy, Clone, Debug)]
struct TestCaseParams {
sample_tx_count: u32,