Merge pull request #95 from blockworks-foundation/lite-rpc-send-task-async-batches

send_wire_transaction_batch is now spawn with a control on number of …
This commit is contained in:
galactus 2023-03-24 08:24:17 +01:00 committed by GitHub
commit 7b4af7a301
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 72 additions and 38 deletions

View File

@ -22,9 +22,9 @@ pub const DEFAULT_TX_MAX_RETRIES: u16 = 1;
#[from_env]
pub const DEFAULT_TX_BATCH_SIZE: usize = 512;
#[from_env]
pub const DEFAULT_FANOUT_SIZE: u64 = 32;
pub const DEFAULT_FANOUT_SIZE: u64 = 100;
#[from_env]
pub const DEFAULT_TX_BATCH_INTERVAL_MS: u64 = 100;
pub const DEFAULT_TX_BATCH_INTERVAL_MS: u64 = 10;
#[from_env]
pub const DEFAULT_CLEAN_INTERVAL_MS: u64 = 5 * 60 * 1000; // five minute
pub const DEFAULT_TRANSACTION_CONFIRMATION_STATUS: TransactionConfirmationStatus =

View File

@ -1,5 +1,8 @@
use std::{
sync::Arc,
sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
time::{Duration, Instant},
};
@ -38,10 +41,15 @@ lazy_static::lazy_static! {
))
.unwrap();
static ref TX_TIMED_OUT: GenericGauge<prometheus::core::AtomicI64> = register_int_gauge!(opts!("literpc_tx_timeout", "Number of transactions that timeout")).unwrap();
static ref TOKIO_SEND_TASKS: GenericGauge<prometheus::core::AtomicI64> = register_int_gauge!(opts!("literpc_send_tasks_count", "Number of tasks sending confirmed transactions")).unwrap();
}
pub type WireTransaction = Vec<u8>;
const NUMBER_OF_TX_SENDERS: usize = 5;
// making 250 as sleep time will effectively make lite rpc send
// (1000/250) * 5 * 512 = 10240 tps
const SLEEP_TIME_FOR_SENDING_TASK_MS: u64 = 250;
const MAX_NUMBER_OF_TOKIO_TASKS_SENDING_TXS: u64 = 1024;
/// Retry transactions to a maximum of `u16` times, keep a track of confirmed transactions
#[derive(Clone)]
@ -83,6 +91,7 @@ impl TxSender {
txs: Vec<WireTransaction>,
postgres: Option<PostgresMpscSend>,
permit: OwnedSemaphorePermit,
tasks_counter: Arc<AtomicU64>,
) {
assert_eq!(sigs_and_slots.len(), txs.len());
@ -101,43 +110,58 @@ impl TxSender {
}
let forwarded_slot = tpu_client.estimated_current_slot().await;
let quic_response = match tpu_client.try_send_wire_transaction_batch(txs).await {
Ok(_) => {
// metrics
TXS_SENT.inc_by(sigs_and_slots.len() as u64);
1
}
Err(err) => {
TXS_SENT_ERRORS.inc_by(sigs_and_slots.len() as u64);
warn!("{err}");
0
}
};
drop(permit);
let transaction_batch_size = txs.len() as u64;
let current_nb_tasks = tasks_counter.fetch_add(1, Ordering::Relaxed);
TOKIO_SEND_TASKS.set((current_nb_tasks + 1) as i64);
let tasks_counter_clone = tasks_counter.clone();
if let Some(postgres) = postgres {
for (sig, recent_slot) in &sigs_and_slots {
MESSAGES_IN_POSTGRES_CHANNEL.inc();
postgres
.send(PostgresMsg::PostgresTx(PostgresTx {
signature: sig.clone(),
recent_slot: *recent_slot as i64,
forwarded_slot: forwarded_slot as i64,
processed_slot: None,
cu_consumed: None,
cu_requested: None,
quic_response,
}))
.expect("Error writing to postgres service");
tokio::spawn(async move {
let quic_response = match tpu_client.try_send_wire_transaction_batch(txs).await {
Ok(_) => {
TXS_SENT.inc_by(transaction_batch_size);
1
}
Err(err) => {
TXS_SENT_ERRORS.inc_by(transaction_batch_size);
warn!("{err}");
0
}
};
tasks_counter.fetch_sub(1, Ordering::Relaxed);
if let Some(postgres) = postgres {
for (sig, recent_slot) in &sigs_and_slots {
MESSAGES_IN_POSTGRES_CHANNEL.inc();
postgres
.send(PostgresMsg::PostgresTx(PostgresTx {
signature: sig.clone(),
recent_slot: *recent_slot as i64,
forwarded_slot: forwarded_slot as i64,
processed_slot: None,
cu_consumed: None,
cu_requested: None,
quic_response,
}))
.expect("Error writing to postgres service");
}
}
histo_timer.observe_duration();
info!(
"It took {} ms to send a batch of {} transaction(s)",
start.elapsed().as_millis(),
sigs_and_slots.len()
);
});
loop {
tokio::time::sleep(Duration::from_millis(SLEEP_TIME_FOR_SENDING_TASK_MS)).await;
if tasks_counter_clone.load(std::sync::atomic::Ordering::Relaxed)
< MAX_NUMBER_OF_TOKIO_TASKS_SENDING_TXS
{
break;
}
// else currently tokio has lanched too many tasks wait for some of them to get finished
}
histo_timer.observe_duration();
info!(
"It took {} ms to send a batch of {} transaction(s)",
start.elapsed().as_millis(),
sigs_and_slots.len()
);
drop(permit);
}
/// retry and confirm transactions every 2ms (avg time to confirm tx)
@ -154,10 +178,14 @@ impl TxSender {
tx_send_interval.as_millis()
);
let semaphore = Arc::new(Semaphore::new(NUMBER_OF_TX_SENDERS));
// To limit the maximum number of background tasks sending transactions to MAX_NUMBER_OF_TOKIO_TASKS_SENDING_TXS
let tasks_counter = Arc::new(AtomicU64::new(0));
loop {
let mut sigs_and_slots = Vec::with_capacity(tx_batch_size);
let mut txs = Vec::with_capacity(tx_batch_size);
let mut permit = None;
let tasks_counter = tasks_counter.clone();
while txs.len() <= tx_batch_size {
match tokio::time::timeout(tx_send_interval, recv.recv()).await {
@ -200,7 +228,13 @@ impl TxSender {
let tx_sender = self.clone();
tokio::spawn(async move {
tx_sender
.forward_txs(sigs_and_slots, txs, postgres, permit)
.forward_txs(
sigs_and_slots,
txs,
postgres,
permit,
tasks_counter.clone(),
)
.await;
});
}