run all keeper functions in a batch

This commit is contained in:
Maximilian Schneider 2023-04-19 11:29:26 +00:00
parent 8b27a138c9
commit e653eb96a9
3 changed files with 46 additions and 45 deletions

View File

@ -1,4 +1,6 @@
use log::warn;
use solana_sdk::compute_budget::ComputeBudgetInstruction; use solana_sdk::compute_budget::ComputeBudgetInstruction;
use tokio::spawn;
use { use {
crate::{ crate::{
@ -103,15 +105,14 @@ fn create_cache_perp_markets_instructions(perp_markets: &[PerpMarketCache]) -> I
to_sdk_instruction(ix) to_sdk_instruction(ix)
} }
pub async fn send_transaction( pub fn prepare_transaction(
tpu_manager: TpuManager,
mut ixs: Vec<Instruction>, mut ixs: Vec<Instruction>,
blockhash: Arc<RwLock<Hash>>, recent_blockhash: &Hash,
current_slot: Arc<AtomicU64>, current_slot: Arc<AtomicU64>,
payer: &Keypair, payer: &Keypair,
prioritization_fee: u64, prioritization_fee: u64,
keeper_instruction: KeeperInstruction, keeper_instruction: KeeperInstruction,
) { ) -> (Transaction, TransactionSendRecord) {
// add a noop with a current timestamp to ensure unique txs // add a noop with a current timestamp to ensure unique txs
ixs.push(noop::timestamp()); ixs.push(noop::timestamp());
// add priority fees // add priority fees
@ -119,7 +120,6 @@ pub async fn send_transaction(
prioritization_fee, prioritization_fee,
)); ));
let mut tx = Transaction::new_unsigned(Message::new(&ixs, Some(&payer.pubkey()))); let mut tx = Transaction::new_unsigned(Message::new(&ixs, Some(&payer.pubkey())));
let recent_blockhash = blockhash.read().await;
tx.sign(&[payer], *recent_blockhash); tx.sign(&[payer], *recent_blockhash);
let tx_send_record = TransactionSendRecord { let tx_send_record = TransactionSendRecord {
@ -131,7 +131,7 @@ pub async fn send_transaction(
priority_fees: prioritization_fee, priority_fees: prioritization_fee,
keeper_instruction: Some(keeper_instruction), keeper_instruction: Some(keeper_instruction),
}; };
tpu_manager.send_transaction(&tx, tx_send_record).await; return (tx, tx_send_record);
} }
pub fn create_update_and_cache_quote_banks( pub fn create_update_and_cache_quote_banks(
@ -184,76 +184,74 @@ pub fn start_keepers(
let quote_root_bank_ix = let quote_root_bank_ix =
create_update_and_cache_quote_banks(&perp_markets, quote_root_bank, quote_node_banks); create_update_and_cache_quote_banks(&perp_markets, quote_root_bank, quote_node_banks);
let blockhash = blockhash.clone();
while !exit_signal.load(Ordering::Relaxed) { while !exit_signal.load(Ordering::Relaxed) {
send_transaction( let recent_blockhash = blockhash.read().await.to_owned();
tpu_manager.clone(),
let mut tx_batch = vec![];
tx_batch.push(prepare_transaction(
cache_prices.clone(), cache_prices.clone(),
blockhash.clone(), &recent_blockhash,
current_slot.clone(), current_slot.clone(),
&authority, &authority,
prioritization_fee, prioritization_fee,
KeeperInstruction::CachePrice, KeeperInstruction::CachePrice,
) ));
.await;
send_transaction( tx_batch.push(prepare_transaction(
tpu_manager.clone(),
quote_root_bank_ix.clone(), quote_root_bank_ix.clone(),
blockhash.clone(), &recent_blockhash,
current_slot.clone(), current_slot.clone(),
&authority, &authority,
prioritization_fee, prioritization_fee,
KeeperInstruction::UpdateAndCacheQuoteRootBank, KeeperInstruction::UpdateAndCacheQuoteRootBank,
) ));
.await;
for updates in update_funding_ix.chunks(3) { for updates in update_funding_ix.chunks(3) {
send_transaction( tx_batch.push(prepare_transaction(
tpu_manager.clone(),
updates.to_vec(), updates.to_vec(),
blockhash.clone(), &recent_blockhash,
current_slot.clone(), current_slot.clone(),
&authority, &authority,
prioritization_fee, prioritization_fee,
KeeperInstruction::UpdateFunding, KeeperInstruction::UpdateFunding,
) ));
.await;
} }
tx_batch.push(prepare_transaction(
send_transaction(
tpu_manager.clone(),
root_update_ixs.clone(), root_update_ixs.clone(),
blockhash.clone(), &recent_blockhash,
current_slot.clone(), current_slot.clone(),
&authority, &authority,
prioritization_fee, prioritization_fee,
KeeperInstruction::UpdateRootBanks, KeeperInstruction::UpdateRootBanks,
) ));
.await;
send_transaction( tx_batch.push(prepare_transaction(
tpu_manager.clone(),
update_perp_cache.clone(), update_perp_cache.clone(),
blockhash.clone(), &recent_blockhash,
current_slot.clone(), current_slot.clone(),
&authority, &authority,
prioritization_fee, prioritization_fee,
KeeperInstruction::UpdatePerpCache, KeeperInstruction::UpdatePerpCache,
) ));
.await;
send_transaction( tx_batch.push(prepare_transaction(
tpu_manager.clone(),
cache_root_bank_ix.clone(), cache_root_bank_ix.clone(),
blockhash.clone(), &recent_blockhash,
current_slot.clone(), current_slot.clone(),
&authority, &authority,
prioritization_fee, prioritization_fee,
KeeperInstruction::CacheRootBanks, KeeperInstruction::CacheRootBanks,
) ));
.await;
let start_slot = current_slot.load(Ordering::Relaxed);
let start_time = Utc::now();
let tpu_manager = tpu_manager.clone();
spawn(async move {
if !tpu_manager.send_transaction_batch(&tx_batch).await {
warn!("issue when sending batch started slot={start_slot} time={start_time} hash={recent_blockhash:?}");
}
});
std::thread::sleep(std::time::Duration::from_secs(1)); std::thread::sleep(std::time::Duration::from_secs(1));
} }
}) })

View File

@ -21,13 +21,16 @@ use solana_sdk::{
compute_budget, hash::Hash, instruction::Instruction, message::Message, signature::Keypair, compute_budget, hash::Hash, instruction::Instruction, message::Message, signature::Keypair,
signer::Signer, transaction::Transaction, signer::Signer, transaction::Transaction,
}; };
use tokio::{sync::RwLock, task::{JoinHandle, self}}; use tokio::{
sync::RwLock,
task::{self, JoinHandle},
};
use crate::{ use crate::{
helpers::{to_sdk_instruction, to_sp_pk}, helpers::{to_sdk_instruction, to_sp_pk},
mango::AccountKeys, mango::AccountKeys,
states::{PerpMarketCache, TransactionSendRecord}, states::{PerpMarketCache, TransactionSendRecord},
tpu_manager::{TpuManager, self}, tpu_manager::TpuManager,
}; };
pub fn create_ask_bid_transaction( pub fn create_ask_bid_transaction(
@ -196,12 +199,11 @@ pub async fn send_mm_transactions(
} }
let tpu_manager = tpu_manager.clone(); let tpu_manager = tpu_manager.clone();
task::spawn(async move{ task::spawn(async move {
if !tpu_manager.send_transaction_batch(&batch_to_send).await { if !tpu_manager.send_transaction_batch(&batch_to_send).await {
println!("sending failed on tpu client"); println!("sending failed on tpu client");
} }
}); });
} }
} }

View File

@ -4,6 +4,7 @@ use solana_client::nonblocking::rpc_client::RpcClient;
use solana_client::{connection_cache::ConnectionCache, nonblocking::tpu_client::TpuClient}; use solana_client::{connection_cache::ConnectionCache, nonblocking::tpu_client::TpuClient};
use solana_quic_client::{QuicConfig, QuicConnectionManager, QuicPool}; use solana_quic_client::{QuicConfig, QuicConnectionManager, QuicPool};
use solana_sdk::signature::Keypair; use solana_sdk::signature::Keypair;
use solana_sdk::transaction::Transaction;
use std::{ use std::{
net::{IpAddr, Ipv4Addr}, net::{IpAddr, Ipv4Addr},
sync::{ sync::{
@ -146,11 +147,11 @@ impl TpuManager {
pub async fn send_transaction_batch( pub async fn send_transaction_batch(
&self, &self,
batch: &Vec<(solana_sdk::transaction::Transaction, TransactionSendRecord)>, batch: &Vec<(Transaction, TransactionSendRecord)>,
) -> bool { ) -> bool {
let tpu_client = self.get_tpu_client().await; let tpu_client = self.get_tpu_client().await;
for (tx, record) in batch { for (_tx, record) in batch {
self.stats.inc_send(&record.keeper_instruction); self.stats.inc_send(&record.keeper_instruction);
let tx_sent_record = self.tx_send_record.clone(); let tx_sent_record = self.tx_send_record.clone();