From 6349a6dc979aafd8b8b63780ca9575b33ad54006 Mon Sep 17 00:00:00 2001 From: Jon Cinque Date: Wed, 1 Jun 2022 23:41:35 +0200 Subject: [PATCH] tpu-client: Add "fake" async version with sync sending (#25499) * tpu-client: Add "fake" async for sending * Add Drop implementation * Rebase --- client/src/nonblocking/mod.rs | 1 + client/src/nonblocking/tpu_client.rs | 479 +++++++++++++++++++++++++++ rpc-test/tests/nonblocking.rs | 71 ++++ 3 files changed, 551 insertions(+) create mode 100644 client/src/nonblocking/tpu_client.rs create mode 100644 rpc-test/tests/nonblocking.rs diff --git a/client/src/nonblocking/mod.rs b/client/src/nonblocking/mod.rs index d1e64a97af..fa40c0db2c 100644 --- a/client/src/nonblocking/mod.rs +++ b/client/src/nonblocking/mod.rs @@ -1,2 +1,3 @@ pub mod pubsub_client; pub mod rpc_client; +pub mod tpu_client; diff --git a/client/src/nonblocking/tpu_client.rs b/client/src/nonblocking/tpu_client.rs new file mode 100644 index 0000000000..91ed573824 --- /dev/null +++ b/client/src/nonblocking/tpu_client.rs @@ -0,0 +1,479 @@ +use { + crate::{ + client_error::ClientError, + connection_cache::get_connection, + nonblocking::{ + pubsub_client::{PubsubClient, PubsubClientError}, + rpc_client::RpcClient, + }, + rpc_request::MAX_GET_SIGNATURE_STATUSES_QUERY_ITEMS, + rpc_response::SlotUpdate, + spinner, + tpu_client::{ + LeaderTpuCache, LeaderTpuCacheUpdateInfo, RecentLeaderSlots, TpuClientConfig, + MAX_FANOUT_SLOTS, SEND_TRANSACTION_INTERVAL, TRANSACTION_RESEND_INTERVAL, + }, + tpu_connection::TpuConnection, + }, + bincode::serialize, + futures_util::stream::StreamExt, + log::*, + solana_sdk::{ + clock::Slot, + commitment_config::CommitmentConfig, + message::Message, + signature::SignerError, + signers::Signers, + transaction::{Transaction, TransactionError}, + transport::{Result as TransportResult, TransportError}, + }, + std::{ + collections::HashMap, + net::SocketAddr, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, RwLock, + }, + }, + thiserror::Error, + tokio::{ + task::JoinHandle, + time::{sleep, timeout, Duration, Instant}, + }, +}; + +#[derive(Error, Debug)] +pub enum TpuSenderError { + #[error("Pubsub error: {0:?}")] + PubsubError(#[from] PubsubClientError), + #[error("RPC error: {0:?}")] + RpcError(#[from] ClientError), + #[error("IO error: {0:?}")] + IoError(#[from] std::io::Error), + #[error("Signer error: {0:?}")] + SignerError(#[from] SignerError), + #[error("Custom error: {0}")] + Custom(String), +} + +type Result = std::result::Result; + +/// Client which sends transactions directly to the current leader's TPU port over UDP. +/// The client uses RPC to determine the current leader and fetch node contact info +pub struct TpuClient { + fanout_slots: u64, + leader_tpu_service: LeaderTpuService, + exit: Arc, + rpc_client: Arc, +} + +impl TpuClient { + /// Serialize and send transaction to the current and upcoming leader TPUs according to fanout + /// size + pub async fn send_transaction(&self, transaction: &Transaction) -> bool { + let wire_transaction = serialize(transaction).expect("serialization should succeed"); + self.send_wire_transaction(wire_transaction).await + } + + /// Send a wire transaction to the current and upcoming leader TPUs according to fanout size + pub async fn send_wire_transaction(&self, wire_transaction: Vec) -> bool { + self.try_send_wire_transaction(wire_transaction) + .await + .is_ok() + } + + /// Serialize and send transaction to the current and upcoming leader TPUs according to fanout + /// size + /// Returns the last error if all sends fail + pub async fn try_send_transaction(&self, transaction: &Transaction) -> TransportResult<()> { + let wire_transaction = serialize(transaction).expect("serialization should succeed"); + self.try_send_wire_transaction(wire_transaction).await + } + + /// Send a wire transaction to the current and upcoming leader TPUs according to fanout size + /// Returns the last error if all sends fail + async fn try_send_wire_transaction(&self, wire_transaction: Vec) -> TransportResult<()> { + let mut last_error: Option = None; + let mut some_success = false; + for tpu_address in self + .leader_tpu_service + .leader_tpu_sockets(self.fanout_slots) + { + let conn = get_connection(&tpu_address); + // Fake async + let result = conn.send_wire_transaction_async(wire_transaction.clone()); + if let Err(err) = result { + last_error = Some(err); + } else { + some_success = true; + } + } + if !some_success { + Err(if let Some(err) = last_error { + err + } else { + std::io::Error::new(std::io::ErrorKind::Other, "No sends attempted").into() + }) + } else { + Ok(()) + } + } + + /// Create a new client that disconnects when dropped + pub async fn new( + rpc_client: Arc, + websocket_url: &str, + config: TpuClientConfig, + ) -> Result { + let exit = Arc::new(AtomicBool::new(false)); + let leader_tpu_service = + LeaderTpuService::new(rpc_client.clone(), websocket_url, exit.clone()).await?; + + Ok(Self { + fanout_slots: config.fanout_slots.min(MAX_FANOUT_SLOTS).max(1), + leader_tpu_service, + exit, + rpc_client, + }) + } + + pub async fn send_and_confirm_messages_with_spinner( + &self, + messages: &[Message], + signers: &T, + ) -> Result>> { + let mut expired_blockhash_retries = 5; + let progress_bar = spinner::new_progress_bar(); + progress_bar.set_message("Setting up..."); + + let mut transactions = messages + .iter() + .enumerate() + .map(|(i, message)| (i, Transaction::new_unsigned(message.clone()))) + .collect::>(); + let total_transactions = transactions.len(); + let mut transaction_errors = vec![None; transactions.len()]; + let mut confirmed_transactions = 0; + let mut block_height = self.rpc_client.get_block_height().await?; + while expired_blockhash_retries > 0 { + let (blockhash, last_valid_block_height) = self + .rpc_client + .get_latest_blockhash_with_commitment(self.rpc_client.commitment()) + .await?; + + let mut pending_transactions = HashMap::new(); + for (i, mut transaction) in transactions { + transaction.try_sign(signers, blockhash)?; + pending_transactions.insert(transaction.signatures[0], (i, transaction)); + } + + let mut last_resend = Instant::now() - TRANSACTION_RESEND_INTERVAL; + while block_height <= last_valid_block_height { + let num_transactions = pending_transactions.len(); + + // Periodically re-send all pending transactions + if Instant::now().duration_since(last_resend) > TRANSACTION_RESEND_INTERVAL { + for (index, (_i, transaction)) in pending_transactions.values().enumerate() { + if !self.send_transaction(transaction).await { + let _result = self.rpc_client.send_transaction(transaction).await.ok(); + } + spinner::set_message_for_confirmed_transactions( + &progress_bar, + confirmed_transactions, + total_transactions, + None, //block_height, + last_valid_block_height, + &format!("Sending {}/{} transactions", index + 1, num_transactions,), + ); + sleep(SEND_TRANSACTION_INTERVAL).await; + } + last_resend = Instant::now(); + } + + // Wait for the next block before checking for transaction statuses + let mut block_height_refreshes = 10; + spinner::set_message_for_confirmed_transactions( + &progress_bar, + confirmed_transactions, + total_transactions, + Some(block_height), + last_valid_block_height, + &format!( + "Waiting for next block, {} transactions pending...", + num_transactions + ), + ); + let mut new_block_height = block_height; + while block_height == new_block_height && block_height_refreshes > 0 { + sleep(Duration::from_millis(500)).await; + new_block_height = self.rpc_client.get_block_height().await?; + block_height_refreshes -= 1; + } + block_height = new_block_height; + + // Collect statuses for the transactions, drop those that are confirmed + let pending_signatures = pending_transactions.keys().cloned().collect::>(); + for pending_signatures_chunk in + pending_signatures.chunks(MAX_GET_SIGNATURE_STATUSES_QUERY_ITEMS) + { + if let Ok(result) = self + .rpc_client + .get_signature_statuses(pending_signatures_chunk) + .await + { + let statuses = result.value; + for (signature, status) in + pending_signatures_chunk.iter().zip(statuses.into_iter()) + { + if let Some(status) = status { + if status.satisfies_commitment(self.rpc_client.commitment()) { + if let Some((i, _)) = pending_transactions.remove(signature) { + confirmed_transactions += 1; + if status.err.is_some() { + progress_bar.println(format!( + "Failed transaction: {:?}", + status + )); + } + transaction_errors[i] = status.err; + } + } + } + } + } + spinner::set_message_for_confirmed_transactions( + &progress_bar, + confirmed_transactions, + total_transactions, + Some(block_height), + last_valid_block_height, + "Checking transaction status...", + ); + } + + if pending_transactions.is_empty() { + return Ok(transaction_errors); + } + } + + transactions = pending_transactions.into_iter().map(|(_k, v)| v).collect(); + progress_bar.println(format!( + "Blockhash expired. {} retries remaining", + expired_blockhash_retries + )); + expired_blockhash_retries -= 1; + } + Err(TpuSenderError::Custom("Max retries exceeded".into())) + } + + pub fn rpc_client(&self) -> &RpcClient { + &self.rpc_client + } + + pub async fn shutdown(&mut self) { + self.exit.store(true, Ordering::Relaxed); + self.leader_tpu_service.join().await; + } +} +impl Drop for TpuClient { + fn drop(&mut self) { + self.exit.store(true, Ordering::Relaxed); + } +} + +/// Service that tracks upcoming leaders and maintains an up-to-date mapping +/// of leader id to TPU socket address. +pub struct LeaderTpuService { + recent_slots: RecentLeaderSlots, + leader_tpu_cache: Arc>, + t_leader_tpu_service: Option>>, +} + +impl LeaderTpuService { + pub async fn new( + rpc_client: Arc, + websocket_url: &str, + exit: Arc, + ) -> Result { + let start_slot = rpc_client + .get_slot_with_commitment(CommitmentConfig::processed()) + .await?; + + let recent_slots = RecentLeaderSlots::new(start_slot); + let slots_in_epoch = rpc_client.get_epoch_info().await?.slots_in_epoch; + let leaders = rpc_client + .get_slot_leaders(start_slot, LeaderTpuCache::fanout(slots_in_epoch)) + .await?; + let cluster_nodes = rpc_client.get_cluster_nodes().await?; + let leader_tpu_cache = Arc::new(RwLock::new(LeaderTpuCache::new( + start_slot, + slots_in_epoch, + leaders, + cluster_nodes, + ))); + + let pubsub_client = if !websocket_url.is_empty() { + Some(PubsubClient::new(websocket_url).await?) + } else { + None + }; + + let t_leader_tpu_service = Some({ + let recent_slots = recent_slots.clone(); + let leader_tpu_cache = leader_tpu_cache.clone(); + tokio::spawn(async move { + Self::run( + rpc_client, + recent_slots, + leader_tpu_cache, + pubsub_client, + exit, + ) + .await + }) + }); + + Ok(LeaderTpuService { + recent_slots, + leader_tpu_cache, + t_leader_tpu_service, + }) + } + + pub async fn join(&mut self) { + if let Some(t_handle) = self.t_leader_tpu_service.take() { + t_handle.await.unwrap().unwrap(); + } + } + + pub fn estimated_current_slot(&self) -> Slot { + self.recent_slots.estimated_current_slot() + } + + fn leader_tpu_sockets(&self, fanout_slots: u64) -> Vec { + let current_slot = self.recent_slots.estimated_current_slot(); + self.leader_tpu_cache + .read() + .unwrap() + .get_leader_sockets(current_slot, fanout_slots) + } + + async fn run( + rpc_client: Arc, + recent_slots: RecentLeaderSlots, + leader_tpu_cache: Arc>, + pubsub_client: Option, + exit: Arc, + ) -> Result<()> { + let (mut notifications, unsubscribe) = if let Some(pubsub_client) = &pubsub_client { + let (notifications, unsubscribe) = pubsub_client.slot_updates_subscribe().await?; + (Some(notifications), Some(unsubscribe)) + } else { + (None, None) + }; + let mut last_cluster_refresh = Instant::now(); + let mut sleep_ms = 1000; + loop { + if exit.load(Ordering::Relaxed) { + if let Some(unsubscribe) = unsubscribe { + (unsubscribe)().await; + } + // `notifications` requires a valid reference to `pubsub_client` + // so `notifications` must be dropped before moving `pubsub_client` + drop(notifications); + if let Some(pubsub_client) = pubsub_client { + pubsub_client.shutdown().await.unwrap(); + }; + break; + } + + // Sleep a slot before checking if leader cache needs to be refreshed again + sleep(Duration::from_millis(sleep_ms)).await; + sleep_ms = 1000; + + if let Some(notifications) = &mut notifications { + while let Ok(Some(update)) = + timeout(Duration::from_millis(10), notifications.next()).await + { + let current_slot = match update { + // This update indicates that a full slot was received by the connected + // node so we can stop sending transactions to the leader for that slot + SlotUpdate::Completed { slot, .. } => slot.saturating_add(1), + // This update indicates that we have just received the first shred from + // the leader for this slot and they are probably still accepting transactions. + SlotUpdate::FirstShredReceived { slot, .. } => slot, + _ => continue, + }; + recent_slots.record_slot(current_slot); + } + } + + let cache_update_info = maybe_fetch_cache_info( + &leader_tpu_cache, + last_cluster_refresh, + &rpc_client, + &recent_slots, + ) + .await; + + if cache_update_info.has_some() { + let mut leader_tpu_cache = leader_tpu_cache.write().unwrap(); + let (has_error, cluster_refreshed) = leader_tpu_cache + .update_all(recent_slots.estimated_current_slot(), cache_update_info); + if has_error { + sleep_ms = 100; + } + if cluster_refreshed { + last_cluster_refresh = Instant::now(); + } + } + } + Ok(()) + } +} + +async fn maybe_fetch_cache_info( + leader_tpu_cache: &Arc>, + last_cluster_refresh: Instant, + rpc_client: &RpcClient, + recent_slots: &RecentLeaderSlots, +) -> LeaderTpuCacheUpdateInfo { + // Refresh cluster TPU ports every 5min in case validators restart with new port configuration + // or new validators come online + let maybe_cluster_nodes = if last_cluster_refresh.elapsed() > Duration::from_secs(5 * 60) { + Some(rpc_client.get_cluster_nodes().await) + } else { + None + }; + + let estimated_current_slot = recent_slots.estimated_current_slot(); + let (last_slot, last_epoch_info_slot, slots_in_epoch) = { + let leader_tpu_cache = leader_tpu_cache.read().unwrap(); + leader_tpu_cache.slot_info() + }; + let maybe_epoch_info = + if estimated_current_slot >= last_epoch_info_slot.saturating_sub(slots_in_epoch) { + Some(rpc_client.get_epoch_info().await) + } else { + None + }; + + let maybe_slot_leaders = if estimated_current_slot >= last_slot.saturating_sub(MAX_FANOUT_SLOTS) + { + Some( + rpc_client + .get_slot_leaders( + estimated_current_slot, + LeaderTpuCache::fanout(slots_in_epoch), + ) + .await, + ) + } else { + None + }; + LeaderTpuCacheUpdateInfo { + maybe_cluster_nodes, + maybe_epoch_info, + maybe_slot_leaders, + } +} diff --git a/rpc-test/tests/nonblocking.rs b/rpc-test/tests/nonblocking.rs new file mode 100644 index 0000000000..ecb9ee5199 --- /dev/null +++ b/rpc-test/tests/nonblocking.rs @@ -0,0 +1,71 @@ +use { + solana_client::{ + nonblocking::tpu_client::{LeaderTpuService, TpuClient}, + tpu_client::TpuClientConfig, + }, + solana_sdk::{clock::DEFAULT_MS_PER_SLOT, pubkey::Pubkey, system_transaction}, + solana_test_validator::TestValidatorGenesis, + std::sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, + tokio::time::{sleep, Duration, Instant}, +}; + +#[tokio::test] +async fn test_tpu_send_transaction() { + let (test_validator, mint_keypair) = TestValidatorGenesis::default().start_async().await; + let rpc_client = Arc::new(test_validator.get_async_rpc_client()); + let mut tpu_client = TpuClient::new( + rpc_client.clone(), + &test_validator.rpc_pubsub_url(), + TpuClientConfig::default(), + ) + .await + .unwrap(); + + let recent_blockhash = rpc_client.get_latest_blockhash().await.unwrap(); + let tx = + system_transaction::transfer(&mint_keypair, &Pubkey::new_unique(), 42, recent_blockhash); + assert!(tpu_client.send_transaction(&tx).await); + + let timeout = Duration::from_secs(5); + let now = Instant::now(); + let signatures = vec![tx.signatures[0]]; + loop { + assert!(now.elapsed() < timeout); + let statuses = rpc_client + .get_signature_statuses(&signatures) + .await + .unwrap(); + if statuses.value.get(0).is_some() { + break; + } + } + tpu_client.shutdown().await; +} + +#[tokio::test] +async fn test_tpu_cache_slot_updates() { + let (test_validator, _) = TestValidatorGenesis::default().start_async().await; + let rpc_client = Arc::new(test_validator.get_async_rpc_client()); + let exit = Arc::new(AtomicBool::new(false)); + let mut leader_tpu_service = + LeaderTpuService::new(rpc_client, &test_validator.rpc_pubsub_url(), exit.clone()) + .await + .unwrap(); + let start_slot = leader_tpu_service.estimated_current_slot(); + let timeout = Duration::from_secs(5); + let sleep_time = Duration::from_millis(DEFAULT_MS_PER_SLOT); + let now = Instant::now(); + loop { + assert!(now.elapsed() < timeout); + let current_slot = leader_tpu_service.estimated_current_slot(); + if current_slot != start_slot { + break; + } + sleep(sleep_time).await; + } + exit.store(true, Ordering::Relaxed); + leader_tpu_service.join().await; +}