Use connection cache in send transaction (#23712)

Use connection cache in send transaction (#23712)
This commit is contained in:
Lijun Wang 2022-03-21 23:24:21 -07:00 committed by GitHub
parent eb3df4c20e
commit 49228573f4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 25 additions and 38 deletions

2
Cargo.lock generated
View File

@ -5617,7 +5617,9 @@ version = "1.11.0"
dependencies = [ dependencies = [
"crossbeam-channel", "crossbeam-channel",
"log", "log",
"solana-client",
"solana-logger", "solana-logger",
"solana-measure",
"solana-metrics", "solana-metrics",
"solana-runtime", "solana-runtime",
"solana-sdk", "solana-sdk",

View File

@ -3595,6 +3595,8 @@ version = "1.11.0"
dependencies = [ dependencies = [
"crossbeam-channel", "crossbeam-channel",
"log", "log",
"solana-client",
"solana-measure",
"solana-metrics", "solana-metrics",
"solana-runtime", "solana-runtime",
"solana-sdk", "solana-sdk",

View File

@ -12,10 +12,13 @@ edition = "2021"
[dependencies] [dependencies]
crossbeam-channel = "0.5" crossbeam-channel = "0.5"
log = "0.4.14" log = "0.4.14"
solana-client = { path = "../client", version = "=1.11.0" }
solana-measure = { path = "../measure", version = "=1.11.0" }
solana-metrics = { path = "../metrics", version = "=1.11.0" } solana-metrics = { path = "../metrics", version = "=1.11.0" }
solana-runtime = { path = "../runtime", version = "=1.11.0" } solana-runtime = { path = "../runtime", version = "=1.11.0" }
solana-sdk = { path = "../sdk", version = "=1.11.0" } solana-sdk = { path = "../sdk", version = "=1.11.0" }
[dev-dependencies] [dev-dependencies]
solana-logger = { path = "../logger", version = "=1.11.0" } solana-logger = { path = "../logger", version = "=1.11.0" }

View File

@ -2,12 +2,14 @@ use {
crate::tpu_info::TpuInfo, crate::tpu_info::TpuInfo,
crossbeam_channel::{Receiver, RecvTimeoutError}, crossbeam_channel::{Receiver, RecvTimeoutError},
log::*, log::*,
solana_client::connection_cache,
solana_measure::measure::Measure,
solana_metrics::{datapoint_warn, inc_new_counter_info}, solana_metrics::{datapoint_warn, inc_new_counter_info},
solana_runtime::{bank::Bank, bank_forks::BankForks}, solana_runtime::{bank::Bank, bank_forks::BankForks},
solana_sdk::{hash::Hash, nonce_account, pubkey::Pubkey, signature::Signature}, solana_sdk::{hash::Hash, nonce_account, pubkey::Pubkey, signature::Signature},
std::{ std::{
collections::hash_map::{Entry, HashMap}, collections::hash_map::{Entry, HashMap},
net::{SocketAddr, UdpSocket}, net::SocketAddr,
sync::{Arc, RwLock}, sync::{Arc, RwLock},
thread::{self, Builder, JoinHandle}, thread::{self, Builder, JoinHandle},
time::{Duration, Instant}, time::{Duration, Instant},
@ -134,12 +136,11 @@ impl SendTransactionService {
let mut last_status_check = Instant::now(); let mut last_status_check = Instant::now();
let mut last_leader_refresh = Instant::now(); let mut last_leader_refresh = Instant::now();
let mut transactions = HashMap::new(); let mut transactions = HashMap::new();
let send_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
if let Some(leader_info) = leader_info.as_mut() { if let Some(leader_info) = leader_info.as_mut() {
leader_info.refresh_recent_peers(); leader_info.refresh_recent_peers();
} }
connection_cache::set_use_quic(config.use_quic);
Builder::new() Builder::new()
.name("send-tx-sv2".to_string()) .name("send-tx-sv2".to_string())
.spawn(move || loop { .spawn(move || loop {
@ -164,11 +165,7 @@ impl SendTransactionService {
}) })
.unwrap_or_else(|| vec![&tpu_address]); .unwrap_or_else(|| vec![&tpu_address]);
for address in addresses { for address in addresses {
Self::send_transaction( Self::send_transaction(address, &transaction_info.wire_transaction);
&send_socket,
address,
&transaction_info.wire_transaction,
);
} }
if transactions_len < MAX_TRANSACTION_QUEUE_SIZE { if transactions_len < MAX_TRANSACTION_QUEUE_SIZE {
inc_new_counter_info!("send_transaction_service-insert-tx", 1); inc_new_counter_info!("send_transaction_service-insert-tx", 1);
@ -199,7 +196,6 @@ impl SendTransactionService {
let _result = Self::process_transactions( let _result = Self::process_transactions(
&working_bank, &working_bank,
&root_bank, &root_bank,
&send_socket,
&tpu_address, &tpu_address,
&mut transactions, &mut transactions,
&leader_info, &leader_info,
@ -221,7 +217,6 @@ impl SendTransactionService {
fn process_transactions<T: TpuInfo>( fn process_transactions<T: TpuInfo>(
working_bank: &Arc<Bank>, working_bank: &Arc<Bank>,
root_bank: &Arc<Bank>, root_bank: &Arc<Bank>,
send_socket: &UdpSocket,
tpu_address: &SocketAddr, tpu_address: &SocketAddr,
transactions: &mut HashMap<Signature, TransactionInfo>, transactions: &mut HashMap<Signature, TransactionInfo>,
leader_info: &Option<T>, leader_info: &Option<T>,
@ -292,11 +287,7 @@ impl SendTransactionService {
}) })
.unwrap_or_else(|| vec![tpu_address]); .unwrap_or_else(|| vec![tpu_address]);
for address in addresses { for address in addresses {
Self::send_transaction( Self::send_transaction(address, &transaction_info.wire_transaction);
send_socket,
address,
&transaction_info.wire_transaction,
);
} }
true true
} }
@ -317,14 +308,20 @@ impl SendTransactionService {
result result
} }
fn send_transaction( fn send_transaction(tpu_address: &SocketAddr, wire_transaction: &[u8]) {
send_socket: &UdpSocket, let mut measure = Measure::start("send_transaction_service-us");
tpu_address: &SocketAddr, let connection = connection_cache::get_connection(tpu_address);
wire_transaction: &[u8],
) { if let Err(err) = connection.send_wire_transaction(wire_transaction) {
if let Err(err) = send_socket.send_to(wire_transaction, tpu_address) {
warn!("Failed to send transaction to {}: {:?}", tpu_address, err); warn!("Failed to send transaction to {}: {:?}", tpu_address, err);
} }
measure.stop();
inc_new_counter_info!(
"send_transaction_service-us",
measure.as_us() as usize,
1000,
1000
);
} }
pub fn join(self) -> thread::Result<()> { pub fn join(self) -> thread::Result<()> {
@ -372,7 +369,6 @@ mod test {
let (genesis_config, mint_keypair) = create_genesis_config(4); let (genesis_config, mint_keypair) = create_genesis_config(4);
let bank = Bank::new_for_tests(&genesis_config); let bank = Bank::new_for_tests(&genesis_config);
let bank_forks = Arc::new(RwLock::new(BankForks::new(bank))); let bank_forks = Arc::new(RwLock::new(BankForks::new(bank)));
let send_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
let tpu_address = "127.0.0.1:0".parse().unwrap(); let tpu_address = "127.0.0.1:0".parse().unwrap();
let config = Config { let config = Config {
leader_forward_count: 1, leader_forward_count: 1,
@ -419,7 +415,6 @@ mod test {
let result = SendTransactionService::process_transactions::<NullTpuInfo>( let result = SendTransactionService::process_transactions::<NullTpuInfo>(
&working_bank, &working_bank,
&root_bank, &root_bank,
&send_socket,
&tpu_address, &tpu_address,
&mut transactions, &mut transactions,
&None, &None,
@ -448,7 +443,6 @@ mod test {
let result = SendTransactionService::process_transactions::<NullTpuInfo>( let result = SendTransactionService::process_transactions::<NullTpuInfo>(
&working_bank, &working_bank,
&root_bank, &root_bank,
&send_socket,
&tpu_address, &tpu_address,
&mut transactions, &mut transactions,
&None, &None,
@ -477,7 +471,6 @@ mod test {
let result = SendTransactionService::process_transactions::<NullTpuInfo>( let result = SendTransactionService::process_transactions::<NullTpuInfo>(
&working_bank, &working_bank,
&root_bank, &root_bank,
&send_socket,
&tpu_address, &tpu_address,
&mut transactions, &mut transactions,
&None, &None,
@ -506,7 +499,6 @@ mod test {
let result = SendTransactionService::process_transactions::<NullTpuInfo>( let result = SendTransactionService::process_transactions::<NullTpuInfo>(
&working_bank, &working_bank,
&root_bank, &root_bank,
&send_socket,
&tpu_address, &tpu_address,
&mut transactions, &mut transactions,
&None, &None,
@ -536,7 +528,6 @@ mod test {
let result = SendTransactionService::process_transactions::<NullTpuInfo>( let result = SendTransactionService::process_transactions::<NullTpuInfo>(
&working_bank, &working_bank,
&root_bank, &root_bank,
&send_socket,
&tpu_address, &tpu_address,
&mut transactions, &mut transactions,
&None, &None,
@ -576,7 +567,6 @@ mod test {
let result = SendTransactionService::process_transactions::<NullTpuInfo>( let result = SendTransactionService::process_transactions::<NullTpuInfo>(
&working_bank, &working_bank,
&root_bank, &root_bank,
&send_socket,
&tpu_address, &tpu_address,
&mut transactions, &mut transactions,
&None, &None,
@ -594,7 +584,6 @@ mod test {
let result = SendTransactionService::process_transactions::<NullTpuInfo>( let result = SendTransactionService::process_transactions::<NullTpuInfo>(
&working_bank, &working_bank,
&root_bank, &root_bank,
&send_socket,
&tpu_address, &tpu_address,
&mut transactions, &mut transactions,
&None, &None,
@ -617,7 +606,6 @@ mod test {
let (genesis_config, mint_keypair) = create_genesis_config(4); let (genesis_config, mint_keypair) = create_genesis_config(4);
let bank = Bank::new_for_tests(&genesis_config); let bank = Bank::new_for_tests(&genesis_config);
let bank_forks = Arc::new(RwLock::new(BankForks::new(bank))); let bank_forks = Arc::new(RwLock::new(BankForks::new(bank)));
let send_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
let tpu_address = "127.0.0.1:0".parse().unwrap(); let tpu_address = "127.0.0.1:0".parse().unwrap();
let config = Config { let config = Config {
leader_forward_count: 1, leader_forward_count: 1,
@ -674,7 +662,6 @@ mod test {
let result = SendTransactionService::process_transactions::<NullTpuInfo>( let result = SendTransactionService::process_transactions::<NullTpuInfo>(
&working_bank, &working_bank,
&root_bank, &root_bank,
&send_socket,
&tpu_address, &tpu_address,
&mut transactions, &mut transactions,
&None, &None,
@ -702,7 +689,6 @@ mod test {
let result = SendTransactionService::process_transactions::<NullTpuInfo>( let result = SendTransactionService::process_transactions::<NullTpuInfo>(
&working_bank, &working_bank,
&root_bank, &root_bank,
&send_socket,
&tpu_address, &tpu_address,
&mut transactions, &mut transactions,
&None, &None,
@ -732,7 +718,6 @@ mod test {
let result = SendTransactionService::process_transactions::<NullTpuInfo>( let result = SendTransactionService::process_transactions::<NullTpuInfo>(
&working_bank, &working_bank,
&root_bank, &root_bank,
&send_socket,
&tpu_address, &tpu_address,
&mut transactions, &mut transactions,
&None, &None,
@ -760,7 +745,6 @@ mod test {
let result = SendTransactionService::process_transactions::<NullTpuInfo>( let result = SendTransactionService::process_transactions::<NullTpuInfo>(
&working_bank, &working_bank,
&root_bank, &root_bank,
&send_socket,
&tpu_address, &tpu_address,
&mut transactions, &mut transactions,
&None, &None,
@ -789,7 +773,6 @@ mod test {
let result = SendTransactionService::process_transactions::<NullTpuInfo>( let result = SendTransactionService::process_transactions::<NullTpuInfo>(
&working_bank, &working_bank,
&root_bank, &root_bank,
&send_socket,
&tpu_address, &tpu_address,
&mut transactions, &mut transactions,
&None, &None,
@ -818,7 +801,6 @@ mod test {
let result = SendTransactionService::process_transactions::<NullTpuInfo>( let result = SendTransactionService::process_transactions::<NullTpuInfo>(
&working_bank, &working_bank,
&root_bank, &root_bank,
&send_socket,
&tpu_address, &tpu_address,
&mut transactions, &mut transactions,
&None, &None,
@ -848,7 +830,6 @@ mod test {
let result = SendTransactionService::process_transactions::<NullTpuInfo>( let result = SendTransactionService::process_transactions::<NullTpuInfo>(
&working_bank, &working_bank,
&root_bank, &root_bank,
&send_socket,
&tpu_address, &tpu_address,
&mut transactions, &mut transactions,
&None, &None,
@ -873,7 +854,6 @@ mod test {
let result = SendTransactionService::process_transactions::<NullTpuInfo>( let result = SendTransactionService::process_transactions::<NullTpuInfo>(
&working_bank, &working_bank,
&root_bank, &root_bank,
&send_socket,
&tpu_address, &tpu_address,
&mut transactions, &mut transactions,
&None, &None,