From 9fabdab32bff8e3ca59705266abab67227732880 Mon Sep 17 00:00:00 2001 From: godmodegalactus Date: Wed, 20 Mar 2024 11:28:09 +0100 Subject: [PATCH] Bugfix number of active connections never dropping --- bench/src/helpers.rs | 14 +++-- bench/src/main.rs | 2 +- .../src/tpu_utils/tpu_connection_manager.rs | 54 +++++++++---------- 3 files changed, 37 insertions(+), 33 deletions(-) diff --git a/bench/src/helpers.rs b/bench/src/helpers.rs index e3469b19..377ca214 100644 --- a/bench/src/helpers.rs +++ b/bench/src/helpers.rs @@ -3,6 +3,7 @@ use itertools::Itertools; use lazy_static::lazy_static; use rand::{distributions::Alphanumeric, prelude::Distribution, SeedableRng}; use solana_rpc_client::nonblocking::rpc_client::RpcClient; +use solana_sdk::compute_budget; use solana_sdk::instruction::AccountMeta; use solana_sdk::{ commitment_config::CommitmentConfig, @@ -111,7 +112,10 @@ impl BenchHelper { let memo = Pubkey::from_str(MEMO_PROGRAM_ID).unwrap(); let instruction = Instruction::new_with_bytes(memo, msg, vec![]); - let message = Message::new(&[instruction], Some(&payer.pubkey())); + let cu = compute_budget::ComputeBudgetInstruction::set_compute_unit_limit(10000); + let price: Instruction = + compute_budget::ComputeBudgetInstruction::set_compute_unit_price(1000000); + let message = Message::new(&[cu, price, instruction], Some(&payer.pubkey())); Transaction::new(&[payer], message, blockhash) } @@ -120,6 +124,10 @@ impl BenchHelper { let memo = Pubkey::from_str(MEMO_PROGRAM_ID).unwrap(); + let cu = compute_budget::ComputeBudgetInstruction::set_compute_unit_limit(10000); + let price: Instruction = + compute_budget::ComputeBudgetInstruction::set_compute_unit_price(1000000); + let instruction = Instruction::new_with_bytes( memo, msg, @@ -128,7 +136,7 @@ impl BenchHelper { .map(|keypair| AccountMeta::new_readonly(keypair.pubkey(), true)) .collect_vec(), ); - let message = Message::new(&[instruction], Some(&payer.pubkey())); + let message = Message::new(&[cu, price, instruction], Some(&payer.pubkey())); let mut signers = vec![payer]; signers.extend(accounts.iter()); @@ -160,7 +168,7 @@ fn transaction_size_large() { ); let seed = 42; - let random_strings = BenchHelper::generate_random_strings(1, Some(seed), 240); + let random_strings = BenchHelper::generate_random_strings(1, Some(seed), 232); let rand_string = random_strings.first().unwrap(); let tx = BenchHelper::create_memo_tx_large(rand_string, &payer_keypair, blockhash); diff --git a/bench/src/main.rs b/bench/src/main.rs index 04fdbf1f..8762d79a 100644 --- a/bench/src/main.rs +++ b/bench/src/main.rs @@ -180,7 +180,7 @@ async fn bench( let map_of_txs = map_of_txs.clone(); let n_chars = match transaction_size { TransactionSize::Small => 10, - TransactionSize::Large => 240, // 565 is max but we need to lower that to not burn the CUs + TransactionSize::Large => 232, // 565 is max but we need to lower that to not burn the CUs }; let rand_strings = BenchHelper::generate_random_strings(tx_count, Some(seed), n_chars); diff --git a/services/src/tpu_utils/tpu_connection_manager.rs b/services/src/tpu_utils/tpu_connection_manager.rs index be3a56de..8f92cb30 100644 --- a/services/src/tpu_utils/tpu_connection_manager.rs +++ b/services/src/tpu_utils/tpu_connection_manager.rs @@ -82,7 +82,7 @@ impl ActiveConnection { async fn listen( &self, mut transaction_reciever: Receiver, - mut exit_oneshot_channel: tokio::sync::mpsc::Receiver<()>, + exit_notifier: Arc, addr: SocketAddr, identity_stakes: IdentityStakesData, ) { @@ -220,8 +220,8 @@ impl ActiveConnection { }); } }, - _ = exit_oneshot_channel.recv() => { - break; + _ = exit_notifier.notified() => { + break 'main_loop; } } } @@ -235,31 +235,26 @@ impl ActiveConnection { pub fn start_listening( &self, transaction_reciever: Receiver, - exit_oneshot_channel: tokio::sync::mpsc::Receiver<()>, + exit_notifier: Arc, identity_stakes: IdentityStakesData, ) { let addr = self.tpu_address; let this = self.clone(); tokio::spawn(async move { - this.listen( - transaction_reciever, - exit_oneshot_channel, - addr, - identity_stakes, - ) - .await; + this.listen(transaction_reciever, exit_notifier, addr, identity_stakes) + .await; }); } } -struct ActiveConnectionWithExitChannel { +struct ActiveConnectionWithExitNotifier { pub active_connection: ActiveConnection, - pub exit_stream: tokio::sync::mpsc::Sender<()>, + pub exit_notifier: Arc, } pub struct TpuConnectionManager { endpoints: RotatingQueue, - identity_to_active_connection: Arc>>, + identity_to_active_connection: Arc>>, } impl TpuConnectionManager { @@ -297,37 +292,38 @@ impl TpuConnectionManager { connection_parameters, ); // using mpsc as a oneshot channel/ because with one shot channel we cannot reuse the reciever - let (sx, rx) = tokio::sync::mpsc::channel(1); + let exit_notifier = Arc::new(Notify::new()); let broadcast_receiver = broadcast_sender.subscribe(); - active_connection.start_listening(broadcast_receiver, rx, identity_stakes); + active_connection.start_listening( + broadcast_receiver, + exit_notifier.clone(), + identity_stakes, + ); self.identity_to_active_connection.insert( *identity, - Arc::new(ActiveConnectionWithExitChannel { + Arc::new(ActiveConnectionWithExitNotifier { active_connection, - exit_stream: sx, + exit_notifier, }), ); } } // remove connections which are no longer needed - let collect_current_active_connections = self - .identity_to_active_connection - .iter() - .map(|x| (*x.key(), x.value().clone())) - .collect::>(); - for (identity, value) in collect_current_active_connections.iter() { - if !connections_to_keep.contains_key(identity) { - trace!("removing a connection for {}", identity); + self.identity_to_active_connection.retain(|key, value| { + if !connections_to_keep.contains_key(key) { + trace!("removing a connection for {}", key.to_string()); // ignore error for exit channel value .active_connection .exit_signal .store(true, Ordering::Relaxed); - let _ = value.exit_stream.send(()).await; - self.identity_to_active_connection.remove(identity); + value.exit_notifier.notify_one(); + false + } else { + true } - } + }); } }