diff --git a/src/keeper.rs b/src/keeper.rs index 65875ac..2017255 100644 --- a/src/keeper.rs +++ b/src/keeper.rs @@ -1,4 +1,6 @@ +use log::warn; use solana_sdk::compute_budget::ComputeBudgetInstruction; +use tokio::spawn; use { crate::{ @@ -103,15 +105,14 @@ fn create_cache_perp_markets_instructions(perp_markets: &[PerpMarketCache]) -> I to_sdk_instruction(ix) } -pub async fn send_transaction( - tpu_manager: TpuManager, +pub fn prepare_transaction( mut ixs: Vec, - blockhash: Arc>, + recent_blockhash: &Hash, current_slot: Arc, payer: &Keypair, prioritization_fee: u64, keeper_instruction: KeeperInstruction, -) { +) -> (Transaction, TransactionSendRecord) { // add a noop with a current timestamp to ensure unique txs ixs.push(noop::timestamp()); // add priority fees @@ -119,7 +120,6 @@ pub async fn send_transaction( prioritization_fee, )); let mut tx = Transaction::new_unsigned(Message::new(&ixs, Some(&payer.pubkey()))); - let recent_blockhash = blockhash.read().await; tx.sign(&[payer], *recent_blockhash); let tx_send_record = TransactionSendRecord { @@ -131,7 +131,7 @@ pub async fn send_transaction( priority_fees: prioritization_fee, keeper_instruction: Some(keeper_instruction), }; - tpu_manager.send_transaction(&tx, tx_send_record).await; + return (tx, tx_send_record); } pub fn create_update_and_cache_quote_banks( @@ -184,76 +184,74 @@ pub fn start_keepers( let quote_root_bank_ix = create_update_and_cache_quote_banks(&perp_markets, quote_root_bank, quote_node_banks); - let blockhash = blockhash.clone(); - while !exit_signal.load(Ordering::Relaxed) { - send_transaction( - tpu_manager.clone(), + let recent_blockhash = blockhash.read().await.to_owned(); + + let mut tx_batch = vec![]; + tx_batch.push(prepare_transaction( cache_prices.clone(), - blockhash.clone(), + &recent_blockhash, current_slot.clone(), &authority, prioritization_fee, KeeperInstruction::CachePrice, - ) - .await; + )); - send_transaction( - tpu_manager.clone(), + tx_batch.push(prepare_transaction( quote_root_bank_ix.clone(), - blockhash.clone(), + &recent_blockhash, current_slot.clone(), &authority, prioritization_fee, KeeperInstruction::UpdateAndCacheQuoteRootBank, - ) - .await; + )); for updates in update_funding_ix.chunks(3) { - send_transaction( - tpu_manager.clone(), + tx_batch.push(prepare_transaction( updates.to_vec(), - blockhash.clone(), + &recent_blockhash, current_slot.clone(), &authority, prioritization_fee, KeeperInstruction::UpdateFunding, - ) - .await; + )); } - - send_transaction( - tpu_manager.clone(), + tx_batch.push(prepare_transaction( root_update_ixs.clone(), - blockhash.clone(), + &recent_blockhash, current_slot.clone(), &authority, prioritization_fee, KeeperInstruction::UpdateRootBanks, - ) - .await; + )); - send_transaction( - tpu_manager.clone(), + tx_batch.push(prepare_transaction( update_perp_cache.clone(), - blockhash.clone(), + &recent_blockhash, current_slot.clone(), &authority, prioritization_fee, KeeperInstruction::UpdatePerpCache, - ) - .await; + )); - send_transaction( - tpu_manager.clone(), + tx_batch.push(prepare_transaction( cache_root_bank_ix.clone(), - blockhash.clone(), + &recent_blockhash, current_slot.clone(), &authority, prioritization_fee, KeeperInstruction::CacheRootBanks, - ) - .await; + )); + + let start_slot = current_slot.load(Ordering::Relaxed); + let start_time = Utc::now(); + let tpu_manager = tpu_manager.clone(); + spawn(async move { + if !tpu_manager.send_transaction_batch(&tx_batch).await { + warn!("issue when sending batch started slot={start_slot} time={start_time} hash={recent_blockhash:?}"); + } + }); + std::thread::sleep(std::time::Duration::from_secs(1)); } }) diff --git a/src/market_markers.rs b/src/market_markers.rs index a0732d3..ff95d4d 100644 --- a/src/market_markers.rs +++ b/src/market_markers.rs @@ -21,13 +21,16 @@ use solana_sdk::{ compute_budget, hash::Hash, instruction::Instruction, message::Message, signature::Keypair, signer::Signer, transaction::Transaction, }; -use tokio::{sync::RwLock, task::{JoinHandle, self}}; +use tokio::{ + sync::RwLock, + task::{self, JoinHandle}, +}; use crate::{ helpers::{to_sdk_instruction, to_sp_pk}, mango::AccountKeys, states::{PerpMarketCache, TransactionSendRecord}, - tpu_manager::{TpuManager, self}, + tpu_manager::TpuManager, }; pub fn create_ask_bid_transaction( @@ -196,12 +199,11 @@ pub async fn send_mm_transactions( } let tpu_manager = tpu_manager.clone(); - task::spawn(async move{ + task::spawn(async move { if !tpu_manager.send_transaction_batch(&batch_to_send).await { println!("sending failed on tpu client"); } }); - } } diff --git a/src/tpu_manager.rs b/src/tpu_manager.rs index 8dcfa91..dddc5a3 100644 --- a/src/tpu_manager.rs +++ b/src/tpu_manager.rs @@ -4,6 +4,7 @@ use solana_client::nonblocking::rpc_client::RpcClient; use solana_client::{connection_cache::ConnectionCache, nonblocking::tpu_client::TpuClient}; use solana_quic_client::{QuicConfig, QuicConnectionManager, QuicPool}; use solana_sdk::signature::Keypair; +use solana_sdk::transaction::Transaction; use std::{ net::{IpAddr, Ipv4Addr}, sync::{ @@ -146,11 +147,11 @@ impl TpuManager { pub async fn send_transaction_batch( &self, - batch: &Vec<(solana_sdk::transaction::Transaction, TransactionSendRecord)>, + batch: &Vec<(Transaction, TransactionSendRecord)>, ) -> bool { let tpu_client = self.get_tpu_client().await; - for (tx, record) in batch { + for (_tx, record) in batch { self.stats.inc_send(&record.keeper_instruction); let tx_sent_record = self.tx_send_record.clone();