Merge pull request #119 from blockworks-foundation/optimize_custom_tpu_2

Optimize custom tpu 2
This commit is contained in:
galactus 2023-04-14 13:57:21 +02:00 committed by GitHub
commit 4b61b33755
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 342 additions and 136 deletions

1
Cargo.lock generated
View File

@ -379,6 +379,7 @@ dependencies = [
"clap 4.1.6",
"csv",
"dirs",
"futures",
"log",
"rand 0.8.5",
"rand_chacha 0.3.1",

View File

@ -17,4 +17,5 @@ csv = "1.2.1"
dirs = "5.0.0"
rand = "0.8.5"
rand_chacha = "0.3.1"
futures = { workspace = true }

View File

@ -10,9 +10,10 @@ use bench::{
metrics::{AvgMetric, Metric},
};
use clap::Parser;
use log::info;
use futures::future::join_all;
use log::{error, info};
use solana_rpc_client::{nonblocking::rpc_client::RpcClient, rpc_client::SerializableTransaction};
use solana_sdk::{commitment_config::CommitmentConfig, signature::Signature};
use solana_sdk::{commitment_config::CommitmentConfig, signature::Signature, signer::Signer};
#[tokio::main]
async fn main() {
@ -39,17 +40,33 @@ async fn main() {
let mut avg_metric = AvgMetric::default();
for run_num in 0..runs {
let metric = bench(rpc_client.clone(), tx_count).await;
info!("Run {run_num}: Sent and Confirmed {tx_count} tx(s) in {metric:?} with",);
// update avg metric
avg_metric += &metric;
// write metric to file
csv_writer.serialize(metric).unwrap();
let mut tasks = vec![];
for _ in 0..runs {
let rpc_client = rpc_client.clone();
tasks.push(tokio::spawn(bench(rpc_client.clone(), tx_count)));
// wait for an interval
run_interval_ms.tick().await;
}
let join_res = join_all(tasks).await;
let mut run_num = 1;
for res in join_res {
match res {
Ok(metric) => {
info!("Run {run_num}: Sent and Confirmed {tx_count} tx(s) in {metric:?} with",);
// update avg metric
avg_metric += &metric;
csv_writer.serialize(metric).unwrap();
}
Err(_) => {
error!("join error for run {}", run_num);
}
}
run_num += 1;
}
let avg_metric = Metric::from(avg_metric);
info!("Avg Metric {avg_metric:?}",);
@ -60,6 +77,8 @@ async fn main() {
async fn bench(rpc_client: Arc<RpcClient>, tx_count: usize) -> Metric {
let funded_payer = BenchHelper::get_payer().await.unwrap();
println!("payer {}", funded_payer.pubkey());
let blockhash = rpc_client.get_latest_blockhash().await.unwrap();
let txs = BenchHelper::generate_txs(tx_count, &funded_payer, blockhash, None);
@ -106,7 +125,7 @@ async fn bench(rpc_client: Arc<RpcClient>, tx_count: usize) -> Metric {
metrics.txs_confirmed += 1;
to_remove_txs.push(sig);
} else if time_elapsed_since_last_confirmed.unwrap().elapsed()
> Duration::from_secs(3)
> Duration::from_secs(30)
{
metrics.txs_un_confirmed += 1;
to_remove_txs.push(sig);

View File

@ -19,7 +19,7 @@ pub const DEFAULT_WS_ADDR: &str = "ws://0.0.0.0:8900";
#[from_env]
pub const DEFAULT_TX_MAX_RETRIES: u16 = 1;
#[from_env]
pub const DEFAULT_TX_BATCH_SIZE: usize = 32;
pub const DEFAULT_TX_BATCH_SIZE: usize = 100;
#[from_env]
pub const DEFAULT_MAX_NUMBER_OF_TXS_IN_QUEUE: usize = 40_000;

View File

@ -333,7 +333,7 @@ impl BlockListener {
slot: slot as i64,
leader_id: 0, // TODO: lookup leader
parent_slot: parent_slot as i64,
cluster_time: Utc.timestamp_millis_opt(block_time*1000).unwrap(),
cluster_time: Utc.timestamp_millis_opt(block_time * 1000).unwrap(),
local_time: block_info.and_then(|b| b.processed_local_time),
}))
.expect("Error sending block to postgres service");

View File

@ -2,38 +2,44 @@ use std::{
collections::HashMap,
net::{IpAddr, Ipv4Addr, SocketAddr},
sync::{
atomic::{AtomicBool, Ordering},
atomic::{AtomicBool, AtomicU64, Ordering},
Arc,
},
time::Duration,
};
use dashmap::DashMap;
use log::{error, info, trace, warn};
use log::{error, trace, warn};
use prometheus::{core::GenericGauge, opts, register_int_gauge};
use quinn::{
ClientConfig, Connection, ConnectionError, Endpoint, EndpointConfig, IdleTimeout, SendStream,
TokioRuntime, TransportConfig,
};
use solana_sdk::pubkey::Pubkey;
use solana_sdk::{
pubkey::Pubkey,
quic::{QUIC_MAX_STAKED_RECEIVE_WINDOW_RATIO, QUIC_MIN_STAKED_RECEIVE_WINDOW_RATIO},
};
use solana_streamer::nonblocking::quic::compute_max_allowed_uni_streams;
use tokio::{
sync::{broadcast::Receiver, broadcast::Sender},
sync::{broadcast::Receiver, broadcast::Sender, RwLock},
time::timeout,
};
use super::rotating_queue::RotatingQueue;
use super::{rotating_queue::RotatingQueue, tpu_service::IdentityStakes};
pub const ALPN_TPU_PROTOCOL_ID: &[u8] = b"solana-tpu";
const QUIC_CONNECTION_TIMEOUT_DURATION_IN_SEC: Duration = Duration::from_secs(5);
const QUIC_CONNECTION_TIMEOUT_DURATION_IN_SEC: Duration = Duration::from_secs(10);
const CONNECTION_RETRY_COUNT: usize = 10;
lazy_static::lazy_static! {
static ref NB_QUIC_CONNECTIONS: GenericGauge<prometheus::core::AtomicI64> =
register_int_gauge!(opts!("literpc_nb_active_quic_connections", "Number of quic connections open")).unwrap();
static ref NB_QUIC_TASKS: GenericGauge<prometheus::core::AtomicI64> =
register_int_gauge!(opts!("literpc_nb_quic_tasks", "Number quic tasks that are running")).unwrap();
static ref NB_QUIC_ACTIVE_CONNECTIONS: GenericGauge<prometheus::core::AtomicI64> =
register_int_gauge!(opts!("literpc_nb_active_connections", "Number quic tasks that are running")).unwrap();
static ref NB_CONNECTIONS_TO_KEEP: GenericGauge<prometheus::core::AtomicI64> =
register_int_gauge!(opts!("literpc_connections_to_keep", "Number of connections to keep asked by tpu service")).unwrap();
static ref NB_QUIC_TASKS: GenericGauge<prometheus::core::AtomicI64> =
register_int_gauge!(opts!("literpc_quic_tasks", "Number of connections to keep asked by tpu service")).unwrap();
}
struct ActiveConnection {
@ -91,19 +97,18 @@ impl ActiveConnection {
endpoint: Endpoint,
addr: SocketAddr,
exit_signal: Arc<AtomicBool>,
) -> Option<Arc<Connection>> {
) -> Option<Connection> {
for _i in 0..CONNECTION_RETRY_COUNT {
let conn = if already_connected {
info!("making make_connection_0rtt");
Self::make_connection_0rtt(endpoint.clone(), addr).await
} else {
info!("making make_connection");
Self::make_connection(endpoint.clone(), addr).await
let conn = Self::make_connection(endpoint.clone(), addr).await;
conn
};
match conn {
Ok(conn) => {
NB_QUIC_CONNECTIONS.inc();
return Some(Arc::new(conn));
return Some(conn);
}
Err(e) => {
trace!("Could not connect to {} because of error {}", identity, e);
@ -116,50 +121,162 @@ impl ActiveConnection {
None
}
async fn open_unistream(
connection: &mut Option<Arc<Connection>>,
mut reconnect: bool,
async fn write_all(
mut send_stream: SendStream,
tx: &Vec<u8>,
identity: Pubkey,
already_connected: bool,
endpoint: Endpoint,
addr: SocketAddr,
exit_signal: Arc<AtomicBool>,
) -> Option<SendStream> {
loop {
if let Some(connection) = connection {
match timeout(
QUIC_CONNECTION_TIMEOUT_DURATION_IN_SEC,
connection.open_uni(),
)
.await
{
Ok(Ok(unistream)) => return Some(unistream),
Ok(Err(_)) => (),
Err(_) => return None,
last_stable_id: Arc<AtomicU64>,
connection_stable_id: u64,
) -> bool {
let write_timeout_res = timeout(
QUIC_CONNECTION_TIMEOUT_DURATION_IN_SEC,
send_stream.write_all(tx.as_slice()),
)
.await;
match write_timeout_res {
Ok(write_res) => {
if let Err(e) = write_res {
trace!(
"Error while writing transaction for {}, error {}",
identity,
e
);
// retry
last_stable_id.store(connection_stable_id, Ordering::Relaxed);
return true;
}
} else {
reconnect = true
}
if !reconnect {
return None;
Err(_) => {
warn!("timeout while writing transaction for {}", identity);
}
}
// re connect
let Some(conn) = Self::connect(
identity,
already_connected,
endpoint.clone(),
addr,
exit_signal.clone(),
)
.await else {
return None;
let finish_timeout_res = timeout(
QUIC_CONNECTION_TIMEOUT_DURATION_IN_SEC,
send_stream.finish(),
)
.await;
match finish_timeout_res {
Ok(finish_res) => {
if let Err(e) = finish_res {
last_stable_id.store(connection_stable_id, Ordering::Relaxed);
trace!(
"Error while writing transaction for {}, error {}",
identity,
e
);
}
}
Err(_) => {
warn!("timeout while writing transaction for {}", identity);
}
}
false
}
async fn open_unistream(
connection: Connection,
last_stable_id: Arc<AtomicU64>,
) -> (Option<SendStream>, bool) {
match timeout(
QUIC_CONNECTION_TIMEOUT_DURATION_IN_SEC,
connection.open_uni(),
)
.await
{
Ok(Ok(unistream)) => return (Some(unistream), false),
Ok(Err(_)) => {
// reset connection for next retry
last_stable_id.store(connection.stable_id() as u64, Ordering::Relaxed);
(None, true)
}
Err(_) => return (None, false),
}
}
async fn send_transaction_batch(
connection: Arc<RwLock<Connection>>,
txs: Vec<Vec<u8>>,
identity: Pubkey,
endpoint: Endpoint,
socket_addr: SocketAddr,
exit_signal: Arc<AtomicBool>,
last_stable_id: Arc<AtomicU64>,
) {
for _ in 0..3 {
let conn = {
let last_stable_id = last_stable_id.load(Ordering::Relaxed) as usize;
let conn = connection.read().await;
if conn.stable_id() == last_stable_id {
// problematic connection
drop(conn);
let mut conn = connection.write().await;
let new_conn = Self::connect(
identity,
true,
endpoint.clone(),
socket_addr.clone(),
exit_signal.clone(),
)
.await;
if let Some(new_conn) = new_conn {
*conn = new_conn;
conn.clone()
} else {
// could not connect
return;
}
} else {
conn.clone()
}
};
let mut retry = false;
for tx in &txs {
let (stream, retry_conn) =
Self::open_unistream(conn.clone(), last_stable_id.clone()).await;
if let Some(send_stream) = stream {
if exit_signal.load(Ordering::Relaxed) {
return;
}
// new connection don't reconnect now
*connection = Some(conn);
reconnect = false;
retry = Self::write_all(
send_stream,
tx,
identity,
last_stable_id.clone(),
conn.stable_id() as u64,
)
.await;
} else {
retry = retry_conn;
}
}
if !retry {
break;
}
}
}
// copied from solana code base
fn compute_receive_window_ratio_for_staked_node(
max_stake: u64,
min_stake: u64,
stake: u64,
) -> u64 {
if stake > max_stake {
return QUIC_MAX_STAKED_RECEIVE_WINDOW_RATIO;
}
let max_ratio = QUIC_MAX_STAKED_RECEIVE_WINDOW_RATIO;
let min_ratio = QUIC_MIN_STAKED_RECEIVE_WINDOW_RATIO;
if max_stake > min_stake {
let a = (max_ratio - min_ratio) as f64 / (max_stake - min_stake) as f64;
let b = max_ratio as f64 - ((max_stake as f64) * a);
let ratio = (a * stake as f64) + b;
ratio.round() as u64
} else {
QUIC_MAX_STAKED_RECEIVE_WINDOW_RATIO
}
}
@ -170,19 +287,43 @@ impl ActiveConnection {
addr: SocketAddr,
exit_signal: Arc<AtomicBool>,
identity: Pubkey,
identity_stakes: IdentityStakes,
) {
NB_QUIC_TASKS.inc();
let mut already_connected = false;
let mut connection: Option<Arc<Connection>> = None;
NB_QUIC_ACTIVE_CONNECTIONS.inc();
let mut transaction_reciever = transaction_reciever;
let mut exit_oneshot_channel = exit_oneshot_channel;
let max_uni_stream_connections: u64 = compute_max_allowed_uni_streams(
identity_stakes.peer_type,
identity_stakes.stakes,
identity_stakes.total_stakes,
) as u64;
let number_of_transactions_per_unistream = match identity_stakes.peer_type {
solana_streamer::nonblocking::quic::ConnectionPeerType::Staked => {
Self::compute_receive_window_ratio_for_staked_node(
identity_stakes.max_stakes,
identity_stakes.min_stakes,
identity_stakes.stakes,
)
}
solana_streamer::nonblocking::quic::ConnectionPeerType::Unstaked => 1,
};
let task_counter: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
let mut connection: Option<Arc<RwLock<Connection>>> = None;
let last_stable_id: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
loop {
// exit signal set
if exit_signal.load(Ordering::Relaxed) {
break;
}
if task_counter.load(Ordering::Relaxed) > max_uni_stream_connections {
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
continue;
}
tokio::select! {
tx_or_timeout = timeout(QUIC_CONNECTION_TIMEOUT_DURATION_IN_SEC, transaction_reciever.recv() ) => {
// exit signal set
@ -192,7 +333,7 @@ impl ActiveConnection {
match tx_or_timeout {
Ok(tx) => {
let tx: Vec<u8> = match tx {
let first_tx: Vec<u8> = match tx {
Ok(tx) => tx,
Err(e) => {
error!(
@ -202,73 +343,45 @@ impl ActiveConnection {
continue;
}
};
let unistream = Self::open_unistream(
&mut connection,
true,
identity,
already_connected,
endpoint.clone(),
addr,
exit_signal.clone(),
).await;
if !already_connected && connection.is_some() {
already_connected = true;
let mut txs = vec![first_tx];
for _ in 1..number_of_transactions_per_unistream {
if let Ok(tx) = transaction_reciever.try_recv() {
txs.push(tx);
}
}
match unistream {
Some(mut send_stream) => {
trace!("Sending {} transaction", identity);
let write_timeout_res = timeout( QUIC_CONNECTION_TIMEOUT_DURATION_IN_SEC, send_stream.write_all(tx.as_slice())).await;
match write_timeout_res {
Ok(write_res) => {
if let Err(e) = write_res {
warn!(
"Error while writing transaction for {}, error {}",
identity,
e
);
}
},
Err(_) => {
warn!(
"timeout while writing transaction for {}",
identity
);
}
}
let finish_timeout_res = timeout(QUIC_CONNECTION_TIMEOUT_DURATION_IN_SEC, send_stream.finish()).await;
match finish_timeout_res {
Ok(finish_res) => {
if let Err(e) = finish_res {
warn!(
"Error while writing transaction for {}, error {}",
identity,
e
);
}
},
Err(_) => {
warn!(
"timeout while writing transaction for {}",
identity
);
}
}
},
None => {
trace!("could not create a unistream for {}", identity);
if connection.is_none() {
// initial connection
let conn = Self::connect(identity, false, endpoint.clone(), addr.clone(), exit_signal.clone()).await;
if let Some(conn) = conn {
// could connect
connection = Some(Arc::new(RwLock::new(conn)));
} else {
break;
}
}
let task_counter = task_counter.clone();
let endpoint = endpoint.clone();
let exit_signal = exit_signal.clone();
let addr = addr.clone();
let connection = connection.clone();
let last_stable_id = last_stable_id.clone();
tokio::spawn(async move {
task_counter.fetch_add(1, Ordering::Relaxed);
NB_QUIC_TASKS.inc();
let connection = connection.unwrap();
Self::send_transaction_batch(connection, txs, identity, endpoint, addr, exit_signal, last_stable_id).await;
NB_QUIC_TASKS.dec();
task_counter.fetch_sub(1, Ordering::Relaxed);
});
tokio::time::sleep(tokio::time::Duration::from_micros(100)).await;
},
Err(_) => {
// timed out
if connection.is_some() {
NB_QUIC_CONNECTIONS.dec();
connection = None;
}
}
}
},
@ -277,17 +390,14 @@ impl ActiveConnection {
}
};
}
if connection.is_some() {
NB_QUIC_CONNECTIONS.dec();
}
NB_QUIC_TASKS.dec();
NB_QUIC_ACTIVE_CONNECTIONS.dec();
}
pub fn start_listening(
&self,
transaction_reciever: Receiver<Vec<u8>>,
exit_oneshot_channel: tokio::sync::mpsc::Receiver<()>,
identity_stakes: IdentityStakes,
) {
let endpoint = self.endpoint.clone();
let addr = self.tpu_address;
@ -301,6 +411,7 @@ impl ActiveConnection {
addr,
exit_signal,
identity,
identity_stakes,
)
.await;
});
@ -365,6 +476,7 @@ impl TpuConnectionManager {
&self,
transaction_sender: Arc<Sender<Vec<u8>>>,
connections_to_keep: HashMap<Pubkey, SocketAddr>,
identity_stakes: IdentityStakes,
) {
NB_CONNECTIONS_TO_KEEP.set(connections_to_keep.len() as i64);
for (identity, socket_addr) in &connections_to_keep {
@ -376,7 +488,7 @@ impl TpuConnectionManager {
let (sx, rx) = tokio::sync::mpsc::channel(1);
let transaction_reciever = transaction_sender.subscribe();
active_connection.start_listening(transaction_reciever, rx);
active_connection.start_listening(transaction_reciever, rx, identity_stakes);
self.identity_to_active_connection.insert(
*identity,
Arc::new(ActiveConnectionWithExitChannel {

View File

@ -8,10 +8,14 @@ use solana_client::{
rpc_response::RpcContactInfo,
};
use solana_sdk::{pubkey::Pubkey, quic::QUIC_PORT_OFFSET, signature::Keypair, slot_history::Slot};
use solana_streamer::tls_certificates::new_self_signed_tls_certificate;
use solana_sdk::{
pubkey::Pubkey, quic::QUIC_PORT_OFFSET, signature::Keypair, signer::Signer, slot_history::Slot,
};
use solana_streamer::{
nonblocking::quic::ConnectionPeerType, tls_certificates::new_self_signed_tls_certificate,
};
use std::{
collections::VecDeque,
collections::{HashMap, VecDeque},
net::{IpAddr, Ipv4Addr},
str::FromStr,
sync::{
@ -31,7 +35,7 @@ const CACHE_NEXT_SLOT_LEADERS_PUBKEY_SIZE: usize = 1024; // Save pubkey and cont
const CLUSTERINFO_REFRESH_TIME: u64 = 60; // refresh cluster every minute
const LEADER_SCHEDULE_UPDATE_INTERVAL: u64 = 10; // update leader schedule every 10s
const AVERAGE_SLOT_CHANGE_TIME_IN_MILLIS: u64 = 400;
const MAXIMUM_TRANSACTIONS_IN_QUEUE: usize = 1024;
const MAXIMUM_TRANSACTIONS_IN_QUEUE: usize = 16384;
lazy_static::lazy_static! {
static ref NB_CLUSTER_NODES: GenericGauge<prometheus::core::AtomicI64> =
@ -63,6 +67,29 @@ pub struct TpuService {
pubsub_client: Arc<PubsubClient>,
broadcast_sender: Arc<tokio::sync::broadcast::Sender<Vec<u8>>>,
tpu_connection_manager: Arc<TpuConnectionManager>,
identity: Arc<Keypair>,
identity_stakes: Arc<RwLock<IdentityStakes>>,
}
#[derive(Debug, Copy, Clone)]
pub struct IdentityStakes {
pub peer_type: ConnectionPeerType,
pub stakes: u64,
pub total_stakes: u64,
pub min_stakes: u64,
pub max_stakes: u64,
}
impl Default for IdentityStakes {
fn default() -> Self {
IdentityStakes {
peer_type: ConnectionPeerType::Unstaked,
stakes: 0,
total_stakes: 0,
max_stakes: 0,
min_stakes: 0,
}
}
}
impl TpuService {
@ -95,6 +122,8 @@ impl TpuService {
pubsub_client: Arc::new(pubsub_client),
broadcast_sender: Arc::new(sender),
tpu_connection_manager: Arc::new(tpu_connection_manager),
identity,
identity_stakes: Arc::new(RwLock::new(IdentityStakes::default())),
})
}
@ -106,6 +135,43 @@ impl TpuService {
}
});
NB_CLUSTER_NODES.set(self.cluster_nodes.len() as i64);
// update stakes for identity
// update stakes for the identity
{
let vote_accounts = self.rpc_client.get_vote_accounts().await?;
let map_of_stakes: HashMap<String, u64> = vote_accounts
.current
.iter()
.map(|x| (x.node_pubkey.clone(), x.activated_stake))
.collect();
if let Some(stakes) = map_of_stakes.get(&self.identity.pubkey().to_string()) {
let all_stakes: Vec<u64> = vote_accounts
.current
.iter()
.map(|x| x.activated_stake)
.collect();
let identity_stakes = IdentityStakes {
peer_type: ConnectionPeerType::Staked,
stakes: *stakes,
min_stakes: all_stakes.iter().min().map_or(0, |x| *x),
max_stakes: all_stakes.iter().max().map_or(0, |x| *x),
total_stakes: all_stakes.iter().sum(),
};
info!(
"Idenity stakes {}, {}, {}, {}",
identity_stakes.total_stakes,
identity_stakes.min_stakes,
identity_stakes.max_stakes,
identity_stakes.stakes
);
let mut lock = self.identity_stakes.write().await;
*lock = identity_stakes;
}
}
Ok(())
}
@ -195,8 +261,15 @@ impl TpuService {
(Pubkey::from_str(x.pubkey.as_str()).unwrap(), addr)
})
.collect();
let identity_stakes = self.identity_stakes.read().await;
self.tpu_connection_manager
.update_connections(self.broadcast_sender.clone(), connections_to_keep)
.update_connections(
self.broadcast_sender.clone(),
connections_to_keep,
*identity_stakes,
)
.await;
}
@ -224,7 +297,7 @@ impl TpuService {
Duration::from_millis(2000),
self.pubsub_client.slot_subscribe(),
)
.await;
.await;
match res {
Ok(sub_res) => {
match sub_res {