Merge pull request #3 from KirillLykov/send_batch-3

Send transactions in batches of configurable size
This commit is contained in:
galactus 2022-11-18 14:47:58 +00:00 committed by GitHub
commit c91147c363
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 194 additions and 101 deletions

View File

@ -20,6 +20,7 @@ pub struct Config {
pub block_data_save_file: String,
pub airdrop_accounts: bool,
pub mango_cluster: String,
pub txs_batch_size: Option<usize>,
}
impl Default for Config {
@ -37,6 +38,7 @@ impl Default for Config {
block_data_save_file: String::new(),
airdrop_accounts: false,
mango_cluster: "testnet.0".to_string(),
txs_batch_size: None,
}
}
}
@ -168,6 +170,14 @@ pub fn build_args<'a, 'b>(version: &'b str) -> App<'a, 'b> {
.takes_value(true)
.help("Name of mango cluster from ids.json"),
)
.arg(
Arg::with_name("batch-size")
.long("batch-size")
.value_name("UINT")
.takes_value(true)
.required(false)
.help("If specified, transactions are send in batches of specified size"),
)
}
/// Parses a clap `ArgMatches` structure into a `Config`
@ -241,5 +251,8 @@ pub fn extract_args(matches: &ArgMatches) -> Config {
Some(x) => x.to_string(),
None => "testnet.0".to_string(),
};
args.txs_batch_size = matches
.value_of("batch-size")
.map(|batch_size_str| batch_size_str.parse().expect("can't parse batch-size"));
args
}

View File

@ -187,6 +187,102 @@ fn seconds_since(dt: DateTime<Utc>) -> i64 {
Utc::now().signed_duration_since(dt).num_seconds()
}
fn create_ask_bid_transaction(
c: &PerpMarketCache,
mango_account_pk: Pubkey,
mango_account_signer_pk: Pubkey,
) -> Transaction {
let offset = rand::random::<i8>() as i64;
let spread = rand::random::<u8>() as i64;
debug!(
"price:{:?} price_quote_lots:{:?} order_base_lots:{:?} offset:{:?} spread:{:?}",
c.price, c.price_quote_lots, c.order_base_lots, offset, spread
);
let cancel_ix: Instruction = serde_json::from_str(
&serde_json::to_string(
&cancel_all_perp_orders(
&pk_from_str_like(&c.mango_program_pk),
&pk_from_str_like(&c.mango_group_pk),
&pk_from_str_like(&mango_account_pk),
&(pk_from_str_like(&mango_account_signer_pk)),
&(pk_from_str_like(&c.perp_market_pk)),
&c.perp_market.bids,
&c.perp_market.asks,
10,
)
.unwrap(),
)
.unwrap(),
)
.unwrap();
let place_bid_ix: Instruction = serde_json::from_str(
&serde_json::to_string(
&place_perp_order2(
&pk_from_str_like(&c.mango_program_pk),
&pk_from_str_like(&c.mango_group_pk),
&pk_from_str_like(&mango_account_pk),
&(pk_from_str_like(&mango_account_signer_pk)),
&pk_from_str_like(&c.mango_cache_pk),
&(pk_from_str_like(&c.perp_market_pk)),
&c.perp_market.bids,
&c.perp_market.asks,
&c.perp_market.event_queue,
None,
&[],
Side::Bid,
c.price_quote_lots + offset - spread,
c.order_base_lots,
i64::MAX,
1,
mango::matching::OrderType::Limit,
false,
None,
64,
mango::matching::ExpiryType::Absolute,
)
.unwrap(),
)
.unwrap(),
)
.unwrap();
let place_ask_ix: Instruction = serde_json::from_str(
&serde_json::to_string(
&place_perp_order2(
&pk_from_str_like(&c.mango_program_pk),
&pk_from_str_like(&c.mango_group_pk),
&pk_from_str_like(&mango_account_pk),
&(pk_from_str_like(&mango_account_signer_pk)),
&pk_from_str_like(&c.mango_cache_pk),
&(pk_from_str_like(&c.perp_market_pk)),
&c.perp_market.bids,
&c.perp_market.asks,
&c.perp_market.event_queue,
None,
&[],
Side::Ask,
c.price_quote_lots + offset + spread,
c.order_base_lots,
i64::MAX,
2,
mango::matching::OrderType::Limit,
false,
None,
64,
mango::matching::ExpiryType::Absolute,
)
.unwrap(),
)
.unwrap(),
)
.unwrap();
Transaction::new_unsigned(Message::new(
&[cancel_ix, place_bid_ix, place_ask_ix],
Some(&mango_account_signer_pk),
))
}
fn send_mm_transactions(
quotes_per_second: u64,
perp_market_caches: &Vec<PerpMarketCache>,
@ -200,97 +296,8 @@ fn send_mm_transactions(
// update quotes 2x per second
for _ in 0..quotes_per_second {
for c in perp_market_caches.iter() {
let offset = rand::random::<i8>() as i64;
let spread = rand::random::<u8>() as i64;
debug!(
"price:{:?} price_quote_lots:{:?} order_base_lots:{:?} offset:{:?} spread:{:?}",
c.price, c.price_quote_lots, c.order_base_lots, offset, spread
);
let cancel_ix: Instruction = serde_json::from_str(
&serde_json::to_string(
&cancel_all_perp_orders(
&pk_from_str_like(&c.mango_program_pk),
&pk_from_str_like(&c.mango_group_pk),
&pk_from_str_like(&mango_account_pk),
&(pk_from_str_like(&mango_account_signer.pubkey())),
&(pk_from_str_like(&c.perp_market_pk)),
&c.perp_market.bids,
&c.perp_market.asks,
10,
)
.unwrap(),
)
.unwrap(),
)
.unwrap();
let place_bid_ix: Instruction = serde_json::from_str(
&serde_json::to_string(
&place_perp_order2(
&pk_from_str_like(&c.mango_program_pk),
&pk_from_str_like(&c.mango_group_pk),
&pk_from_str_like(&mango_account_pk),
&(pk_from_str_like(&mango_account_signer.pubkey())),
&pk_from_str_like(&c.mango_cache_pk),
&(pk_from_str_like(&c.perp_market_pk)),
&c.perp_market.bids,
&c.perp_market.asks,
&c.perp_market.event_queue,
None,
&[],
Side::Bid,
c.price_quote_lots + offset - spread,
c.order_base_lots,
i64::MAX,
1,
mango::matching::OrderType::Limit,
false,
None,
64,
mango::matching::ExpiryType::Absolute,
)
.unwrap(),
)
.unwrap(),
)
.unwrap();
let place_ask_ix: Instruction = serde_json::from_str(
&serde_json::to_string(
&place_perp_order2(
&pk_from_str_like(&c.mango_program_pk),
&pk_from_str_like(&c.mango_group_pk),
&pk_from_str_like(&mango_account_pk),
&(pk_from_str_like(&mango_account_signer.pubkey())),
&pk_from_str_like(&c.mango_cache_pk),
&(pk_from_str_like(&c.perp_market_pk)),
&c.perp_market.bids,
&c.perp_market.asks,
&c.perp_market.event_queue,
None,
&[],
Side::Ask,
c.price_quote_lots + offset + spread,
c.order_base_lots,
i64::MAX,
2,
mango::matching::OrderType::Limit,
false,
None,
64,
mango::matching::ExpiryType::Absolute,
)
.unwrap(),
)
.unwrap(),
)
.unwrap();
let mut tx = Transaction::new_unsigned(Message::new(
&[cancel_ix, place_bid_ix, place_ask_ix],
Some(&mango_account_signer.pubkey()),
));
let mut tx =
create_ask_bid_transaction(c, mango_account_pk, mango_account_signer.pubkey());
if let Ok(recent_blockhash) = blockhash.read() {
tx.sign(&[mango_account_signer], *recent_blockhash);
@ -314,6 +321,63 @@ fn send_mm_transactions(
}
}
fn send_mm_transactions_batched(
txs_batch_size: usize,
quotes_per_second: u64,
perp_market_caches: &Vec<PerpMarketCache>,
tx_record_sx: &Sender<TransactionSendRecord>,
tpu_client_pool: Arc<RotatingQueue<Arc<TpuClient>>>,
mango_account_pk: Pubkey,
mango_account_signer: &Keypair,
blockhash: Arc<RwLock<Hash>>,
slot: &AtomicU64,
) {
let mut transactions = Vec::<_>::with_capacity(txs_batch_size);
// update quotes 2x per second
for _ in 0..quotes_per_second {
for c in perp_market_caches.iter() {
for _ in 0..txs_batch_size {
transactions.push(create_ask_bid_transaction(
c,
mango_account_pk,
mango_account_signer.pubkey(),
));
}
if let Ok(recent_blockhash) = blockhash.read() {
for tx in &mut transactions {
tx.sign(&[mango_account_signer], *recent_blockhash);
}
}
let tpu_client = tpu_client_pool.get();
if tpu_client
.try_send_transaction_batch(&transactions)
.is_err()
{
error!("Sending batch failed");
continue;
}
for tx in &transactions {
let sent = tx_record_sx.send(TransactionSendRecord {
signature: tx.signatures[0],
sent_at: Utc::now(),
sent_slot: slot.load(Ordering::Acquire),
market_maker: mango_account_signer.pubkey(),
market: c.perp_market_pk,
});
if sent.is_err() {
error!(
"sending error on channel : {}",
sent.err().unwrap().to_string()
);
}
}
transactions.clear();
}
}
}
fn process_signature_confirmation_batch(
rpc_client: &RpcClient,
batch: &Vec<TransactionSendRecord>,
@ -796,6 +860,7 @@ fn main() {
block_data_save_file,
airdrop_accounts,
mango_cluster,
txs_batch_size,
..
} = &cli_config;
@ -977,6 +1042,7 @@ fn main() {
//sleep(Duration::from_secs(10));
let tx_record_sx = tx_record_sx.clone();
let txs_batch_size: Option<usize> = *txs_batch_size;
Builder::new()
.name("solana-client-sender".to_string())
@ -984,16 +1050,30 @@ fn main() {
for _i in 0..duration.as_secs() {
let start = Instant::now();
// send market maker transactions
send_mm_transactions(
quotes_per_second,
&perp_market_caches,
&tx_record_sx,
tpu_client_pool.clone(),
mango_account_pk,
&mango_account_signer,
blockhash.clone(),
current_slot.as_ref(),
);
if let Some(txs_batch_size) = txs_batch_size.clone() {
send_mm_transactions_batched(
txs_batch_size,
quotes_per_second,
&perp_market_caches,
&tx_record_sx,
tpu_client_pool.clone(),
mango_account_pk,
&mango_account_signer,
blockhash.clone(),
current_slot.as_ref(),
);
} else {
send_mm_transactions(
quotes_per_second,
&perp_market_caches,
&tx_record_sx,
tpu_client_pool.clone(),
mango_account_pk,
&mango_account_signer,
blockhash.clone(),
current_slot.as_ref(),
);
}
let elapsed_millis: u64 = start.elapsed().as_millis() as u64;
if elapsed_millis < 950 {