Merge pull request #105 from blockworks-foundation/custom_tpu_client

Custom tpu client
This commit is contained in:
galactus 2023-04-04 18:10:12 +02:00 committed by GitHub
commit baec1d3894
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 1331 additions and 861 deletions

854
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -3,33 +3,31 @@ name = "lite-rpc"
version = "0.1.0"
edition = "2021"
description = "A lite version of solana rpc to send and confirm transactions"
rust-version = "1.67.1"
[workspace]
members = [
"bench"
]
[dev-dependencies]
bench = { path = "./bench" }
[dependencies]
solana-sdk = { git = "https://github.com/blockworks-foundation/solana", branch="lite_rpc" }
solana-rpc-client = { git = "https://github.com/blockworks-foundation/solana", branch="lite_rpc" }
solana-rpc-client-api= { git = "https://github.com/blockworks-foundation/solana", branch="lite_rpc" }
solana-tpu-client= { git = "https://github.com/blockworks-foundation/solana", branch="lite_rpc" }
solana-quic-client= { git = "https://github.com/blockworks-foundation/solana", branch="lite_rpc" }
solana-pubsub-client= { git = "https://github.com/blockworks-foundation/solana", branch="lite_rpc" }
solana-transaction-status = { git = "https://github.com/blockworks-foundation/solana", branch="lite_rpc" }
solana-version= { git = "https://github.com/blockworks-foundation/solana", branch="lite_rpc" }
solana-client= { git = "https://github.com/blockworks-foundation/solana", branch="lite_rpc" }
[workspace.dependencies]
solana-sdk = "1.15.2"
solana-rpc-client = "1.15.2"
solana-rpc-client-api = "1.15.2"
solana-transaction-status = "1.15.2"
solana-version = "1.15.2"
solana-client = "1.15.2"
solana-net-utils = "1.15.2"
solana-pubsub-client = "1.15.2"
solana-streamer = "1.15.2"
serde = { version = "1.0.152", features = ["derive"] }
serde_json = "1.0.93"
tokio = { version = "1.25.0", features = ["full"]}
tokio = { version = "1.27.0", features = ["full", "fs"]}
bincode = "1.3.3"
bs58 = "0.4.0"
base64 = "0.21.0"
thiserror = "1.0.38"
futures = "0.3.26"
thiserror = "1.0.40"
futures = "0.3.28"
bytes = "1.4.0"
anyhow = "1.0.69"
log = "0.4.17"
@ -45,3 +43,44 @@ prometheus = "0.13.3"
lazy_static = "1.4.0"
dotenv = "0.15.0"
async-channel = "1.8.0"
quinn = "0.9.3"
rustls = { version = "0.20.6", default-features = false }
[dev-dependencies]
bench = { path = "./bench" }
[dependencies]
solana-sdk = { workspace = true }
solana-rpc-client = { workspace = true }
solana-rpc-client-api = { workspace = true }
solana-transaction-status = { workspace = true }
solana-version = { workspace = true }
solana-client = { workspace = true }
solana-net-utils = { workspace = true }
solana-pubsub-client = { workspace = true }
solana-streamer = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
tokio = { workspace = true }
bincode = { workspace = true }
bs58 = { workspace = true }
base64 = { workspace = true }
thiserror = { workspace = true }
futures = { workspace = true }
bytes = { workspace = true }
anyhow = { workspace = true }
log = { workspace = true }
clap = { workspace = true }
dashmap = { workspace = true }
const_env = { workspace = true }
jsonrpsee = { workspace = true }
tracing-subscriber = { workspace = true }
tokio-postgres = { workspace = true }
native-tls = { workspace = true }
postgres-native-tls = { workspace = true }
prometheus = { workspace = true }
lazy_static = { workspace = true }
dotenv = { workspace = true }
async-channel = { workspace = true }
quinn = { workspace = true }
rustls = { workspace = true }

View File

@ -4,16 +4,17 @@ version = "0.1.0"
edition = "2021"
[dependencies]
solana-sdk = { git = "https://github.com/blockworks-foundation/solana", branch="lite_rpc" }
solana-rpc-client = { git = "https://github.com/blockworks-foundation/solana", branch="lite_rpc" }
log = "0.4.17"
anyhow = "1.0.69"
serde = "1.0.152"
serde_json = "1.0.93"
csv = "1.2.0"
clap = { version = "4.1.6", features = ["derive"] }
tokio = { version = "1.25.0", features = ["full", "fs"]}
tracing-subscriber = "0.3.16"
dirs = "4.0.0"
solana-sdk = { workspace = true }
solana-rpc-client = { workspace = true }
log = { workspace = true }
anyhow = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
clap = { workspace = true }
tokio = { workspace = true }
tracing-subscriber = { workspace = true }
csv = "1.2.1"
dirs = "5.0.0"
rand = "0.8.5"
rand_chacha = "0.3.1"

View File

@ -1,7 +1,7 @@
use std::str::FromStr;
use std::{str::FromStr, time::Duration};
use anyhow::Context;
use rand::{distributions::Alphanumeric, prelude::Distribution};
use rand::{distributions::Alphanumeric, prelude::Distribution, SeedableRng};
use solana_rpc_client::nonblocking::rpc_client::RpcClient;
use solana_sdk::{
commitment_config::CommitmentConfig,
@ -14,8 +14,10 @@ use solana_sdk::{
system_instruction,
transaction::Transaction,
};
use tokio::time::Instant;
const MEMO_PROGRAM_ID: &str = "MemoSq4gqABAXKb96qnH8TysNcWxMyWCqXgDLGmfcHr";
const WAIT_LIMIT_IN_SECONDS: u64 = 60;
pub struct BenchHelper;
@ -40,7 +42,11 @@ impl BenchHelper {
sig: &Signature,
commitment_config: CommitmentConfig,
) -> anyhow::Result<()> {
let instant = Instant::now();
loop {
if instant.elapsed() > Duration::from_secs(WAIT_LIMIT_IN_SECONDS) {
return Err(anyhow::Error::msg("Timedout waiting"));
}
if let Some(err) = rpc_client
.get_signature_status_with_commitment(sig, commitment_config)
.await?
@ -68,13 +74,13 @@ impl BenchHelper {
num_of_txs: usize,
funded_payer: &Keypair,
blockhash: Hash,
random_seed: Option<u64>,
) -> Vec<Transaction> {
let seed = random_seed.map_or(0, |x| x);
let mut rng = rand_chacha::ChaCha8Rng::seed_from_u64(seed);
(0..num_of_txs)
.map(|_| {
let random_bytes: Vec<u8> = Alphanumeric
.sample_iter(rand::thread_rng())
.take(10)
.collect();
let random_bytes: Vec<u8> = Alphanumeric.sample_iter(&mut rng).take(10).collect();
Self::create_memo_tx(&random_bytes, funded_payer, blockhash)
})

View File

@ -62,7 +62,7 @@ async fn bench(rpc_client: Arc<RpcClient>, tx_count: usize) -> Metric {
let funded_payer = BenchHelper::get_payer().await.unwrap();
let blockhash = rpc_client.get_latest_blockhash().await.unwrap();
let txs = BenchHelper::generate_txs(tx_count, &funded_payer, blockhash);
let txs = BenchHelper::generate_txs(tx_count, &funded_payer, blockhash, None);
let mut un_confirmed_txs: HashMap<Signature, Option<Instant>> =
HashMap::with_capacity(txs.len());

View File

@ -3,17 +3,18 @@ use crate::{
configs::{IsBlockHashValidConfig, SendTransactionConfig},
encoding::BinaryEncoding,
rpc::LiteRpcServer,
tpu_manager::TpuManager,
workers::{
BlockListener, Cleaner, MetricsCapture, Postgres, PrometheusSync, TxSender, WireTransaction,
tpu_utils::tpu_service::TpuService, BlockListener, Cleaner, MetricsCapture, Postgres,
PrometheusSync, TxSender, WireTransaction,
},
DEFAULT_MAX_NUMBER_OF_TXS_IN_QUEUE,
};
use std::{ops::Deref, str::FromStr, sync::Arc, time::Duration};
use anyhow::bail;
use log::info;
use log::{error, info};
use jsonrpsee::{server::ServerBuilder, types::SubscriptionResult, SubscriptionSink};
@ -30,7 +31,7 @@ use solana_sdk::{
use solana_transaction_status::TransactionStatus;
use tokio::{
net::ToSocketAddrs,
sync::mpsc::{self, UnboundedSender},
sync::mpsc::{self, Sender},
task::JoinHandle,
};
@ -55,9 +56,9 @@ lazy_static::lazy_static! {
/// A bridge between clients and tpu
pub struct LiteBridge {
pub rpc_client: Arc<RpcClient>,
pub tpu_manager: Arc<TpuManager>,
pub tpu_service: Arc<TpuService>,
// None if LiteBridge is not executed
pub tx_send_channel: Option<UnboundedSender<(String, WireTransaction, u64)>>,
pub tx_send_channel: Option<Sender<(String, WireTransaction, u64)>>,
pub tx_sender: TxSender,
pub block_listner: BlockListener,
pub block_store: BlockStore,
@ -71,11 +72,19 @@ impl LiteBridge {
identity: Keypair,
) -> anyhow::Result<Self> {
let rpc_client = Arc::new(RpcClient::new(rpc_url.clone()));
let current_slot = rpc_client.get_slot().await?;
let tpu_manager =
Arc::new(TpuManager::new(rpc_client.clone(), ws_addr, fanout_slots, identity).await?);
let tpu_service = TpuService::new(
Arc::new(std::sync::atomic::AtomicU64::new(current_slot)),
fanout_slots,
Arc::new(identity),
rpc_client.clone(),
ws_addr,
)
.await?;
let tpu_service = Arc::new(tpu_service);
let tx_sender = TxSender::new(tpu_manager.clone());
let tx_sender = TxSender::new(tpu_service.clone());
let block_store = BlockStore::new(&rpc_client).await?;
@ -84,7 +93,7 @@ impl LiteBridge {
Ok(Self {
rpc_client,
tpu_manager,
tpu_service,
tx_send_channel: None,
tx_sender,
block_listner,
@ -98,8 +107,6 @@ impl LiteBridge {
mut self,
http_addr: T,
ws_addr: T,
tx_batch_size: usize,
tx_send_interval: Duration,
clean_interval: Duration,
enable_postgres: bool,
prometheus_addr: T,
@ -114,15 +121,15 @@ impl LiteBridge {
(None, None)
};
let (tx_send, tx_recv) = mpsc::unbounded_channel();
let mut tpu_services = self.tpu_service.start().await?;
let (tx_send, tx_recv) = mpsc::channel(DEFAULT_MAX_NUMBER_OF_TXS_IN_QUEUE);
self.tx_send_channel = Some(tx_send);
let tx_sender = self.tx_sender.clone().execute(
tx_recv,
tx_batch_size,
tx_send_interval,
postgres_send.clone(),
);
let tx_sender = self
.tx_sender
.clone()
.execute(tx_recv, postgres_send.clone());
let metrics_capture = MetricsCapture::new(self.tx_sender.clone()).capture();
let prometheus_sync = PrometheusSync.sync(prometheus_addr);
@ -141,7 +148,6 @@ impl LiteBridge {
self.tx_sender.clone(),
self.block_listner.clone(),
self.block_store.clone(),
self.tpu_manager.clone(),
)
.start(clean_interval);
@ -186,6 +192,8 @@ impl LiteBridge {
cleaner,
];
services.append(&mut tpu_services);
if let Some(postgres) = postgres {
services.push(postgres);
}
@ -231,11 +239,18 @@ impl LiteRpcServer for LiteBridge {
return Err(jsonrpsee::core::Error::Custom("Blockhash not found in block store".to_string()));
};
self.tx_send_channel
if let Err(e) = self
.tx_send_channel
.as_ref()
.expect("Lite Bridge Not Executed")
.send((sig.to_string(), raw_tx, slot))
.unwrap();
.await
{
error!(
"Internal error sending transaction on send channel error {}",
e
);
}
TXS_IN_CHANNEL.inc();
Ok(BinaryEncoding::Base58.encode(sig))

View File

@ -1,7 +1,4 @@
use crate::{
DEFAULT_CLEAN_INTERVAL_MS, DEFAULT_FANOUT_SIZE, DEFAULT_RPC_ADDR, DEFAULT_TX_BATCH_INTERVAL_MS,
DEFAULT_TX_BATCH_SIZE, DEFAULT_WS_ADDR,
};
use crate::{DEFAULT_CLEAN_INTERVAL_MS, DEFAULT_FANOUT_SIZE, DEFAULT_RPC_ADDR, DEFAULT_WS_ADDR};
use clap::Parser;
#[derive(Parser, Debug)]
@ -15,15 +12,9 @@ pub struct Args {
pub lite_rpc_http_addr: String,
#[arg(short = 's', long, default_value_t = String::from("[::]:8891"))]
pub lite_rpc_ws_addr: String,
/// batch size of each batch forward
#[arg(short = 'b', long, default_value_t = DEFAULT_TX_BATCH_SIZE)]
pub tx_batch_size: usize,
/// tpu fanout
#[arg(short = 'f', long, default_value_t = DEFAULT_FANOUT_SIZE) ]
pub fanout_size: u64,
/// interval between each batch forward
#[arg(short = 'i', long, default_value_t = DEFAULT_TX_BATCH_INTERVAL_MS)]
pub tx_batch_interval_ms: u64,
/// interval between clean
#[arg(short = 'c', long, default_value_t = DEFAULT_CLEAN_INTERVAL_MS)]
pub clean_interval_ms: u64,

View File

@ -8,7 +8,6 @@ pub mod configs;
pub mod encoding;
pub mod errors;
pub mod rpc;
pub mod tpu_manager;
pub mod workers;
#[from_env]
@ -20,13 +19,14 @@ pub const DEFAULT_WS_ADDR: &str = "ws://0.0.0.0:8900";
#[from_env]
pub const DEFAULT_TX_MAX_RETRIES: u16 = 1;
#[from_env]
pub const DEFAULT_TX_BATCH_SIZE: usize = 512;
pub const DEFAULT_TX_BATCH_SIZE: usize = 32;
#[from_env]
pub const DEFAULT_MAX_NUMBER_OF_TXS_IN_QUEUE: usize = 40_000;
/// 25 slots in 10s send to little more leaders
#[from_env]
pub const DEFAULT_FANOUT_SIZE: u64 = 30;
#[from_env]
pub const DEFAULT_TX_BATCH_INTERVAL_MS: u64 = 100;
pub const DEFAULT_FANOUT_SIZE: u64 = 100;
#[from_env]
pub const DEFAULT_CLEAN_INTERVAL_MS: u64 = 5 * 60 * 1000; // five minute
pub const DEFAULT_TRANSACTION_CONFIRMATION_STATUS: TransactionConfirmationStatus =

View File

@ -38,10 +38,8 @@ pub async fn main() -> anyhow::Result<()> {
let Args {
rpc_addr,
ws_addr,
tx_batch_size,
lite_rpc_ws_addr,
lite_rpc_http_addr,
tx_batch_interval_ms,
clean_interval_ms,
fanout_size,
enable_postgres,
@ -53,7 +51,6 @@ pub async fn main() -> anyhow::Result<()> {
let identity = get_identity_keypair(&identity_keypair).await;
let tx_batch_interval_ms = Duration::from_millis(tx_batch_interval_ms);
let clean_interval_ms = Duration::from_millis(clean_interval_ms);
let light_bridge = LiteBridge::new(rpc_addr, ws_addr, fanout_size, identity).await?;
@ -62,8 +59,6 @@ pub async fn main() -> anyhow::Result<()> {
.start_services(
lite_rpc_http_addr,
lite_rpc_ws_addr,
tx_batch_size,
tx_batch_interval_ms,
clean_interval_ms,
enable_postgres,
prometheus_addr,

View File

@ -1,145 +0,0 @@
use std::{
net::{IpAddr, Ipv4Addr},
sync::{
atomic::{AtomicU32, Ordering},
Arc,
},
};
use log::info;
use prometheus::{opts, register_int_counter, IntCounter};
use solana_quic_client::{QuicConfig, QuicPool};
use solana_rpc_client::nonblocking::rpc_client::RpcClient;
use solana_sdk::signature::Keypair;
use solana_tpu_client::{
nonblocking::tpu_client::TpuClient,
tpu_client::TpuClientConfig,
tpu_connection_cache::{NewTpuConfig, TpuConnectionCache},
};
use tokio::sync::RwLock;
pub type QuicTpuClient = TpuClient<QuicPool>;
pub type QuicConnectionCache = TpuConnectionCache<QuicPool>;
const TPU_CONNECTION_CACHE_SIZE: usize = 4;
lazy_static::lazy_static! {
static ref TPU_CONNECTION_RESET: IntCounter =
register_int_counter!(opts!("literpc_tpu_connection_reset", "Number of times tpu connection was reseted")).unwrap();
}
#[derive(Clone)]
pub struct TpuManager {
error_count: Arc<AtomicU32>,
rpc_client: Arc<RpcClient>,
// why arc twice / one is so that we clone rwlock and other so that we can clone tpu client
tpu_client: Arc<RwLock<Arc<QuicTpuClient>>>,
pub ws_addr: String,
fanout_slots: u64,
identity: Arc<Keypair>,
}
impl TpuManager {
pub async fn new(
rpc_client: Arc<RpcClient>,
ws_addr: String,
fanout_slots: u64,
identity: Keypair,
) -> anyhow::Result<Self> {
let mut tpu_config = QuicConfig::new().unwrap();
tpu_config
.update_client_certificate(&identity, IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)))
.unwrap();
let connection_cache =
QuicConnectionCache::new_with_config(TPU_CONNECTION_CACHE_SIZE, tpu_config);
let connection_cache = Arc::new(connection_cache);
let tpu_client =
Self::new_tpu_client(rpc_client.clone(), &ws_addr, fanout_slots, connection_cache)
.await?;
let tpu_client = Arc::new(RwLock::new(Arc::new(tpu_client)));
Ok(Self {
rpc_client,
tpu_client,
ws_addr,
fanout_slots,
error_count: Default::default(),
identity: Arc::new(identity),
})
}
pub async fn new_tpu_client(
rpc_client: Arc<RpcClient>,
ws_addr: &str,
fanout_slots: u64,
connection_cache: Arc<QuicConnectionCache>,
) -> anyhow::Result<QuicTpuClient> {
Ok(TpuClient::new_with_connection_cache(
rpc_client.clone(),
ws_addr,
TpuClientConfig { fanout_slots },
connection_cache,
)
.await?)
}
pub async fn reset_tpu_client(&self) -> anyhow::Result<()> {
let mut tpu_config = QuicConfig::new().unwrap();
tpu_config
.update_client_certificate(&self.identity, IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)))
.unwrap();
let connection_cache =
QuicConnectionCache::new_with_config(TPU_CONNECTION_CACHE_SIZE, tpu_config);
let connection_cache = Arc::new(connection_cache);
let tpu_client = Self::new_tpu_client(
self.rpc_client.clone(),
&self.ws_addr,
self.fanout_slots,
connection_cache,
)
.await?;
self.error_count.store(0, Ordering::Relaxed);
*self.tpu_client.write().await = Arc::new(tpu_client);
TPU_CONNECTION_RESET.inc();
Ok(())
}
pub async fn reset(&self) -> anyhow::Result<()> {
self.error_count.fetch_add(1, Ordering::Relaxed);
if self.error_count.load(Ordering::Relaxed) > 5 {
self.reset_tpu_client().await?;
info!("TPU Reset after 5 errors");
}
Ok(())
}
async fn get_tpu_client(&self) -> Arc<QuicTpuClient> {
self.tpu_client.read().await.clone()
}
pub async fn try_send_wire_transaction_batch(
&self,
wire_transactions: Vec<Vec<u8>>,
) -> anyhow::Result<()> {
let tpu_client = self.get_tpu_client().await;
match tpu_client
.try_send_wire_transaction_batch(wire_transactions)
.await
{
Ok(_) => Ok(()),
Err(err) => {
self.reset().await?;
Err(err.into())
}
}
}
pub async fn estimated_current_slot(&self) -> u64 {
let tpu_client = self.get_tpu_client().await;
tpu_client.estimated_current_slot()
}
}

View File

@ -6,7 +6,7 @@ use std::{
use dashmap::DashMap;
use jsonrpsee::SubscriptionSink;
use log::{info, warn};
use log::{info, trace, warn};
use prometheus::{
core::GenericGauge, histogram_opts, opts, register_histogram, register_int_counter,
register_int_gauge, Histogram, IntCounter,
@ -243,6 +243,11 @@ impl BlockListener {
TXS_CONFIRMED.inc();
}
info!(
"got transaction {} confrimation level {}",
sig, commitment_config.commitment
);
tx_status.value_mut().status = Some(TransactionStatus {
slot,
confirmations: None,
@ -290,7 +295,7 @@ impl BlockListener {
}
}
info!(
trace!(
"Number of transactions processed {} for slot {} for commitment {} time taken {} ms",
transactions_processed,
slot,

View File

@ -1,9 +1,9 @@
use std::{sync::Arc, time::Duration};
use std::time::Duration;
use log::info;
use tokio::task::JoinHandle;
use crate::{block_store::BlockStore, tpu_manager::TpuManager};
use crate::block_store::BlockStore;
use super::{BlockListener, TxSender};
@ -13,7 +13,6 @@ pub struct Cleaner {
tx_sender: TxSender,
block_listenser: BlockListener,
block_store: BlockStore,
tpu_manager: Arc<TpuManager>,
}
impl Cleaner {
@ -21,13 +20,11 @@ impl Cleaner {
tx_sender: TxSender,
block_listenser: BlockListener,
block_store: BlockStore,
tpu_manager: Arc<TpuManager>,
) -> Self {
Self {
tx_sender,
block_listenser,
block_store,
tpu_manager,
}
}
@ -56,7 +53,6 @@ impl Cleaner {
self.clean_tx_sender(ttl_duration);
self.clean_block_listeners(ttl_duration);
self.clean_block_store(ttl_duration).await;
let _ = self.tpu_manager.reset_tpu_client().await;
}
})
}

View File

@ -3,6 +3,7 @@ mod cleaner;
mod metrics_capture;
mod postgres;
mod prometheus_sync;
pub mod tpu_utils;
mod tx_sender;
pub use block_listenser::*;

View File

@ -0,0 +1,3 @@
pub mod rotating_queue;
pub mod tpu_connection_manager;
pub mod tpu_service;

View File

@ -0,0 +1,34 @@
use std::{
collections::VecDeque,
sync::{Arc, RwLock},
};
#[derive(Clone)]
pub struct RotatingQueue<T> {
deque: Arc<RwLock<VecDeque<T>>>,
}
impl<T: Clone> RotatingQueue<T> {
pub fn new<F>(size: usize, creator_functor: F) -> Self
where
F: Fn() -> T,
{
let item = Self {
deque: Arc::new(RwLock::new(VecDeque::<T>::new())),
};
{
let mut deque = item.deque.write().unwrap();
for _i in 0..size {
deque.push_back(creator_functor());
}
}
item
}
pub fn get(&self) -> T {
let mut deque = self.deque.write().unwrap();
let current = deque.pop_front().unwrap();
deque.push_back(current.clone());
current
}
}

View File

@ -0,0 +1,362 @@
use std::{
collections::HashMap,
net::{IpAddr, Ipv4Addr, SocketAddr},
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
time::Duration,
};
use dashmap::DashMap;
use log::{error, info, trace};
use prometheus::{core::GenericGauge, opts, register_int_gauge};
use quinn::{
ClientConfig, Connection, ConnectionError, Endpoint, EndpointConfig, IdleTimeout, TokioRuntime,
TransportConfig,
};
use solana_sdk::pubkey::Pubkey;
use tokio::{
sync::{broadcast::Receiver, broadcast::Sender},
time::timeout,
};
use super::rotating_queue::RotatingQueue;
pub const ALPN_TPU_PROTOCOL_ID: &[u8] = b"solana-tpu";
const QUIC_CONNECTION_TIMEOUT_DURATION_IN_SEC: u64 = 5;
lazy_static::lazy_static! {
static ref NB_QUIC_CONNECTIONS: GenericGauge<prometheus::core::AtomicI64> =
register_int_gauge!(opts!("literpc_nb_active_quic_connections", "Number of quic connections open")).unwrap();
static ref NB_QUIC_TASKS: GenericGauge<prometheus::core::AtomicI64> =
register_int_gauge!(opts!("literpc_nb_quic_tasks", "Number quic tasks that are running")).unwrap();
}
struct ActiveConnection {
pub endpoint: Endpoint,
pub identity: Pubkey,
pub tpu_address: SocketAddr,
pub exit_signal: Arc<AtomicBool>,
}
impl ActiveConnection {
pub fn new(endpoint: Endpoint, tpu_address: SocketAddr, identity: Pubkey) -> Self {
Self {
endpoint,
tpu_address,
identity,
exit_signal: Arc::new(AtomicBool::new(false)),
}
}
async fn make_connection(endpoint: Endpoint, addr: SocketAddr) -> anyhow::Result<Connection> {
let connecting = endpoint.connect(addr, "connect")?;
// let res = timeout(
// Duration::from_secs(QUIC_CONNECTION_TIMEOUT_DURATION_IN_SEC),
// connecting,
// )
// .await?;
let res = connecting.await;
Ok(res.unwrap())
}
async fn make_connection_0rtt(
endpoint: Endpoint,
addr: SocketAddr,
) -> anyhow::Result<Connection> {
let connecting = endpoint.connect(addr, "connect")?;
let connection = match connecting.into_0rtt() {
Ok((connection, zero_rtt)) => {
if let Ok(_) = timeout(
Duration::from_secs(QUIC_CONNECTION_TIMEOUT_DURATION_IN_SEC),
zero_rtt,
)
.await
{
connection
} else {
error!("timeout while connecting");
return Err(ConnectionError::TimedOut.into());
}
}
Err(connecting) => {
if let Ok(connecting_result) = timeout(
Duration::from_millis(QUIC_CONNECTION_TIMEOUT_DURATION_IN_SEC),
connecting,
)
.await
{
connecting_result?
} else {
error!("timeout while connecting");
return Err(ConnectionError::TimedOut.into());
}
}
};
Ok(connection)
}
async fn listen(
transaction_reciever: Receiver<Vec<u8>>,
exit_oneshot_channel: tokio::sync::mpsc::Receiver<()>,
endpoint: Endpoint,
addr: SocketAddr,
exit_signal: Arc<AtomicBool>,
identity: Pubkey,
) {
NB_QUIC_TASKS.inc();
let mut already_connected = false;
let mut connection: Option<Connection> = None;
let mut transaction_reciever = transaction_reciever;
let mut exit_oneshot_channel = exit_oneshot_channel;
loop {
// exit signal set
if exit_signal.load(Ordering::Relaxed) {
break;
}
tokio::select! {
tx_or_timeout = timeout(Duration::from_secs(QUIC_CONNECTION_TIMEOUT_DURATION_IN_SEC), transaction_reciever.recv() ) => {
// exit signal set
if exit_signal.load(Ordering::Relaxed) {
break;
}
match tx_or_timeout {
Ok(tx) => {
let tx: Vec<u8> = match tx {
Ok(tx) => tx,
Err(e) => {
error!(
"Broadcast channel error on recv for {} error {}",
identity, e
);
continue;
}
};
let mut send_stream = match &connection {
Some(conn) => {
let unistream = conn.open_uni().await;
if let Err(e) = unistream {
error!("error opening a unistream for {} error {}", identity, e);
continue;
}
unistream.unwrap()
},
None => {
let conn = if already_connected {
info!("making make_connection_0rtt");
Self::make_connection_0rtt(endpoint.clone(), addr.clone()).await
} else {
info!("making make_connection");
Self::make_connection(endpoint.clone(), addr.clone()).await
};
match conn {
Ok(conn) => {
NB_QUIC_CONNECTIONS.inc();
already_connected = true;
let unistream = conn.open_uni().await;
if let Err(e) = unistream {
error!("error opening a unistream for {} error {}", identity, e);
continue;
}
connection = Some(conn);
unistream.unwrap()
},
Err(e) => {
error!("Could not connect to {} because of error {}", identity, e);
continue;
}
}
}
};
trace!("Sending {} transaction", identity);
if let Err(e) = send_stream.write_all(tx.as_slice()).await {
error!(
"Error while writing transaction for {} error {}",
identity, e
);
}
if let Err(e) = send_stream.finish().await {
error!(
"Error finishing for {}, error {}",
identity, e,
)
}
},
Err(_) => {
// timed out
if let Some(_) = &mut connection {
NB_QUIC_CONNECTIONS.dec();
connection = None;
}
}
}
},
_ = exit_oneshot_channel.recv() => {
if let Some(_) = &mut connection {
NB_QUIC_CONNECTIONS.dec();
connection = None;
}
break;
}
};
}
if let Some(_) = &mut connection {
NB_QUIC_CONNECTIONS.dec();
}
NB_QUIC_TASKS.dec();
}
pub fn start_listening(
&self,
transaction_reciever: Receiver<Vec<u8>>,
exit_oneshot_channel: tokio::sync::mpsc::Receiver<()>,
) {
let endpoint = self.endpoint.clone();
let addr = self.tpu_address.clone();
let exit_signal = self.exit_signal.clone();
let identity = self.identity.clone();
tokio::spawn(async move {
Self::listen(
transaction_reciever,
exit_oneshot_channel,
endpoint,
addr,
exit_signal,
identity,
)
.await;
});
}
}
struct ActiveConnectionWithExitChannel {
pub active_connection: ActiveConnection,
pub exit_channel: tokio::sync::mpsc::Sender<()>,
}
pub struct TpuConnectionManager {
endpoints: RotatingQueue<Endpoint>,
identity_to_active_connection: Arc<DashMap<Pubkey, Arc<ActiveConnectionWithExitChannel>>>,
}
impl TpuConnectionManager {
pub fn new(certificate: rustls::Certificate, key: rustls::PrivateKey, fanout: usize) -> Self {
let number_of_clients = if fanout > 5 { fanout / 4 } else { 1 };
Self {
endpoints: RotatingQueue::new(number_of_clients, || {
Self::create_endpoint(certificate.clone(), key.clone())
}),
identity_to_active_connection: Arc::new(DashMap::new()),
}
}
fn create_endpoint(certificate: rustls::Certificate, key: rustls::PrivateKey) -> Endpoint {
let mut endpoint = {
let client_socket =
solana_net_utils::bind_in_range(IpAddr::V4(Ipv4Addr::UNSPECIFIED), (8000, 10000))
.expect("create_endpoint bind_in_range")
.1;
let config = EndpointConfig::default();
quinn::Endpoint::new(config, None, client_socket, TokioRuntime)
.expect("create_endpoint quinn::Endpoint::new")
};
let mut crypto = rustls::ClientConfig::builder()
.with_safe_defaults()
.with_custom_certificate_verifier(SkipServerVerification::new())
.with_single_cert(vec![certificate], key)
.expect("Failed to set QUIC client certificates");
crypto.enable_early_data = true;
crypto.alpn_protocols = vec![ALPN_TPU_PROTOCOL_ID.to_vec()];
let mut config = ClientConfig::new(Arc::new(crypto));
let mut transport_config = TransportConfig::default();
let timeout = IdleTimeout::try_from(Duration::from_secs(1)).unwrap();
transport_config.max_idle_timeout(Some(timeout));
transport_config.keep_alive_interval(Some(Duration::from_millis(500)));
config.transport_config(Arc::new(transport_config));
endpoint.set_default_client_config(config);
endpoint
}
pub async fn update_connections(
&self,
transaction_sender: Arc<Sender<Vec<u8>>>,
connections_to_keep: HashMap<Pubkey, SocketAddr>,
) {
for (identity, socket_addr) in &connections_to_keep {
if self.identity_to_active_connection.get(&identity).is_none() {
info!("added a connection for {}, {}", identity, socket_addr);
let endpoint = self.endpoints.get();
let active_connection =
ActiveConnection::new(endpoint, socket_addr.clone(), identity.clone());
// using mpsc as a oneshot channel/ because with one shot channel we cannot reuse the reciever
let (sx, rx) = tokio::sync::mpsc::channel(1);
let transaction_reciever = transaction_sender.subscribe();
active_connection.start_listening(transaction_reciever, rx);
self.identity_to_active_connection.insert(
identity.clone(),
Arc::new(ActiveConnectionWithExitChannel {
active_connection,
exit_channel: sx,
}),
);
}
}
// remove connections which are no longer needed
let collect_current_active_connections = self
.identity_to_active_connection
.iter()
.map(|x| (x.key().clone(), x.value().clone()))
.collect::<Vec<_>>();
for (identity, value) in collect_current_active_connections.iter() {
if !connections_to_keep.contains_key(identity) {
info!("removing a connection for {}", identity);
// ignore error for exit channel
value
.active_connection
.exit_signal
.store(true, Ordering::Relaxed);
let _ = value.exit_channel.send(()).await;
self.identity_to_active_connection.remove(identity);
}
}
}
}
struct SkipServerVerification;
impl SkipServerVerification {
pub fn new() -> Arc<Self> {
Arc::new(Self)
}
}
impl rustls::client::ServerCertVerifier for SkipServerVerification {
fn verify_server_cert(
&self,
_end_entity: &rustls::Certificate,
_intermediates: &[rustls::Certificate],
_server_name: &rustls::ServerName,
_scts: &mut dyn Iterator<Item = &[u8]>,
_ocsp_response: &[u8],
_now: std::time::SystemTime,
) -> Result<rustls::client::ServerCertVerified, rustls::Error> {
Ok(rustls::client::ServerCertVerified::assertion())
}
}

View File

@ -0,0 +1,319 @@
use anyhow::Result;
use dashmap::DashMap;
use futures::StreamExt;
use log::{error, info, warn};
use prometheus::{core::GenericGauge, opts, register_int_gauge};
use solana_client::{
nonblocking::{pubsub_client::PubsubClient, rpc_client::RpcClient},
rpc_response::RpcContactInfo,
};
use solana_sdk::{pubkey::Pubkey, quic::QUIC_PORT_OFFSET, signature::Keypair, slot_history::Slot};
use solana_streamer::tls_certificates::new_self_signed_tls_certificate;
use std::{
collections::VecDeque,
net::{IpAddr, Ipv4Addr},
str::FromStr,
sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
};
use tokio::{
sync::RwLock,
task::JoinHandle,
time::{Duration, Instant},
};
use super::tpu_connection_manager::TpuConnectionManager;
const CACHE_NEXT_SLOT_LEADERS_PUBKEY_SIZE: usize = 1024; // Save pubkey and contact info of next 1024 leaders in the queue
const CLUSTERINFO_REFRESH_TIME: u64 = 60; // refresh cluster every minute
const LEADER_SCHEDULE_UPDATE_INTERVAL: u64 = 10; // update leader schedule every 10s
const AVERAGE_SLOT_CHANGE_TIME_IN_MILLIS: u64 = 400;
const MAXIMUM_TRANSACTIONS_IN_QUEUE: usize = 1024;
lazy_static::lazy_static! {
static ref NB_CLUSTER_NODES: GenericGauge<prometheus::core::AtomicI64> =
register_int_gauge!(opts!("literpc_nb_cluster_nodes", "Number of cluster nodes in saved")).unwrap();
static ref NB_OF_LEADERS_IN_SCHEDULE: GenericGauge<prometheus::core::AtomicI64> =
register_int_gauge!(opts!("literpc_cached_leader", "Number of leaders in schedule cache")).unwrap();
static ref CURRENT_SLOT: GenericGauge<prometheus::core::AtomicI64> =
register_int_gauge!(opts!("literpc_current_slot", "Current slot seen by last rpc")).unwrap();
static ref ESTIMATED_SLOT: GenericGauge<prometheus::core::AtomicI64> =
register_int_gauge!(opts!("literpc_estimated_slot", "Estimated slot seen by last rpc")).unwrap();
}
pub struct LeaderData {
contact_info: Arc<RpcContactInfo>,
leader_slot: Slot,
}
#[derive(Clone)]
pub struct TpuService {
cluster_nodes: Arc<DashMap<Pubkey, Arc<RpcContactInfo>>>,
current_slot: Arc<AtomicU64>,
estimated_slot: Arc<AtomicU64>,
leader_schedule: Arc<RwLock<VecDeque<LeaderData>>>,
fanout_slots: u64,
rpc_client: Arc<RpcClient>,
pubsub_client: Arc<PubsubClient>,
broadcast_sender: Arc<tokio::sync::broadcast::Sender<Vec<u8>>>,
tpu_connection_manager: Arc<TpuConnectionManager>,
}
impl TpuService {
pub async fn new(
current_slot: Arc<AtomicU64>,
fanout_slots: u64,
identity: Arc<Keypair>,
rpc_client: Arc<RpcClient>,
rpc_ws_address: String,
) -> anyhow::Result<Self> {
let slot = current_slot.load(Ordering::Relaxed);
let pubsub_client = PubsubClient::new(&rpc_ws_address).await?;
let (sender, _) = tokio::sync::broadcast::channel(MAXIMUM_TRANSACTIONS_IN_QUEUE);
let (certificate, key) = new_self_signed_tls_certificate(
identity.as_ref(),
IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
)
.expect("Failed to initialize QUIC client certificates");
let tpu_connection_manager =
TpuConnectionManager::new(certificate, key, fanout_slots as usize);
Ok(Self {
cluster_nodes: Arc::new(DashMap::new()),
current_slot: current_slot,
estimated_slot: Arc::new(AtomicU64::new(slot)),
leader_schedule: Arc::new(RwLock::new(VecDeque::new())),
fanout_slots,
rpc_client,
pubsub_client: Arc::new(pubsub_client),
broadcast_sender: Arc::new(sender),
tpu_connection_manager: Arc::new(tpu_connection_manager),
})
}
pub async fn update_cluster_nodes(&self) -> Result<()> {
let cluster_nodes = self.rpc_client.get_cluster_nodes().await?;
cluster_nodes.iter().for_each(|x| {
if let Ok(pubkey) = Pubkey::from_str(x.pubkey.as_str()) {
self.cluster_nodes.insert(pubkey, Arc::new(x.clone()));
}
});
NB_CLUSTER_NODES.set(self.cluster_nodes.len() as i64);
Ok(())
}
pub fn send_transaction(&self, transaction: Vec<u8>) -> anyhow::Result<()> {
self.broadcast_sender.send(transaction)?;
Ok(())
}
pub async fn update_leader_schedule(&self) -> Result<()> {
let current_slot = self.current_slot.load(Ordering::Relaxed);
let (queue_begin_slot, queue_end_slot) = {
let mut leader_queue = self.leader_schedule.write().await;
// remove old leaders
while leader_queue.front().map_or(current_slot, |x| x.leader_slot) < current_slot {
leader_queue.pop_front();
}
let last_element = leader_queue.back().map_or(current_slot, |x| x.leader_slot);
(current_slot, last_element)
};
let last_slot_needed = queue_begin_slot + CACHE_NEXT_SLOT_LEADERS_PUBKEY_SIZE as u64;
if last_slot_needed > queue_end_slot + 1 {
let first_slot_to_fetch = queue_end_slot + 1;
let leaders = self
.rpc_client
.get_slot_leaders(first_slot_to_fetch, last_slot_needed - first_slot_to_fetch)
.await?;
let mut leader_queue = self.leader_schedule.write().await;
for i in first_slot_to_fetch..last_slot_needed {
let current_leader = (i - first_slot_to_fetch) as usize;
let leader = leaders[current_leader];
match self.cluster_nodes.get(&leader) {
Some(r) => {
// push back the leader in the queue
leader_queue.push_back(LeaderData {
contact_info: r.value().clone(),
leader_slot: i,
});
}
None => {
warn!("leader not found in cluster info : {}", leader.to_string());
}
}
}
NB_OF_LEADERS_IN_SCHEDULE.set(leader_queue.len() as i64);
}
Ok(())
}
async fn update_quic_connections(&self) {
let estimated_slot = self.estimated_slot.load(Ordering::Relaxed);
let current_slot = self.current_slot.load(Ordering::Relaxed);
let load_slot = if estimated_slot <= current_slot {
current_slot
} else {
if estimated_slot - current_slot > 8 {
estimated_slot - 8
} else {
current_slot
}
};
let fanout = self.fanout_slots;
let next_leaders = {
let leader_schedule = self.leader_schedule.read().await;
let mut next_leaders = vec![];
for leader in leader_schedule.iter() {
if leader.leader_slot >= load_slot && leader.leader_slot <= load_slot + fanout {
next_leaders.push(leader.contact_info.clone());
} else if leader.leader_slot > load_slot + fanout {
break;
}
}
next_leaders
};
let connections_to_keep = next_leaders
.iter()
.filter(|x| x.tpu.is_some())
.map(|x| {
let mut addr = x.tpu.unwrap().clone();
// add quic port offset
addr.set_port(addr.port() + QUIC_PORT_OFFSET);
(Pubkey::from_str(x.pubkey.as_str()).unwrap(), addr)
})
.collect();
self.tpu_connection_manager
.update_connections(self.broadcast_sender.clone(), connections_to_keep)
.await;
}
pub async fn start(&self) -> anyhow::Result<Vec<JoinHandle<anyhow::Result<()>>>> {
self.update_cluster_nodes().await?;
self.update_leader_schedule().await?;
self.update_quic_connections().await;
let this = self.clone();
let jh_update_leaders = tokio::spawn(async move {
let mut last_cluster_info_update = Instant::now();
let leader_schedule_update_interval =
Duration::from_secs(LEADER_SCHEDULE_UPDATE_INTERVAL);
let cluster_info_update_interval = Duration::from_secs(CLUSTERINFO_REFRESH_TIME);
loop {
tokio::time::sleep(leader_schedule_update_interval).await;
info!("update leader schedule and cluster nodes");
if this.update_leader_schedule().await.is_err() {
error!("unable to update leader shedule");
}
if last_cluster_info_update.elapsed() > cluster_info_update_interval {
if this.update_cluster_nodes().await.is_err() {
error!("unable to update cluster infos");
} else {
last_cluster_info_update = Instant::now();
}
}
}
});
let pubsub_client = self.pubsub_client.clone();
let current_slot = self.current_slot.clone();
let (slot_sender, slot_reciever) = tokio::sync::mpsc::unbounded_channel::<Slot>();
let slot_sub_task = tokio::spawn(async move {
let pubsub_client = pubsub_client.clone();
let current_slot = current_slot.clone();
loop {
let res = pubsub_client.slot_subscribe().await;
if let Ok((mut client, unsub)) = res {
loop {
let next =
tokio::time::timeout(Duration::from_millis(2000), client.next()).await;
match next {
Ok(slot_info) => {
if let Some(slot_info) = slot_info {
if slot_info.slot > current_slot.load(Ordering::Relaxed) {
current_slot.store(slot_info.slot, Ordering::Relaxed);
CURRENT_SLOT.set(slot_info.slot as i64);
let _ = slot_sender.send(slot_info.slot);
}
}
}
Err(_) => {
// timedout reconnect to pubsub
warn!("slot pub sub disconnected reconnecting");
break;
}
}
}
unsub();
} else if let Err(e) = res {
error!("could not subsribe to the slot {}", e);
}
}
});
let estimated_slot = self.estimated_slot.clone();
let current_slot = self.current_slot.clone();
let this = self.clone();
let estimated_slot_calculation = tokio::spawn(async move {
// this is an estimated slot. we get the current slot and if we do not recieve any notification in 400ms we update it manually
let mut slot_reciever = slot_reciever;
loop {
let update_connections = match tokio::time::timeout(
Duration::from_millis(AVERAGE_SLOT_CHANGE_TIME_IN_MILLIS),
slot_reciever.recv(),
)
.await
{
Ok(recv) => {
if let Some(slot) = recv {
if slot > estimated_slot.load(Ordering::Relaxed) {
// incase of multilple slot update events / take the current slot
let current_slot = current_slot.load(Ordering::Relaxed);
estimated_slot.store(current_slot, Ordering::Relaxed);
ESTIMATED_SLOT.set(current_slot as i64);
true
} else {
// queue is late estimate slot is already ahead
false
}
} else {
false
}
}
Err(_) => {
let slot = estimated_slot.fetch_add(1, Ordering::Relaxed);
ESTIMATED_SLOT.set((slot + 1) as i64);
true
}
};
if update_connections {
this.update_quic_connections().await;
}
}
});
Ok(vec![
jh_update_leaders,
slot_sub_task,
estimated_slot_calculation,
])
}
pub fn get_estimated_slot(&self) -> u64 {
self.estimated_slot.load(Ordering::Relaxed)
}
}

View File

@ -1,8 +1,5 @@
use std::{
sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
sync::Arc,
time::{Duration, Instant},
};
@ -15,16 +12,13 @@ use prometheus::{
register_int_gauge, Histogram, IntCounter,
};
use solana_transaction_status::TransactionStatus;
use tokio::{
sync::Semaphore,
sync::{mpsc::UnboundedReceiver, OwnedSemaphorePermit},
task::JoinHandle,
};
use tokio::{sync::mpsc::Receiver, task::JoinHandle};
use crate::{
bridge::TXS_IN_CHANNEL,
tpu_manager::TpuManager,
workers::{PostgresMsg, PostgresTx, MESSAGES_IN_POSTGRES_CHANNEL},
workers::{
tpu_utils::tpu_service::TpuService, PostgresMsg, PostgresTx, MESSAGES_IN_POSTGRES_CHANNEL,
},
};
use super::PostgresMpscSend;
@ -41,15 +35,13 @@ lazy_static::lazy_static! {
))
.unwrap();
static ref TX_TIMED_OUT: GenericGauge<prometheus::core::AtomicI64> = register_int_gauge!(opts!("literpc_tx_timeout", "Number of transactions that timeout")).unwrap();
static ref TOKIO_SEND_TASKS: GenericGauge<prometheus::core::AtomicI64> = register_int_gauge!(opts!("literpc_send_tasks_count", "Number of tasks sending confirmed transactions")).unwrap();
}
pub type WireTransaction = Vec<u8>;
const NUMBER_OF_TX_SENDERS: usize = 7;
// making 250 as sleep time will effectively make lite rpc send
// (1000/250) * 5 * 512 = 10240 tps
const SLEEP_TIME_FOR_SENDING_TASK_MS: u64 = 250;
const MAX_NUMBER_OF_TOKIO_TASKS_SENDING_TXS: u64 = 1024;
const INTERVAL_PER_BATCH_IN_MS: u64 = 50;
const MAX_BATCH_SIZE_IN_PER_INTERVAL: usize = 2000;
/// Retry transactions to a maximum of `u16` times, keep a track of confirmed transactions
#[derive(Clone)]
@ -57,7 +49,7 @@ pub struct TxSender {
/// Tx(s) forwarded to tpu
pub txs_sent_store: Arc<DashMap<String, TxProps>>,
/// TpuClient to call the tpu port
pub tpu_manager: Arc<TpuManager>,
pub tpu_service: Arc<TpuService>,
}
/// Transaction Properties
@ -77,9 +69,9 @@ impl Default for TxProps {
}
impl TxSender {
pub fn new(tpu_manager: Arc<TpuManager>) -> Self {
pub fn new(tpu_service: Arc<TpuService>) -> Self {
Self {
tpu_manager,
tpu_service,
txs_sent_store: Default::default(),
}
}
@ -90,8 +82,6 @@ impl TxSender {
sigs_and_slots: Vec<(String, u64)>,
txs: Vec<WireTransaction>,
postgres: Option<PostgresMpscSend>,
permit: OwnedSemaphorePermit,
tasks_counter: Arc<AtomicU64>,
) {
assert_eq!(sigs_and_slots.len(), txs.len());
@ -102,125 +92,99 @@ impl TxSender {
let histo_timer = TT_SENT_TIMER.start_timer();
let start = Instant::now();
let tpu_client = self.tpu_manager.clone();
let tpu_client = self.tpu_service.clone();
let txs_sent = self.txs_sent_store.clone();
for (sig, _) in &sigs_and_slots {
info!("sending transaction {}", sig);
txs_sent.insert(sig.to_owned(), TxProps::default());
}
let forwarded_slot = tpu_client.estimated_current_slot().await;
let transaction_batch_size = txs.len() as u64;
let current_nb_tasks = tasks_counter.fetch_add(1, Ordering::Relaxed);
TOKIO_SEND_TASKS.set((current_nb_tasks + 1) as i64);
let tasks_counter_clone = tasks_counter.clone();
let forwarded_slot = tpu_client.get_estimated_slot();
tokio::spawn(async move {
let quic_response = match tpu_client.try_send_wire_transaction_batch(txs).await {
let mut quic_responses = vec![];
for tx in txs {
let quic_response = match tpu_client.send_transaction(tx) {
Ok(_) => {
TXS_SENT.inc_by(transaction_batch_size);
TXS_SENT.inc_by(1);
1
}
Err(err) => {
TXS_SENT_ERRORS.inc_by(transaction_batch_size);
TXS_SENT_ERRORS.inc_by(1);
warn!("{err}");
0
}
};
tasks_counter.fetch_sub(1, Ordering::Relaxed);
if let Some(postgres) = postgres {
let postgres_msgs = sigs_and_slots
.iter()
.map(|(sig, recent_slot)| PostgresTx {
signature: sig.clone(),
recent_slot: *recent_slot as i64,
forwarded_slot: forwarded_slot as i64,
processed_slot: None,
cu_consumed: None,
cu_requested: None,
quic_response,
})
.collect();
postgres
.send(PostgresMsg::PostgresTx(postgres_msgs))
.expect("Error writing to postgres service");
MESSAGES_IN_POSTGRES_CHANNEL.inc();
}
histo_timer.observe_duration();
info!(
"It took {} ms to send a batch of {} transaction(s)",
start.elapsed().as_millis(),
sigs_and_slots.len()
);
});
loop {
tokio::time::sleep(Duration::from_millis(SLEEP_TIME_FOR_SENDING_TASK_MS)).await;
if tasks_counter_clone.load(std::sync::atomic::Ordering::Relaxed)
< MAX_NUMBER_OF_TOKIO_TASKS_SENDING_TXS
{
break;
}
// else currently tokio has lanched too many tasks wait for some of them to get finished
quic_responses.push(quic_response);
}
drop(permit);
if let Some(postgres) = &postgres {
let postgres_msgs = sigs_and_slots
.iter()
.enumerate()
.map(|(index, (sig, recent_slot))| PostgresTx {
signature: sig.clone(),
recent_slot: *recent_slot as i64,
forwarded_slot: forwarded_slot as i64,
processed_slot: None,
cu_consumed: None,
cu_requested: None,
quic_response: quic_responses[index],
})
.collect();
postgres
.send(PostgresMsg::PostgresTx(postgres_msgs))
.expect("Error writing to postgres service");
MESSAGES_IN_POSTGRES_CHANNEL.inc();
}
histo_timer.observe_duration();
info!(
"It took {} ms to send a batch of {} transaction(s)",
start.elapsed().as_millis(),
sigs_and_slots.len()
);
}
/// retry and confirm transactions every 2ms (avg time to confirm tx)
pub fn execute(
self,
mut recv: UnboundedReceiver<(String, WireTransaction, u64)>,
tx_batch_size: usize,
tx_send_interval: Duration,
mut recv: Receiver<(String, WireTransaction, u64)>,
postgres_send: Option<PostgresMpscSend>,
) -> JoinHandle<anyhow::Result<()>> {
tokio::spawn(async move {
info!(
"Batching tx(s) with batch size of {tx_batch_size} every {}ms",
tx_send_interval.as_millis()
);
let semaphore = Arc::new(Semaphore::new(NUMBER_OF_TX_SENDERS));
// To limit the maximum number of background tasks sending transactions to MAX_NUMBER_OF_TOKIO_TASKS_SENDING_TXS
let tasks_counter = Arc::new(AtomicU64::new(0));
let tx_sender = self.clone();
loop {
let mut sigs_and_slots = Vec::with_capacity(tx_batch_size);
let mut txs = Vec::with_capacity(tx_batch_size);
let mut permit = None;
let tasks_counter = tasks_counter.clone();
let mut timeout_interval = tx_send_interval.as_millis() as u64;
let mut sigs_and_slots = Vec::with_capacity(MAX_BATCH_SIZE_IN_PER_INTERVAL);
let mut txs = Vec::with_capacity(MAX_BATCH_SIZE_IN_PER_INTERVAL);
let mut timeout_interval = INTERVAL_PER_BATCH_IN_MS;
while txs.len() <= tx_batch_size {
// In solana there in sig verify stage rate is limited to 2000 txs in 50ms
// taking this as reference
while txs.len() <= MAX_BATCH_SIZE_IN_PER_INTERVAL {
let instance = tokio::time::Instant::now();
match tokio::time::timeout(Duration::from_millis(timeout_interval), recv.recv())
.await
{
Ok(value) => match value {
Some((sig, tx, slot)) => {
if self.txs_sent_store.contains_key(&sig) {
// duplicate transaction
continue;
}
TXS_IN_CHANNEL.dec();
sigs_and_slots.push((sig, slot));
txs.push(tx);
// update the timeout inteval
timeout_interval = timeout_interval
.saturating_sub(instance.elapsed().as_millis() as u64).max(1);
.saturating_sub(instance.elapsed().as_millis() as u64)
.max(1);
}
None => {
bail!("Channel Disconnected");
}
},
Err(_) => {
permit = semaphore.clone().try_acquire_owned().ok();
if permit.is_some() {
break;
} else {
// already timed out, but could not get a permit
timeout_interval = 1;
}
break;
}
}
}
@ -231,30 +195,10 @@ impl TxSender {
continue;
}
let permit = match permit {
Some(permit) => permit,
None => {
// get the permit
semaphore.clone().acquire_owned().await.unwrap()
}
};
if !txs.is_empty() {
TX_BATCH_SIZES.set(txs.len() as i64);
let postgres = postgres_send.clone();
let tx_sender = self.clone();
tokio::spawn(async move {
tx_sender
.forward_txs(
sigs_and_slots,
txs,
postgres,
permit,
tasks_counter.clone(),
)
.await;
});
}
TX_BATCH_SIZES.set(txs.len() as i64);
tx_sender
.forward_txs(sigs_and_slots, txs, postgres_send.clone())
.await;
}
})
}

View File

@ -1,7 +1,7 @@
use bench::helpers::BenchHelper;
use lite_rpc::DEFAULT_LITE_RPC_ADDR;
use log::info;
use solana_rpc_client::{nonblocking::rpc_client::RpcClient, rpc_client::SerializableTransaction};
use solana_rpc_client::nonblocking::rpc_client::RpcClient;
use solana_sdk::commitment_config::CommitmentConfig;
#[tokio::test]
@ -13,15 +13,17 @@ async fn send_and_confirm_txs_get_signature_statuses() {
let funded_payer = BenchHelper::get_payer().await.unwrap();
let blockhash = rpc_client.get_latest_blockhash().await.unwrap();
let tx = &BenchHelper::generate_txs(1, &funded_payer, blockhash)[0];
let sig = tx.get_signature();
let txs = BenchHelper::generate_txs(5, &funded_payer, blockhash, Some(1));
let signatures = txs.iter().map(|x| x.signatures[0]).collect::<Vec<_>>();
rpc_client.send_transaction(tx).await.unwrap();
info!("{sig}");
BenchHelper::wait_till_signature_status(&rpc_client, sig, CommitmentConfig::confirmed())
.await
.unwrap();
for tx in txs {
rpc_client.send_transaction(&tx).await.unwrap();
}
for sig in signatures {
BenchHelper::wait_till_signature_status(&rpc_client, &sig, CommitmentConfig::confirmed())
.await
.unwrap();
}
}
#[tokio::test]
@ -31,10 +33,11 @@ async fn send_and_confirm_tx_rpc_client() {
let funded_payer = BenchHelper::get_payer().await.unwrap();
let blockhash = rpc_client.get_latest_blockhash().await.unwrap();
let tx = &BenchHelper::generate_txs(1, &funded_payer, blockhash)[0];
let sig = tx.get_signature();
let txs = BenchHelper::generate_txs(5, &funded_payer, blockhash, Some(2));
rpc_client.send_and_confirm_transaction(tx).await.unwrap();
for tx in txs {
rpc_client.send_and_confirm_transaction(&tx).await.unwrap();
info!("Sent and Confirmed {sig}");
info!("Sent and Confirmed {}", tx.signatures[0]);
}
}

View File

@ -1,13 +1,15 @@
use std::{sync::Arc, time::Duration};
use std::{
sync::{atomic::AtomicU64, Arc},
time::Duration,
};
use bench::helpers::BenchHelper;
use futures::future::try_join_all;
use lite_rpc::{
block_store::BlockStore,
encoding::BinaryEncoding,
tpu_manager::TpuManager,
workers::{BlockListener, TxSender},
DEFAULT_RPC_ADDR, DEFAULT_TX_BATCH_INTERVAL_MS, DEFAULT_TX_BATCH_SIZE, DEFAULT_WS_ADDR,
workers::{tpu_utils::tpu_service::TpuService, BlockListener, TxSender},
DEFAULT_RPC_ADDR, DEFAULT_WS_ADDR,
};
use solana_rpc_client::nonblocking::rpc_client::RpcClient;
@ -18,35 +20,32 @@ use tokio::sync::mpsc;
#[tokio::test]
async fn send_and_confirm_txs() {
let rpc_client = Arc::new(RpcClient::new(DEFAULT_RPC_ADDR.to_string()));
let current_slot = rpc_client.get_slot().await.unwrap();
let tpu_client = Arc::new(
TpuManager::new(
rpc_client.clone(),
DEFAULT_WS_ADDR.into(),
Default::default(),
Keypair::new(),
)
.await
.unwrap(),
);
let tpu_service = TpuService::new(
Arc::new(AtomicU64::new(current_slot)),
32,
Arc::new(Keypair::new()),
rpc_client.clone(),
DEFAULT_WS_ADDR.into(),
)
.await
.unwrap();
let tpu_client = Arc::new(tpu_service);
let tx_sender = TxSender::new(tpu_client);
let block_store = BlockStore::new(&rpc_client).await.unwrap();
let block_listener = BlockListener::new(rpc_client.clone(), tx_sender.clone(), block_store);
let (tx_send, tx_recv) = mpsc::unbounded_channel();
let (tx_send, tx_recv) = mpsc::channel(1024);
let services = try_join_all(vec![
block_listener
.clone()
.listen(CommitmentConfig::confirmed(), None),
tx_sender.clone().execute(
tx_recv,
DEFAULT_TX_BATCH_SIZE,
Duration::from_millis(DEFAULT_TX_BATCH_INTERVAL_MS),
None,
),
tx_sender.clone().execute(tx_recv, None),
]);
let confirm = tokio::spawn(async move {
@ -59,9 +58,7 @@ async fn send_and_confirm_txs() {
let tx = BinaryEncoding::Base58.encode(bincode::serialize(&tx).unwrap());
let sig = sig.to_string();
tx_send
.send((sig.clone(), tx.as_bytes().to_vec(), 0))
.unwrap();
let _ = tx_send.send((sig.clone(), tx.as_bytes().to_vec(), 0)).await;
for _ in 0..2 {
let tx_status = tx_sender.txs_sent_store.get(&sig).unwrap();