Making maximum time allocated to create a batch to 100ms, reducing fanout to 30
This commit is contained in:
parent
6352b23847
commit
e3016fe2a8
|
@ -21,10 +21,12 @@ pub const DEFAULT_WS_ADDR: &str = "ws://0.0.0.0:8900";
|
|||
pub const DEFAULT_TX_MAX_RETRIES: u16 = 1;
|
||||
#[from_env]
|
||||
pub const DEFAULT_TX_BATCH_SIZE: usize = 512;
|
||||
|
||||
/// 25 slots in 10s send to little more leaders
|
||||
#[from_env]
|
||||
pub const DEFAULT_FANOUT_SIZE: u64 = 100;
|
||||
pub const DEFAULT_FANOUT_SIZE: u64 = 30;
|
||||
#[from_env]
|
||||
pub const DEFAULT_TX_BATCH_INTERVAL_MS: u64 = 10;
|
||||
pub const DEFAULT_TX_BATCH_INTERVAL_MS: u64 = 100;
|
||||
#[from_env]
|
||||
pub const DEFAULT_CLEAN_INTERVAL_MS: u64 = 5 * 60 * 1000; // five minute
|
||||
pub const DEFAULT_TRANSACTION_CONFIRMATION_STATUS: TransactionConfirmationStatus =
|
||||
|
|
|
@ -45,7 +45,7 @@ lazy_static::lazy_static! {
|
|||
}
|
||||
|
||||
pub type WireTransaction = Vec<u8>;
|
||||
const NUMBER_OF_TX_SENDERS: usize = 5;
|
||||
const NUMBER_OF_TX_SENDERS: usize = 7;
|
||||
// making 250 as sleep time will effectively make lite rpc send
|
||||
// (1000/250) * 5 * 512 = 10240 tps
|
||||
const SLEEP_TIME_FOR_SENDING_TASK_MS: u64 = 250;
|
||||
|
@ -186,9 +186,18 @@ impl TxSender {
|
|||
let mut txs = Vec::with_capacity(tx_batch_size);
|
||||
let mut permit = None;
|
||||
let tasks_counter = tasks_counter.clone();
|
||||
let mut timeout_interval = tx_send_interval.as_millis() as u64;
|
||||
|
||||
let mut try_get_permit = || {
|
||||
permit = semaphore.clone().try_acquire_owned().ok();
|
||||
permit.is_some()
|
||||
};
|
||||
|
||||
while txs.len() <= tx_batch_size {
|
||||
match tokio::time::timeout(tx_send_interval, recv.recv()).await {
|
||||
let instance = tokio::time::Instant::now();
|
||||
match tokio::time::timeout(Duration::from_millis(timeout_interval), recv.recv())
|
||||
.await
|
||||
{
|
||||
Ok(value) => match value {
|
||||
Some((sig, tx, slot)) => {
|
||||
TXS_IN_CHANNEL.dec();
|
||||
|
@ -200,13 +209,21 @@ impl TxSender {
|
|||
}
|
||||
},
|
||||
Err(_) => {
|
||||
permit = semaphore.clone().try_acquire_owned().ok();
|
||||
if permit.is_some() {
|
||||
// we have a permit we can send collected transaction batch
|
||||
if try_get_permit() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
timeout_interval =
|
||||
timeout_interval.saturating_sub(instance.elapsed().as_millis() as u64);
|
||||
if timeout_interval < 100 {
|
||||
if try_get_permit() {
|
||||
break;
|
||||
} else {
|
||||
// could not get a permit continue batching with a small timeout
|
||||
timeout_interval = 100;
|
||||
}
|
||||
}
|
||||
}
|
||||
assert_eq!(sigs_and_slots.len(), txs.len());
|
||||
|
||||
|
|
Loading…
Reference in New Issue