run batch execution async

This commit is contained in:
Maximilian Schneider 2023-04-18 23:20:13 +00:00
parent 465e628a64
commit 8b27a138c9
2 changed files with 27 additions and 20 deletions

View File

@ -21,13 +21,13 @@ use solana_sdk::{
compute_budget, hash::Hash, instruction::Instruction, message::Message, signature::Keypair, compute_budget, hash::Hash, instruction::Instruction, message::Message, signature::Keypair,
signer::Signer, transaction::Transaction, signer::Signer, transaction::Transaction,
}; };
use tokio::{sync::RwLock, task::JoinHandle}; use tokio::{sync::RwLock, task::{JoinHandle, self}};
use crate::{ use crate::{
helpers::{to_sdk_instruction, to_sp_pk}, helpers::{to_sdk_instruction, to_sp_pk},
mango::AccountKeys, mango::AccountKeys,
states::{PerpMarketCache, TransactionSendRecord}, states::{PerpMarketCache, TransactionSendRecord},
tpu_manager::TpuManager, tpu_manager::{TpuManager, self},
}; };
pub fn create_ask_bid_transaction( pub fn create_ask_bid_transaction(
@ -195,10 +195,13 @@ pub async fn send_mm_transactions(
batch_to_send.push((tx, record)); batch_to_send.push((tx, record));
} }
let tpu_manager = tpu_manager.clone();
task::spawn(async move{
if !tpu_manager.send_transaction_batch(&batch_to_send).await {
println!("sending failed on tpu client");
}
});
if !tpu_manager.send_transaction_batch(&batch_to_send).await {
println!("sending failed on tpu client");
}
} }
} }

View File

@ -144,29 +144,33 @@ impl TpuManager {
tpu_client.send_transaction(transaction).await tpu_client.send_transaction(transaction).await
} }
pub async fn send_transaction_batch( pub async fn send_transaction_batch(
&self, &self,
batch: &Vec<(solana_sdk::transaction::Transaction, TransactionSendRecord)>, batch: &Vec<(solana_sdk::transaction::Transaction, TransactionSendRecord)>,
) -> bool { ) -> bool {
let tpu_client = self.get_tpu_client().await; let tpu_client = self.get_tpu_client().await;
for (tx, record) in batch { for (tx, record) in batch {
self.stats.inc_send(&record.keeper_instruction);
self.stats let tx_sent_record = self.tx_send_record.clone();
.inc_send(&record.keeper_instruction); let sent = tx_sent_record.send(record.clone());
if sent.is_err() {
let tx_sent_record = self.tx_send_record.clone(); warn!(
let sent = tx_sent_record.send(record.clone()); "sending error on channel : {}",
if sent.is_err() { sent.err().unwrap().to_string()
warn!( );
"sending error on channel : {}", }
sent.err().unwrap().to_string()
);
}
} }
tpu_client.try_send_wire_transaction_batch(batch.iter().map(|(tx, _)| serialize(tx).expect("serialization should succeed")).collect()).await.is_ok() tpu_client
.try_send_wire_transaction_batch(
batch
.iter()
.map(|(tx, _)| serialize(tx).expect("serialization should succeed"))
.collect(),
)
.await
.is_ok()
} }
} }