From e765215beb6fab5d244fa2d16f54c81556072cb9 Mon Sep 17 00:00:00 2001 From: Godmode Galactus Date: Thu, 15 Jun 2023 10:41:58 +0200 Subject: [PATCH] making mango simulation work with custom tpu and verify transactions using notifications --- Cargo.lock | 78 +++++++-------- Cargo.toml | 5 +- README.md | 4 +- src/cli.rs | 6 +- src/confirmation_strategies.rs | 144 ++++++++++++++++++++++++++++ src/crank.rs | 5 +- src/helpers.rs | 122 ++++++++++++----------- src/main.rs | 116 +++++++++++++--------- src/mango_v3_perp_crank_sink.rs | 45 +++++---- src/market_markers.rs | 75 ++++++++------- src/noop.rs | 2 +- src/stats.rs | 21 ++-- src/tpu_manager.rs | 165 +++++--------------------------- 13 files changed, 417 insertions(+), 371 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d6f6e56..0d0f055 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -433,9 +433,9 @@ checksum = "6b4930d2cb77ce62f89ee5d5289b4ac049559b1c45539271f5ed4fdc7db34545" [[package]] name = "arrayvec" -version = "0.7.2" +version = "0.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8da52d66c7071e2e3fa2a1e5c6d088fec47b593032b254f5e980de8ea54454d6" +checksum = "8868f09ff8cea88b079da74ae569d9b8c62a23c68c746240b704ee6f7525c89c" [[package]] name = "ascii" @@ -590,7 +590,7 @@ dependencies = [ "async-global-executor", "async-io", "async-lock", - "crossbeam-utils 0.8.15", + "crossbeam-utils 0.8.16", "futures-channel", "futures-core", "futures-io", @@ -1297,7 +1297,7 @@ version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "62ec6771ecfa0762d24683ee5a32ad78487a3d3afdc0fb8cae19d2c5deb50b7c" dependencies = [ - "crossbeam-utils 0.8.15", + "crossbeam-utils 0.8.16", ] [[package]] @@ -1410,7 +1410,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a33c2bf77f2df06183c3aa30d1e96c0695a313d4f9c453cc3762a6db39f99200" dependencies = [ "cfg-if 1.0.0", - "crossbeam-utils 0.8.15", + "crossbeam-utils 0.8.16", ] [[package]] @@ -1421,19 +1421,19 @@ checksum = "ce6fd6f855243022dcecf8702fef0c297d4338e226845fe067f6341ad9fa0cef" dependencies = [ "cfg-if 1.0.0", "crossbeam-epoch", - "crossbeam-utils 0.8.15", + "crossbeam-utils 0.8.16", ] [[package]] name = "crossbeam-epoch" -version = "0.9.14" +version = "0.9.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "46bd5f3f85273295a9d14aedfb86f6aadbff6d8f5295c4a9edb08e819dcf5695" +checksum = "ae211234986c545741a7dc064309f67ee1e5ad243d0e48335adc0484d960bcc7" dependencies = [ "autocfg 1.1.0", "cfg-if 1.0.0", - "crossbeam-utils 0.8.15", - "memoffset 0.8.0", + "crossbeam-utils 0.8.16", + "memoffset 0.9.0", "scopeguard", ] @@ -1450,9 +1450,9 @@ dependencies = [ [[package]] name = "crossbeam-utils" -version = "0.8.15" +version = "0.8.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3c063cd8cc95f5c377ed0d4b49a4b21f632396ff690e8470c29b3359b346984b" +checksum = "5a22b2d63d4d1dc0b7f1b6b2747dd0088008a9be28b6ddf0b1e7d335e3037294" dependencies = [ "cfg-if 1.0.0", ] @@ -2761,9 +2761,9 @@ dependencies = [ [[package]] name = "js-sys" -version = "0.3.63" +version = "0.3.64" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2f37a4a5928311ac501dee68b3c7613a1037d0edb30c8e5427bd832d55d1b790" +checksum = "c5f195fe497f702db0f318b07fdd68edb16955aed830df8363d837542f8f935a" dependencies = [ "wasm-bindgen", ] @@ -3226,6 +3226,7 @@ dependencies = [ "solana-version", "thiserror", "tokio", + "yellowstone-grpc-proto", ] [[package]] @@ -3270,15 +3271,6 @@ dependencies = [ "autocfg 1.1.0", ] -[[package]] -name = "memoffset" -version = "0.8.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d61c719bcfbcf5d62b3a09efa6088de8c54bc0bfcd3ea7ae39fcc186108b8de1" -dependencies = [ - "autocfg 1.1.0", -] - [[package]] name = "memoffset" version = "0.9.0" @@ -4558,7 +4550,7 @@ checksum = "4b8f95bd6966f5c87776639160a66bd8ab9895d9d4ab01ddba9fc60661aebe8d" dependencies = [ "crossbeam-channel", "crossbeam-deque", - "crossbeam-utils 0.8.15", + "crossbeam-utils 0.8.16", "num_cpus", ] @@ -5785,7 +5777,6 @@ dependencies = [ [[package]] name = "solana-lite-rpc-core" version = "0.2.0" -source = "git+https://github.com/blockworks-foundation/lite-rpc.git?branch=mango_simulation_test#7621852fa6b09ce419fcc500054e97d3ba55b6d6" dependencies = [ "anyhow", "async-trait", @@ -5817,7 +5808,6 @@ dependencies = [ [[package]] name = "solana-lite-rpc-services" version = "0.2.0" -source = "git+https://github.com/blockworks-foundation/lite-rpc.git?branch=mango_simulation_test#7621852fa6b09ce419fcc500054e97d3ba55b6d6" dependencies = [ "anyhow", "async-channel", @@ -7678,9 +7668,9 @@ checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9" [[package]] name = "uuid" -version = "1.3.3" +version = "1.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "345444e32442451b267fc254ae85a209c64be56d2890e601a0c37ff0c3c5ecd2" +checksum = "0fa2982af2eec27de306107c027578ff7f423d65f7250e40ce0fea8f45248b81" [[package]] name = "value-bag" @@ -7795,9 +7785,9 @@ checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" [[package]] name = "wasm-bindgen" -version = "0.2.86" +version = "0.2.87" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5bba0e8cb82ba49ff4e229459ff22a191bbe9a1cb3a341610c9c33efc27ddf73" +checksum = "7706a72ab36d8cb1f80ffbf0e071533974a60d0a308d01a5d0375bf60499a342" dependencies = [ "cfg-if 1.0.0", "wasm-bindgen-macro", @@ -7805,9 +7795,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-backend" -version = "0.2.86" +version = "0.2.87" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "19b04bc93f9d6bdee709f6bd2118f57dd6679cf1176a1af464fca3ab0d66d8fb" +checksum = "5ef2b6d3c510e9625e5fe6f509ab07d66a760f0885d858736483c32ed7809abd" dependencies = [ "bumpalo", "log 0.4.19", @@ -7820,9 +7810,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-futures" -version = "0.4.36" +version = "0.4.37" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2d1985d03709c53167ce907ff394f5316aa22cb4e12761295c5dc57dacb6297e" +checksum = "c02dbc21516f9f1f04f187958890d7e6026df8d16540b7ad9492bc34a67cea03" dependencies = [ "cfg-if 1.0.0", "js-sys", @@ -7832,9 +7822,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro" -version = "0.2.86" +version = "0.2.87" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "14d6b024f1a526bb0234f52840389927257beb670610081360e5a03c5df9c258" +checksum = "dee495e55982a3bd48105a7b947fd2a9b4a8ae3010041b9e0faab3f9cd028f1d" dependencies = [ "quote 1.0.28", "wasm-bindgen-macro-support", @@ -7842,9 +7832,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro-support" -version = "0.2.86" +version = "0.2.87" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e128beba882dd1eb6200e1dc92ae6c5dbaa4311aa7bb211ca035779e5efc39f8" +checksum = "54681b18a46765f095758388f2d0cf16eb8d4169b639ab575a8f5693af210c7b" dependencies = [ "proc-macro2 1.0.60", "quote 1.0.28", @@ -7855,15 +7845,15 @@ dependencies = [ [[package]] name = "wasm-bindgen-shared" -version = "0.2.86" +version = "0.2.87" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ed9d5b4305409d1fc9482fee2d7f9bcbf24b3972bf59817ef757e23982242a93" +checksum = "ca6ad05a4870b2bf5fe995117d3728437bd27d7cd5f06f13c17443ef369775a1" [[package]] name = "web-sys" -version = "0.3.63" +version = "0.3.64" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3bdd9ef4e984da1187bf8110c5cf5b845fbc87a23602cdf912386a76fcd3a7c2" +checksum = "9b85cbef8c220a6abc02aefd892dfc0fc23afb1c6a426316ec33253a3877249b" dependencies = [ "js-sys", "wasm-bindgen", @@ -8233,9 +8223,9 @@ dependencies = [ [[package]] name = "yellowstone-grpc-proto" -version = "1.2.0+solana.1.15.2" +version = "1.1.0+solana.1.15.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b2510246c32ce979707ee8ab8f2040fca29fc4d1be0d86e0109c3627fc7acd49" +checksum = "8c2688c6f1d930f21f580829f6f896d7d38f90d5b2272e53a8f69a82e72b656f" dependencies = [ "anyhow", "prost", diff --git a/Cargo.toml b/Cargo.toml index f689808..be3a3ce 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -49,8 +49,8 @@ solana-logger = "1.15.2" solana-transaction-status = "1.15.2" solana-account-decoder = "1.15.2" -solana-lite-rpc-core = { git = "https://github.com/blockworks-foundation/lite-rpc.git", branch="mango_simulation_test" } -solana-lite-rpc-services = { git = "https://github.com/blockworks-foundation/lite-rpc.git", branch="mango_simulation_test" } +solana-lite-rpc-core = { path = "/home/galactus/mangolana/lite-rpc/core" } +solana-lite-rpc-services = { path = "/home/galactus/mangolana/lite-rpc/services" } # pin program to mango-v3 version of solana sdk @@ -62,6 +62,7 @@ mango = { git = "https://github.com/blockworks-foundation/mango-v3.git", tag = " mango-common = { git = "https://github.com/blockworks-foundation/mango-v3.git", tag = "v3.6.0" } mango-feeds-connector = { git = "https://github.com/blockworks-foundation/mango-feeds.git", branch = "ckamm/solana-versions2", default-features = false, features = ["solana-1-15"] } bincode = "1.3.3" +"yellowstone-grpc-proto" = "=1.1.0" [package.metadata.docs.rs] targets = ["x86_64-unknown-linux-gnu"] diff --git a/README.md b/README.md index 0d5c3fd..e285ef0 100644 --- a/README.md +++ b/README.md @@ -9,6 +9,8 @@ The code then will create transaction request (q) requests per seconds for (n) s For the best results to avoid limits by quic it is better to fill the argument "identity" of a valid staked validator for the cluster you are testing with. +Do not use localhost use http://127.0.0.1:8899 instead. + ## Build Install configure-mango @@ -69,7 +71,7 @@ OPTIONS: -i, --identity Identity used in the QUIC connection. Identity with a lot of stake has a better chance to send transaction to the leader -u, --url URL for Solana's JSON RPC or moniker (or their first letter): [mainnet- - beta, testnet, devnet, localhost] + beta, testnet, devnet, 127.0.0.1:8899] -k, --keeper-authority If specified, authority keypair would be used to pay for keeper transactions -c, --mango-cluster Name of mango cluster from ids.json diff --git a/src/cli.rs b/src/cli.rs index e91e49c..8e6b352 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -11,7 +11,7 @@ pub struct Config { pub entrypoint_addr: SocketAddr, pub json_rpc_url: String, pub websocket_url: String, - pub id: Keypair, + pub identity: Keypair, pub duration: Duration, pub quotes_per_second: u64, pub account_keys: String, @@ -32,7 +32,7 @@ impl Default for Config { entrypoint_addr: SocketAddr::from(([127, 0, 0, 1], 8001)), json_rpc_url: ConfigInput::default().json_rpc_url, websocket_url: ConfigInput::default().websocket_url, - id: Keypair::new(), + identity: Keypair::new(), duration: Duration::new(std::u64::MAX, 0), quotes_per_second: 1, account_keys: String::new(), @@ -249,7 +249,7 @@ pub fn extract_args(matches: &ArgMatches) -> Config { &config.keypair_path, ); if let Ok(id) = read_keypair_file(id_path) { - args.id = id; + args.identity = id; } else if matches.is_present("identity") { panic!("could not parse identity path"); } diff --git a/src/confirmation_strategies.rs b/src/confirmation_strategies.rs index 36e5532..8d91e31 100644 --- a/src/confirmation_strategies.rs +++ b/src/confirmation_strategies.rs @@ -10,6 +10,7 @@ use chrono::Utc; use dashmap::DashMap; use log::{debug, warn}; use solana_client::{nonblocking::rpc_client::RpcClient, rpc_config::RpcBlockConfig}; +use solana_lite_rpc_core::notifications::NotificationMsg; use solana_sdk::{ commitment_config::{CommitmentConfig, CommitmentLevel}, signature::Signature, @@ -153,6 +154,149 @@ async fn get_blocks_with_retry( Err(()) } +pub fn confirmation_by_lite_rpc_notification_stream( + tx_record_rx: UnboundedReceiver, + notification_stream: UnboundedReceiver, + tx_confirm_records: tokio::sync::broadcast::Sender, + tx_block_data: tokio::sync::broadcast::Sender, + exit_signal: Arc, +) -> Vec> { + let transaction_map: Arc> = Arc::new(DashMap::new()); + + let confirming_task = { + let transaction_map = transaction_map.clone(); + let tx_confirm_records = tx_confirm_records.clone(); + tokio::spawn(async move { + let mut started_getting_mm_transactions = false; + let mut tx_record_rx = tx_record_rx; + let mut notification_stream = notification_stream; + + while !transaction_map.is_empty() || !started_getting_mm_transactions { + tokio::select! { + transaction_record = tx_record_rx.recv() => { + if let Some(transaction_record) = transaction_record{ + started_getting_mm_transactions = true; + transaction_map + .insert(transaction_record.signature.to_string(), (transaction_record, Instant::now())); + } + + }, + notification = notification_stream.recv() => { + if let Some(notification) = notification { + match notification { + NotificationMsg::BlockNotificationMsg(block_notification) => { + let _ = tx_block_data.send(BlockData { + block_hash: block_notification.blockhash.to_string(), + block_leader: block_notification.block_leader, + block_slot: block_notification.slot, + block_time: block_notification.block_time, + number_of_mm_transactions: block_notification.transaction_found, + total_transactions: block_notification.total_transactions, + cu_consumed: block_notification.cu_consumed_by_txs, + }); + } + NotificationMsg::UpdateTransactionMsg(tx_update_notifications) => { + + for tx_notification in tx_update_notifications { + if let Some(value) = transaction_map.get(&tx_notification.signature) { + let (tx_sent_record, _) = value.clone(); + let error = match &tx_notification.transaction_status { + Err(e) => { + Some(e.to_string()) + }, + _ => None + }; + + transaction_map.remove(&tx_notification.signature); + let _ = tx_confirm_records.send(TransactionConfirmRecord { + signature: tx_notification.signature, + confirmed_slot: Some(tx_notification.slot), + confirmed_at: Some(Utc::now().to_string()), + sent_at: tx_sent_record.sent_at.to_string(), + sent_slot: tx_sent_record.sent_slot, + successful: tx_notification.transaction_status.is_ok(), + error, + block_hash: Some(tx_notification.blockhash), + market: tx_sent_record.market.map(|x| x.to_string()), + market_maker: tx_sent_record.market_maker.map(|x| x.to_string()), + keeper_instruction: tx_sent_record.keeper_instruction.clone(), + slot_processed: Some(tx_notification.slot), + slot_leader: Some(tx_notification.leader.to_string()), + timed_out: false, + priority_fees: tx_sent_record.priority_fees, + }); + } + } + }, + _ => { + // others do nothing + } + } + } + }, + _ = tokio::time::sleep(Duration::from_secs(1)) => { + // timeout + continue; + } + } + } + }) + }; + + let cleaner_jh = { + let transaction_map = transaction_map.clone(); + let exit_signal = exit_signal.clone(); + let tx_confirm_records = tx_confirm_records.clone(); + tokio::spawn(async move { + loop { + tokio::time::sleep(Duration::from_secs(60)).await; + { + let mut to_remove = vec![]; + + for tx_data in transaction_map.iter() { + let sent_record = &tx_data.0; + let instant = tx_data.1; + let signature = tx_data.key(); + let remove = instant.elapsed() > Duration::from_secs(120); + + // add to timeout if not retaining + if remove { + let _ = tx_confirm_records.send(TransactionConfirmRecord { + signature: signature.to_string(), + confirmed_slot: None, + confirmed_at: None, + sent_at: sent_record.sent_at.to_string(), + sent_slot: sent_record.sent_slot, + successful: false, + error: Some("timeout".to_string()), + block_hash: None, + market: sent_record.market.map(|x| x.to_string()), + market_maker: sent_record.market_maker.map(|x| x.to_string()), + keeper_instruction: sent_record.keeper_instruction.clone(), + slot_processed: None, + slot_leader: None, + timed_out: true, + priority_fees: sent_record.priority_fees, + }); + to_remove.push(signature.clone()); + } + } + + for signature in to_remove { + transaction_map.remove(&signature); + } + + // if exit and all the transactions are processed + if exit_signal.load(Ordering::Relaxed) && transaction_map.len() == 0 { + break; + } + } + } + }) + }; + vec![confirming_task, cleaner_jh] +} + pub fn confirmations_by_blocks( client: Arc, mut tx_record_rx: UnboundedReceiver, diff --git a/src/crank.rs b/src/crank.rs index 7a702e1..8113124 100644 --- a/src/crank.rs +++ b/src/crank.rs @@ -107,10 +107,7 @@ pub fn start( }; let tpu_manager = tpu_manager.clone(); - tokio::spawn(async move { - let ok = tpu_manager.send_transaction(&tx, tx_send_record).await; - trace!("send tx={:?} ok={ok}", tx.signatures[0]); - }); + tpu_manager.send_transaction(&tx, tx_send_record).await; } } }); diff --git a/src/helpers.rs b/src/helpers.rs index fce88f9..41b58f2 100644 --- a/src/helpers.rs +++ b/src/helpers.rs @@ -13,7 +13,7 @@ use fixed::types::I80F48; use log::{debug, info}; use mango::state::{MangoCache, MangoGroup, PerpMarket}; use mango_common::Loadable; -use solana_client::rpc_client::RpcClient; +use solana_client::nonblocking::rpc_client::RpcClient; use solana_program::{clock::DEFAULT_MS_PER_SLOT, pubkey::Pubkey}; use solana_sdk::hash::Hash; use tokio::{sync::RwLock, task::JoinHandle}; @@ -53,14 +53,14 @@ pub fn to_sdk_instruction( } } -pub fn load_from_rpc(rpc_client: &RpcClient, pk: &Pubkey) -> T { - let acc = rpc_client.get_account(&to_sdk_pk(pk)).unwrap(); +pub async fn load_from_rpc(rpc_client: &RpcClient, pk: &Pubkey) -> T { + let acc = rpc_client.get_account(&to_sdk_pk(pk)).await.unwrap(); return T::load_from_bytes(acc.data.as_slice()).unwrap().clone(); } pub async fn get_latest_blockhash(rpc_client: &RpcClient) -> Hash { loop { - match rpc_client.get_latest_blockhash() { + match rpc_client.get_latest_blockhash().await { Ok(blockhash) => return blockhash, Err(err) => { info!("Couldn't get last blockhash: {:?}", err); @@ -73,7 +73,7 @@ pub async fn get_latest_blockhash(rpc_client: &RpcClient) -> Hash { pub async fn get_new_latest_blockhash(client: Arc, blockhash: &Hash) -> Option { let start = Instant::now(); while start.elapsed().as_secs() < 5 { - if let Ok(new_blockhash) = client.get_latest_blockhash() { + if let Ok(new_blockhash) = client.get_latest_blockhash().await { if new_blockhash != *blockhash { debug!("Got new blockhash ({:?})", blockhash); return Some(new_blockhash); @@ -103,7 +103,7 @@ pub async fn poll_blockhash_and_slot( break; } - match client.get_slot() { + match client.get_slot().await { Ok(new_slot) => slot.store(new_slot, Ordering::Release), Err(e) => { info!("Failed to download slot: {}, skip", e); @@ -147,72 +147,68 @@ pub fn start_blockhash_polling_service( }) } -pub fn get_mango_market_perps_cache( +pub async fn get_mango_market_perps_cache( rpc_client: Arc, mango_group_config: &GroupConfig, mango_program_pk: &Pubkey, ) -> Vec { // fetch group let mango_group_pk = Pubkey::from_str(mango_group_config.public_key.as_str()).unwrap(); - let mango_group = load_from_rpc::(&rpc_client, &mango_group_pk); + let mango_group = load_from_rpc::(&rpc_client, &mango_group_pk).await; let mango_cache_pk = Pubkey::from_str(mango_group.mango_cache.to_string().as_str()).unwrap(); - let mango_cache = load_from_rpc::(&rpc_client, &mango_cache_pk); + let mango_cache = load_from_rpc::(&rpc_client, &mango_cache_pk).await; + let mut ret = vec![]; + for market_index in 0..mango_group_config.perp_markets.len() { + let perp_maket_config = &mango_group_config.perp_markets[market_index]; + let perp_market_pk = Pubkey::from_str(perp_maket_config.public_key.as_str()).unwrap(); + let perp_market = load_from_rpc::(&rpc_client, &perp_market_pk).await; - mango_group_config - .perp_markets - .iter() - .enumerate() - .map(|(market_index, perp_maket_config)| { - let perp_market_pk = Pubkey::from_str(perp_maket_config.public_key.as_str()).unwrap(); - let perp_market = load_from_rpc::(&rpc_client, &perp_market_pk); + // fetch price + let base_decimals = mango_group_config.tokens[market_index].decimals; + let quote_decimals = mango_group_config.tokens[0].decimals; - // fetch price - let base_decimals = mango_group_config.tokens[market_index].decimals; - let quote_decimals = mango_group_config.tokens[0].decimals; + let base_unit = I80F48::from_num(10u64.pow(base_decimals as u32)); + let quote_unit = I80F48::from_num(10u64.pow(quote_decimals as u32)); + let price = mango_cache.price_cache[market_index].price; + println!( + "market index {} price of : {}", + market_index, mango_cache.price_cache[market_index].price + ); - let base_unit = I80F48::from_num(10u64.pow(base_decimals as u32)); - let quote_unit = I80F48::from_num(10u64.pow(quote_decimals as u32)); - let price = mango_cache.price_cache[market_index].price; - println!( - "market index {} price of : {}", - market_index, mango_cache.price_cache[market_index].price - ); + let price_quote_lots: i64 = price + .mul(quote_unit) + .mul(I80F48::from_num(perp_market.base_lot_size)) + .div(I80F48::from_num(perp_market.quote_lot_size)) + .div(base_unit) + .to_num(); + let order_base_lots: i64 = base_unit + .div(I80F48::from_num(perp_market.base_lot_size)) + .to_num(); - let price_quote_lots: i64 = price - .mul(quote_unit) - .mul(I80F48::from_num(perp_market.base_lot_size)) - .div(I80F48::from_num(perp_market.quote_lot_size)) - .div(base_unit) - .to_num(); - let order_base_lots: i64 = base_unit - .div(I80F48::from_num(perp_market.base_lot_size)) - .to_num(); - - let root_bank = &mango_group_config.tokens[market_index].root_key; - let root_bank = Pubkey::from_str(root_bank.as_str()).unwrap(); - let node_banks = mango_group_config.tokens[market_index] - .node_keys - .iter() - .map(|x| Pubkey::from_str(x.as_str()).unwrap()) - .collect(); - let price_oracle = - Pubkey::from_str(mango_group_config.oracles[market_index].public_key.as_str()) - .unwrap(); - PerpMarketCache { - order_base_lots, - price, - price_quote_lots, - mango_program_pk: mango_program_pk.clone(), - mango_group_pk, - mango_cache_pk, - perp_market_pk, - perp_market, - root_bank, - node_banks, - price_oracle, - bids: perp_market.bids, - asks: perp_market.asks, - } - }) - .collect() + let root_bank = &mango_group_config.tokens[market_index].root_key; + let root_bank = Pubkey::from_str(root_bank.as_str()).unwrap(); + let node_banks = mango_group_config.tokens[market_index] + .node_keys + .iter() + .map(|x| Pubkey::from_str(x.as_str()).unwrap()) + .collect(); + let price_oracle = + Pubkey::from_str(mango_group_config.oracles[market_index].public_key.as_str()).unwrap(); + ret.push(PerpMarketCache { + order_base_lots, + price, + price_quote_lots, + mango_program_pk: mango_program_pk.clone(), + mango_group_pk, + mango_cache_pk, + perp_market_pk, + perp_market, + root_bank, + node_banks, + price_oracle, + bids: perp_market.bids, + asks: perp_market.asks, + }); + } + ret } diff --git a/src/main.rs b/src/main.rs index d49d6ff..08ce542 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,8 +1,12 @@ +use mango_simulation::confirmation_strategies::confirmation_by_lite_rpc_notification_stream; +use solana_lite_rpc_core::{block_store::BlockStore, tx_store::{TxStore, empty_tx_store}, notifications::NotificationMsg}; +use solana_lite_rpc_services::{transaction_service::{TransactionService, TransactionServiceBuilder}, tx_sender::TxSender, tpu_utils::tpu_service::TpuService, block_listenser::{BlockListener}, transaction_replayer::TransactionReplayer}; +use tokio::sync::mpsc::{UnboundedSender, unbounded_channel}; + use { log::info, mango_simulation::{ cli, - confirmation_strategies::confirmations_by_blocks, crank::{self, KeeperConfig}, helpers::{ get_latest_blockhash, get_mango_market_perps_cache, start_blockhash_polling_service, @@ -17,7 +21,7 @@ use { tpu_manager::TpuManager, }, serde_json, - solana_client::{nonblocking::rpc_client::RpcClient as NbRpcClient, rpc_client::RpcClient}, + solana_client::nonblocking::rpc_client::RpcClient as NbRpcClient, solana_program::pubkey::Pubkey, solana_sdk::{commitment_config::CommitmentConfig, signer::keypair::Keypair}, std::{ @@ -32,6 +36,16 @@ use { const METRICS_NAME: &str = "mango-bencher"; +async fn configure_transaction_service(rpc_client: Arc, ws_address: String, identity: Keypair, block_store: BlockStore, tx_store: TxStore, notifier: UnboundedSender ) -> (TransactionService, JoinHandle) { + let slot = rpc_client.get_slot().await.expect("GetSlot should work"); + let tpu_service = TpuService::new(slot, 8, Arc::new(identity), rpc_client.clone(), ws_address, tx_store.clone()).await.expect("Should be able to create TPU"); + let tx_sender = TxSender::new(tx_store.clone(), tpu_service.clone()); + let block_listenser = BlockListener::new(rpc_client.clone(), tx_store.clone(), block_store.clone()); + let replayer = TransactionReplayer::new(tpu_service.clone(), tx_store.clone(), Duration::from_secs(2)); + let builder = TransactionServiceBuilder::new(tx_sender, replayer, block_listenser, tpu_service, 1_000_000); + builder.start(Some(notifier), block_store, 10, Duration::from_secs(300)).await +} + #[tokio::main(flavor = "multi_thread", worker_threads = 10)] pub async fn main() -> anyhow::Result<()> { solana_logger::setup_with_default("info"); @@ -43,7 +57,7 @@ pub async fn main() -> anyhow::Result<()> { let cli::Config { json_rpc_url, websocket_url, - id, + identity, account_keys, mango_keys, duration, @@ -81,13 +95,18 @@ pub async fn main() -> anyhow::Result<()> { .groups .iter() .find(|g| g.name == *mango_group_id) - .unwrap(); + .expect("Mango group config should exist"); let nb_rpc_client = Arc::new(NbRpcClient::new_with_commitment( json_rpc_url.to_string(), CommitmentConfig::confirmed(), )); + let tx_store = empty_tx_store(); + let block_store = BlockStore::new(&nb_rpc_client).await.expect("Blockstore should be created"); + let (notif_sx, notif_rx) = unbounded_channel(); + let (transaction_service, tx_service_jh) = configure_transaction_service(nb_rpc_client.clone(), websocket_url.clone(), Keypair::from_bytes(identity.to_bytes().as_slice()).unwrap(), block_store, tx_store, notif_sx).await; + let nb_users = account_keys_parsed.len(); let mut mango_sim_stats = MangoSimulationStats::new( @@ -99,17 +118,24 @@ pub async fn main() -> anyhow::Result<()> { let (tx_record_sx, tx_record_rx) = tokio::sync::mpsc::unbounded_channel(); - let tpu_manager = TpuManager::new( + // continuosly fetch blockhash + let exit_signal = Arc::new(AtomicBool::new(false)); + let latest_blockhash = get_latest_blockhash(&nb_rpc_client.clone()).await; + let blockhash = Arc::new(RwLock::new(latest_blockhash)); + let current_slot = Arc::new(AtomicU64::new(0)); + let blockhash_thread = start_blockhash_polling_service( + exit_signal.clone(), + blockhash.clone(), + current_slot.clone(), nb_rpc_client.clone(), - websocket_url.clone(), - solana_client::tpu_client::TpuClientConfig::default().fanout_slots, - Keypair::from_bytes(id.to_bytes().as_slice()).unwrap(), + ); + + let tpu_manager = TpuManager::new( + transaction_service, mango_sim_stats.clone(), tx_record_sx.clone(), ) - .await; - - tpu_manager.force_reset_after_every(Duration::from_secs(300)); + .await?; info!( "accounts:{:?} markets:{:?} quotes_per_second:{:?} expected_tps:{:?} duration:{:?}", @@ -122,34 +148,20 @@ pub async fn main() -> anyhow::Result<()> { duration ); - // continuosly fetch blockhash - let rpc_client = Arc::new(RpcClient::new_with_commitment( - json_rpc_url.to_string(), - CommitmentConfig::finalized(), - )); - let exit_signal = Arc::new(AtomicBool::new(false)); - let latest_blockhash = get_latest_blockhash(&rpc_client.clone()).await; - let blockhash = Arc::new(RwLock::new(latest_blockhash)); - let current_slot = Arc::new(AtomicU64::new(0)); - let blockhash_thread = start_blockhash_polling_service( - exit_signal.clone(), - blockhash.clone(), - current_slot.clone(), - rpc_client.clone(), - ); - let mango_program_pk = Pubkey::from_str(mango_group_config.mango_program_id.as_str()).unwrap(); + let mango_program_pk = Pubkey::from_str(mango_group_config.mango_program_id.as_str()).expect("Mango program should be able to convert into pubkey"); let perp_market_caches: Vec = - get_mango_market_perps_cache(rpc_client.clone(), mango_group_config, &mango_program_pk); + get_mango_market_perps_cache(nb_rpc_client.clone(), mango_group_config, &mango_program_pk) + .await; let quote_root_bank = - Pubkey::from_str(mango_group_config.tokens.last().unwrap().root_key.as_str()).unwrap(); + Pubkey::from_str(mango_group_config.tokens.last().unwrap().root_key.as_str()).expect("Quote root bank should be able to convert into pubkey");(); let quote_node_banks = mango_group_config .tokens .last() .unwrap() .node_keys .iter() - .map(|x| Pubkey::from_str(x.as_str()).unwrap()) + .map(|x| Pubkey::from_str(x.as_str()).expect("Token mint should be able to convert into pubkey")) .collect(); clean_market_makers( @@ -177,7 +189,7 @@ pub async fn main() -> anyhow::Result<()> { } else { None }; - let from_slot = current_slot.load(Ordering::Relaxed); + let keeper_config = KeeperConfig { program_id: to_sdk_pk(&mango_program_pk), rpc_url: json_rpc_url.clone(), @@ -191,7 +203,7 @@ pub async fn main() -> anyhow::Result<()> { current_slot.clone(), tpu_manager.clone(), mango_group_config, - id, + identity, keeper_prioritization, ); @@ -205,7 +217,7 @@ pub async fn main() -> anyhow::Result<()> { exit_signal.clone(), blockhash.clone(), current_slot.clone(), - tpu_manager, + tpu_manager.clone(), &duration, *quotes_per_second, *priority_fees_proba, @@ -214,8 +226,7 @@ pub async fn main() -> anyhow::Result<()> { info!("Number of MM threads {}", mm_tasks.len()); drop(tx_record_sx); - let mut tasks = vec![]; - tasks.push(blockhash_thread); + let mut tasks = vec![blockhash_thread]; let (tx_status_sx, tx_status_rx) = tokio::sync::broadcast::channel(1000000); let (block_status_sx, block_status_rx) = tokio::sync::broadcast::channel(1000000); @@ -231,13 +242,12 @@ pub async fn main() -> anyhow::Result<()> { ); tasks.append(&mut writers_jh); - let mut confirmation_threads = confirmations_by_blocks( - nb_rpc_client, + let mut confirmation_threads = confirmation_by_lite_rpc_notification_stream( tx_record_rx, + notif_rx, tx_status_sx, block_status_sx, - from_slot, - exit_signal.clone(), + exit_signal.clone() ); tasks.append(&mut confirmation_threads); @@ -264,12 +274,30 @@ pub async fn main() -> anyhow::Result<()> { // we start stopping all other process // some processes like confirmation of transactions will take some time and will get additional 2 minutes // to confirm remaining transactions - futures::future::join_all(mm_tasks).await; - info!("finished market making, joining all other services"); - println!("finished market making, joining all other services"); - exit_signal.store(true, Ordering::Relaxed); + let market_makers_wait_task = { + let exit_signal = exit_signal.clone(); + tokio::spawn(async move { + futures::future::join_all(mm_tasks).await; + info!("finished market making, joining all other services"); + exit_signal.store(true, Ordering::Relaxed); + }) + }; - futures::future::join_all(tasks).await; + let other_tasks_wait_task = { + let tpu_manager = tpu_manager.clone(); + tokio::spawn(async move { + futures::future::join_all(tasks).await; + info!("finished joining all other services, joining TransactionService"); + tpu_manager.stop(); + }) + }; + + let transaction_service = tokio::spawn(async move { + let _ = tx_service_jh.await; + info!("Transaction service joined"); + }); + + futures::future::join_all([market_makers_wait_task, other_tasks_wait_task, transaction_service]).await; mango_sim_stats.report(true, METRICS_NAME).await; Ok(()) } diff --git a/src/mango_v3_perp_crank_sink.rs b/src/mango_v3_perp_crank_sink.rs index b5bd938..41e10f0 100644 --- a/src/mango_v3_perp_crank_sink.rs +++ b/src/mango_v3_perp_crank_sink.rs @@ -1,4 +1,9 @@ -use std::{cell::RefCell, collections::{BTreeMap, HashSet}, convert::TryFrom, mem::size_of}; +use std::{ + cell::RefCell, + collections::{BTreeMap, HashSet}, + convert::TryFrom, + mem::size_of, +}; use arrayref::array_ref; use async_channel::Sender; @@ -95,27 +100,22 @@ impl AccountWriteSink for MangoV3PerpCrankSink { trace!("evq {pk:?} seq_num={seq_num} len={len} contains_fill_events={contains_fill_events} has_backlog={has_backlog}"); let mut mango_accounts = HashSet::new(); - event_queue - .iter() - .take(MAX_EVENTS_PER_TX) - .for_each(|e| - if mango_accounts.len() < MAX_ACCS_PER_TX { - match EventType::try_from(e.event_type).expect("mango v4 event") { - EventType::Fill => { - let fill: &FillEvent = cast_ref(e); - mango_accounts.insert(fill.maker); - mango_accounts.insert(fill.taker); - } - EventType::Out => { - let out: &OutEvent = cast_ref(e); - mango_accounts.insert(out.owner); - } - EventType::Liquidate => { - - } + event_queue.iter().take(MAX_EVENTS_PER_TX).for_each(|e| { + if mango_accounts.len() < MAX_ACCS_PER_TX { + match EventType::try_from(e.event_type).expect("mango v4 event") { + EventType::Fill => { + let fill: &FillEvent = cast_ref(e); + mango_accounts.insert(fill.maker); + mango_accounts.insert(fill.taker); } + EventType::Out => { + let out: &OutEvent = cast_ref(e); + mango_accounts.insert(out.owner); + } + EventType::Liquidate => {} } - ); + } + }); let pk = solana_sdk::pubkey::Pubkey::new_from_array(pk.to_bytes()); let mkt_pk = self @@ -130,7 +130,10 @@ impl AccountWriteSink for MangoV3PerpCrankSink { &to_sp_pk(&self.cache_pk), &to_sp_pk(mkt_pk), &to_sp_pk(&pk), - &mut mango_accounts.iter().map(|pk| pk.clone()).collect::>(), + &mut mango_accounts + .iter() + .map(|pk| pk.clone()) + .collect::>(), MAX_EVENTS_PER_TX, ) .unwrap(), diff --git a/src/market_markers.rs b/src/market_markers.rs index ff95d4d..4605a91 100644 --- a/src/market_markers.rs +++ b/src/market_markers.rs @@ -317,49 +317,52 @@ pub async fn clean_market_makers( blockhash: Arc>, ) { info!("Cleaning previous transactions by market makers"); - let mut tasks = vec![]; - for market_maker in account_keys_parsed { - let mango_account_pk = - Pubkey::from_str(market_maker.mango_account_pks[0].as_str()).unwrap(); - for perp_market in perp_market_caches { - let market_maker = market_maker.clone(); - let perp_market = perp_market.clone(); - let rpc_client = rpc_client.clone(); - let mango_account_pk = mango_account_pk.clone(); - let blockhash = blockhash.clone(); + for account_keys_parsed in account_keys_parsed.chunks(10) { + let mut tasks = vec![]; + for market_maker in account_keys_parsed { + let mango_account_pk = + Pubkey::from_str(market_maker.mango_account_pks[0].as_str()).unwrap(); + for perp_market in perp_market_caches { + let market_maker = market_maker.clone(); + let perp_market = perp_market.clone(); + let rpc_client = rpc_client.clone(); + let mango_account_pk = mango_account_pk.clone(); + let blockhash = blockhash.clone(); - let task = tokio::spawn(async move { - let mango_account_signer = - Keypair::from_bytes(market_maker.secret_key.as_slice()).unwrap(); + let task = tokio::spawn(async move { + let mango_account_signer = + Keypair::from_bytes(market_maker.secret_key.as_slice()).unwrap(); - for _ in 0..10 { - let mut tx = create_cancel_all_orders( - &perp_market, - mango_account_pk, - &mango_account_signer, - ); + for _ in 0..10 { + let mut tx = create_cancel_all_orders( + &perp_market, + mango_account_pk, + &mango_account_signer, + ); - let recent_blockhash = *blockhash.read().await; - tx.sign(&[&mango_account_signer], recent_blockhash); - // send and confirm the transaction with an RPC - if let Ok(res) = tokio::time::timeout( - Duration::from_secs(10), - rpc_client.send_and_confirm_transaction(&tx), - ) - .await - { - match res { - Ok(_) => break, - Err(e) => info!("Error occured while doing cancel all for ma : {} and perp market : {} error : {}", mango_account_pk, perp_market.perp_market_pk, e), + let recent_blockhash = *blockhash.read().await; + tx.sign(&[&mango_account_signer], recent_blockhash); + let sig = tx.signatures[0]; + // send and confirm the transaction with an RPC + if let Ok(res) = tokio::time::timeout( + Duration::from_secs(10), + rpc_client.send_and_confirm_transaction(&tx), + ) + .await + { + match res { + Ok(_) => break, + Err(e) => info!("Error occured while doing cancel all for ma : {}, sig : {} perp market : {} error : {}", mango_account_pk, sig, perp_market.perp_market_pk, e), + } } } - } - }); - tasks.push(task); + }); + tasks.push(task); + } } - } - futures::future::join_all(tasks).await; + futures::future::join_all(tasks).await; + } info!("finished cleaning market makers"); } diff --git a/src/noop.rs b/src/noop.rs index 9a89ddc..7b3f917 100644 --- a/src/noop.rs +++ b/src/noop.rs @@ -1,6 +1,6 @@ use chrono::Utc; -use std::str::FromStr; use solana_sdk::{instruction::Instruction, pubkey::Pubkey}; +use std::str::FromStr; pub fn instruction(data: Vec) -> Instruction { Instruction { diff --git a/src/stats.rs b/src/stats.rs index 47b15ef..af2136c 100644 --- a/src/stats.rs +++ b/src/stats.rs @@ -1,16 +1,17 @@ use std::{ + collections::HashMap, sync::Mutex, sync::{ atomic::{AtomicU64, Ordering}, Arc, }, - time::Instant, collections::HashMap, + time::Instant, }; use crate::states::{KeeperInstruction, TransactionConfirmRecord}; use iter_tools::Itertools; use solana_metrics::datapoint_info; -use tokio::{task::JoinHandle, sync::RwLock}; +use tokio::{sync::RwLock, task::JoinHandle}; // Non atomic version of counters #[derive(Clone, Default, Debug)] @@ -49,7 +50,7 @@ impl NACounters { pub fn diff(&self, other: &NACounters) -> NACounters { let mut new_error_count = HashMap::new(); for (error, count) in &self.errors { - if let Some(v) = other.errors.get( error ) { + if let Some(v) = other.errors.get(error) { new_error_count.insert(error.clone(), *count - *v); } else { new_error_count.insert(error.clone(), *count); @@ -383,7 +384,13 @@ impl MangoSimulationStats { .checked_div(counters.num_sent) .unwrap_or(0) ); - let top_5_errors = counters.errors.iter().sorted_by(|x,y| {(*y.1).cmp(x.1)}).take(5).enumerate().collect_vec(); + let top_5_errors = counters + .errors + .iter() + .sorted_by(|x, y| (*y.1).cmp(x.1)) + .take(5) + .enumerate() + .collect_vec(); let mut errors_to_print: String = String::new(); for (idx, (error, count)) in top_5_errors { println!("Error #{idx} : {error} ({count})"); @@ -489,11 +496,7 @@ impl MangoSimulationStats { diff.succ_update_funding_txs, i64 ), - ( - "top_5_errors", - errors_to_print, - String - ) + ("top_5_errors", errors_to_print, String) ); } } diff --git a/src/tpu_manager.rs b/src/tpu_manager.rs index 8ced98a..9ab2072 100644 --- a/src/tpu_manager.rs +++ b/src/tpu_manager.rs @@ -1,138 +1,33 @@ -use bincode::serialize; -use log::{info, warn, error}; -use solana_client::nonblocking::rpc_client::RpcClient; -use solana_client::{connection_cache::ConnectionCache, nonblocking::tpu_client::TpuClient}; -use solana_quic_client::{QuicConfig, QuicConnectionManager, QuicPool}; -use solana_sdk::signature::Keypair; +use log::warn; +use solana_client::connection_cache::ConnectionCache; +use solana_lite_rpc_services::{transaction_service::{TransactionService}}; use solana_sdk::transaction::Transaction; -use std::time::Duration; -use std::{ - net::{IpAddr, Ipv4Addr}, - sync::{ - atomic::{AtomicU32, Ordering}, - Arc, - }, -}; -use tokio::sync::{mpsc::UnboundedSender, RwLock}; + +use tokio::sync::mpsc::UnboundedSender; use crate::{states::TransactionSendRecord, stats::MangoSimulationStats}; - -pub type QuicTpuClient = TpuClient; pub type QuicConnectionCache = ConnectionCache; #[derive(Clone)] pub struct TpuManager { - error_count: Arc, - rpc_client: Arc, // why arc twice / one is so that we clone rwlock and other so that we can clone tpu client - tpu_client: Arc>>, - pub ws_addr: String, - fanout_slots: u64, - identity: Arc, + transaction_service: TransactionService, stats: MangoSimulationStats, tx_send_record: UnboundedSender, } impl TpuManager { pub async fn new( - rpc_client: Arc, - ws_addr: String, - fanout_slots: u64, - identity: Keypair, + transaction_service: TransactionService, stats: MangoSimulationStats, tx_send_record: UnboundedSender, - ) -> Self { - let connection_cache = ConnectionCache::new_with_client_options( - 4, - None, - Some((&identity, IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)))), - None, - ); - let quic_connection_cache = - if let ConnectionCache::Quic(connection_cache) = connection_cache { - Some(connection_cache) - } else { - None - }; + ) -> anyhow::Result { - let tpu_client = Arc::new( - TpuClient::new_with_connection_cache( - rpc_client.clone(), - &ws_addr, - solana_client::tpu_client::TpuClientConfig { fanout_slots }, - quic_connection_cache.unwrap(), - ) - .await - .unwrap(), - ); - - Self { - rpc_client, - tpu_client: Arc::new(RwLock::new(tpu_client)), - ws_addr, - fanout_slots, - error_count: Default::default(), - identity: Arc::new(identity), + Ok(Self { + transaction_service, stats, tx_send_record, - } - } - - pub async fn reset_tpu_client(&self) -> anyhow::Result<()> { - let identity = Keypair::from_bytes(&self.identity.to_bytes()).unwrap(); - let connection_cache = ConnectionCache::new_with_client_options( - 4, - None, - Some((&identity, IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)))), - None, - ); - let quic_connection_cache = - if let ConnectionCache::Quic(connection_cache) = connection_cache { - Some(connection_cache) - } else { - None - }; - - let tpu_client = Arc::new( - TpuClient::new_with_connection_cache( - self.rpc_client.clone(), - &self.ws_addr, - solana_client::tpu_client::TpuClientConfig { - fanout_slots: self.fanout_slots, - }, - quic_connection_cache.unwrap(), - ) - .await - .unwrap(), - ); - self.error_count.store(0, Ordering::Relaxed); - *self.tpu_client.write().await = tpu_client; - Ok(()) - } - - pub async fn reset(&self) -> anyhow::Result<()> { - self.error_count.fetch_add(1, Ordering::Relaxed); - - if self.error_count.load(Ordering::Relaxed) > 5 { - self.reset_tpu_client().await?; - info!("TPU Reset after 5 errors"); - } - - Ok(()) - } - - pub fn force_reset_after_every(&self, duration: Duration) { - let this = self.clone(); - tokio::spawn(async move { - tokio::time::sleep(duration).await; - if let Err(e) = this.reset_tpu_client().await { - error!("timely restart of tpu client failed {}", e); - } - }); - } - - async fn get_tpu_client(&self) -> Arc { - self.tpu_client.read().await.clone() + }) } pub async fn send_transaction( @@ -140,7 +35,6 @@ impl TpuManager { transaction: &solana_sdk::transaction::Transaction, transaction_sent_record: TransactionSendRecord, ) -> bool { - let tpu_client = self.get_tpu_client().await; self.stats .inc_send(&transaction_sent_record.keeper_instruction); @@ -151,41 +45,26 @@ impl TpuManager { "sending error on channel : {}", sent.err().unwrap().to_string() ); - if let Err(e) = self.reset().await { - error!("error while reseting tpu client {}", e); - } } + let transaction = bincode::serialize(transaction).unwrap(); - tpu_client.send_transaction(transaction).await + self.transaction_service + .send_transaction(transaction, None).await + .is_ok() } pub async fn send_transaction_batch( &self, batch: &Vec<(Transaction, TransactionSendRecord)>, ) -> bool { - let tpu_client = self.get_tpu_client().await; - - for (_tx, record) in batch { - self.stats.inc_send(&record.keeper_instruction); - - let tx_sent_record = self.tx_send_record.clone(); - let sent = tx_sent_record.send(record.clone()); - if sent.is_err() { - warn!( - "sending error on channel : {}", - sent.err().unwrap().to_string() - ); - } + let mut value = true; + for (tx, record) in batch { + value &= self.send_transaction(tx, record.clone()).await; } + value + } - tpu_client - .try_send_wire_transaction_batch( - batch - .iter() - .map(|(tx, _)| serialize(tx).expect("serialization should succeed")) - .collect(), - ) - .await - .is_ok() + pub fn stop(&self) { + self.transaction_service.stop(); } }