Merge pull request #135 from blockworks-foundation/block_listner_on_estimated_slots

Block listner on estimated slots
This commit is contained in:
galactus 2023-05-02 15:46:19 +02:00 committed by GitHub
commit e5e3c164b1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 284 additions and 194 deletions

1
Cargo.lock generated
View File

@ -427,6 +427,7 @@ dependencies = [
"anyhow",
"clap 4.2.4",
"csv",
"dashmap",
"dirs",
"futures",
"log",

View File

@ -18,3 +18,4 @@ dirs = "5.0.0"
rand = "0.8.5"
rand_chacha = "0.3.1"
futures = { workspace = true }
dashmap = {workspace = true }

View File

@ -1,5 +1,3 @@
use std::{str::FromStr, time::Duration};
use anyhow::Context;
use rand::{distributions::Alphanumeric, prelude::Distribution, SeedableRng};
use solana_rpc_client::nonblocking::rpc_client::RpcClient;
@ -14,6 +12,7 @@ use solana_sdk::{
system_instruction,
transaction::Transaction,
};
use std::{str::FromStr, time::Duration};
use tokio::time::Instant;
const MEMO_PROGRAM_ID: &str = "MemoSq4gqABAXKb96qnH8TysNcWxMyWCqXgDLGmfcHr";
@ -69,6 +68,14 @@ impl BenchHelper {
Transaction::new(&[funded_payer], message, blockhash)
}
pub fn generate_random_strings(num_of_txs: usize, random_seed: Option<u64>) -> Vec<Vec<u8>> {
let seed = random_seed.map_or(0, |x| x);
let mut rng = rand_chacha::ChaCha8Rng::seed_from_u64(seed);
(0..num_of_txs)
.map(|_| Alphanumeric.sample_iter(&mut rng).take(10).collect())
.collect()
}
#[inline]
pub fn generate_txs(
num_of_txs: usize,

View File

@ -1,20 +1,23 @@
use std::{sync::Arc, collections::HashMap};
use bench::{
cli::Args,
helpers::BenchHelper,
metrics::{AvgMetric, Metric},
};
use clap::Parser;
use dashmap::DashMap;
use futures::future::join_all;
use log::{error, info};
use solana_rpc_client::nonblocking::rpc_client::RpcClient;
use solana_sdk::{commitment_config::CommitmentConfig, signer::Signer, signature::Keypair};
use solana_sdk::{
commitment_config::CommitmentConfig, hash::Hash, signature::Keypair, signer::Signer,
};
use std::sync::Arc;
use tokio::{
sync::RwLock,
time::{Duration, Instant},
};
#[tokio::main]
#[tokio::main(flavor = "multi_thread", worker_threads = 16)]
async fn main() {
tracing_subscriber::fmt::init();
@ -43,9 +46,36 @@ async fn main() {
lite_rpc_addr.clone(),
CommitmentConfig::confirmed(),
));
let bh = rpc_client.get_latest_blockhash().await.unwrap();
let block_hash: Arc<RwLock<Hash>> = Arc::new(RwLock::new(bh));
let _jh = {
// block hash updater task
let block_hash = block_hash.clone();
let rpc_client = rpc_client.clone();
tokio::spawn(async move {
loop {
let bh = rpc_client.get_latest_blockhash().await;
match bh {
Ok(bh) => {
let mut lock = block_hash.write().await;
*lock = bh;
},
Err(e) => println!("blockhash update error {}", e),
}
tokio::time::sleep(Duration::from_millis(500)).await;
}
})
};
for seed in 0..runs {
let funded_payer = Keypair::from_bytes(funded_payer.to_bytes().as_slice()).unwrap();
tasks.push(tokio::spawn(bench(rpc_client.clone(), tx_count, funded_payer, seed as u64)));
tasks.push(tokio::spawn(bench(
rpc_client.clone(),
tx_count,
funded_payer,
seed as u64,
block_hash.clone(),
)));
// wait for an interval
run_interval_ms.tick().await;
}
@ -82,40 +112,73 @@ struct TxSendData {
sent_instant: Instant,
}
async fn bench(rpc_client: Arc<RpcClient>, tx_count: usize, funded_payer: Keypair, seed: u64) -> Metric {
let blockhash = rpc_client.get_latest_blockhash().await.unwrap();
async fn bench(
rpc_client: Arc<RpcClient>,
tx_count: usize,
funded_payer: Keypair,
seed: u64,
block_hash: Arc<RwLock<Hash>>,
) -> Metric {
let map_of_txs = Arc::new(DashMap::new());
// transaction sender task
{
let map_of_txs = map_of_txs.clone();
let rpc_client = rpc_client.clone();
tokio::spawn(async move {
let map_of_txs = map_of_txs.clone();
let rand_strings = BenchHelper::generate_random_strings(tx_count, Some(seed));
let txs = BenchHelper::generate_txs(tx_count, &funded_payer, blockhash, Some(seed));
for rand_string in rand_strings {
let blockhash = { *block_hash.read().await };
let tx = BenchHelper::create_memo_tx(&rand_string, &funded_payer, blockhash);
let start_time = Instant::now();
if let Ok(signature) = rpc_client.send_transaction(&tx).await {
map_of_txs.insert(
signature,
TxSendData {
sent_duration: start_time.elapsed(),
sent_instant: Instant::now(),
},
);
}
}
});
}
let mut metric = Metric::default();
let mut map_of_txs = HashMap::new();
for tx in txs {
let rpc_client = rpc_client.clone();
let start_time = Instant::now();
if let Ok(signature) = rpc_client.send_transaction(&tx).await {
map_of_txs.insert( signature, TxSendData {
sent_duration: start_time.elapsed(),
sent_instant: Instant::now(),
});
}
}
let confirmation_time = Instant::now();
while confirmation_time.elapsed() < Duration::from_secs(60) && !map_of_txs.is_empty() {
let signatures = map_of_txs.iter().map(|x| x.0.clone()).collect::<Vec<_>>();
let mut confirmed_count = 0;
while confirmation_time.elapsed() < Duration::from_secs(60)
&& !(map_of_txs.is_empty() && confirmed_count == tx_count)
{
let signatures = map_of_txs
.iter()
.map(|x| x.key().clone())
.collect::<Vec<_>>();
if signatures.is_empty() {
tokio::time::sleep(Duration::from_millis(1)).await;
continue;
}
if let Ok(res) = rpc_client.get_signature_statuses(&signatures).await {
for i in 0..signatures.len() {
let tx_status = &res.value[i];
if let Some(_) = tx_status {
let signature = signatures[i];
let tx_data = map_of_txs.get(&signature).unwrap();
metric.add_successful_transaction( tx_data.sent_duration, tx_data.sent_instant.elapsed());
metric.add_successful_transaction(
tx_data.sent_duration,
tx_data.sent_instant.elapsed(),
);
drop(tx_data);
map_of_txs.remove(&signature);
confirmed_count += 1;
}
}
}
}
for (_, tx) in map_of_txs {
for tx in map_of_txs.iter() {
metric.add_unsuccessful_transaction(tx.sent_duration);
}
metric.finalize();

View File

@ -38,11 +38,13 @@ impl Metric {
pub fn finalize(&mut self) {
if self.txs_sent > 0 {
self.average_time_to_send_txs = self.total_sent_time.as_millis() as f64 / self.txs_sent as f64;
self.average_time_to_send_txs =
self.total_sent_time.as_millis() as f64 / self.txs_sent as f64;
}
if self.txs_confirmed > 0 {
self.average_confirmation_time_ms = self.total_confirmation_time.as_millis() as f64 / self.txs_confirmed as f64;
self.average_confirmation_time_ms =
self.total_confirmation_time.as_millis() as f64 / self.txs_confirmed as f64;
}
}
}
@ -81,8 +83,10 @@ impl DivAssign<u64> for Metric {
self.txs_confirmed /= rhs;
self.txs_un_confirmed /= rhs;
self.total_confirmation_time = Duration::from_micros((self.total_confirmation_time.as_micros() / rhs as u128) as u64);
self.total_sent_time = Duration::from_micros((self.total_sent_time.as_micros() / rhs as u128) as u64);
self.total_confirmation_time =
Duration::from_micros((self.total_confirmation_time.as_micros() / rhs as u128) as u64);
self.total_sent_time =
Duration::from_micros((self.total_sent_time.as_micros() / rhs as u128) as u64);
self.finalize();
}
}

View File

@ -5,7 +5,7 @@ use crate::{
rpc::LiteRpcServer,
workers::{
tpu_utils::tpu_service::TpuService, BlockListener, Cleaner, MetricsCapture, Postgres,
PrometheusSync, TransactionReplay, TransactionReplayer, TxSender, WireTransaction,
PrometheusSync, TransactionReplay, TransactionReplayer, TxProps, TxSender, WireTransaction,
MESSAGES_IN_REPLAY_QUEUE,
},
DEFAULT_MAX_NUMBER_OF_TXS_IN_QUEUE,
@ -15,6 +15,7 @@ use std::{ops::Deref, str::FromStr, sync::Arc, time::Duration};
use anyhow::bail;
use dashmap::DashMap;
use log::{error, info};
use jsonrpsee::{core::SubscriptionResult, server::ServerBuilder, PendingSubscriptionSink};
@ -82,17 +83,21 @@ impl LiteBridge {
let rpc_client = Arc::new(RpcClient::new(rpc_url.clone()));
let current_slot = rpc_client.get_slot().await?;
let tx_store: Arc<DashMap<String, TxProps>> = Default::default();
let tpu_service = TpuService::new(
current_slot,
fanout_slots,
Arc::new(identity),
rpc_client.clone(),
ws_addr,
tx_store.clone(),
)
.await?;
let tpu_service = Arc::new(tpu_service);
let tx_sender = TxSender::new(tpu_service.clone());
let tx_sender = TxSender::new(tx_store, tpu_service.clone());
let block_store = BlockStore::new(&rpc_client).await?;
@ -152,15 +157,17 @@ impl LiteBridge {
let metrics_capture = MetricsCapture::new(self.tx_sender.clone()).capture();
let prometheus_sync = PrometheusSync.sync(prometheus_addr);
let finalized_block_listener = self
.block_listner
.clone()
.listen(CommitmentConfig::finalized(), postgres_send.clone());
let finalized_block_listener = self.block_listner.clone().listen(
CommitmentConfig::finalized(),
postgres_send.clone(),
self.tpu_service.get_estimated_slot_holder(),
);
let confirmed_block_listener = self
.block_listner
.clone()
.listen(CommitmentConfig::confirmed(), None);
let confirmed_block_listener = self.block_listner.clone().listen(
CommitmentConfig::confirmed(),
None,
self.tpu_service.get_estimated_slot_holder(),
);
let processed_block_listener = self.block_listner.clone().listen_processed();

View File

@ -1,5 +1,8 @@
use std::{
sync::{atomic::AtomicU64, Arc},
sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
time::Duration,
};
@ -406,10 +409,11 @@ impl BlockListener {
self,
commitment_config: CommitmentConfig,
postgres: Option<PostgresMpscSend>,
estimated_slot: Arc<AtomicU64>,
) -> JoinHandle<anyhow::Result<()>> {
let (slot_retry_queue_sx, mut slot_retry_queue_rx) = tokio::sync::mpsc::unbounded_channel();
let (block_schedule_queue_sx, block_schedule_queue_rx) =
async_channel::unbounded::<(Slot, u8)>();
async_channel::unbounded::<Slot>();
// task to fetch blocks
for _i in 0..8 {
@ -420,7 +424,7 @@ impl BlockListener {
tokio::spawn(async move {
loop {
let (slot, error_count) = match block_schedule_queue_rx.recv().await {
let slot = match block_schedule_queue_rx.recv().await {
Ok(v) => v,
Err(e) => {
error!("Recv error on block channel {}", e);
@ -439,24 +443,12 @@ impl BlockListener {
.await
.is_err()
{
// usually as we index all the slots even if they are not been processed we get some errors for slot
// as they are not in long term storage of the rpc // we check 5 times before ignoring the slot
if error_count > 5 {
// retried for 10 times / there should be no block for this slot
warn!(
"unable to get block at slot {} and commitment {}",
slot, commitment_config.commitment
);
continue;
} else {
// add a task to be queued after a delay
let retry_at = tokio::time::Instant::now()
.checked_add(Duration::from_millis(100))
.unwrap();
let _ = slot_retry_queue_sx.send((slot, error_count, retry_at));
BLOCKS_IN_RETRY_QUEUE.inc();
}
// add a task to be queued after a delay
let retry_at = tokio::time::Instant::now()
.checked_add(Duration::from_millis(10))
.unwrap();
let _ = slot_retry_queue_sx.send((slot, retry_at));
BLOCKS_IN_RETRY_QUEUE.inc();
};
}
});
@ -469,11 +461,11 @@ impl BlockListener {
let block_schedule_queue_sx = block_schedule_queue_sx.clone();
let recent_slot = recent_slot.clone();
tokio::spawn(async move {
while let Some((slot, error_count, instant)) = slot_retry_queue_rx.recv().await {
while let Some((slot, instant)) = slot_retry_queue_rx.recv().await {
BLOCKS_IN_RETRY_QUEUE.dec();
let recent_slot = recent_slot.load(std::sync::atomic::Ordering::Relaxed);
// if slot is too old ignore
if recent_slot.saturating_sub(slot) > 256 {
if recent_slot.saturating_sub(slot) > 128 {
// slot too old to retry
// most probably its an empty slot
continue;
@ -483,7 +475,7 @@ impl BlockListener {
if now < instant {
tokio::time::sleep_until(instant).await;
}
if let Ok(_) = block_schedule_queue_sx.send((slot, error_count + 1)).await {
if let Ok(_) = block_schedule_queue_sx.send(slot).await {
if commitment_config.is_finalized() {
BLOCKS_IN_FINALIZED_QUEUE.inc();
} else {
@ -494,7 +486,6 @@ impl BlockListener {
});
}
let rpc_client = self.rpc_client.clone();
tokio::spawn(async move {
info!("{commitment_config:?} block listner started");
@ -504,24 +495,14 @@ impl BlockListener {
.await
.slot;
// -5 for warmup
let mut last_latest_slot = last_latest_slot - 5;
let mut last_latest_slot = last_latest_slot.saturating_sub(5);
recent_slot.store(last_latest_slot, std::sync::atomic::Ordering::Relaxed);
// storage for recent slots processed
let rpc_client = rpc_client.clone();
loop {
let new_slot = match rpc_client.get_slot_with_commitment(commitment_config).await {
Ok(new_slot) => new_slot,
Err(err) => {
warn!("Error while fetching slot {err:?}");
ERRORS_WHILE_FETCHING_SLOTS.inc();
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
continue;
}
};
let new_slot = estimated_slot.load(Ordering::Relaxed);
if last_latest_slot == new_slot {
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
continue;
}
@ -530,7 +511,7 @@ impl BlockListener {
// context for lock
{
for slot in new_block_slots {
if let Err(e) = block_schedule_queue_sx.send((slot, 0)).await {
if let Err(e) = block_schedule_queue_sx.send(slot).await {
error!("error sending of block schedule queue {}", e);
} else {
if commitment_config.is_finalized() {
@ -544,8 +525,6 @@ impl BlockListener {
last_latest_slot = new_slot;
recent_slot.store(last_latest_slot, std::sync::atomic::Ordering::Relaxed);
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
}
})
}

View File

@ -1,5 +1,5 @@
use std::{
collections::HashMap,
collections::{HashMap, VecDeque},
net::{IpAddr, Ipv4Addr, SocketAddr},
sync::{
atomic::{AtomicBool, AtomicU64, Ordering},
@ -17,7 +17,6 @@ use quinn::{
};
use solana_sdk::{
pubkey::Pubkey,
quic::{QUIC_MAX_STAKED_RECEIVE_WINDOW_RATIO, QUIC_MIN_STAKED_RECEIVE_WINDOW_RATIO},
};
use solana_streamer::nonblocking::quic::compute_max_allowed_uni_streams;
use tokio::{
@ -25,10 +24,12 @@ use tokio::{
time::timeout,
};
use crate::workers::TxProps;
use super::{rotating_queue::RotatingQueue, tpu_service::IdentityStakes};
pub const ALPN_TPU_PROTOCOL_ID: &[u8] = b"solana-tpu";
const QUIC_CONNECTION_TIMEOUT_DURATION_IN_SEC: Duration = Duration::from_secs(10);
const QUIC_CONNECTION_TIMEOUT_DURATION_IN_SEC: Duration = Duration::from_secs(1);
const CONNECTION_RETRY_COUNT: usize = 10;
lazy_static::lazy_static! {
@ -43,19 +44,26 @@ lazy_static::lazy_static! {
}
struct ActiveConnection {
pub endpoint: Endpoint,
pub identity: Pubkey,
pub tpu_address: SocketAddr,
pub exit_signal: Arc<AtomicBool>,
endpoint: Endpoint,
identity: Pubkey,
tpu_address: SocketAddr,
exit_signal: Arc<AtomicBool>,
txs_sent_store: Arc<DashMap<String, TxProps>>,
}
impl ActiveConnection {
pub fn new(endpoint: Endpoint, tpu_address: SocketAddr, identity: Pubkey) -> Self {
pub fn new(
endpoint: Endpoint,
tpu_address: SocketAddr,
identity: Pubkey,
txs_sent_store: Arc<DashMap<String, TxProps>>,
) -> Self {
Self {
endpoint,
tpu_address,
identity,
exit_signal: Arc::new(AtomicBool::new(false)),
txs_sent_store,
}
}
@ -165,10 +173,11 @@ impl ActiveConnection {
identity,
e
);
return true;
}
}
Err(_) => {
warn!("timeout while writing transaction for {}", identity);
warn!("timeout while finishing transaction for {}", identity);
}
}
@ -204,8 +213,13 @@ impl ActiveConnection {
exit_signal: Arc<AtomicBool>,
last_stable_id: Arc<AtomicU64>,
) {
for _ in 0..3 {
if exit_signal.load(Ordering::Relaxed) {
let mut queue = VecDeque::new();
for tx in txs {
queue.push_back(tx);
}
for _ in 0..CONNECTION_RETRY_COUNT {
if queue.is_empty() || exit_signal.load(Ordering::Relaxed) {
// return
return;
}
@ -244,7 +258,8 @@ impl ActiveConnection {
}
};
let mut retry = false;
for tx in &txs {
while !queue.is_empty() {
let tx = queue.pop_front().unwrap();
let (stream, retry_conn) =
Self::open_unistream(conn.clone(), last_stable_id.clone()).await;
if let Some(send_stream) = stream {
@ -254,7 +269,7 @@ impl ActiveConnection {
retry = Self::write_all(
send_stream,
tx,
&tx,
identity,
last_stable_id.clone(),
conn.stable_id() as u64,
@ -263,6 +278,10 @@ impl ActiveConnection {
} else {
retry = retry_conn;
}
if retry {
queue.push_back(tx);
break;
}
}
if !retry {
break;
@ -270,36 +289,25 @@ impl ActiveConnection {
}
}
// copied from solana code base
fn compute_receive_window_ratio_for_staked_node(
max_stake: u64,
min_stake: u64,
stake: u64,
) -> u64 {
if stake > max_stake {
return QUIC_MAX_STAKED_RECEIVE_WINDOW_RATIO;
}
let max_ratio = QUIC_MAX_STAKED_RECEIVE_WINDOW_RATIO;
let min_ratio = QUIC_MIN_STAKED_RECEIVE_WINDOW_RATIO;
if max_stake > min_stake {
let a = (max_ratio - min_ratio) as f64 / (max_stake - min_stake) as f64;
let b = max_ratio as f64 - ((max_stake as f64) * a);
let ratio = (a * stake as f64) + b;
ratio.round() as u64
} else {
QUIC_MAX_STAKED_RECEIVE_WINDOW_RATIO
fn check_for_confirmation(
txs_sent_store: &Arc<DashMap<String, TxProps>>,
signature: String,
) -> bool {
match txs_sent_store.get(&signature) {
Some(props) => props.status.is_some(),
None => false,
}
}
async fn listen(
transaction_reciever: Receiver<Vec<u8>>,
transaction_reciever: Receiver<(String, Vec<u8>)>,
exit_oneshot_channel: tokio::sync::mpsc::Receiver<()>,
endpoint: Endpoint,
addr: SocketAddr,
exit_signal: Arc<AtomicBool>,
identity: Pubkey,
identity_stakes: IdentityStakes,
txs_sent_store: Arc<DashMap<String, TxProps>>,
) {
NB_QUIC_ACTIVE_CONNECTIONS.inc();
let mut transaction_reciever = transaction_reciever;
@ -310,16 +318,7 @@ impl ActiveConnection {
identity_stakes.stakes,
identity_stakes.total_stakes,
) as u64;
let number_of_transactions_per_unistream = match identity_stakes.peer_type {
solana_streamer::nonblocking::quic::ConnectionPeerType::Staked => {
Self::compute_receive_window_ratio_for_staked_node(
identity_stakes.max_stakes,
identity_stakes.min_stakes,
identity_stakes.stakes,
)
}
solana_streamer::nonblocking::quic::ConnectionPeerType::Unstaked => 1,
};
let number_of_transactions_per_unistream = 5;
let task_counter: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
let mut connection: Option<Arc<RwLock<Connection>>> = None;
@ -331,71 +330,72 @@ impl ActiveConnection {
break;
}
if task_counter.load(Ordering::Relaxed) > max_uni_stream_connections {
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
if task_counter.load(Ordering::Relaxed) >= max_uni_stream_connections {
tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
continue;
}
tokio::select! {
tx_or_timeout = timeout(QUIC_CONNECTION_TIMEOUT_DURATION_IN_SEC, transaction_reciever.recv() ) => {
tx = transaction_reciever.recv() => {
// exit signal set
if exit_signal.load(Ordering::Relaxed) {
break;
}
match tx_or_timeout {
Ok(tx) => {
let first_tx: Vec<u8> = match tx {
Ok(tx) => tx,
Err(e) => {
error!(
"Broadcast channel error on recv for {} error {}",
identity, e
);
continue;
}
};
let mut txs = vec![first_tx];
for _ in 1..number_of_transactions_per_unistream {
if let Ok(tx) = transaction_reciever.try_recv() {
txs.push(tx);
}
let first_tx: Vec<u8> = match tx {
Ok((sig, tx)) => {
if Self::check_for_confirmation(&txs_sent_store, sig) {
// transaction is already confirmed/ no need to send
continue;
}
if connection.is_none() {
// initial connection
let conn = Self::connect(identity, false, endpoint.clone(), addr.clone(), exit_signal.clone()).await;
if let Some(conn) = conn {
// could connect
connection = Some(Arc::new(RwLock::new(conn)));
} else {
break;
}
}
let task_counter = task_counter.clone();
let endpoint = endpoint.clone();
let exit_signal = exit_signal.clone();
let addr = addr.clone();
let connection = connection.clone();
let last_stable_id = last_stable_id.clone();
tokio::spawn(async move {
task_counter.fetch_add(1, Ordering::Relaxed);
NB_QUIC_TASKS.inc();
let connection = connection.unwrap();
Self::send_transaction_batch(connection, txs, identity, endpoint, addr, exit_signal, last_stable_id).await;
NB_QUIC_TASKS.dec();
task_counter.fetch_sub(1, Ordering::Relaxed);
});
tokio::time::sleep(tokio::time::Duration::from_micros(100)).await;
tx
},
Err(_) => {
// timed out
Err(e) => {
error!(
"Broadcast channel error on recv for {} error {}",
identity, e
);
continue;
}
};
let mut txs = vec![first_tx];
for _ in 1..number_of_transactions_per_unistream {
if let Ok((signature, tx)) = transaction_reciever.try_recv() {
if Self::check_for_confirmation(&txs_sent_store, signature) {
continue;
}
txs.push(tx);
}
}
if connection.is_none() {
// initial connection
let conn = Self::connect(identity, false, endpoint.clone(), addr.clone(), exit_signal.clone()).await;
if let Some(conn) = conn {
// could connect
connection = Some(Arc::new(RwLock::new(conn)));
} else {
break;
}
}
let task_counter = task_counter.clone();
let endpoint = endpoint.clone();
let exit_signal = exit_signal.clone();
let addr = addr.clone();
let connection = connection.clone();
let last_stable_id = last_stable_id.clone();
tokio::spawn(async move {
task_counter.fetch_add(1, Ordering::Relaxed);
NB_QUIC_TASKS.inc();
let connection = connection.unwrap();
Self::send_transaction_batch(connection, txs, identity, endpoint, addr, exit_signal, last_stable_id).await;
NB_QUIC_TASKS.dec();
task_counter.fetch_sub(1, Ordering::Relaxed);
});
},
_ = exit_oneshot_channel.recv() => {
break;
@ -409,7 +409,7 @@ impl ActiveConnection {
pub fn start_listening(
&self,
transaction_reciever: Receiver<Vec<u8>>,
transaction_reciever: Receiver<(String, Vec<u8>)>,
exit_oneshot_channel: tokio::sync::mpsc::Receiver<()>,
identity_stakes: IdentityStakes,
) {
@ -417,6 +417,7 @@ impl ActiveConnection {
let addr = self.tpu_address;
let exit_signal = self.exit_signal.clone();
let identity = self.identity;
let txs_sent_store = self.txs_sent_store.clone();
tokio::spawn(async move {
Self::listen(
transaction_reciever,
@ -426,6 +427,7 @@ impl ActiveConnection {
exit_signal,
identity,
identity_stakes,
txs_sent_store,
)
.await;
});
@ -488,16 +490,22 @@ impl TpuConnectionManager {
pub async fn update_connections(
&self,
transaction_sender: Arc<Sender<Vec<u8>>>,
transaction_sender: Arc<Sender<(String, Vec<u8>)>>,
connections_to_keep: HashMap<Pubkey, SocketAddr>,
identity_stakes: IdentityStakes,
txs_sent_store: Arc<DashMap<String, TxProps>>,
) {
NB_CONNECTIONS_TO_KEEP.set(connections_to_keep.len() as i64);
for (identity, socket_addr) in &connections_to_keep {
if self.identity_to_active_connection.get(identity).is_none() {
trace!("added a connection for {}, {}", identity, socket_addr);
let endpoint = self.endpoints.get();
let active_connection = ActiveConnection::new(endpoint, *socket_addr, *identity);
let active_connection = ActiveConnection::new(
endpoint,
*socket_addr,
*identity,
txs_sent_store.clone(),
);
// using mpsc as a oneshot channel/ because with one shot channel we cannot reuse the reciever
let (sx, rx) = tokio::sync::mpsc::channel(1);

View File

@ -29,13 +29,15 @@ use tokio::{
time::{Duration, Instant},
};
use crate::workers::TxProps;
use super::tpu_connection_manager::TpuConnectionManager;
const CACHE_NEXT_SLOT_LEADERS_PUBKEY_SIZE: usize = 1024; // Save pubkey and contact info of next 1024 leaders in the queue
const CLUSTERINFO_REFRESH_TIME: u64 = 60; // refresh cluster every minute
const LEADER_SCHEDULE_UPDATE_INTERVAL: u64 = 10; // update leader schedule every 10s
const AVERAGE_SLOT_CHANGE_TIME_IN_MILLIS: u64 = 400;
const MAXIMUM_TRANSACTIONS_IN_QUEUE: usize = 16384;
const MAXIMUM_TRANSACTIONS_IN_QUEUE: usize = 200_000;
lazy_static::lazy_static! {
static ref NB_CLUSTER_NODES: GenericGauge<prometheus::core::AtomicI64> =
@ -65,10 +67,11 @@ pub struct TpuService {
fanout_slots: u64,
rpc_client: Arc<RpcClient>,
pubsub_client: Arc<PubsubClient>,
broadcast_sender: Arc<tokio::sync::broadcast::Sender<Vec<u8>>>,
broadcast_sender: Arc<tokio::sync::broadcast::Sender<(String, Vec<u8>)>>,
tpu_connection_manager: Arc<TpuConnectionManager>,
identity: Arc<Keypair>,
identity_stakes: Arc<RwLock<IdentityStakes>>,
txs_sent_store: Arc<DashMap<String, TxProps>>,
}
#[derive(Debug, Copy, Clone)]
@ -99,6 +102,7 @@ impl TpuService {
identity: Arc<Keypair>,
rpc_client: Arc<RpcClient>,
rpc_ws_address: String,
txs_sent_store: Arc<DashMap<String, TxProps>>,
) -> anyhow::Result<Self> {
let pubsub_client = PubsubClient::new(&rpc_ws_address).await?;
let (sender, _) = tokio::sync::broadcast::channel(MAXIMUM_TRANSACTIONS_IN_QUEUE);
@ -123,6 +127,7 @@ impl TpuService {
tpu_connection_manager: Arc::new(tpu_connection_manager),
identity,
identity_stakes: Arc::new(RwLock::new(IdentityStakes::default())),
txs_sent_store,
})
}
@ -174,8 +179,8 @@ impl TpuService {
Ok(())
}
pub fn send_transaction(&self, transaction: Vec<u8>) -> anyhow::Result<()> {
self.broadcast_sender.send(transaction)?;
pub fn send_transaction(&self, signature: String, transaction: Vec<u8>) -> anyhow::Result<()> {
self.broadcast_sender.send((signature, transaction))?;
Ok(())
}
@ -271,6 +276,7 @@ impl TpuService {
self.broadcast_sender.clone(),
connections_to_keep,
*identity_stakes,
self.txs_sent_store.clone(),
)
.await;
}
@ -441,4 +447,8 @@ impl TpuService {
pub fn get_estimated_slot(&self) -> u64 {
self.estimated_slot.load(Ordering::Relaxed)
}
pub fn get_estimated_slot_holder(&self) -> Arc<AtomicU64> {
self.estimated_slot.clone()
}
}

View File

@ -63,7 +63,9 @@ impl TransactionReplayer {
continue;
}
// ignore reset error
let _ = tx_sender.tpu_service.send_transaction(tx_replay.tx.clone());
let _ = tx_sender
.tpu_service
.send_transaction(tx_replay.signature.clone(), tx_replay.tx.clone());
if tx_replay.replay_count < tx_replay.max_replay {
tx_replay.replay_count += 1;

View File

@ -70,10 +70,13 @@ impl Default for TxProps {
}
impl TxSender {
pub fn new(tpu_service: Arc<TpuService>) -> Self {
pub fn new(
txs_sent_store: Arc<DashMap<String, TxProps>>,
tpu_service: Arc<TpuService>,
) -> Self {
Self {
tpu_service,
txs_sent_store: Default::default(),
txs_sent_store: txs_sent_store,
}
}
@ -105,8 +108,9 @@ impl TxSender {
let forwarded_local_time = Utc::now();
let mut quic_responses = vec![];
for tx in txs {
let quic_response = match tpu_client.send_transaction(tx) {
for (tx, (signature, _)) in txs.iter().zip(sigs_and_slots.clone()) {
txs_sent.insert(signature.to_owned(), TxProps::default());
let quic_response = match tpu_client.send_transaction(signature.clone(), tx.clone()) {
Ok(_) => {
TXS_SENT.inc_by(1);
1

View File

@ -1,11 +1,12 @@
use std::{sync::Arc, time::Duration};
use bench::helpers::BenchHelper;
use dashmap::DashMap;
use futures::future::try_join_all;
use lite_rpc::{
block_store::BlockStore,
encoding::BinaryEncoding,
workers::{tpu_utils::tpu_service::TpuService, BlockListener, TxSender},
workers::{tpu_utils::tpu_service::TpuService, BlockListener, TxProps, TxSender},
DEFAULT_RPC_ADDR, DEFAULT_WS_ADDR,
};
use solana_rpc_client::nonblocking::rpc_client::RpcClient;
@ -18,20 +19,21 @@ use tokio::sync::mpsc;
async fn send_and_confirm_txs() {
let rpc_client = Arc::new(RpcClient::new(DEFAULT_RPC_ADDR.to_string()));
let current_slot = rpc_client.get_slot().await.unwrap();
let txs_sent_store: Arc<DashMap<String, TxProps>> = Default::default();
let tpu_service = TpuService::new(
current_slot,
32,
Arc::new(Keypair::new()),
rpc_client.clone(),
DEFAULT_WS_ADDR.into(),
txs_sent_store.clone(),
)
.await
.unwrap();
let tpu_client = Arc::new(tpu_service);
let tpu_client = Arc::new(tpu_service.clone());
let tx_sender = TxSender::new(tpu_client);
let tx_sender = TxSender::new(txs_sent_store, tpu_client);
let block_store = BlockStore::new(&rpc_client).await.unwrap();
let block_listener = BlockListener::new(rpc_client.clone(), tx_sender.clone(), block_store);
@ -39,9 +41,11 @@ async fn send_and_confirm_txs() {
let (tx_send, tx_recv) = mpsc::channel(1024);
let services = try_join_all(vec![
block_listener
.clone()
.listen(CommitmentConfig::confirmed(), None),
block_listener.clone().listen(
CommitmentConfig::confirmed(),
None,
tpu_service.get_estimated_slot_holder(),
),
tx_sender.clone().execute(tx_recv, None),
]);