Merge branch 'production'

This commit is contained in:
Godmode Galactus 2024-03-20 15:28:35 +01:00
commit d917a3a5c9
No known key found for this signature in database
GPG Key ID: 22DA4A30887FDA3C
3 changed files with 25 additions and 21 deletions

View File

@ -3,6 +3,7 @@ use itertools::Itertools;
use lazy_static::lazy_static;
use rand::{distributions::Alphanumeric, prelude::Distribution, SeedableRng};
use solana_rpc_client::nonblocking::rpc_client::RpcClient;
use solana_sdk::compute_budget;
use solana_sdk::instruction::AccountMeta;
use solana_sdk::{
commitment_config::CommitmentConfig,
@ -111,7 +112,10 @@ impl BenchHelper {
let memo = Pubkey::from_str(MEMO_PROGRAM_ID).unwrap();
let instruction = Instruction::new_with_bytes(memo, msg, vec![]);
let message = Message::new(&[instruction], Some(&payer.pubkey()));
let cu = compute_budget::ComputeBudgetInstruction::set_compute_unit_limit(10000);
let price: Instruction =
compute_budget::ComputeBudgetInstruction::set_compute_unit_price(1000000);
let message = Message::new(&[cu, price, instruction], Some(&payer.pubkey()));
Transaction::new(&[payer], message, blockhash)
}
@ -120,6 +124,10 @@ impl BenchHelper {
let memo = Pubkey::from_str(MEMO_PROGRAM_ID).unwrap();
let cu = compute_budget::ComputeBudgetInstruction::set_compute_unit_limit(10000);
let price: Instruction =
compute_budget::ComputeBudgetInstruction::set_compute_unit_price(1000000);
let instruction = Instruction::new_with_bytes(
memo,
msg,
@ -128,7 +136,7 @@ impl BenchHelper {
.map(|keypair| AccountMeta::new_readonly(keypair.pubkey(), true))
.collect_vec(),
);
let message = Message::new(&[instruction], Some(&payer.pubkey()));
let message = Message::new(&[cu, price, instruction], Some(&payer.pubkey()));
let mut signers = vec![payer];
signers.extend(accounts.iter());

View File

@ -180,7 +180,7 @@ async fn bench(
let map_of_txs = map_of_txs.clone();
let n_chars = match transaction_size {
TransactionSize::Small => 10,
TransactionSize::Large => 240, // 565 is max but we need to lower that to not burn the CUs
TransactionSize::Large => 232, // 565 is max but we need to lower that to not burn the CUs
};
let rand_strings = BenchHelper::generate_random_strings(tx_count, Some(seed), n_chars);

View File

@ -220,8 +220,7 @@ impl ActiveConnection {
}
},
_ = exit_notifier.notified() => {
// notified to exit
break;
break 'main_loop;
}
}
}
@ -235,7 +234,7 @@ impl ActiveConnection {
pub fn start_listening(
&self,
transaction_reciever: Receiver<SentTransactionInfo>,
exit_notifier: Arc<tokio::sync::Notify>,
exit_notifier: Arc<Notify>,
identity_stakes: IdentityStakesData,
) {
let addr = self.tpu_address;
@ -247,14 +246,14 @@ impl ActiveConnection {
}
}
struct ActiveConnectionWithExitChannel {
struct ActiveConnectionWithExitNotifier {
pub active_connection: ActiveConnection,
pub exit_notifier: Arc<tokio::sync::Notify>,
pub exit_notifier: Arc<Notify>,
}
pub struct TpuConnectionManager {
endpoints: RotatingQueue<Endpoint>,
identity_to_active_connection: Arc<DashMap<Pubkey, Arc<ActiveConnectionWithExitChannel>>>,
identity_to_active_connection: Arc<DashMap<Pubkey, Arc<ActiveConnectionWithExitNotifier>>>,
}
impl TpuConnectionManager {
@ -292,7 +291,7 @@ impl TpuConnectionManager {
connection_parameters,
);
// using mpsc as a oneshot channel/ because with one shot channel we cannot reuse the reciever
let exit_notifier = Arc::new(tokio::sync::Notify::new());
let exit_notifier = Arc::new(Notify::new());
let broadcast_receiver = broadcast_sender.subscribe();
active_connection.start_listening(
@ -302,7 +301,7 @@ impl TpuConnectionManager {
);
self.identity_to_active_connection.insert(
*identity,
Arc::new(ActiveConnectionWithExitChannel {
Arc::new(ActiveConnectionWithExitNotifier {
active_connection,
exit_notifier,
}),
@ -311,22 +310,19 @@ impl TpuConnectionManager {
}
// remove connections which are no longer needed
let collect_current_active_connections = self
.identity_to_active_connection
.iter()
.map(|x| (*x.key(), x.value().clone()))
.collect::<Vec<_>>();
for (identity, value) in collect_current_active_connections.iter() {
if !connections_to_keep.contains_key(identity) {
trace!("removing a connection for {}", identity);
self.identity_to_active_connection.retain(|key, value| {
if !connections_to_keep.contains_key(key) {
trace!("removing a connection for {}", key.to_string());
// ignore error for exit channel
value
.active_connection
.exit_signal
.store(true, Ordering::Relaxed);
value.exit_notifier.notify_one();
self.identity_to_active_connection.remove(identity);
}
false
} else {
true
}
});
}
}