Leader info refresher (#24597)

In PR review https://github.com/solana-labs/solana/pull/24083/files#r852661162. We are concerned the leader info might be out dated if the retry queue is long causing large number transactions sent to outdated leaders and increasing the load in the network.

A leader info refresher is used to ensure the leader info is up-to-date before being used in sending transactions. The refresher can update the new leader with updated endpoints.
This commit is contained in:
Lijun Wang 2022-04-28 08:35:42 -07:00 committed by GitHub
parent 7b6880f652
commit 431c8412ef
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 96 additions and 55 deletions

View File

@ -128,8 +128,61 @@ impl Default for Config {
}
}
/// The maximum duration the retry thread may be configured to sleep before
/// processing the transactions that need to be retried.
pub const MAX_RETRY_SLEEP_MS: u64 = 1000;
/// The leader info refresh rate.
pub const LEADER_INFO_REFRESH_RATE_MS: u64 = 1000;
/// A struct responsible for holding up-to-date leader information
/// used for sending transactions.
pub struct CurrentLeaderInfo<T>
where
T: TpuInfo + std::marker::Send + 'static,
{
/// The last time the leader info was refreshed
last_leader_refresh: Option<Instant>,
/// The leader info
leader_info: Option<T>,
/// How often to refresh the leader info
refresh_rate: Duration,
}
impl<T> CurrentLeaderInfo<T>
where
T: TpuInfo + std::marker::Send + 'static,
{
/// Get the leader info, refresh if expired
pub fn get_leader_info(&mut self) -> Option<&T> {
if let Some(leader_info) = self.leader_info.as_mut() {
let now = Instant::now();
let need_refresh = self
.last_leader_refresh
.map(|last| now.duration_since(last) >= self.refresh_rate)
.unwrap_or(true);
if need_refresh {
leader_info.refresh_recent_peers();
self.last_leader_refresh = Some(now);
}
}
self.leader_info.as_ref()
}
pub fn new(leader_info: Option<T>) -> Self {
Self {
last_leader_refresh: None,
leader_info,
refresh_rate: Duration::from_millis(LEADER_INFO_REFRESH_RATE_MS),
}
}
}
impl SendTransactionService {
pub fn new<T: TpuInfo + std::marker::Send + Clone + 'static>(
pub fn new<T: TpuInfo + std::marker::Send + 'static>(
tpu_address: SocketAddr,
bank_forks: &Arc<RwLock<BankForks>>,
leader_info: Option<T>,
@ -147,7 +200,7 @@ impl SendTransactionService {
Self::new_with_config(tpu_address, bank_forks, leader_info, receiver, config)
}
pub fn new_with_config<T: TpuInfo + std::marker::Send + Clone + 'static>(
pub fn new_with_config<T: TpuInfo + std::marker::Send + 'static>(
tpu_address: SocketAddr,
bank_forks: &Arc<RwLock<BankForks>>,
leader_info: Option<T>,
@ -155,11 +208,14 @@ impl SendTransactionService {
config: Config,
) -> Self {
let retry_transactions = Arc::new(Mutex::new(HashMap::new()));
let leader_info_provider = Arc::new(Mutex::new(CurrentLeaderInfo::new(leader_info)));
let exit = Arc::new(AtomicBool::new(false));
let receive_txn_thread = Self::receive_txn_thread(
tpu_address,
receiver,
leader_info.clone(),
leader_info_provider.clone(),
config.clone(),
retry_transactions.clone(),
exit.clone(),
@ -168,7 +224,7 @@ impl SendTransactionService {
let retry_thread = Self::retry_thread(
tpu_address,
bank_forks.clone(),
leader_info,
leader_info_provider,
config,
retry_transactions,
exit.clone(),
@ -184,28 +240,24 @@ impl SendTransactionService {
fn receive_txn_thread<T: TpuInfo + std::marker::Send + 'static>(
tpu_address: SocketAddr,
receiver: Receiver<TransactionInfo>,
mut leader_info: Option<T>,
leader_info_provider: Arc<Mutex<CurrentLeaderInfo<T>>>,
config: Config,
retry_transactions: Arc<Mutex<HashMap<Signature, TransactionInfo>>>,
exit: Arc<AtomicBool>,
) -> JoinHandle<()> {
let mut last_batch_sent = Instant::now();
let mut last_leader_refresh = Instant::now();
let mut transactions = HashMap::new();
info!(
"Starting send-transaction-service::receive_txn_thread with config {:?}",
config
);
if let Some(leader_info) = leader_info.as_mut() {
leader_info.refresh_recent_peers();
}
connection_cache::set_use_quic(config.use_quic);
Builder::new()
.name("send-tx-receive".to_string())
.spawn(move || loop {
let recv_timeout_ms = config.batch_send_rate_ms;
match receiver.recv_timeout(Duration::from_millis(1000.min(recv_timeout_ms))) {
match receiver.recv_timeout(Duration::from_millis(recv_timeout_ms)) {
Err(RecvTimeoutError::Disconnected) => {
info!("Terminating send-transaction-service.");
exit.store(true, Ordering::Relaxed);
@ -243,7 +295,7 @@ impl SendTransactionService {
let _result = Self::send_transactions_in_batch(
&tpu_address,
&mut transactions,
&leader_info,
leader_info_provider.lock().unwrap().get_leader_info(),
&config,
);
let last_sent_time = Instant::now();
@ -266,12 +318,6 @@ impl SendTransactionService {
}
last_batch_sent = Instant::now();
if last_leader_refresh.elapsed().as_millis() > 1000 {
if let Some(leader_info) = leader_info.as_mut() {
leader_info.refresh_recent_peers();
}
last_leader_refresh = Instant::now();
}
}
})
.unwrap()
@ -281,26 +327,23 @@ impl SendTransactionService {
fn retry_thread<T: TpuInfo + std::marker::Send + 'static>(
tpu_address: SocketAddr,
bank_forks: Arc<RwLock<BankForks>>,
mut leader_info: Option<T>,
leader_info_provider: Arc<Mutex<CurrentLeaderInfo<T>>>,
config: Config,
retry_transactions: Arc<Mutex<HashMap<Signature, TransactionInfo>>>,
exit: Arc<AtomicBool>,
) -> JoinHandle<()> {
let mut last_leader_refresh = Instant::now();
info!(
"Starting send-transaction-service::retry_thread with config {:?}",
config
);
if let Some(leader_info) = leader_info.as_mut() {
leader_info.refresh_recent_peers();
}
connection_cache::set_use_quic(config.use_quic);
Builder::new()
.name("send-tx-retry".to_string())
.spawn(move || loop {
let retry_interval_ms = config.retry_rate_ms;
sleep(Duration::from_millis(1000.min(retry_interval_ms)));
sleep(Duration::from_millis(
MAX_RETRY_SLEEP_MS.min(retry_interval_ms),
));
if exit.load(Ordering::Relaxed) {
break;
}
@ -323,16 +366,10 @@ impl SendTransactionService {
&root_bank,
&tpu_address,
&mut transactions,
&leader_info,
&leader_info_provider,
&config,
);
}
if last_leader_refresh.elapsed().as_millis() > 1000 {
if let Some(leader_info) = leader_info.as_mut() {
leader_info.refresh_recent_peers();
}
last_leader_refresh = Instant::now();
}
})
.unwrap()
}
@ -341,7 +378,7 @@ impl SendTransactionService {
fn send_transactions_in_batch<T: TpuInfo>(
tpu_address: &SocketAddr,
transactions: &mut HashMap<Signature, TransactionInfo>,
leader_info: &Option<T>,
leader_info: Option<&T>,
config: &Config,
) {
let mut measure = Measure::start("send_transactions_in_batch-us");
@ -367,12 +404,12 @@ impl SendTransactionService {
}
/// Retry transactions sent before.
fn process_transactions<T: TpuInfo>(
fn process_transactions<T: TpuInfo + std::marker::Send + 'static>(
working_bank: &Arc<Bank>,
root_bank: &Arc<Bank>,
tpu_address: &SocketAddr,
transactions: &mut HashMap<Signature, TransactionInfo>,
leader_info: &Option<T>,
leader_info_provider: &Arc<Mutex<CurrentLeaderInfo<T>>>,
config: &Config,
) -> ProcessTransactionsResult {
let mut result = ProcessTransactionsResult::default();
@ -469,17 +506,19 @@ impl SendTransactionService {
if !batched_transactions.is_empty() {
// Processing the transactions in batch
let addresses = Self::get_tpu_addresses(tpu_address, leader_info, config);
let wire_transactions = transactions
.iter()
.filter(|(signature, _)| batched_transactions.contains(signature))
.map(|(_, transaction_info)| transaction_info.wire_transaction.as_ref())
.collect::<Vec<&[u8]>>();
for address in &addresses {
let iter = wire_transactions.chunks(config.batch_size);
for chunk in iter {
let iter = wire_transactions.chunks(config.batch_size);
for chunk in iter {
let mut leader_info_provider = leader_info_provider.lock().unwrap();
let leader_info = leader_info_provider.get_leader_info();
let addresses = Self::get_tpu_addresses(tpu_address, leader_info, config);
for address in &addresses {
Self::send_transactions(address, chunk);
}
}
@ -532,7 +571,7 @@ impl SendTransactionService {
fn get_tpu_addresses<'a, T: TpuInfo>(
tpu_address: &'a SocketAddr,
leader_info: &'a Option<T>,
leader_info: Option<&'a T>,
config: &'a Config,
) -> Vec<&'a SocketAddr> {
let addresses = leader_info
@ -630,6 +669,7 @@ mod test {
let mut transactions = HashMap::new();
info!("Expired transactions are dropped...");
let leader_info_provider = Arc::new(Mutex::new(CurrentLeaderInfo::new(None)));
transactions.insert(
Signature::default(),
TransactionInfo::new(
@ -646,7 +686,7 @@ mod test {
&root_bank,
&tpu_address,
&mut transactions,
&None,
&leader_info_provider,
&config,
);
assert!(transactions.is_empty());
@ -675,7 +715,7 @@ mod test {
&root_bank,
&tpu_address,
&mut transactions,
&None,
&leader_info_provider,
&config,
);
assert!(transactions.is_empty());
@ -704,7 +744,7 @@ mod test {
&root_bank,
&tpu_address,
&mut transactions,
&None,
&leader_info_provider,
&config,
);
assert!(transactions.is_empty());
@ -733,7 +773,7 @@ mod test {
&root_bank,
&tpu_address,
&mut transactions,
&None,
&leader_info_provider,
&config,
);
assert_eq!(transactions.len(), 1);
@ -764,7 +804,7 @@ mod test {
&root_bank,
&tpu_address,
&mut transactions,
&None,
&leader_info_provider,
&config,
);
assert_eq!(transactions.len(), 1);
@ -805,7 +845,7 @@ mod test {
&root_bank,
&tpu_address,
&mut transactions,
&None,
&leader_info_provider,
&config,
);
assert_eq!(transactions.len(), 1);
@ -822,7 +862,7 @@ mod test {
&root_bank,
&tpu_address,
&mut transactions,
&None,
&leader_info_provider,
&config,
);
assert!(transactions.is_empty());
@ -896,12 +936,13 @@ mod test {
Some(Instant::now()),
),
);
let leader_info_provider = Arc::new(Mutex::new(CurrentLeaderInfo::new(None)));
let result = SendTransactionService::process_transactions::<NullTpuInfo>(
&working_bank,
&root_bank,
&tpu_address,
&mut transactions,
&None,
&leader_info_provider,
&config,
);
assert!(transactions.is_empty());
@ -929,7 +970,7 @@ mod test {
&root_bank,
&tpu_address,
&mut transactions,
&None,
&leader_info_provider,
&config,
);
assert!(transactions.is_empty());
@ -959,7 +1000,7 @@ mod test {
&root_bank,
&tpu_address,
&mut transactions,
&None,
&leader_info_provider,
&config,
);
assert!(transactions.is_empty());
@ -987,7 +1028,7 @@ mod test {
&root_bank,
&tpu_address,
&mut transactions,
&None,
&leader_info_provider,
&config,
);
assert!(transactions.is_empty());
@ -1016,7 +1057,7 @@ mod test {
&root_bank,
&tpu_address,
&mut transactions,
&None,
&leader_info_provider,
&config,
);
assert!(transactions.is_empty());
@ -1045,7 +1086,7 @@ mod test {
&root_bank,
&tpu_address,
&mut transactions,
&None,
&leader_info_provider,
&config,
);
assert_eq!(transactions.len(), 1);
@ -1076,7 +1117,7 @@ mod test {
&root_bank,
&tpu_address,
&mut transactions,
&None,
&leader_info_provider,
&config,
);
assert_eq!(transactions.len(), 1);
@ -1104,7 +1145,7 @@ mod test {
&root_bank,
&tpu_address,
&mut transactions,
&None,
&leader_info_provider,
&config,
);
assert_eq!(transactions.len(), 0);