From 9f1f7aff2b102bab715c1f9bba5876ebd56223f3 Mon Sep 17 00:00:00 2001 From: Trent Nelson Date: Sun, 30 Jan 2022 12:27:42 -0700 Subject: [PATCH] rpc-sts: dedupe before initial send --- .../src/send_transaction_service.rs | 54 ++++++++++--------- 1 file changed, 30 insertions(+), 24 deletions(-) diff --git a/send-transaction-service/src/send_transaction_service.rs b/send-transaction-service/src/send_transaction_service.rs index 86508297d9..2ac8b617a0 100644 --- a/send-transaction-service/src/send_transaction_service.rs +++ b/send-transaction-service/src/send_transaction_service.rs @@ -6,7 +6,7 @@ use { solana_runtime::{bank::Bank, bank_forks::BankForks}, solana_sdk::{hash::Hash, nonce_account, pubkey::Pubkey, signature::Signature}, std::{ - collections::HashMap, + collections::hash_map::{Entry, HashMap}, net::{SocketAddr, UdpSocket}, sync::{Arc, RwLock}, thread::{self, Builder, JoinHandle}, @@ -142,30 +142,36 @@ impl SendTransactionService { Err(RecvTimeoutError::Timeout) => {} Ok(transaction_info) => { inc_new_counter_info!("send_transaction_service-recv-tx", 1); - let addresses = leader_info.as_ref().map(|leader_info| { - leader_info.get_leader_tpus(config.leader_forward_count) - }); - let addresses = addresses - .map(|address_list| { - if address_list.is_empty() { - vec![&tpu_address] - } else { - address_list - } - }) - .unwrap_or_else(|| vec![&tpu_address]); - for address in addresses { - Self::send_transaction( - &send_socket, - address, - &transaction_info.wire_transaction, - ); - } - if transactions.len() < MAX_TRANSACTION_QUEUE_SIZE { - inc_new_counter_info!("send_transaction_service-insert-tx", 1); - transactions.insert(transaction_info.signature, transaction_info); + let transactions_len = transactions.len(); + let entry = transactions.entry(transaction_info.signature); + if let Entry::Vacant(_) = entry { + let addresses = leader_info.as_ref().map(|leader_info| { + leader_info.get_leader_tpus(config.leader_forward_count) + }); + let addresses = addresses + .map(|address_list| { + if address_list.is_empty() { + vec![&tpu_address] + } else { + address_list + } + }) + .unwrap_or_else(|| vec![&tpu_address]); + for address in addresses { + Self::send_transaction( + &send_socket, + address, + &transaction_info.wire_transaction, + ); + } + if transactions_len < MAX_TRANSACTION_QUEUE_SIZE { + inc_new_counter_info!("send_transaction_service-insert-tx", 1); + entry.or_insert(transaction_info); + } else { + datapoint_warn!("send_transaction_service-queue-overflow"); + } } else { - datapoint_warn!("send_transaction_service-queue-overflow"); + inc_new_counter_info!("send_transaction_service-recv-duplicate", 1); } } }