From 09ea899a2df5ab3298a29fe9876dc23b52d6f1dc Mon Sep 17 00:00:00 2001 From: Jon Cinque Date: Sat, 21 May 2022 20:22:30 +0200 Subject: [PATCH] tpu-client: Refactor to prep for async client (#25432) --- client/src/spinner.rs | 23 ++++ client/src/tpu_client.rs | 285 ++++++++++++++++++++++++--------------- 2 files changed, 203 insertions(+), 105 deletions(-) diff --git a/client/src/spinner.rs b/client/src/spinner.rs index 97ae0bee21..19b788b4a5 100644 --- a/client/src/spinner.rs +++ b/client/src/spinner.rs @@ -9,3 +9,26 @@ pub(crate) fn new_progress_bar() -> ProgressBar { progress_bar.enable_steady_tick(100); progress_bar } + +pub(crate) fn set_message_for_confirmed_transactions( + progress_bar: &ProgressBar, + confirmed_transactions: u32, + total_transactions: usize, + block_height: Option, + last_valid_block_height: u64, + status: &str, +) { + progress_bar.set_message(format!( + "{:>5.1}% | {:<40}{}", + confirmed_transactions as f64 * 100. / total_transactions as f64, + status, + match block_height { + Some(block_height) => format!( + " [block height {}; re-sign in {} blocks]", + block_height, + last_valid_block_height.saturating_sub(block_height), + ), + None => String::new(), + }, + )); +} diff --git a/client/src/tpu_client.rs b/client/src/tpu_client.rs index abb87244c1..64f03f2b39 100644 --- a/client/src/tpu_client.rs +++ b/client/src/tpu_client.rs @@ -1,11 +1,11 @@ use { crate::{ - client_error::ClientError, + client_error::{ClientError, Result as ClientResult}, connection_cache::send_wire_transaction_async, pubsub_client::{PubsubClient, PubsubClientError, PubsubClientSubscription}, rpc_client::RpcClient, rpc_request::MAX_GET_SIGNATURE_STATUSES_QUERY_ITEMS, - rpc_response::SlotUpdate, + rpc_response::{RpcContactInfo, SlotUpdate}, spinner, }, bincode::serialize, @@ -13,6 +13,7 @@ use { solana_sdk::{ clock::Slot, commitment_config::CommitmentConfig, + epoch_info::EpochInfo, message::Message, pubkey::Pubkey, signature::SignerError, @@ -29,9 +30,9 @@ use { Arc, RwLock, }, thread::{sleep, JoinHandle}, - time::{Duration, Instant}, }, thiserror::Error, + tokio::time::{Duration, Instant}, }; #[derive(Error, Debug)] @@ -56,6 +57,11 @@ pub const DEFAULT_FANOUT_SLOTS: u64 = 12; /// Maximum number of slots used to build TPU socket fanout set pub const MAX_FANOUT_SLOTS: u64 = 100; +/// Send at ~100 TPS +pub(crate) const SEND_TRANSACTION_INTERVAL: Duration = Duration::from_millis(10); +/// Retry batch send after 4 seconds +pub(crate) const TRANSACTION_RESEND_INTERVAL: Duration = Duration::from_secs(4); + /// Config params for `TpuClient` #[derive(Clone, Debug)] pub struct TpuClientConfig { @@ -156,10 +162,6 @@ impl TpuClient { signers: &T, ) -> Result>> { let mut expired_blockhash_retries = 5; - /* Send at ~100 TPS */ - const SEND_TRANSACTION_INTERVAL: Duration = Duration::from_millis(10); - /* Retry batch send after 4 seconds */ - const TRANSACTION_RESEND_INTERVAL: Duration = Duration::from_secs(4); let progress_bar = spinner::new_progress_bar(); progress_bar.set_message("Setting up..."); @@ -169,29 +171,11 @@ impl TpuClient { .enumerate() .map(|(i, message)| (i, Transaction::new_unsigned(message.clone()))) .collect::>(); - let num_transactions = transactions.len() as f64; + let total_transactions = transactions.len(); let mut transaction_errors = vec![None; transactions.len()]; - let set_message = |confirmed_transactions, - block_height: Option, - last_valid_block_height: u64, - status: &str| { - progress_bar.set_message(format!( - "{:>5.1}% | {:<40}{}", - confirmed_transactions as f64 * 100. / num_transactions, - status, - match block_height { - Some(block_height) => format!( - " [block height {}; re-sign in {} blocks]", - block_height, - last_valid_block_height.saturating_sub(block_height), - ), - None => String::new(), - }, - )); - }; - let mut confirmed_transactions = 0; let mut block_height = self.rpc_client.get_block_height()?; + while expired_blockhash_retries > 0 { let (blockhash, last_valid_block_height) = self .rpc_client @@ -213,8 +197,10 @@ impl TpuClient { if !self.send_transaction(transaction) { let _result = self.rpc_client.send_transaction(transaction).ok(); } - set_message( + spinner::set_message_for_confirmed_transactions( + &progress_bar, confirmed_transactions, + total_transactions, None, //block_height, last_valid_block_height, &format!("Sending {}/{} transactions", index + 1, num_transactions,), @@ -226,8 +212,10 @@ impl TpuClient { // Wait for the next block before checking for transaction statuses let mut block_height_refreshes = 10; - set_message( + spinner::set_message_for_confirmed_transactions( + &progress_bar, confirmed_transactions, + total_transactions, Some(block_height), last_valid_block_height, &format!("Waiting for next block, {} pending...", num_transactions), @@ -269,8 +257,10 @@ impl TpuClient { } } } - set_message( + spinner::set_message_for_confirmed_transactions( + &progress_bar, confirmed_transactions, + total_transactions, Some(block_height), last_valid_block_height, "Checking transaction status...", @@ -304,7 +294,20 @@ impl Drop for TpuClient { } } -struct LeaderTpuCache { +pub(crate) struct LeaderTpuCacheUpdateInfo { + pub(crate) maybe_cluster_nodes: Option>>, + pub(crate) maybe_epoch_info: Option>, + pub(crate) maybe_slot_leaders: Option>>, +} +impl LeaderTpuCacheUpdateInfo { + pub(crate) fn has_some(&self) -> bool { + self.maybe_cluster_nodes.is_some() + || self.maybe_epoch_info.is_some() + || self.maybe_slot_leaders.is_some() + } +} + +pub(crate) struct LeaderTpuCache { first_slot: Slot, leaders: Vec, leader_tpu_map: HashMap, @@ -313,26 +316,41 @@ struct LeaderTpuCache { } impl LeaderTpuCache { - fn new(rpc_client: &RpcClient, first_slot: Slot) -> Result { - let slots_in_epoch = rpc_client.get_epoch_info()?.slots_in_epoch; - let leaders = Self::fetch_slot_leaders(rpc_client, first_slot, slots_in_epoch)?; - let leader_tpu_map = Self::fetch_cluster_tpu_sockets(rpc_client)?; - Ok(Self { + pub(crate) fn new( + first_slot: Slot, + slots_in_epoch: Slot, + leaders: Vec, + cluster_nodes: Vec, + ) -> Self { + let leader_tpu_map = Self::extract_cluster_tpu_sockets(cluster_nodes); + Self { first_slot, leaders, leader_tpu_map, slots_in_epoch, last_epoch_info_slot: first_slot, - }) + } } // Last slot that has a cached leader pubkey - fn last_slot(&self) -> Slot { + pub(crate) fn last_slot(&self) -> Slot { self.first_slot + self.leaders.len().saturating_sub(1) as u64 } + pub(crate) fn slot_info(&self) -> (Slot, Slot, Slot) { + ( + self.last_slot(), + self.last_epoch_info_slot, + self.slots_in_epoch, + ) + } + // Get the TPU sockets for the current leader and upcoming leaders according to fanout size - fn get_leader_sockets(&self, current_slot: Slot, fanout_slots: u64) -> Vec { + pub(crate) fn get_leader_sockets( + &self, + current_slot: Slot, + fanout_slots: u64, + ) -> Vec { let mut leader_set = HashSet::new(); let mut leader_sockets = Vec::new(); for leader_slot in current_slot..current_slot + fanout_slots { @@ -358,7 +376,7 @@ impl LeaderTpuCache { leader_sockets } - fn get_slot_leader(&self, slot: Slot) -> Option<&Pubkey> { + pub(crate) fn get_slot_leader(&self, slot: Slot) -> Option<&Pubkey> { if slot >= self.first_slot { let index = slot - self.first_slot; self.leaders.get(index as usize) @@ -367,9 +385,10 @@ impl LeaderTpuCache { } } - fn fetch_cluster_tpu_sockets(rpc_client: &RpcClient) -> Result> { - let cluster_contact_info = rpc_client.get_cluster_nodes()?; - Ok(cluster_contact_info + pub(crate) fn extract_cluster_tpu_sockets( + cluster_contact_info: Vec, + ) -> HashMap { + cluster_contact_info .into_iter() .filter_map(|contact_info| { Some(( @@ -377,16 +396,97 @@ impl LeaderTpuCache { contact_info.tpu?, )) }) - .collect()) + .collect() } - fn fetch_slot_leaders( - rpc_client: &RpcClient, - start_slot: Slot, - slots_in_epoch: Slot, - ) -> Result> { - let fanout = (2 * MAX_FANOUT_SLOTS).min(slots_in_epoch); - Ok(rpc_client.get_slot_leaders(start_slot, fanout)?) + pub(crate) fn fanout(slots_in_epoch: Slot) -> Slot { + (2 * MAX_FANOUT_SLOTS).min(slots_in_epoch) + } + + pub(crate) fn update_all( + &mut self, + estimated_current_slot: Slot, + cache_update_info: LeaderTpuCacheUpdateInfo, + ) -> (bool, bool) { + let mut has_error = false; + let mut cluster_refreshed = false; + if let Some(cluster_nodes) = cache_update_info.maybe_cluster_nodes { + match cluster_nodes { + Ok(cluster_nodes) => { + let leader_tpu_map = LeaderTpuCache::extract_cluster_tpu_sockets(cluster_nodes); + self.leader_tpu_map = leader_tpu_map; + cluster_refreshed = true; + } + Err(err) => { + warn!("Failed to fetch cluster tpu sockets: {}", err); + has_error = true; + } + } + } + + if let Some(Ok(epoch_info)) = cache_update_info.maybe_epoch_info { + self.slots_in_epoch = epoch_info.slots_in_epoch; + self.last_epoch_info_slot = estimated_current_slot; + } + + if let Some(slot_leaders) = cache_update_info.maybe_slot_leaders { + match slot_leaders { + Ok(slot_leaders) => { + self.first_slot = estimated_current_slot; + self.leaders = slot_leaders; + } + Err(err) => { + warn!( + "Failed to fetch slot leaders (current estimated slot: {}): {}", + estimated_current_slot, err + ); + has_error = true; + } + } + } + (has_error, cluster_refreshed) + } +} + +fn maybe_fetch_cache_info( + leader_tpu_cache: &Arc>, + last_cluster_refresh: Instant, + rpc_client: &RpcClient, + recent_slots: &RecentLeaderSlots, +) -> LeaderTpuCacheUpdateInfo { + // Refresh cluster TPU ports every 5min in case validators restart with new port configuration + // or new validators come online + let maybe_cluster_nodes = if last_cluster_refresh.elapsed() > Duration::from_secs(5 * 60) { + Some(rpc_client.get_cluster_nodes()) + } else { + None + }; + + let estimated_current_slot = recent_slots.estimated_current_slot(); + let (last_slot, last_epoch_info_slot, slots_in_epoch) = { + let leader_tpu_cache = leader_tpu_cache.read().unwrap(); + leader_tpu_cache.slot_info() + }; + let maybe_epoch_info = + if estimated_current_slot >= last_epoch_info_slot.saturating_sub(slots_in_epoch) { + Some(rpc_client.get_epoch_info()) + } else { + None + }; + + let maybe_slot_leaders = if estimated_current_slot >= last_slot.saturating_sub(MAX_FANOUT_SLOTS) + { + Some(rpc_client.get_slot_leaders( + estimated_current_slot, + LeaderTpuCache::fanout(slots_in_epoch), + )) + } else { + None + }; + LeaderTpuCacheUpdateInfo { + maybe_cluster_nodes, + maybe_epoch_info, + maybe_slot_leaders, } } @@ -394,15 +494,15 @@ impl LeaderTpuCache { const MAX_SLOT_SKIP_DISTANCE: u64 = 48; #[derive(Clone, Debug)] -struct RecentLeaderSlots(Arc>>); +pub(crate) struct RecentLeaderSlots(Arc>>); impl RecentLeaderSlots { - fn new(current_slot: Slot) -> Self { + pub(crate) fn new(current_slot: Slot) -> Self { let mut recent_slots = VecDeque::new(); recent_slots.push_back(current_slot); Self(Arc::new(RwLock::new(recent_slots))) } - fn record_slot(&self, current_slot: Slot) { + pub(crate) fn record_slot(&self, current_slot: Slot) { let mut recent_slots = self.0.write().unwrap(); recent_slots.push_back(current_slot); // 12 recent slots should be large enough to avoid a misbehaving @@ -413,7 +513,7 @@ impl RecentLeaderSlots { } // Estimate the current slot from recent slot notifications. - fn estimated_current_slot(&self) -> Slot { + pub(crate) fn estimated_current_slot(&self) -> Slot { let mut recent_slots: Vec = self.0.read().unwrap().iter().cloned().collect(); assert!(!recent_slots.is_empty()); recent_slots.sort_unstable(); @@ -458,7 +558,16 @@ impl LeaderTpuService { let start_slot = rpc_client.get_slot_with_commitment(CommitmentConfig::processed())?; let recent_slots = RecentLeaderSlots::new(start_slot); - let leader_tpu_cache = Arc::new(RwLock::new(LeaderTpuCache::new(&rpc_client, start_slot)?)); + let slots_in_epoch = rpc_client.get_epoch_info()?.slots_in_epoch; + let leaders = + rpc_client.get_slot_leaders(start_slot, LeaderTpuCache::fanout(slots_in_epoch))?; + let cluster_nodes = rpc_client.get_cluster_nodes()?; + let leader_tpu_cache = Arc::new(RwLock::new(LeaderTpuCache::new( + start_slot, + slots_in_epoch, + leaders, + cluster_nodes, + ))); let subscription = if !websocket_url.is_empty() { let recent_slots = recent_slots.clone(); @@ -530,59 +639,25 @@ impl LeaderTpuService { } // Sleep a few slots before checking if leader cache needs to be refreshed again - std::thread::sleep(Duration::from_millis(sleep_ms)); + sleep(Duration::from_millis(sleep_ms)); sleep_ms = 1000; - // Refresh cluster TPU ports every 5min in case validators restart with new port configuration - // or new validators come online - if last_cluster_refresh.elapsed() > Duration::from_secs(5 * 60) { - match LeaderTpuCache::fetch_cluster_tpu_sockets(&rpc_client) { - Ok(leader_tpu_map) => { - leader_tpu_cache.write().unwrap().leader_tpu_map = leader_tpu_map; - last_cluster_refresh = Instant::now(); - } - Err(err) => { - warn!("Failed to fetch cluster tpu sockets: {}", err); - sleep_ms = 100; - } - } - } + let cache_update_info = maybe_fetch_cache_info( + &leader_tpu_cache, + last_cluster_refresh, + &rpc_client, + &recent_slots, + ); - let estimated_current_slot = recent_slots.estimated_current_slot(); - let (last_slot, last_epoch_info_slot, mut slots_in_epoch) = { - let leader_tpu_cache = leader_tpu_cache.read().unwrap(); - ( - leader_tpu_cache.last_slot(), - leader_tpu_cache.last_epoch_info_slot, - leader_tpu_cache.slots_in_epoch, - ) - }; - if estimated_current_slot >= last_epoch_info_slot.saturating_sub(slots_in_epoch) { - if let Ok(epoch_info) = rpc_client.get_epoch_info() { - slots_in_epoch = epoch_info.slots_in_epoch; - let mut leader_tpu_cache = leader_tpu_cache.write().unwrap(); - leader_tpu_cache.slots_in_epoch = slots_in_epoch; - leader_tpu_cache.last_epoch_info_slot = estimated_current_slot; + if cache_update_info.has_some() { + let mut leader_tpu_cache = leader_tpu_cache.write().unwrap(); + let (has_error, cluster_refreshed) = leader_tpu_cache + .update_all(recent_slots.estimated_current_slot(), cache_update_info); + if has_error { + sleep_ms = 100; } - } - if estimated_current_slot >= last_slot.saturating_sub(MAX_FANOUT_SLOTS) { - match LeaderTpuCache::fetch_slot_leaders( - &rpc_client, - estimated_current_slot, - slots_in_epoch, - ) { - Ok(slot_leaders) => { - let mut leader_tpu_cache = leader_tpu_cache.write().unwrap(); - leader_tpu_cache.first_slot = estimated_current_slot; - leader_tpu_cache.leaders = slot_leaders; - } - Err(err) => { - warn!( - "Failed to fetch slot leaders (current estimated slot: {}): {}", - estimated_current_slot, err - ); - sleep_ms = 100; - } + if cluster_refreshed { + last_cluster_refresh = Instant::now(); } } }