From 7853684abf24d4d82b9984b39a11862692ff41a2 Mon Sep 17 00:00:00 2001 From: Godmode Galactus Date: Fri, 23 Jun 2023 15:30:02 +0200 Subject: [PATCH] moving lite-rpc libraries to version v0.2.2 --- src/confirmation_strategies.rs | 7 +- src/main.rs | 113 +++++++++++++++++++++++---------- src/tpu_manager.rs | 6 +- 3 files changed, 85 insertions(+), 41 deletions(-) diff --git a/src/confirmation_strategies.rs b/src/confirmation_strategies.rs index ec45686..f1d5f36 100644 --- a/src/confirmation_strategies.rs +++ b/src/confirmation_strategies.rs @@ -161,7 +161,8 @@ pub fn confirmation_by_lite_rpc_notification_stream( tx_block_data: tokio::sync::broadcast::Sender, exit_signal: Arc, ) -> Vec> { - let transaction_map: Arc> = Arc::new(DashMap::new()); + let transaction_map: Arc> = + Arc::new(DashMap::new()); let confirming_task = { let transaction_map = transaction_map.clone(); @@ -170,7 +171,7 @@ pub fn confirmation_by_lite_rpc_notification_stream( tokio::spawn(async move { let mut tx_record_rx = tx_record_rx; let mut notification_stream = notification_stream; - + while !transaction_map.is_empty() || !exit_signal.load(Ordering::Relaxed) { tokio::select! { transaction_record = tx_record_rx.recv() => { @@ -182,7 +183,7 @@ pub fn confirmation_by_lite_rpc_notification_stream( }, notification = notification_stream.recv() => { if let Some(notification) = notification { - + match notification { NotificationMsg::BlockNotificationMsg(block_notification) => { let _ = tx_block_data.send(BlockData { diff --git a/src/main.rs b/src/main.rs index 72bf1e4..3338dc5 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,6 +2,7 @@ use { log::info, mango_simulation::{ cli, + confirmation_strategies::confirmation_by_lite_rpc_notification_stream, crank::{self, KeeperConfig}, helpers::{ get_latest_blockhash, get_mango_market_perps_cache, start_blockhash_polling_service, @@ -14,10 +15,22 @@ use { states::PerpMarketCache, stats::MangoSimulationStats, tpu_manager::TpuManager, - confirmation_strategies::confirmation_by_lite_rpc_notification_stream, }, serde_json, solana_client::nonblocking::rpc_client::RpcClient as NbRpcClient, + solana_lite_rpc_core::{ + block_store::BlockStore, + notifications::NotificationMsg, + quic_connection_utils::QuicConnectionParameters, + tx_store::{empty_tx_store, TxStore}, + }, + solana_lite_rpc_services::{ + block_listenser::BlockListener, + tpu_utils::tpu_service::{TpuService, TpuServiceConfig}, + transaction_replayer::TransactionReplayer, + transaction_service::{TransactionService, TransactionServiceBuilder}, + tx_sender::TxSender, + }, solana_program::pubkey::Pubkey, solana_sdk::{commitment_config::CommitmentConfig, signer::keypair::Keypair}, std::{ @@ -27,34 +40,38 @@ use { sync::Arc, time::Duration, }, + tokio::sync::mpsc::{unbounded_channel, UnboundedSender}, tokio::{sync::RwLock, task::JoinHandle}, - solana_lite_rpc_core::{block_store::BlockStore, tx_store::{TxStore, empty_tx_store}, notifications::NotificationMsg, quic_connection_utils::QuicConnectionParameters}, - solana_lite_rpc_services::{transaction_service::{TransactionService, TransactionServiceBuilder}, tx_sender::TxSender, tpu_utils::tpu_service::{TpuService, TpuServiceConfig}, block_listenser::{BlockListener}, transaction_replayer::TransactionReplayer}, - tokio::sync::mpsc::{UnboundedSender, unbounded_channel}, }; const METRICS_NAME: &str = "mango-bencher"; -async fn configure_transaction_service(rpc_client: Arc, identity: Keypair, block_store: BlockStore, tx_store: TxStore, notifier: UnboundedSender ) -> (TransactionService, JoinHandle>) { +async fn configure_transaction_service( + rpc_client: Arc, + identity: Keypair, + block_store: BlockStore, + tx_store: TxStore, + notifier: UnboundedSender, +) -> (TransactionService, JoinHandle>) { let slot = rpc_client.get_slot().await.expect("GetSlot should work"); let tpu_config = TpuServiceConfig { - fanout_slots: 12, - number_of_leaders_to_cache: 1024, - clusterinfo_refresh_time: Duration::from_secs(60*60), - leader_schedule_update_frequency: Duration::from_secs(10), - maximum_transaction_in_queue: 200_000, - maximum_number_of_errors: 10, - quic_connection_params : QuicConnectionParameters { - connection_timeout: Duration::from_secs(1), - connection_retry_count: 10, - finalize_timeout: Duration::from_millis(200), - max_number_of_connections: 10, - unistream_timeout: Duration::from_millis(500), - write_timeout: Duration::from_secs(1), - number_of_transactions_per_unistream: 10, - } - }; - + fanout_slots: 12, + number_of_leaders_to_cache: 1024, + clusterinfo_refresh_time: Duration::from_secs(60 * 60), + leader_schedule_update_frequency: Duration::from_secs(10), + maximum_transaction_in_queue: 200_000, + maximum_number_of_errors: 10, + quic_connection_params: QuicConnectionParameters { + connection_timeout: Duration::from_secs(1), + connection_retry_count: 10, + finalize_timeout: Duration::from_millis(200), + max_number_of_connections: 10, + unistream_timeout: Duration::from_millis(500), + write_timeout: Duration::from_secs(1), + number_of_transactions_per_unistream: 10, + }, + }; + let tpu_service = TpuService::new( tpu_config, Arc::new(identity), @@ -62,12 +79,24 @@ async fn configure_transaction_service(rpc_client: Arc, identity: K rpc_client.clone(), tx_store.clone(), ) - .await.expect("Should be able to create TPU"); - + .await + .expect("Should be able to create TPU"); + let tx_sender = TxSender::new(tx_store.clone(), tpu_service.clone()); - let block_listenser = BlockListener::new(rpc_client.clone(), tx_store.clone(), block_store.clone()); - let replayer = TransactionReplayer::new(tpu_service.clone(), tx_store.clone(), Duration::from_secs(2)); - let builder = TransactionServiceBuilder::new(tx_sender, replayer, block_listenser, tpu_service, 1_000_000); + let block_listenser = + BlockListener::new(rpc_client.clone(), tx_store.clone(), block_store.clone()); + let replayer = TransactionReplayer::new( + tpu_service.clone(), + tx_store.clone(), + Duration::from_secs(2), + ); + let builder = TransactionServiceBuilder::new( + tx_sender, + replayer, + block_listenser, + tpu_service, + 1_000_000, + ); builder.start(Some(notifier), block_store, 10, Duration::from_secs(90)) } @@ -128,9 +157,18 @@ pub async fn main() -> anyhow::Result<()> { )); let tx_store = empty_tx_store(); - let block_store = BlockStore::new(&nb_rpc_client).await.expect("Blockstore should be created"); + let block_store = BlockStore::new(&nb_rpc_client) + .await + .expect("Blockstore should be created"); let (notif_sx, notif_rx) = unbounded_channel(); - let (transaction_service, tx_service_jh) = configure_transaction_service(nb_rpc_client.clone(), Keypair::from_bytes(identity.to_bytes().as_slice()).unwrap(), block_store, tx_store, notif_sx).await; + let (transaction_service, tx_service_jh) = configure_transaction_service( + nb_rpc_client.clone(), + Keypair::from_bytes(identity.to_bytes().as_slice()).unwrap(), + block_store, + tx_store, + notif_sx, + ) + .await; let nb_users = account_keys_parsed.len(); @@ -173,20 +211,25 @@ pub async fn main() -> anyhow::Result<()> { duration ); - let mango_program_pk = Pubkey::from_str(mango_group_config.mango_program_id.as_str()).expect("Mango program should be able to convert into pubkey"); + let mango_program_pk = Pubkey::from_str(mango_group_config.mango_program_id.as_str()) + .expect("Mango program should be able to convert into pubkey"); let perp_market_caches: Vec = get_mango_market_perps_cache(nb_rpc_client.clone(), mango_group_config, &mango_program_pk) .await; let quote_root_bank = - Pubkey::from_str(mango_group_config.tokens.last().unwrap().root_key.as_str()).expect("Quote root bank should be able to convert into pubkey");(); + Pubkey::from_str(mango_group_config.tokens.last().unwrap().root_key.as_str()) + .expect("Quote root bank should be able to convert into pubkey"); + (); let quote_node_banks = mango_group_config .tokens .last() .unwrap() .node_keys .iter() - .map(|x| Pubkey::from_str(x.as_str()).expect("Token mint should be able to convert into pubkey")) + .map(|x| { + Pubkey::from_str(x.as_str()).expect("Token mint should be able to convert into pubkey") + }) .collect(); clean_market_makers( @@ -272,7 +315,7 @@ pub async fn main() -> anyhow::Result<()> { notif_rx, tx_status_sx, block_status_sx, - exit_signal.clone() + exit_signal.clone(), ); tasks.append(&mut confirmation_threads); @@ -315,11 +358,11 @@ pub async fn main() -> anyhow::Result<()> { info!("Transaction service joined"); }); - tokio::select!{ + tokio::select! { _ = futures::future::join_all(tasks) => {}, _ = transaction_service => {}, }; - + mango_sim_stats.report(true, METRICS_NAME).await; Ok(()) } diff --git a/src/tpu_manager.rs b/src/tpu_manager.rs index eda1b29..fdbb872 100644 --- a/src/tpu_manager.rs +++ b/src/tpu_manager.rs @@ -1,6 +1,6 @@ use log::warn; use solana_client::connection_cache::ConnectionCache; -use solana_lite_rpc_services::{transaction_service::{TransactionService}}; +use solana_lite_rpc_services::transaction_service::TransactionService; use solana_sdk::transaction::Transaction; use tokio::sync::mpsc::UnboundedSender; @@ -22,7 +22,6 @@ impl TpuManager { stats: MangoSimulationStats, tx_send_record: UnboundedSender, ) -> anyhow::Result { - Ok(Self { transaction_service, stats, @@ -49,7 +48,8 @@ impl TpuManager { let transaction = bincode::serialize(transaction).unwrap(); self.transaction_service - .send_transaction(transaction, None).await + .send_transaction(transaction, None) + .await .is_ok() }