Merge pull request #16 from blockworks-foundation/max/batch

improve tx submission
This commit is contained in:
Maximilian Schneider 2023-04-25 12:47:03 +02:00 committed by GitHub
commit 5f76154894
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 90 additions and 46 deletions

1
Cargo.lock generated
View File

@ -3241,6 +3241,7 @@ dependencies = [
"async-channel",
"async-std",
"async-trait",
"bincode",
"borsh 0.9.3",
"bytemuck",
"chrono",

View File

@ -60,6 +60,7 @@ solana-program = "1.9.17"
mango = { git = "https://github.com/blockworks-foundation/mango-v3.git", tag = "v3.6.0", default-features = false }
mango-common = { git = "https://github.com/blockworks-foundation/mango-v3.git", tag = "v3.6.0" }
mango-feeds-connector = { git = "https://github.com/blockworks-foundation/mango-feeds.git", branch = "ckamm/solana-versions2", default-features = false, features = ["solana-1-15"] }
bincode = "1.3.3"
[package.metadata.docs.rs]
targets = ["x86_64-unknown-linux-gnu"]

View File

@ -106,8 +106,11 @@ pub fn start(
keeper_instruction: Some(KeeperInstruction::ConsumeEvents),
};
let ok = tpu_manager.send_transaction(&tx, tx_send_record).await;
trace!("send tx={:?} ok={ok}", tx.signatures[0]);
let tpu_manager = tpu_manager.clone();
tokio::spawn(async move {
let ok = tpu_manager.send_transaction(&tx, tx_send_record).await;
trace!("send tx={:?} ok={ok}", tx.signatures[0]);
});
}
}
});

View File

@ -1,4 +1,6 @@
use log::warn;
use solana_sdk::compute_budget::ComputeBudgetInstruction;
use tokio::spawn;
use {
crate::{
@ -103,15 +105,14 @@ fn create_cache_perp_markets_instructions(perp_markets: &[PerpMarketCache]) -> I
to_sdk_instruction(ix)
}
pub async fn send_transaction(
tpu_manager: TpuManager,
pub fn prepare_transaction(
mut ixs: Vec<Instruction>,
blockhash: Arc<RwLock<Hash>>,
recent_blockhash: &Hash,
current_slot: Arc<AtomicU64>,
payer: &Keypair,
prioritization_fee: u64,
keeper_instruction: KeeperInstruction,
) {
) -> (Transaction, TransactionSendRecord) {
// add a noop with a current timestamp to ensure unique txs
ixs.push(noop::timestamp());
// add priority fees
@ -119,7 +120,6 @@ pub async fn send_transaction(
prioritization_fee,
));
let mut tx = Transaction::new_unsigned(Message::new(&ixs, Some(&payer.pubkey())));
let recent_blockhash = blockhash.read().await;
tx.sign(&[payer], *recent_blockhash);
let tx_send_record = TransactionSendRecord {
@ -131,7 +131,7 @@ pub async fn send_transaction(
priority_fees: prioritization_fee,
keeper_instruction: Some(keeper_instruction),
};
tpu_manager.send_transaction(&tx, tx_send_record).await;
return (tx, tx_send_record);
}
pub fn create_update_and_cache_quote_banks(
@ -184,76 +184,74 @@ pub fn start_keepers(
let quote_root_bank_ix =
create_update_and_cache_quote_banks(&perp_markets, quote_root_bank, quote_node_banks);
let blockhash = blockhash.clone();
while !exit_signal.load(Ordering::Relaxed) {
send_transaction(
tpu_manager.clone(),
let recent_blockhash = blockhash.read().await.to_owned();
let mut tx_batch = vec![];
tx_batch.push(prepare_transaction(
cache_prices.clone(),
blockhash.clone(),
&recent_blockhash,
current_slot.clone(),
&authority,
prioritization_fee,
KeeperInstruction::CachePrice,
)
.await;
));
send_transaction(
tpu_manager.clone(),
tx_batch.push(prepare_transaction(
quote_root_bank_ix.clone(),
blockhash.clone(),
&recent_blockhash,
current_slot.clone(),
&authority,
prioritization_fee,
KeeperInstruction::UpdateAndCacheQuoteRootBank,
)
.await;
));
for updates in update_funding_ix.chunks(3) {
send_transaction(
tpu_manager.clone(),
tx_batch.push(prepare_transaction(
updates.to_vec(),
blockhash.clone(),
&recent_blockhash,
current_slot.clone(),
&authority,
prioritization_fee,
KeeperInstruction::UpdateFunding,
)
.await;
));
}
send_transaction(
tpu_manager.clone(),
tx_batch.push(prepare_transaction(
root_update_ixs.clone(),
blockhash.clone(),
&recent_blockhash,
current_slot.clone(),
&authority,
prioritization_fee,
KeeperInstruction::UpdateRootBanks,
)
.await;
));
send_transaction(
tpu_manager.clone(),
tx_batch.push(prepare_transaction(
update_perp_cache.clone(),
blockhash.clone(),
&recent_blockhash,
current_slot.clone(),
&authority,
prioritization_fee,
KeeperInstruction::UpdatePerpCache,
)
.await;
));
send_transaction(
tpu_manager.clone(),
tx_batch.push(prepare_transaction(
cache_root_bank_ix.clone(),
blockhash.clone(),
&recent_blockhash,
current_slot.clone(),
&authority,
prioritization_fee,
KeeperInstruction::CacheRootBanks,
)
.await;
));
let start_slot = current_slot.load(Ordering::Relaxed);
let start_time = Utc::now();
let tpu_manager = tpu_manager.clone();
spawn(async move {
if !tpu_manager.send_transaction_batch(&tx_batch).await {
warn!("issue when sending batch started slot={start_slot} time={start_time} hash={recent_blockhash:?}");
}
});
std::thread::sleep(std::time::Duration::from_secs(1));
}
})

View File

@ -123,7 +123,7 @@ pub async fn main() -> anyhow::Result<()> {
// continuosly fetch blockhash
let rpc_client = Arc::new(RpcClient::new_with_commitment(
json_rpc_url.to_string(),
CommitmentConfig::confirmed(),
CommitmentConfig::finalized(),
));
let exit_signal = Arc::new(AtomicBool::new(false));
let latest_blockhash = get_latest_blockhash(&rpc_client.clone()).await;

View File

@ -21,7 +21,10 @@ use solana_sdk::{
compute_budget, hash::Hash, instruction::Instruction, message::Message, signature::Keypair,
signer::Signer, transaction::Transaction,
};
use tokio::{sync::RwLock, task::JoinHandle};
use tokio::{
sync::RwLock,
task::{self, JoinHandle},
};
use crate::{
helpers::{to_sdk_instruction, to_sp_pk},
@ -170,6 +173,7 @@ pub async fn send_mm_transactions(
100,
1000,
);
let mut batch_to_send = Vec::with_capacity(perp_market_caches.len());
for (i, c) in perp_market_caches.iter().enumerate() {
let prioritization_fee = prioritization_fee_by_market[i];
let mut tx = create_ask_bid_transaction(
@ -182,7 +186,7 @@ pub async fn send_mm_transactions(
let recent_blockhash = *blockhash.read().await;
tx.sign(&[mango_account_signer], recent_blockhash);
let tx_send_record = TransactionSendRecord {
let record = TransactionSendRecord {
signature: tx.signatures[0],
sent_at: Utc::now(),
sent_slot: slot.load(Ordering::Acquire),
@ -191,10 +195,15 @@ pub async fn send_mm_transactions(
priority_fees: prioritization_fee,
keeper_instruction: None,
};
if !tpu_manager.send_transaction(&tx, tx_send_record).await {
batch_to_send.push((tx, record));
}
let tpu_manager = tpu_manager.clone();
task::spawn(async move {
if !tpu_manager.send_transaction_batch(&batch_to_send).await {
println!("sending failed on tpu client");
}
}
});
}
}

View File

@ -1,8 +1,10 @@
use bincode::serialize;
use log::{info, warn};
use solana_client::nonblocking::rpc_client::RpcClient;
use solana_client::{connection_cache::ConnectionCache, nonblocking::tpu_client::TpuClient};
use solana_quic_client::{QuicConfig, QuicConnectionManager, QuicPool};
use solana_sdk::signature::Keypair;
use solana_sdk::transaction::Transaction;
use std::{
net::{IpAddr, Ipv4Addr},
sync::{
@ -142,4 +144,34 @@ impl TpuManager {
tpu_client.send_transaction(transaction).await
}
pub async fn send_transaction_batch(
&self,
batch: &Vec<(Transaction, TransactionSendRecord)>,
) -> bool {
let tpu_client = self.get_tpu_client().await;
for (_tx, record) in batch {
self.stats.inc_send(&record.keeper_instruction);
let tx_sent_record = self.tx_send_record.clone();
let sent = tx_sent_record.send(record.clone());
if sent.is_err() {
warn!(
"sending error on channel : {}",
sent.err().unwrap().to_string()
);
}
}
tpu_client
.try_send_wire_transaction_batch(
batch
.iter()
.map(|(tx, _)| serialize(tx).expect("serialization should succeed"))
.collect(),
)
.await
.is_ok()
}
}