diff --git a/Cargo.lock b/Cargo.lock index 0b3439c..5fd4559 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5690,7 +5690,7 @@ dependencies = [ [[package]] name = "solana-lite-rpc-core" version = "0.2.1" -source = "git+https://github.com/blockworks-foundation/lite-rpc.git?tag=v0.2.1#c1eed987f29417f8a3b8d147f43a112388f02e4f" +source = "git+https://github.com/blockworks-foundation/lite-rpc.git?tag=v0.2.2#b2a15ad913128e2266deed8b41a4c0c1c640aacc" dependencies = [ "anyhow", "async-trait", @@ -5722,7 +5722,7 @@ dependencies = [ [[package]] name = "solana-lite-rpc-services" version = "0.2.1" -source = "git+https://github.com/blockworks-foundation/lite-rpc.git?tag=v0.2.1#c1eed987f29417f8a3b8d147f43a112388f02e4f" +source = "git+https://github.com/blockworks-foundation/lite-rpc.git?tag=v0.2.2#b2a15ad913128e2266deed8b41a4c0c1c640aacc" dependencies = [ "anyhow", "async-channel", diff --git a/Cargo.toml b/Cargo.toml index d455012..7620605 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -47,8 +47,8 @@ solana-logger = "1.15.2" solana-transaction-status = "1.15.2" solana-account-decoder = "1.15.2" -solana-lite-rpc-core = { git = "https://github.com/blockworks-foundation/lite-rpc.git", tag = "v0.2.1" } -solana-lite-rpc-services = { git = "https://github.com/blockworks-foundation/lite-rpc.git", tag = "v0.2.1" } +solana-lite-rpc-core = { git = "https://github.com/blockworks-foundation/lite-rpc.git", tag = "v0.2.2" } +solana-lite-rpc-services = { git = "https://github.com/blockworks-foundation/lite-rpc.git", tag = "v0.2.2" } # pin program to mango-v3 version of solana sdk diff --git a/src/main.rs b/src/main.rs index 08ce542..72bf1e4 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,8 +1,3 @@ -use mango_simulation::confirmation_strategies::confirmation_by_lite_rpc_notification_stream; -use solana_lite_rpc_core::{block_store::BlockStore, tx_store::{TxStore, empty_tx_store}, notifications::NotificationMsg}; -use solana_lite_rpc_services::{transaction_service::{TransactionService, TransactionServiceBuilder}, tx_sender::TxSender, tpu_utils::tpu_service::TpuService, block_listenser::{BlockListener}, transaction_replayer::TransactionReplayer}; -use tokio::sync::mpsc::{UnboundedSender, unbounded_channel}; - use { log::info, mango_simulation::{ @@ -19,6 +14,7 @@ use { states::PerpMarketCache, stats::MangoSimulationStats, tpu_manager::TpuManager, + confirmation_strategies::confirmation_by_lite_rpc_notification_stream, }, serde_json, solana_client::nonblocking::rpc_client::RpcClient as NbRpcClient, @@ -32,18 +28,47 @@ use { time::Duration, }, tokio::{sync::RwLock, task::JoinHandle}, + solana_lite_rpc_core::{block_store::BlockStore, tx_store::{TxStore, empty_tx_store}, notifications::NotificationMsg, quic_connection_utils::QuicConnectionParameters}, + solana_lite_rpc_services::{transaction_service::{TransactionService, TransactionServiceBuilder}, tx_sender::TxSender, tpu_utils::tpu_service::{TpuService, TpuServiceConfig}, block_listenser::{BlockListener}, transaction_replayer::TransactionReplayer}, + tokio::sync::mpsc::{UnboundedSender, unbounded_channel}, }; const METRICS_NAME: &str = "mango-bencher"; -async fn configure_transaction_service(rpc_client: Arc, ws_address: String, identity: Keypair, block_store: BlockStore, tx_store: TxStore, notifier: UnboundedSender ) -> (TransactionService, JoinHandle) { +async fn configure_transaction_service(rpc_client: Arc, identity: Keypair, block_store: BlockStore, tx_store: TxStore, notifier: UnboundedSender ) -> (TransactionService, JoinHandle>) { let slot = rpc_client.get_slot().await.expect("GetSlot should work"); - let tpu_service = TpuService::new(slot, 8, Arc::new(identity), rpc_client.clone(), ws_address, tx_store.clone()).await.expect("Should be able to create TPU"); + let tpu_config = TpuServiceConfig { + fanout_slots: 12, + number_of_leaders_to_cache: 1024, + clusterinfo_refresh_time: Duration::from_secs(60*60), + leader_schedule_update_frequency: Duration::from_secs(10), + maximum_transaction_in_queue: 200_000, + maximum_number_of_errors: 10, + quic_connection_params : QuicConnectionParameters { + connection_timeout: Duration::from_secs(1), + connection_retry_count: 10, + finalize_timeout: Duration::from_millis(200), + max_number_of_connections: 10, + unistream_timeout: Duration::from_millis(500), + write_timeout: Duration::from_secs(1), + number_of_transactions_per_unistream: 10, + } + }; + + let tpu_service = TpuService::new( + tpu_config, + Arc::new(identity), + slot, + rpc_client.clone(), + tx_store.clone(), + ) + .await.expect("Should be able to create TPU"); + let tx_sender = TxSender::new(tx_store.clone(), tpu_service.clone()); let block_listenser = BlockListener::new(rpc_client.clone(), tx_store.clone(), block_store.clone()); let replayer = TransactionReplayer::new(tpu_service.clone(), tx_store.clone(), Duration::from_secs(2)); let builder = TransactionServiceBuilder::new(tx_sender, replayer, block_listenser, tpu_service, 1_000_000); - builder.start(Some(notifier), block_store, 10, Duration::from_secs(300)).await + builder.start(Some(notifier), block_store, 10, Duration::from_secs(90)) } #[tokio::main(flavor = "multi_thread", worker_threads = 10)] @@ -105,7 +130,7 @@ pub async fn main() -> anyhow::Result<()> { let tx_store = empty_tx_store(); let block_store = BlockStore::new(&nb_rpc_client).await.expect("Blockstore should be created"); let (notif_sx, notif_rx) = unbounded_channel(); - let (transaction_service, tx_service_jh) = configure_transaction_service(nb_rpc_client.clone(), websocket_url.clone(), Keypair::from_bytes(identity.to_bytes().as_slice()).unwrap(), block_store, tx_store, notif_sx).await; + let (transaction_service, tx_service_jh) = configure_transaction_service(nb_rpc_client.clone(), Keypair::from_bytes(identity.to_bytes().as_slice()).unwrap(), block_store, tx_store, notif_sx).await; let nb_users = account_keys_parsed.len(); @@ -283,21 +308,18 @@ pub async fn main() -> anyhow::Result<()> { }) }; - let other_tasks_wait_task = { - let tpu_manager = tpu_manager.clone(); - tokio::spawn(async move { - futures::future::join_all(tasks).await; - info!("finished joining all other services, joining TransactionService"); - tpu_manager.stop(); - }) - }; + tasks.push(market_makers_wait_task); let transaction_service = tokio::spawn(async move { let _ = tx_service_jh.await; info!("Transaction service joined"); }); - futures::future::join_all([market_makers_wait_task, other_tasks_wait_task, transaction_service]).await; + tokio::select!{ + _ = futures::future::join_all(tasks) => {}, + _ = transaction_service => {}, + }; + mango_sim_stats.report(true, METRICS_NAME).await; Ok(()) } diff --git a/src/tpu_manager.rs b/src/tpu_manager.rs index 9ab2072..eda1b29 100644 --- a/src/tpu_manager.rs +++ b/src/tpu_manager.rs @@ -63,8 +63,4 @@ impl TpuManager { } value } - - pub fn stop(&self) { - self.transaction_service.stop(); - } }