Bugfix number of active connections never dropping
This commit is contained in:
parent
1754e92ff3
commit
9fabdab32b
|
@ -3,6 +3,7 @@ use itertools::Itertools;
|
|||
use lazy_static::lazy_static;
|
||||
use rand::{distributions::Alphanumeric, prelude::Distribution, SeedableRng};
|
||||
use solana_rpc_client::nonblocking::rpc_client::RpcClient;
|
||||
use solana_sdk::compute_budget;
|
||||
use solana_sdk::instruction::AccountMeta;
|
||||
use solana_sdk::{
|
||||
commitment_config::CommitmentConfig,
|
||||
|
@ -111,7 +112,10 @@ impl BenchHelper {
|
|||
let memo = Pubkey::from_str(MEMO_PROGRAM_ID).unwrap();
|
||||
|
||||
let instruction = Instruction::new_with_bytes(memo, msg, vec![]);
|
||||
let message = Message::new(&[instruction], Some(&payer.pubkey()));
|
||||
let cu = compute_budget::ComputeBudgetInstruction::set_compute_unit_limit(10000);
|
||||
let price: Instruction =
|
||||
compute_budget::ComputeBudgetInstruction::set_compute_unit_price(1000000);
|
||||
let message = Message::new(&[cu, price, instruction], Some(&payer.pubkey()));
|
||||
Transaction::new(&[payer], message, blockhash)
|
||||
}
|
||||
|
||||
|
@ -120,6 +124,10 @@ impl BenchHelper {
|
|||
|
||||
let memo = Pubkey::from_str(MEMO_PROGRAM_ID).unwrap();
|
||||
|
||||
let cu = compute_budget::ComputeBudgetInstruction::set_compute_unit_limit(10000);
|
||||
let price: Instruction =
|
||||
compute_budget::ComputeBudgetInstruction::set_compute_unit_price(1000000);
|
||||
|
||||
let instruction = Instruction::new_with_bytes(
|
||||
memo,
|
||||
msg,
|
||||
|
@ -128,7 +136,7 @@ impl BenchHelper {
|
|||
.map(|keypair| AccountMeta::new_readonly(keypair.pubkey(), true))
|
||||
.collect_vec(),
|
||||
);
|
||||
let message = Message::new(&[instruction], Some(&payer.pubkey()));
|
||||
let message = Message::new(&[cu, price, instruction], Some(&payer.pubkey()));
|
||||
|
||||
let mut signers = vec![payer];
|
||||
signers.extend(accounts.iter());
|
||||
|
@ -160,7 +168,7 @@ fn transaction_size_large() {
|
|||
);
|
||||
|
||||
let seed = 42;
|
||||
let random_strings = BenchHelper::generate_random_strings(1, Some(seed), 240);
|
||||
let random_strings = BenchHelper::generate_random_strings(1, Some(seed), 232);
|
||||
let rand_string = random_strings.first().unwrap();
|
||||
let tx = BenchHelper::create_memo_tx_large(rand_string, &payer_keypair, blockhash);
|
||||
|
||||
|
|
|
@ -180,7 +180,7 @@ async fn bench(
|
|||
let map_of_txs = map_of_txs.clone();
|
||||
let n_chars = match transaction_size {
|
||||
TransactionSize::Small => 10,
|
||||
TransactionSize::Large => 240, // 565 is max but we need to lower that to not burn the CUs
|
||||
TransactionSize::Large => 232, // 565 is max but we need to lower that to not burn the CUs
|
||||
};
|
||||
let rand_strings = BenchHelper::generate_random_strings(tx_count, Some(seed), n_chars);
|
||||
|
||||
|
|
|
@ -82,7 +82,7 @@ impl ActiveConnection {
|
|||
async fn listen(
|
||||
&self,
|
||||
mut transaction_reciever: Receiver<SentTransactionInfo>,
|
||||
mut exit_oneshot_channel: tokio::sync::mpsc::Receiver<()>,
|
||||
exit_notifier: Arc<Notify>,
|
||||
addr: SocketAddr,
|
||||
identity_stakes: IdentityStakesData,
|
||||
) {
|
||||
|
@ -220,8 +220,8 @@ impl ActiveConnection {
|
|||
});
|
||||
}
|
||||
},
|
||||
_ = exit_oneshot_channel.recv() => {
|
||||
break;
|
||||
_ = exit_notifier.notified() => {
|
||||
break 'main_loop;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -235,31 +235,26 @@ impl ActiveConnection {
|
|||
pub fn start_listening(
|
||||
&self,
|
||||
transaction_reciever: Receiver<SentTransactionInfo>,
|
||||
exit_oneshot_channel: tokio::sync::mpsc::Receiver<()>,
|
||||
exit_notifier: Arc<Notify>,
|
||||
identity_stakes: IdentityStakesData,
|
||||
) {
|
||||
let addr = self.tpu_address;
|
||||
let this = self.clone();
|
||||
tokio::spawn(async move {
|
||||
this.listen(
|
||||
transaction_reciever,
|
||||
exit_oneshot_channel,
|
||||
addr,
|
||||
identity_stakes,
|
||||
)
|
||||
this.listen(transaction_reciever, exit_notifier, addr, identity_stakes)
|
||||
.await;
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
struct ActiveConnectionWithExitChannel {
|
||||
struct ActiveConnectionWithExitNotifier {
|
||||
pub active_connection: ActiveConnection,
|
||||
pub exit_stream: tokio::sync::mpsc::Sender<()>,
|
||||
pub exit_notifier: Arc<Notify>,
|
||||
}
|
||||
|
||||
pub struct TpuConnectionManager {
|
||||
endpoints: RotatingQueue<Endpoint>,
|
||||
identity_to_active_connection: Arc<DashMap<Pubkey, Arc<ActiveConnectionWithExitChannel>>>,
|
||||
identity_to_active_connection: Arc<DashMap<Pubkey, Arc<ActiveConnectionWithExitNotifier>>>,
|
||||
}
|
||||
|
||||
impl TpuConnectionManager {
|
||||
|
@ -297,37 +292,38 @@ impl TpuConnectionManager {
|
|||
connection_parameters,
|
||||
);
|
||||
// using mpsc as a oneshot channel/ because with one shot channel we cannot reuse the reciever
|
||||
let (sx, rx) = tokio::sync::mpsc::channel(1);
|
||||
let exit_notifier = Arc::new(Notify::new());
|
||||
|
||||
let broadcast_receiver = broadcast_sender.subscribe();
|
||||
active_connection.start_listening(broadcast_receiver, rx, identity_stakes);
|
||||
active_connection.start_listening(
|
||||
broadcast_receiver,
|
||||
exit_notifier.clone(),
|
||||
identity_stakes,
|
||||
);
|
||||
self.identity_to_active_connection.insert(
|
||||
*identity,
|
||||
Arc::new(ActiveConnectionWithExitChannel {
|
||||
Arc::new(ActiveConnectionWithExitNotifier {
|
||||
active_connection,
|
||||
exit_stream: sx,
|
||||
exit_notifier,
|
||||
}),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// remove connections which are no longer needed
|
||||
let collect_current_active_connections = self
|
||||
.identity_to_active_connection
|
||||
.iter()
|
||||
.map(|x| (*x.key(), x.value().clone()))
|
||||
.collect::<Vec<_>>();
|
||||
for (identity, value) in collect_current_active_connections.iter() {
|
||||
if !connections_to_keep.contains_key(identity) {
|
||||
trace!("removing a connection for {}", identity);
|
||||
self.identity_to_active_connection.retain(|key, value| {
|
||||
if !connections_to_keep.contains_key(key) {
|
||||
trace!("removing a connection for {}", key.to_string());
|
||||
// ignore error for exit channel
|
||||
value
|
||||
.active_connection
|
||||
.exit_signal
|
||||
.store(true, Ordering::Relaxed);
|
||||
let _ = value.exit_stream.send(()).await;
|
||||
self.identity_to_active_connection.remove(identity);
|
||||
}
|
||||
value.exit_notifier.notify_one();
|
||||
false
|
||||
} else {
|
||||
true
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue