From 465e628a643d91ee825c7a2502b520573f6d2ddb Mon Sep 17 00:00:00 2001 From: Maximilian Schneider Date: Tue, 18 Apr 2023 15:00:57 +0000 Subject: [PATCH 1/5] batch mm txs --- Cargo.lock | 1 + Cargo.toml | 1 + src/market_markers.rs | 12 ++++++++---- src/tpu_manager.rs | 27 +++++++++++++++++++++++++++ 4 files changed, 37 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b7e3ff6..437d12c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3241,6 +3241,7 @@ dependencies = [ "async-channel", "async-std", "async-trait", + "bincode", "borsh 0.9.3", "bytemuck", "chrono", diff --git a/Cargo.toml b/Cargo.toml index a016cf7..1af7467 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -60,6 +60,7 @@ solana-program = "1.9.17" mango = { git = "https://github.com/blockworks-foundation/mango-v3.git", tag = "v3.6.0", default-features = false } mango-common = { git = "https://github.com/blockworks-foundation/mango-v3.git", tag = "v3.6.0" } mango-feeds-connector = { git = "https://github.com/blockworks-foundation/mango-feeds.git", branch = "ckamm/solana-versions2", default-features = false, features = ["solana-1-15"] } +bincode = "1.3.3" [package.metadata.docs.rs] targets = ["x86_64-unknown-linux-gnu"] diff --git a/src/market_markers.rs b/src/market_markers.rs index 028ce14..91e0788 100644 --- a/src/market_markers.rs +++ b/src/market_markers.rs @@ -170,6 +170,7 @@ pub async fn send_mm_transactions( 100, 1000, ); + let mut batch_to_send = Vec::with_capacity(perp_market_caches.len()); for (i, c) in perp_market_caches.iter().enumerate() { let prioritization_fee = prioritization_fee_by_market[i]; let mut tx = create_ask_bid_transaction( @@ -182,7 +183,7 @@ pub async fn send_mm_transactions( let recent_blockhash = *blockhash.read().await; tx.sign(&[mango_account_signer], recent_blockhash); - let tx_send_record = TransactionSendRecord { + let record = TransactionSendRecord { signature: tx.signatures[0], sent_at: Utc::now(), sent_slot: slot.load(Ordering::Acquire), @@ -191,9 +192,12 @@ pub async fn send_mm_transactions( priority_fees: prioritization_fee, keeper_instruction: None, }; - if !tpu_manager.send_transaction(&tx, tx_send_record).await { - println!("sending failed on tpu client"); - } + batch_to_send.push((tx, record)); + } + + + if !tpu_manager.send_transaction_batch(&batch_to_send).await { + println!("sending failed on tpu client"); } } } diff --git a/src/tpu_manager.rs b/src/tpu_manager.rs index 7ef735c..8083a0c 100644 --- a/src/tpu_manager.rs +++ b/src/tpu_manager.rs @@ -1,3 +1,4 @@ +use bincode::serialize; use log::{info, warn}; use solana_client::nonblocking::rpc_client::RpcClient; use solana_client::{connection_cache::ConnectionCache, nonblocking::tpu_client::TpuClient}; @@ -142,4 +143,30 @@ impl TpuManager { tpu_client.send_transaction(transaction).await } + + pub async fn send_transaction_batch( + &self, + batch: &Vec<(solana_sdk::transaction::Transaction, TransactionSendRecord)>, + ) -> bool { + let tpu_client = self.get_tpu_client().await; + + + for (tx, record) in batch { + + self.stats + .inc_send(&record.keeper_instruction); + + let tx_sent_record = self.tx_send_record.clone(); + let sent = tx_sent_record.send(record.clone()); + if sent.is_err() { + warn!( + "sending error on channel : {}", + sent.err().unwrap().to_string() + ); + } + } + + tpu_client.try_send_wire_transaction_batch(batch.iter().map(|(tx, _)| serialize(tx).expect("serialization should succeed")).collect()).await.is_ok() + } + } From 8b27a138c93d52630903372a9bb0599e6cfbd9d2 Mon Sep 17 00:00:00 2001 From: Maximilian Schneider Date: Tue, 18 Apr 2023 23:20:13 +0000 Subject: [PATCH 2/5] run batch execution async --- src/market_markers.rs | 13 ++++++++----- src/tpu_manager.rs | 34 +++++++++++++++++++--------------- 2 files changed, 27 insertions(+), 20 deletions(-) diff --git a/src/market_markers.rs b/src/market_markers.rs index 91e0788..a0732d3 100644 --- a/src/market_markers.rs +++ b/src/market_markers.rs @@ -21,13 +21,13 @@ use solana_sdk::{ compute_budget, hash::Hash, instruction::Instruction, message::Message, signature::Keypair, signer::Signer, transaction::Transaction, }; -use tokio::{sync::RwLock, task::JoinHandle}; +use tokio::{sync::RwLock, task::{JoinHandle, self}}; use crate::{ helpers::{to_sdk_instruction, to_sp_pk}, mango::AccountKeys, states::{PerpMarketCache, TransactionSendRecord}, - tpu_manager::TpuManager, + tpu_manager::{TpuManager, self}, }; pub fn create_ask_bid_transaction( @@ -195,10 +195,13 @@ pub async fn send_mm_transactions( batch_to_send.push((tx, record)); } + let tpu_manager = tpu_manager.clone(); + task::spawn(async move{ + if !tpu_manager.send_transaction_batch(&batch_to_send).await { + println!("sending failed on tpu client"); + } + }); - if !tpu_manager.send_transaction_batch(&batch_to_send).await { - println!("sending failed on tpu client"); - } } } diff --git a/src/tpu_manager.rs b/src/tpu_manager.rs index 8083a0c..8dcfa91 100644 --- a/src/tpu_manager.rs +++ b/src/tpu_manager.rs @@ -144,29 +144,33 @@ impl TpuManager { tpu_client.send_transaction(transaction).await } - pub async fn send_transaction_batch( + pub async fn send_transaction_batch( &self, batch: &Vec<(solana_sdk::transaction::Transaction, TransactionSendRecord)>, ) -> bool { let tpu_client = self.get_tpu_client().await; - for (tx, record) in batch { + self.stats.inc_send(&record.keeper_instruction); - self.stats - .inc_send(&record.keeper_instruction); - - let tx_sent_record = self.tx_send_record.clone(); - let sent = tx_sent_record.send(record.clone()); - if sent.is_err() { - warn!( - "sending error on channel : {}", - sent.err().unwrap().to_string() - ); - } + let tx_sent_record = self.tx_send_record.clone(); + let sent = tx_sent_record.send(record.clone()); + if sent.is_err() { + warn!( + "sending error on channel : {}", + sent.err().unwrap().to_string() + ); + } } - tpu_client.try_send_wire_transaction_batch(batch.iter().map(|(tx, _)| serialize(tx).expect("serialization should succeed")).collect()).await.is_ok() + tpu_client + .try_send_wire_transaction_batch( + batch + .iter() + .map(|(tx, _)| serialize(tx).expect("serialization should succeed")) + .collect(), + ) + .await + .is_ok() } - } From e653eb96a94ef23058f37968c474d00ff2f3f634 Mon Sep 17 00:00:00 2001 From: Maximilian Schneider Date: Wed, 19 Apr 2023 11:29:26 +0000 Subject: [PATCH 3/5] run all keeper functions in a batch --- src/keeper.rs | 76 +++++++++++++++++++++---------------------- src/market_markers.rs | 10 +++--- src/tpu_manager.rs | 5 +-- 3 files changed, 46 insertions(+), 45 deletions(-) diff --git a/src/keeper.rs b/src/keeper.rs index 65875ac..2017255 100644 --- a/src/keeper.rs +++ b/src/keeper.rs @@ -1,4 +1,6 @@ +use log::warn; use solana_sdk::compute_budget::ComputeBudgetInstruction; +use tokio::spawn; use { crate::{ @@ -103,15 +105,14 @@ fn create_cache_perp_markets_instructions(perp_markets: &[PerpMarketCache]) -> I to_sdk_instruction(ix) } -pub async fn send_transaction( - tpu_manager: TpuManager, +pub fn prepare_transaction( mut ixs: Vec, - blockhash: Arc>, + recent_blockhash: &Hash, current_slot: Arc, payer: &Keypair, prioritization_fee: u64, keeper_instruction: KeeperInstruction, -) { +) -> (Transaction, TransactionSendRecord) { // add a noop with a current timestamp to ensure unique txs ixs.push(noop::timestamp()); // add priority fees @@ -119,7 +120,6 @@ pub async fn send_transaction( prioritization_fee, )); let mut tx = Transaction::new_unsigned(Message::new(&ixs, Some(&payer.pubkey()))); - let recent_blockhash = blockhash.read().await; tx.sign(&[payer], *recent_blockhash); let tx_send_record = TransactionSendRecord { @@ -131,7 +131,7 @@ pub async fn send_transaction( priority_fees: prioritization_fee, keeper_instruction: Some(keeper_instruction), }; - tpu_manager.send_transaction(&tx, tx_send_record).await; + return (tx, tx_send_record); } pub fn create_update_and_cache_quote_banks( @@ -184,76 +184,74 @@ pub fn start_keepers( let quote_root_bank_ix = create_update_and_cache_quote_banks(&perp_markets, quote_root_bank, quote_node_banks); - let blockhash = blockhash.clone(); - while !exit_signal.load(Ordering::Relaxed) { - send_transaction( - tpu_manager.clone(), + let recent_blockhash = blockhash.read().await.to_owned(); + + let mut tx_batch = vec![]; + tx_batch.push(prepare_transaction( cache_prices.clone(), - blockhash.clone(), + &recent_blockhash, current_slot.clone(), &authority, prioritization_fee, KeeperInstruction::CachePrice, - ) - .await; + )); - send_transaction( - tpu_manager.clone(), + tx_batch.push(prepare_transaction( quote_root_bank_ix.clone(), - blockhash.clone(), + &recent_blockhash, current_slot.clone(), &authority, prioritization_fee, KeeperInstruction::UpdateAndCacheQuoteRootBank, - ) - .await; + )); for updates in update_funding_ix.chunks(3) { - send_transaction( - tpu_manager.clone(), + tx_batch.push(prepare_transaction( updates.to_vec(), - blockhash.clone(), + &recent_blockhash, current_slot.clone(), &authority, prioritization_fee, KeeperInstruction::UpdateFunding, - ) - .await; + )); } - - send_transaction( - tpu_manager.clone(), + tx_batch.push(prepare_transaction( root_update_ixs.clone(), - blockhash.clone(), + &recent_blockhash, current_slot.clone(), &authority, prioritization_fee, KeeperInstruction::UpdateRootBanks, - ) - .await; + )); - send_transaction( - tpu_manager.clone(), + tx_batch.push(prepare_transaction( update_perp_cache.clone(), - blockhash.clone(), + &recent_blockhash, current_slot.clone(), &authority, prioritization_fee, KeeperInstruction::UpdatePerpCache, - ) - .await; + )); - send_transaction( - tpu_manager.clone(), + tx_batch.push(prepare_transaction( cache_root_bank_ix.clone(), - blockhash.clone(), + &recent_blockhash, current_slot.clone(), &authority, prioritization_fee, KeeperInstruction::CacheRootBanks, - ) - .await; + )); + + let start_slot = current_slot.load(Ordering::Relaxed); + let start_time = Utc::now(); + let tpu_manager = tpu_manager.clone(); + spawn(async move { + if !tpu_manager.send_transaction_batch(&tx_batch).await { + warn!("issue when sending batch started slot={start_slot} time={start_time} hash={recent_blockhash:?}"); + } + }); + std::thread::sleep(std::time::Duration::from_secs(1)); } }) diff --git a/src/market_markers.rs b/src/market_markers.rs index a0732d3..ff95d4d 100644 --- a/src/market_markers.rs +++ b/src/market_markers.rs @@ -21,13 +21,16 @@ use solana_sdk::{ compute_budget, hash::Hash, instruction::Instruction, message::Message, signature::Keypair, signer::Signer, transaction::Transaction, }; -use tokio::{sync::RwLock, task::{JoinHandle, self}}; +use tokio::{ + sync::RwLock, + task::{self, JoinHandle}, +}; use crate::{ helpers::{to_sdk_instruction, to_sp_pk}, mango::AccountKeys, states::{PerpMarketCache, TransactionSendRecord}, - tpu_manager::{TpuManager, self}, + tpu_manager::TpuManager, }; pub fn create_ask_bid_transaction( @@ -196,12 +199,11 @@ pub async fn send_mm_transactions( } let tpu_manager = tpu_manager.clone(); - task::spawn(async move{ + task::spawn(async move { if !tpu_manager.send_transaction_batch(&batch_to_send).await { println!("sending failed on tpu client"); } }); - } } diff --git a/src/tpu_manager.rs b/src/tpu_manager.rs index 8dcfa91..dddc5a3 100644 --- a/src/tpu_manager.rs +++ b/src/tpu_manager.rs @@ -4,6 +4,7 @@ use solana_client::nonblocking::rpc_client::RpcClient; use solana_client::{connection_cache::ConnectionCache, nonblocking::tpu_client::TpuClient}; use solana_quic_client::{QuicConfig, QuicConnectionManager, QuicPool}; use solana_sdk::signature::Keypair; +use solana_sdk::transaction::Transaction; use std::{ net::{IpAddr, Ipv4Addr}, sync::{ @@ -146,11 +147,11 @@ impl TpuManager { pub async fn send_transaction_batch( &self, - batch: &Vec<(solana_sdk::transaction::Transaction, TransactionSendRecord)>, + batch: &Vec<(Transaction, TransactionSendRecord)>, ) -> bool { let tpu_client = self.get_tpu_client().await; - for (tx, record) in batch { + for (_tx, record) in batch { self.stats.inc_send(&record.keeper_instruction); let tx_sent_record = self.tx_send_record.clone(); From 49829d39c5fcfe8c83420731e47d08b0bbcb3418 Mon Sep 17 00:00:00 2001 From: Maximilian Schneider Date: Sun, 23 Apr 2023 13:07:44 +0000 Subject: [PATCH 4/5] make crank async --- src/crank.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/crank.rs b/src/crank.rs index c80b1a2..7a702e1 100644 --- a/src/crank.rs +++ b/src/crank.rs @@ -106,8 +106,11 @@ pub fn start( keeper_instruction: Some(KeeperInstruction::ConsumeEvents), }; - let ok = tpu_manager.send_transaction(&tx, tx_send_record).await; - trace!("send tx={:?} ok={ok}", tx.signatures[0]); + let tpu_manager = tpu_manager.clone(); + tokio::spawn(async move { + let ok = tpu_manager.send_transaction(&tx, tx_send_record).await; + trace!("send tx={:?} ok={ok}", tx.signatures[0]); + }); } } }); From 13ddf85330b138d9fa253cf893aa7f26add2bfe8 Mon Sep 17 00:00:00 2001 From: Maximilian Schneider Date: Sun, 23 Apr 2023 17:57:44 +0200 Subject: [PATCH 5/5] blockhash polling finalized --- src/main.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main.rs b/src/main.rs index 046397b..9448e82 100644 --- a/src/main.rs +++ b/src/main.rs @@ -123,7 +123,7 @@ pub async fn main() -> anyhow::Result<()> { // continuosly fetch blockhash let rpc_client = Arc::new(RpcClient::new_with_commitment( json_rpc_url.to_string(), - CommitmentConfig::confirmed(), + CommitmentConfig::finalized(), )); let exit_signal = Arc::new(AtomicBool::new(false)); let latest_blockhash = get_latest_blockhash(&rpc_client.clone()).await;