add batched requests

This commit is contained in:
Kirill Lykov 2022-11-14 17:46:04 +00:00
parent b9dabc27c2
commit e6ead408a0
1 changed files with 83 additions and 10 deletions

View File

@ -321,6 +321,63 @@ fn send_mm_transactions(
}
}
fn send_mm_transactions_batched(
txs_batch_size: usize,
quotes_per_second: u64,
perp_market_caches: &Vec<PerpMarketCache>,
tx_record_sx: &Sender<TransactionSendRecord>,
tpu_client_pool: Arc<RotatingQueue<Arc<TpuClient>>>,
mango_account_pk: Pubkey,
mango_account_signer: &Keypair,
blockhash: Arc<RwLock<Hash>>,
slot: &AtomicU64,
) {
let mut transactions = Vec::<_>::with_capacity(txs_batch_size);
// update quotes 2x per second
for _ in 0..quotes_per_second {
for c in perp_market_caches.iter() {
for _ in 0..txs_batch_size {
transactions.push(create_ask_bid_transaction(
c,
mango_account_pk,
mango_account_signer.pubkey(),
));
}
if let Ok(recent_blockhash) = blockhash.read() {
for tx in &mut transactions {
tx.sign(&[mango_account_signer], *recent_blockhash);
}
}
let tpu_client = tpu_client_pool.get();
if tpu_client
.try_send_transaction_batch(&transactions)
.is_err()
{
error!("Sending batch failed");
continue;
}
for tx in &transactions {
let sent = tx_record_sx.send(TransactionSendRecord {
signature: tx.signatures[0],
sent_at: Utc::now(),
sent_slot: slot.load(Ordering::Acquire),
market_maker: mango_account_signer.pubkey(),
market: c.perp_market_pk,
});
if sent.is_err() {
error!(
"sending error on channel : {}",
sent.err().unwrap().to_string()
);
}
}
transactions.clear();
}
}
}
fn process_signature_confirmation_batch(
rpc_client: &RpcClient,
batch: &Vec<TransactionSendRecord>,
@ -803,6 +860,7 @@ fn main() {
block_data_save_file,
airdrop_accounts,
mango_cluster,
txs_batch_size,
..
} = &cli_config;
@ -984,6 +1042,7 @@ fn main() {
//sleep(Duration::from_secs(10));
let tx_record_sx = tx_record_sx.clone();
let txs_batch_size: Option<usize> = *txs_batch_size;
Builder::new()
.name("solana-client-sender".to_string())
@ -991,16 +1050,30 @@ fn main() {
for _i in 0..duration.as_secs() {
let start = Instant::now();
// send market maker transactions
send_mm_transactions(
quotes_per_second,
&perp_market_caches,
&tx_record_sx,
tpu_client_pool.clone(),
mango_account_pk,
&mango_account_signer,
blockhash.clone(),
current_slot.as_ref(),
);
if let Some(txs_batch_size) = txs_batch_size.clone() {
send_mm_transactions_batched(
txs_batch_size,
quotes_per_second,
&perp_market_caches,
&tx_record_sx,
tpu_client_pool.clone(),
mango_account_pk,
&mango_account_signer,
blockhash.clone(),
current_slot.as_ref(),
);
} else {
send_mm_transactions(
quotes_per_second,
&perp_market_caches,
&tx_record_sx,
tpu_client_pool.clone(),
mango_account_pk,
&mango_account_signer,
blockhash.clone(),
current_slot.as_ref(),
);
}
let elapsed_millis: u64 = start.elapsed().as_millis() as u64;
if elapsed_millis < 950 {