rpc-sts: dedupe before initial send

This commit is contained in:
Trent Nelson 2022-01-30 12:27:42 -07:00 committed by Trent Nelson
parent 29bf1e2529
commit 9f1f7aff2b
1 changed files with 30 additions and 24 deletions

View File

@ -6,7 +6,7 @@ use {
solana_runtime::{bank::Bank, bank_forks::BankForks}, solana_runtime::{bank::Bank, bank_forks::BankForks},
solana_sdk::{hash::Hash, nonce_account, pubkey::Pubkey, signature::Signature}, solana_sdk::{hash::Hash, nonce_account, pubkey::Pubkey, signature::Signature},
std::{ std::{
collections::HashMap, collections::hash_map::{Entry, HashMap},
net::{SocketAddr, UdpSocket}, net::{SocketAddr, UdpSocket},
sync::{Arc, RwLock}, sync::{Arc, RwLock},
thread::{self, Builder, JoinHandle}, thread::{self, Builder, JoinHandle},
@ -142,30 +142,36 @@ impl SendTransactionService {
Err(RecvTimeoutError::Timeout) => {} Err(RecvTimeoutError::Timeout) => {}
Ok(transaction_info) => { Ok(transaction_info) => {
inc_new_counter_info!("send_transaction_service-recv-tx", 1); inc_new_counter_info!("send_transaction_service-recv-tx", 1);
let addresses = leader_info.as_ref().map(|leader_info| { let transactions_len = transactions.len();
leader_info.get_leader_tpus(config.leader_forward_count) let entry = transactions.entry(transaction_info.signature);
}); if let Entry::Vacant(_) = entry {
let addresses = addresses let addresses = leader_info.as_ref().map(|leader_info| {
.map(|address_list| { leader_info.get_leader_tpus(config.leader_forward_count)
if address_list.is_empty() { });
vec![&tpu_address] let addresses = addresses
} else { .map(|address_list| {
address_list if address_list.is_empty() {
} vec![&tpu_address]
}) } else {
.unwrap_or_else(|| vec![&tpu_address]); address_list
for address in addresses { }
Self::send_transaction( })
&send_socket, .unwrap_or_else(|| vec![&tpu_address]);
address, for address in addresses {
&transaction_info.wire_transaction, Self::send_transaction(
); &send_socket,
} address,
if transactions.len() < MAX_TRANSACTION_QUEUE_SIZE { &transaction_info.wire_transaction,
inc_new_counter_info!("send_transaction_service-insert-tx", 1); );
transactions.insert(transaction_info.signature, transaction_info); }
if transactions_len < MAX_TRANSACTION_QUEUE_SIZE {
inc_new_counter_info!("send_transaction_service-insert-tx", 1);
entry.or_insert(transaction_info);
} else {
datapoint_warn!("send_transaction_service-queue-overflow");
}
} else { } else {
datapoint_warn!("send_transaction_service-queue-overflow"); inc_new_counter_info!("send_transaction_service-recv-duplicate", 1);
} }
} }
} }