From 04cac610cc87c59cb42eb0a6a3fa1931af5119f0 Mon Sep 17 00:00:00 2001 From: Jon Cinque Date: Tue, 9 Aug 2022 14:33:14 +0200 Subject: [PATCH] client: Use async TPU client in sync TPU client by sharing tokio runtime (#26996) * Make the sync tpu client use the async tpu client * Try to fix CI errors * Fix formatting * Make rpc_client::get_nonblocking_client public only in the crate * Save work * Temporary hack to test sharing runtime between tpu_client and rpc_client * [WIP] Copy rpc client * Fix build * Small refactoring * Remove copies * Refactor access to RPC client fields * Change `clone_inner_client` to `get_inner_client` Co-authored-by: Ryan Leung --- client/src/nonblocking/pubsub_client.rs | 6 + client/src/nonblocking/tpu_client.rs | 168 ++++++- client/src/pubsub_client.rs | 22 +- client/src/rpc_client.rs | 400 ++++++++--------- client/src/tpu_client.rs | 563 ++---------------------- 5 files changed, 385 insertions(+), 774 deletions(-) diff --git a/client/src/nonblocking/pubsub_client.rs b/client/src/nonblocking/pubsub_client.rs index 4f252e4fd..1aa6210d0 100644 --- a/client/src/nonblocking/pubsub_client.rs +++ b/client/src/nonblocking/pubsub_client.rs @@ -64,8 +64,14 @@ pub enum PubsubClientError { #[error("subscribe failed: {reason}")] SubscribeFailed { reason: String, message: String }, + #[error("unexpected message format: {0}")] + UnexpectedMessageError(String), + #[error("request failed: {reason}")] RequestFailed { reason: String, message: String }, + + #[error("request error: {0}")] + RequestError(String), } type UnsubscribeFn = Box BoxFuture<'static, ()> + Send>; diff --git a/client/src/nonblocking/tpu_client.rs b/client/src/nonblocking/tpu_client.rs index 4dd39c768..a231b787c 100644 --- a/client/src/nonblocking/tpu_client.rs +++ b/client/src/nonblocking/tpu_client.rs @@ -1,6 +1,6 @@ use { crate::{ - client_error::ClientError, + client_error::{ClientError, Result as ClientResult}, connection_cache::ConnectionCache, nonblocking::{ pubsub_client::{PubsubClient, PubsubClientError}, @@ -8,11 +8,11 @@ use { tpu_connection::TpuConnection, }, rpc_request::MAX_GET_SIGNATURE_STATUSES_QUERY_ITEMS, - rpc_response::SlotUpdate, + rpc_response::{RpcContactInfo, SlotUpdate}, spinner, tpu_client::{ - LeaderTpuCache, LeaderTpuCacheUpdateInfo, RecentLeaderSlots, TpuClientConfig, - MAX_FANOUT_SLOTS, SEND_TRANSACTION_INTERVAL, TRANSACTION_RESEND_INTERVAL, + RecentLeaderSlots, TpuClientConfig, MAX_FANOUT_SLOTS, SEND_TRANSACTION_INTERVAL, + TRANSACTION_RESEND_INTERVAL, }, }, bincode::serialize, @@ -21,15 +21,18 @@ use { solana_sdk::{ clock::Slot, commitment_config::CommitmentConfig, + epoch_info::EpochInfo, message::Message, + pubkey::Pubkey, signature::SignerError, signers::Signers, transaction::{Transaction, TransactionError}, transport::{Result as TransportResult, TransportError}, }, std::{ - collections::HashMap, + collections::{HashMap, HashSet}, net::SocketAddr, + str::FromStr, sync::{ atomic::{AtomicBool, Ordering}, Arc, RwLock, @@ -56,6 +59,156 @@ pub enum TpuSenderError { Custom(String), } +struct LeaderTpuCacheUpdateInfo { + maybe_cluster_nodes: Option>>, + maybe_epoch_info: Option>, + maybe_slot_leaders: Option>>, +} +impl LeaderTpuCacheUpdateInfo { + pub fn has_some(&self) -> bool { + self.maybe_cluster_nodes.is_some() + || self.maybe_epoch_info.is_some() + || self.maybe_slot_leaders.is_some() + } +} + +struct LeaderTpuCache { + first_slot: Slot, + leaders: Vec, + leader_tpu_map: HashMap, + slots_in_epoch: Slot, + last_epoch_info_slot: Slot, +} + +impl LeaderTpuCache { + pub fn new( + first_slot: Slot, + slots_in_epoch: Slot, + leaders: Vec, + cluster_nodes: Vec, + ) -> Self { + let leader_tpu_map = Self::extract_cluster_tpu_sockets(cluster_nodes); + Self { + first_slot, + leaders, + leader_tpu_map, + slots_in_epoch, + last_epoch_info_slot: first_slot, + } + } + + // Last slot that has a cached leader pubkey + pub fn last_slot(&self) -> Slot { + self.first_slot + self.leaders.len().saturating_sub(1) as u64 + } + + pub fn slot_info(&self) -> (Slot, Slot, Slot) { + ( + self.last_slot(), + self.last_epoch_info_slot, + self.slots_in_epoch, + ) + } + + // Get the TPU sockets for the current leader and upcoming leaders according to fanout size + pub fn get_leader_sockets(&self, current_slot: Slot, fanout_slots: u64) -> Vec { + let mut leader_set = HashSet::new(); + let mut leader_sockets = Vec::new(); + for leader_slot in current_slot..current_slot + fanout_slots { + if let Some(leader) = self.get_slot_leader(leader_slot) { + if let Some(tpu_socket) = self.leader_tpu_map.get(leader) { + if leader_set.insert(*leader) { + leader_sockets.push(*tpu_socket); + } + } else { + // The leader is probably delinquent + trace!("TPU not available for leader {}", leader); + } + } else { + // Overran the local leader schedule cache + warn!( + "Leader not known for slot {}; cache holds slots [{},{}]", + leader_slot, + self.first_slot, + self.last_slot() + ); + } + } + leader_sockets + } + + pub fn get_slot_leader(&self, slot: Slot) -> Option<&Pubkey> { + if slot >= self.first_slot { + let index = slot - self.first_slot; + self.leaders.get(index as usize) + } else { + None + } + } + + pub fn extract_cluster_tpu_sockets( + cluster_contact_info: Vec, + ) -> HashMap { + cluster_contact_info + .into_iter() + .filter_map(|contact_info| { + Some(( + Pubkey::from_str(&contact_info.pubkey).ok()?, + contact_info.tpu?, + )) + }) + .collect() + } + + pub fn fanout(slots_in_epoch: Slot) -> Slot { + (2 * MAX_FANOUT_SLOTS).min(slots_in_epoch) + } + + pub fn update_all( + &mut self, + estimated_current_slot: Slot, + cache_update_info: LeaderTpuCacheUpdateInfo, + ) -> (bool, bool) { + let mut has_error = false; + let mut cluster_refreshed = false; + if let Some(cluster_nodes) = cache_update_info.maybe_cluster_nodes { + match cluster_nodes { + Ok(cluster_nodes) => { + let leader_tpu_map = LeaderTpuCache::extract_cluster_tpu_sockets(cluster_nodes); + self.leader_tpu_map = leader_tpu_map; + cluster_refreshed = true; + } + Err(err) => { + warn!("Failed to fetch cluster tpu sockets: {}", err); + has_error = true; + } + } + } + + if let Some(Ok(epoch_info)) = cache_update_info.maybe_epoch_info { + self.slots_in_epoch = epoch_info.slots_in_epoch; + self.last_epoch_info_slot = estimated_current_slot; + } + + if let Some(slot_leaders) = cache_update_info.maybe_slot_leaders { + match slot_leaders { + Ok(slot_leaders) => { + self.first_slot = estimated_current_slot; + self.leaders = slot_leaders; + } + Err(err) => { + warn!( + "Failed to fetch slot leaders (current estimated slot: {}): {}", + estimated_current_slot, err + ); + has_error = true; + } + } + } + (has_error, cluster_refreshed) + } +} + type Result = std::result::Result; /// Client which sends transactions directly to the current leader's TPU port over UDP. @@ -102,7 +255,10 @@ impl TpuClient { /// Send a wire transaction to the current and upcoming leader TPUs according to fanout size /// Returns the last error if all sends fail - async fn try_send_wire_transaction(&self, wire_transaction: Vec) -> TransportResult<()> { + pub async fn try_send_wire_transaction( + &self, + wire_transaction: Vec, + ) -> TransportResult<()> { let leaders = self .leader_tpu_service .leader_tpu_sockets(self.fanout_slots); diff --git a/client/src/pubsub_client.rs b/client/src/pubsub_client.rs index 22bf49b6e..344082ed5 100644 --- a/client/src/pubsub_client.rs +++ b/client/src/pubsub_client.rs @@ -1,3 +1,4 @@ +pub use crate::nonblocking::pubsub_client::PubsubClientError; use { crate::{ rpc_config::{ @@ -31,29 +32,10 @@ use { thread::{sleep, JoinHandle}, time::Duration, }, - thiserror::Error, tungstenite::{connect, stream::MaybeTlsStream, Message, WebSocket}, - url::{ParseError, Url}, + url::Url, }; -#[derive(Debug, Error)] -pub enum PubsubClientError { - #[error("url parse error")] - UrlParseError(#[from] ParseError), - - #[error("unable to connect to server")] - ConnectionError(#[from] tungstenite::Error), - - #[error("json parse error")] - JsonParseError(#[from] serde_json::error::Error), - - #[error("unexpected message format: {0}")] - UnexpectedMessageError(String), - - #[error("request error: {0}")] - RequestError(String), -} - pub struct PubsubClientSubscription where T: DeserializeOwned, diff --git a/client/src/rpc_client.rs b/client/src/rpc_client.rs index 1aba5a0c5..9946dfa5c 100644 --- a/client/src/rpc_client.rs +++ b/client/src/rpc_client.rs @@ -42,7 +42,7 @@ use { EncodedConfirmedBlock, EncodedConfirmedTransactionWithStatusMeta, TransactionStatus, UiConfirmedBlock, UiTransactionEncoding, }, - std::{net::SocketAddr, str::FromStr, time::Duration}, + std::{net::SocketAddr, str::FromStr, sync::Arc, time::Duration}, }; #[derive(Default)] @@ -151,7 +151,7 @@ pub struct GetConfirmedSignaturesForAddress2Config { /// [`ClientErrorKind`]: crate::client_error::ClientErrorKind /// [`ClientErrorKind::Reqwest`]: crate::client_error::ClientErrorKind::Reqwest pub struct RpcClient { - rpc_client: nonblocking::rpc_client::RpcClient, + rpc_client: Arc, runtime: Option, } @@ -173,7 +173,9 @@ impl RpcClient { config: RpcClientConfig, ) -> Self { Self { - rpc_client: nonblocking::rpc_client::RpcClient::new_sender(sender, config), + rpc_client: Arc::new(nonblocking::rpc_client::RpcClient::new_sender( + sender, config, + )), runtime: Some( tokio::runtime::Builder::new_current_thread() .thread_name("rpc-client") @@ -537,7 +539,7 @@ impl RpcClient { /// Get the configured url of the client's sender pub fn url(&self) -> String { - self.rpc_client.url() + (self.rpc_client.as_ref()).url() } /// Get the configured default [commitment level][cl]. @@ -556,7 +558,7 @@ impl RpcClient { /// explicitly provide a [`CommitmentConfig`], like /// [`RpcClient::confirm_transaction_with_commitment`]. pub fn commitment(&self) -> CommitmentConfig { - self.rpc_client.commitment() + (self.rpc_client.as_ref()).commitment() } /// Submit a transaction and wait for confirmation. @@ -624,7 +626,7 @@ impl RpcClient { &self, transaction: &Transaction, ) -> ClientResult { - self.invoke(self.rpc_client.send_and_confirm_transaction(transaction)) + self.invoke((self.rpc_client.as_ref()).send_and_confirm_transaction(transaction)) } pub fn send_and_confirm_transaction_with_spinner( @@ -632,8 +634,7 @@ impl RpcClient { transaction: &Transaction, ) -> ClientResult { self.invoke( - self.rpc_client - .send_and_confirm_transaction_with_spinner(transaction), + (self.rpc_client.as_ref()).send_and_confirm_transaction_with_spinner(transaction), ) } @@ -643,7 +644,7 @@ impl RpcClient { commitment: CommitmentConfig, ) -> ClientResult { self.invoke( - self.rpc_client + (self.rpc_client.as_ref()) .send_and_confirm_transaction_with_spinner_and_commitment(transaction, commitment), ) } @@ -655,12 +656,11 @@ impl RpcClient { config: RpcSendTransactionConfig, ) -> ClientResult { self.invoke( - self.rpc_client - .send_and_confirm_transaction_with_spinner_and_config( - transaction, - commitment, - config, - ), + (self.rpc_client.as_ref()).send_and_confirm_transaction_with_spinner_and_config( + transaction, + commitment, + config, + ), ) } @@ -735,7 +735,7 @@ impl RpcClient { /// # Ok::<(), ClientError>(()) /// ``` pub fn send_transaction(&self, transaction: &Transaction) -> ClientResult { - self.invoke(self.rpc_client.send_transaction(transaction)) + self.invoke((self.rpc_client.as_ref()).send_transaction(transaction)) } /// Submits a signed transaction to the network. @@ -822,17 +822,14 @@ impl RpcClient { transaction: &Transaction, config: RpcSendTransactionConfig, ) -> ClientResult { - self.invoke( - self.rpc_client - .send_transaction_with_config(transaction, config), - ) + self.invoke((self.rpc_client.as_ref()).send_transaction_with_config(transaction, config)) } pub fn send(&self, request: RpcRequest, params: Value) -> ClientResult where T: serde::de::DeserializeOwned, { - self.invoke(self.rpc_client.send(request, params)) + self.invoke((self.rpc_client.as_ref()).send(request, params)) } /// Check the confirmation status of a transaction. @@ -888,7 +885,7 @@ impl RpcClient { /// # Ok::<(), ClientError>(()) /// ``` pub fn confirm_transaction(&self, signature: &Signature) -> ClientResult { - self.invoke(self.rpc_client.confirm_transaction(signature)) + self.invoke((self.rpc_client.as_ref()).confirm_transaction(signature)) } /// Check the confirmation status of a transaction. @@ -951,7 +948,7 @@ impl RpcClient { commitment_config: CommitmentConfig, ) -> RpcResult { self.invoke( - self.rpc_client + (self.rpc_client.as_ref()) .confirm_transaction_with_commitment(signature, commitment_config), ) } @@ -962,7 +959,7 @@ impl RpcClient { recent_blockhash: &Hash, commitment_config: CommitmentConfig, ) -> ClientResult<()> { - self.invoke(self.rpc_client.confirm_transaction_with_spinner( + self.invoke((self.rpc_client.as_ref()).confirm_transaction_with_spinner( signature, recent_blockhash, commitment_config, @@ -1027,7 +1024,7 @@ impl RpcClient { &self, transaction: &Transaction, ) -> RpcResult { - self.invoke(self.rpc_client.simulate_transaction(transaction)) + self.invoke((self.rpc_client.as_ref()).simulate_transaction(transaction)) } /// Simulates sending a transaction. @@ -1106,8 +1103,7 @@ impl RpcClient { config: RpcSimulateTransactionConfig, ) -> RpcResult { self.invoke( - self.rpc_client - .simulate_transaction_with_config(transaction, config), + (self.rpc_client.as_ref()).simulate_transaction_with_config(transaction, config), ) } @@ -1134,7 +1130,7 @@ impl RpcClient { /// # Ok::<(), ClientError>(()) /// ``` pub fn get_highest_snapshot_slot(&self) -> ClientResult { - self.invoke(self.rpc_client.get_highest_snapshot_slot()) + self.invoke((self.rpc_client.as_ref()).get_highest_snapshot_slot()) } #[deprecated( @@ -1143,7 +1139,7 @@ impl RpcClient { )] #[allow(deprecated)] pub fn get_snapshot_slot(&self) -> ClientResult { - self.invoke(self.rpc_client.get_snapshot_slot()) + self.invoke((self.rpc_client.as_ref()).get_snapshot_slot()) } /// Check if a transaction has been processed with the default [commitment level][cl]. @@ -1204,7 +1200,7 @@ impl RpcClient { &self, signature: &Signature, ) -> ClientResult>> { - self.invoke(self.rpc_client.get_signature_status(signature)) + self.invoke((self.rpc_client.as_ref()).get_signature_status(signature)) } /// Gets the statuses of a list of transaction signatures. @@ -1282,7 +1278,7 @@ impl RpcClient { &self, signatures: &[Signature], ) -> RpcResult>> { - self.invoke(self.rpc_client.get_signature_statuses(signatures)) + self.invoke((self.rpc_client.as_ref()).get_signature_statuses(signatures)) } /// Gets the statuses of a list of transaction signatures. @@ -1350,10 +1346,7 @@ impl RpcClient { &self, signatures: &[Signature], ) -> RpcResult>> { - self.invoke( - self.rpc_client - .get_signature_statuses_with_history(signatures), - ) + self.invoke((self.rpc_client.as_ref()).get_signature_statuses_with_history(signatures)) } /// Check if a transaction has been processed with the given [commitment level][cl]. @@ -1420,7 +1413,7 @@ impl RpcClient { commitment_config: CommitmentConfig, ) -> ClientResult>> { self.invoke( - self.rpc_client + (self.rpc_client.as_ref()) .get_signature_status_with_commitment(signature, commitment_config), ) } @@ -1488,12 +1481,11 @@ impl RpcClient { search_transaction_history: bool, ) -> ClientResult>> { self.invoke( - self.rpc_client - .get_signature_status_with_commitment_and_history( - signature, - commitment_config, - search_transaction_history, - ), + (self.rpc_client.as_ref()).get_signature_status_with_commitment_and_history( + signature, + commitment_config, + search_transaction_history, + ), ) } @@ -1519,7 +1511,7 @@ impl RpcClient { /// # Ok::<(), ClientError>(()) /// ``` pub fn get_slot(&self) -> ClientResult { - self.invoke(self.rpc_client.get_slot()) + self.invoke((self.rpc_client.as_ref()).get_slot()) } /// Returns the slot that has reached the given [commitment level][cl]. @@ -1549,7 +1541,7 @@ impl RpcClient { &self, commitment_config: CommitmentConfig, ) -> ClientResult { - self.invoke(self.rpc_client.get_slot_with_commitment(commitment_config)) + self.invoke((self.rpc_client.as_ref()).get_slot_with_commitment(commitment_config)) } /// Returns the block height that has reached the configured [commitment level][cl]. @@ -1574,7 +1566,7 @@ impl RpcClient { /// # Ok::<(), ClientError>(()) /// ``` pub fn get_block_height(&self) -> ClientResult { - self.invoke(self.rpc_client.get_block_height()) + self.invoke((self.rpc_client.as_ref()).get_block_height()) } /// Returns the block height that has reached the given [commitment level][cl]. @@ -1606,10 +1598,7 @@ impl RpcClient { &self, commitment_config: CommitmentConfig, ) -> ClientResult { - self.invoke( - self.rpc_client - .get_block_height_with_commitment(commitment_config), - ) + self.invoke((self.rpc_client.as_ref()).get_block_height_with_commitment(commitment_config)) } /// Returns the slot leaders for a given slot range. @@ -1635,7 +1624,7 @@ impl RpcClient { /// # Ok::<(), ClientError>(()) /// ``` pub fn get_slot_leaders(&self, start_slot: Slot, limit: u64) -> ClientResult> { - self.invoke(self.rpc_client.get_slot_leaders(start_slot, limit)) + self.invoke((self.rpc_client.as_ref()).get_slot_leaders(start_slot, limit)) } /// Get block production for the current epoch. @@ -1658,7 +1647,7 @@ impl RpcClient { /// # Ok::<(), ClientError>(()) /// ``` pub fn get_block_production(&self) -> RpcResult { - self.invoke(self.rpc_client.get_block_production()) + self.invoke((self.rpc_client.as_ref()).get_block_production()) } /// Get block production for the current or previous epoch. @@ -1706,7 +1695,7 @@ impl RpcClient { &self, config: RpcBlockProductionConfig, ) -> RpcResult { - self.invoke(self.rpc_client.get_block_production_with_config(config)) + self.invoke((self.rpc_client.as_ref()).get_block_production_with_config(config)) } /// Returns epoch activation information for a stake account. @@ -1785,7 +1774,7 @@ impl RpcClient { stake_account: Pubkey, epoch: Option, ) -> ClientResult { - self.invoke(self.rpc_client.get_stake_activation(stake_account, epoch)) + self.invoke((self.rpc_client.as_ref()).get_stake_activation(stake_account, epoch)) } /// Returns information about the current supply. @@ -1812,7 +1801,7 @@ impl RpcClient { /// # Ok::<(), ClientError>(()) /// ``` pub fn supply(&self) -> RpcResult { - self.invoke(self.rpc_client.supply()) + self.invoke((self.rpc_client.as_ref()).supply()) } /// Returns information about the current supply. @@ -1842,7 +1831,7 @@ impl RpcClient { &self, commitment_config: CommitmentConfig, ) -> RpcResult { - self.invoke(self.rpc_client.supply_with_commitment(commitment_config)) + self.invoke((self.rpc_client.as_ref()).supply_with_commitment(commitment_config)) } /// Returns the 20 largest accounts, by lamport balance. @@ -1879,7 +1868,7 @@ impl RpcClient { &self, config: RpcLargestAccountsConfig, ) -> RpcResult> { - self.invoke(self.rpc_client.get_largest_accounts_with_config(config)) + self.invoke((self.rpc_client.as_ref()).get_largest_accounts_with_config(config)) } /// Returns the account info and associated stake for all the voting accounts @@ -1906,7 +1895,7 @@ impl RpcClient { /// # Ok::<(), ClientError>(()) /// ``` pub fn get_vote_accounts(&self) -> ClientResult { - self.invoke(self.rpc_client.get_vote_accounts()) + self.invoke((self.rpc_client.as_ref()).get_vote_accounts()) } /// Returns the account info and associated stake for all the voting accounts @@ -1939,10 +1928,7 @@ impl RpcClient { &self, commitment_config: CommitmentConfig, ) -> ClientResult { - self.invoke( - self.rpc_client - .get_vote_accounts_with_commitment(commitment_config), - ) + self.invoke((self.rpc_client.as_ref()).get_vote_accounts_with_commitment(commitment_config)) } /// Returns the account info and associated stake for all the voting accounts @@ -1988,7 +1974,7 @@ impl RpcClient { &self, config: RpcGetVoteAccountsConfig, ) -> ClientResult { - self.invoke(self.rpc_client.get_vote_accounts_with_config(config)) + self.invoke((self.rpc_client.as_ref()).get_vote_accounts_with_config(config)) } pub fn wait_for_max_stake( @@ -1996,10 +1982,7 @@ impl RpcClient { commitment: CommitmentConfig, max_stake_percent: f32, ) -> ClientResult<()> { - self.invoke( - self.rpc_client - .wait_for_max_stake(commitment, max_stake_percent), - ) + self.invoke((self.rpc_client.as_ref()).wait_for_max_stake(commitment, max_stake_percent)) } /// Returns information about all the nodes participating in the cluster. @@ -2023,7 +2006,7 @@ impl RpcClient { /// # Ok::<(), ClientError>(()) /// ``` pub fn get_cluster_nodes(&self) -> ClientResult> { - self.invoke(self.rpc_client.get_cluster_nodes()) + self.invoke((self.rpc_client.as_ref()).get_cluster_nodes()) } /// Returns identity and transaction information about a confirmed block in the ledger. @@ -2055,7 +2038,7 @@ impl RpcClient { /// # Ok::<(), ClientError>(()) /// ``` pub fn get_block(&self, slot: Slot) -> ClientResult { - self.invoke(self.rpc_client.get_block(slot)) + self.invoke((self.rpc_client.as_ref()).get_block(slot)) } /// Returns identity and transaction information about a confirmed block in the ledger. @@ -2088,7 +2071,7 @@ impl RpcClient { slot: Slot, encoding: UiTransactionEncoding, ) -> ClientResult { - self.invoke(self.rpc_client.get_block_with_encoding(slot, encoding)) + self.invoke((self.rpc_client.as_ref()).get_block_with_encoding(slot, encoding)) } /// Returns identity and transaction information about a confirmed block in the ledger. @@ -2131,13 +2114,13 @@ impl RpcClient { slot: Slot, config: RpcBlockConfig, ) -> ClientResult { - self.invoke(self.rpc_client.get_block_with_config(slot, config)) + self.invoke((self.rpc_client.as_ref()).get_block_with_config(slot, config)) } #[deprecated(since = "1.7.0", note = "Please use RpcClient::get_block() instead")] #[allow(deprecated)] pub fn get_confirmed_block(&self, slot: Slot) -> ClientResult { - self.invoke(self.rpc_client.get_confirmed_block(slot)) + self.invoke((self.rpc_client.as_ref()).get_confirmed_block(slot)) } #[deprecated( @@ -2150,10 +2133,7 @@ impl RpcClient { slot: Slot, encoding: UiTransactionEncoding, ) -> ClientResult { - self.invoke( - self.rpc_client - .get_confirmed_block_with_encoding(slot, encoding), - ) + self.invoke((self.rpc_client.as_ref()).get_confirmed_block_with_encoding(slot, encoding)) } #[deprecated( @@ -2166,10 +2146,7 @@ impl RpcClient { slot: Slot, config: RpcConfirmedBlockConfig, ) -> ClientResult { - self.invoke( - self.rpc_client - .get_confirmed_block_with_config(slot, config), - ) + self.invoke((self.rpc_client.as_ref()).get_confirmed_block_with_config(slot, config)) } /// Returns a list of finalized blocks between two slots. @@ -2218,7 +2195,7 @@ impl RpcClient { /// # Ok::<(), ClientError>(()) /// ``` pub fn get_blocks(&self, start_slot: Slot, end_slot: Option) -> ClientResult> { - self.invoke(self.rpc_client.get_blocks(start_slot, end_slot)) + self.invoke((self.rpc_client.as_ref()).get_blocks(start_slot, end_slot)) } /// Returns a list of confirmed blocks between two slots. @@ -2283,7 +2260,7 @@ impl RpcClient { end_slot: Option, commitment_config: CommitmentConfig, ) -> ClientResult> { - self.invoke(self.rpc_client.get_blocks_with_commitment( + self.invoke((self.rpc_client.as_ref()).get_blocks_with_commitment( start_slot, end_slot, commitment_config, @@ -2325,7 +2302,7 @@ impl RpcClient { /// # Ok::<(), ClientError>(()) /// ``` pub fn get_blocks_with_limit(&self, start_slot: Slot, limit: usize) -> ClientResult> { - self.invoke(self.rpc_client.get_blocks_with_limit(start_slot, limit)) + self.invoke((self.rpc_client.as_ref()).get_blocks_with_limit(start_slot, limit)) } /// Returns a list of confirmed blocks starting at the given slot. @@ -2375,11 +2352,13 @@ impl RpcClient { limit: usize, commitment_config: CommitmentConfig, ) -> ClientResult> { - self.invoke(self.rpc_client.get_blocks_with_limit_and_commitment( - start_slot, - limit, - commitment_config, - )) + self.invoke( + (self.rpc_client.as_ref()).get_blocks_with_limit_and_commitment( + start_slot, + limit, + commitment_config, + ), + ) } #[deprecated(since = "1.7.0", note = "Please use RpcClient::get_blocks() instead")] @@ -2389,7 +2368,7 @@ impl RpcClient { start_slot: Slot, end_slot: Option, ) -> ClientResult> { - self.invoke(self.rpc_client.get_confirmed_blocks(start_slot, end_slot)) + self.invoke((self.rpc_client.as_ref()).get_confirmed_blocks(start_slot, end_slot)) } #[deprecated( @@ -2403,11 +2382,13 @@ impl RpcClient { end_slot: Option, commitment_config: CommitmentConfig, ) -> ClientResult> { - self.invoke(self.rpc_client.get_confirmed_blocks_with_commitment( - start_slot, - end_slot, - commitment_config, - )) + self.invoke( + (self.rpc_client.as_ref()).get_confirmed_blocks_with_commitment( + start_slot, + end_slot, + commitment_config, + ), + ) } #[deprecated( @@ -2420,10 +2401,7 @@ impl RpcClient { start_slot: Slot, limit: usize, ) -> ClientResult> { - self.invoke( - self.rpc_client - .get_confirmed_blocks_with_limit(start_slot, limit), - ) + self.invoke((self.rpc_client.as_ref()).get_confirmed_blocks_with_limit(start_slot, limit)) } #[deprecated( @@ -2438,12 +2416,11 @@ impl RpcClient { commitment_config: CommitmentConfig, ) -> ClientResult> { self.invoke( - self.rpc_client - .get_confirmed_blocks_with_limit_and_commitment( - start_slot, - limit, - commitment_config, - ), + (self.rpc_client.as_ref()).get_confirmed_blocks_with_limit_and_commitment( + start_slot, + limit, + commitment_config, + ), ) } @@ -2488,7 +2465,7 @@ impl RpcClient { &self, address: &Pubkey, ) -> ClientResult> { - self.invoke(self.rpc_client.get_signatures_for_address(address)) + self.invoke((self.rpc_client.as_ref()).get_signatures_for_address(address)) } /// Get confirmed signatures for transactions involving an address. @@ -2549,8 +2526,7 @@ impl RpcClient { config: GetConfirmedSignaturesForAddress2Config, ) -> ClientResult> { self.invoke( - self.rpc_client - .get_signatures_for_address_with_config(address, config), + (self.rpc_client.as_ref()).get_signatures_for_address_with_config(address, config), ) } @@ -2563,10 +2539,7 @@ impl RpcClient { &self, address: &Pubkey, ) -> ClientResult> { - self.invoke( - self.rpc_client - .get_confirmed_signatures_for_address2(address), - ) + self.invoke((self.rpc_client.as_ref()).get_confirmed_signatures_for_address2(address)) } #[deprecated( @@ -2580,7 +2553,7 @@ impl RpcClient { config: GetConfirmedSignaturesForAddress2Config, ) -> ClientResult> { self.invoke( - self.rpc_client + (self.rpc_client.as_ref()) .get_confirmed_signatures_for_address2_with_config(address, config), ) } @@ -2633,7 +2606,7 @@ impl RpcClient { signature: &Signature, encoding: UiTransactionEncoding, ) -> ClientResult { - self.invoke(self.rpc_client.get_transaction(signature, encoding)) + self.invoke((self.rpc_client.as_ref()).get_transaction(signature, encoding)) } /// Returns transaction details for a confirmed transaction. @@ -2694,10 +2667,7 @@ impl RpcClient { signature: &Signature, config: RpcTransactionConfig, ) -> ClientResult { - self.invoke( - self.rpc_client - .get_transaction_with_config(signature, config), - ) + self.invoke((self.rpc_client.as_ref()).get_transaction_with_config(signature, config)) } #[deprecated( @@ -2710,10 +2680,7 @@ impl RpcClient { signature: &Signature, encoding: UiTransactionEncoding, ) -> ClientResult { - self.invoke( - self.rpc_client - .get_confirmed_transaction(signature, encoding), - ) + self.invoke((self.rpc_client.as_ref()).get_confirmed_transaction(signature, encoding)) } #[deprecated( @@ -2727,8 +2694,7 @@ impl RpcClient { config: RpcConfirmedTransactionConfig, ) -> ClientResult { self.invoke( - self.rpc_client - .get_confirmed_transaction_with_config(signature, config), + (self.rpc_client.as_ref()).get_confirmed_transaction_with_config(signature, config), ) } @@ -2754,7 +2720,7 @@ impl RpcClient { /// # Ok::<(), ClientError>(()) /// ``` pub fn get_block_time(&self, slot: Slot) -> ClientResult { - self.invoke(self.rpc_client.get_block_time(slot)) + self.invoke((self.rpc_client.as_ref()).get_block_time(slot)) } /// Returns information about the current epoch. @@ -2781,7 +2747,7 @@ impl RpcClient { /// # Ok::<(), ClientError>(()) /// ``` pub fn get_epoch_info(&self) -> ClientResult { - self.invoke(self.rpc_client.get_epoch_info()) + self.invoke((self.rpc_client.as_ref()).get_epoch_info()) } /// Returns information about the current epoch. @@ -2811,10 +2777,7 @@ impl RpcClient { &self, commitment_config: CommitmentConfig, ) -> ClientResult { - self.invoke( - self.rpc_client - .get_epoch_info_with_commitment(commitment_config), - ) + self.invoke((self.rpc_client.as_ref()).get_epoch_info_with_commitment(commitment_config)) } /// Returns the leader schedule for an epoch. @@ -2848,7 +2811,7 @@ impl RpcClient { &self, slot: Option, ) -> ClientResult> { - self.invoke(self.rpc_client.get_leader_schedule(slot)) + self.invoke((self.rpc_client.as_ref()).get_leader_schedule(slot)) } /// Returns the leader schedule for an epoch. @@ -2882,8 +2845,7 @@ impl RpcClient { commitment_config: CommitmentConfig, ) -> ClientResult> { self.invoke( - self.rpc_client - .get_leader_schedule_with_commitment(slot, commitment_config), + (self.rpc_client.as_ref()).get_leader_schedule_with_commitment(slot, commitment_config), ) } @@ -2922,10 +2884,7 @@ impl RpcClient { slot: Option, config: RpcLeaderScheduleConfig, ) -> ClientResult> { - self.invoke( - self.rpc_client - .get_leader_schedule_with_config(slot, config), - ) + self.invoke((self.rpc_client.as_ref()).get_leader_schedule_with_config(slot, config)) } /// Returns epoch schedule information from this cluster's genesis config. @@ -2948,7 +2907,7 @@ impl RpcClient { /// # Ok::<(), ClientError>(()) /// ``` pub fn get_epoch_schedule(&self) -> ClientResult { - self.invoke(self.rpc_client.get_epoch_schedule()) + self.invoke((self.rpc_client.as_ref()).get_epoch_schedule()) } /// Returns a list of recent performance samples, in reverse slot order. @@ -2980,7 +2939,7 @@ impl RpcClient { &self, limit: Option, ) -> ClientResult> { - self.invoke(self.rpc_client.get_recent_performance_samples(limit)) + self.invoke((self.rpc_client.as_ref()).get_recent_performance_samples(limit)) } /// Returns the identity pubkey for the current node. @@ -3003,7 +2962,7 @@ impl RpcClient { /// # Ok::<(), ClientError>(()) /// ``` pub fn get_identity(&self) -> ClientResult { - self.invoke(self.rpc_client.get_identity()) + self.invoke((self.rpc_client.as_ref()).get_identity()) } /// Returns the current inflation governor. @@ -3032,7 +2991,7 @@ impl RpcClient { /// # Ok::<(), ClientError>(()) /// ``` pub fn get_inflation_governor(&self) -> ClientResult { - self.invoke(self.rpc_client.get_inflation_governor()) + self.invoke((self.rpc_client.as_ref()).get_inflation_governor()) } /// Returns the specific inflation values for the current epoch. @@ -3055,7 +3014,7 @@ impl RpcClient { /// # Ok::<(), ClientError>(()) /// ``` pub fn get_inflation_rate(&self) -> ClientResult { - self.invoke(self.rpc_client.get_inflation_rate()) + self.invoke((self.rpc_client.as_ref()).get_inflation_rate()) } /// Returns the inflation reward for a list of addresses for an epoch. @@ -3095,7 +3054,7 @@ impl RpcClient { addresses: &[Pubkey], epoch: Option, ) -> ClientResult>> { - self.invoke(self.rpc_client.get_inflation_reward(addresses, epoch)) + self.invoke((self.rpc_client.as_ref()).get_inflation_reward(addresses, epoch)) } /// Returns the current solana version running on the node. @@ -3122,7 +3081,7 @@ impl RpcClient { /// # Ok::<(), Box>(()) /// ``` pub fn get_version(&self) -> ClientResult { - self.invoke(self.rpc_client.get_version()) + self.invoke((self.rpc_client.as_ref()).get_version()) } /// Returns the lowest slot that the node has information about in its ledger. @@ -3149,7 +3108,7 @@ impl RpcClient { /// # Ok::<(), ClientError>(()) /// ``` pub fn minimum_ledger_slot(&self) -> ClientResult { - self.invoke(self.rpc_client.minimum_ledger_slot()) + self.invoke((self.rpc_client.as_ref()).minimum_ledger_slot()) } /// Returns all information associated with the account of the provided pubkey. @@ -3197,7 +3156,7 @@ impl RpcClient { /// # Ok::<(), ClientError>(()) /// ``` pub fn get_account(&self, pubkey: &Pubkey) -> ClientResult { - self.invoke(self.rpc_client.get_account(pubkey)) + self.invoke((self.rpc_client.as_ref()).get_account(pubkey)) } /// Returns all information associated with the account of the provided pubkey. @@ -3245,8 +3204,7 @@ impl RpcClient { commitment_config: CommitmentConfig, ) -> RpcResult> { self.invoke( - self.rpc_client - .get_account_with_commitment(pubkey, commitment_config), + (self.rpc_client.as_ref()).get_account_with_commitment(pubkey, commitment_config), ) } @@ -3301,7 +3259,7 @@ impl RpcClient { pubkey: &Pubkey, config: RpcAccountInfoConfig, ) -> RpcResult> { - self.invoke(self.rpc_client.get_account_with_config(pubkey, config)) + self.invoke((self.rpc_client.as_ref()).get_account_with_config(pubkey, config)) } /// Get the max slot seen from retransmit stage. @@ -3324,7 +3282,7 @@ impl RpcClient { /// let slot = rpc_client.get_max_retransmit_slot()?; /// # Ok::<(), ClientError>(()) pub fn get_max_retransmit_slot(&self) -> ClientResult { - self.invoke(self.rpc_client.get_max_retransmit_slot()) + self.invoke((self.rpc_client.as_ref()).get_max_retransmit_slot()) } /// Get the max slot seen from after [shred](https://docs.solana.com/terminology#shred) insert. @@ -3347,7 +3305,7 @@ impl RpcClient { /// let slot = rpc_client.get_max_shred_insert_slot()?; /// # Ok::<(), ClientError>(()) pub fn get_max_shred_insert_slot(&self) -> ClientResult { - self.invoke(self.rpc_client.get_max_shred_insert_slot()) + self.invoke((self.rpc_client.as_ref()).get_max_shred_insert_slot()) } /// Returns the account information for a list of pubkeys. @@ -3381,7 +3339,7 @@ impl RpcClient { /// # Ok::<(), ClientError>(()) /// ``` pub fn get_multiple_accounts(&self, pubkeys: &[Pubkey]) -> ClientResult>> { - self.invoke(self.rpc_client.get_multiple_accounts(pubkeys)) + self.invoke((self.rpc_client.as_ref()).get_multiple_accounts(pubkeys)) } /// Returns the account information for a list of pubkeys. @@ -3421,7 +3379,7 @@ impl RpcClient { commitment_config: CommitmentConfig, ) -> RpcResult>> { self.invoke( - self.rpc_client + (self.rpc_client.as_ref()) .get_multiple_accounts_with_commitment(pubkeys, commitment_config), ) } @@ -3469,10 +3427,7 @@ impl RpcClient { pubkeys: &[Pubkey], config: RpcAccountInfoConfig, ) -> RpcResult>> { - self.invoke( - self.rpc_client - .get_multiple_accounts_with_config(pubkeys, config), - ) + self.invoke((self.rpc_client.as_ref()).get_multiple_accounts_with_config(pubkeys, config)) } /// Gets the raw data associated with an account. @@ -3509,7 +3464,7 @@ impl RpcClient { /// # Ok::<(), ClientError>(()) /// ``` pub fn get_account_data(&self, pubkey: &Pubkey) -> ClientResult> { - self.invoke(self.rpc_client.get_account_data(pubkey)) + self.invoke((self.rpc_client.as_ref()).get_account_data(pubkey)) } /// Returns minimum balance required to make an account with specified data length rent exempt. @@ -3534,10 +3489,7 @@ impl RpcClient { /// # Ok::<(), ClientError>(()) /// ``` pub fn get_minimum_balance_for_rent_exemption(&self, data_len: usize) -> ClientResult { - self.invoke( - self.rpc_client - .get_minimum_balance_for_rent_exemption(data_len), - ) + self.invoke((self.rpc_client.as_ref()).get_minimum_balance_for_rent_exemption(data_len)) } /// Request the balance of the provided account pubkey. @@ -3569,7 +3521,7 @@ impl RpcClient { /// # Ok::<(), ClientError>(()) /// ``` pub fn get_balance(&self, pubkey: &Pubkey) -> ClientResult { - self.invoke(self.rpc_client.get_balance(pubkey)) + self.invoke((self.rpc_client.as_ref()).get_balance(pubkey)) } /// Request the balance of the provided account pubkey. @@ -3607,8 +3559,7 @@ impl RpcClient { commitment_config: CommitmentConfig, ) -> RpcResult { self.invoke( - self.rpc_client - .get_balance_with_commitment(pubkey, commitment_config), + (self.rpc_client.as_ref()).get_balance_with_commitment(pubkey, commitment_config), ) } @@ -3642,7 +3593,7 @@ impl RpcClient { /// # Ok::<(), ClientError>(()) /// ``` pub fn get_program_accounts(&self, pubkey: &Pubkey) -> ClientResult> { - self.invoke(self.rpc_client.get_program_accounts(pubkey)) + self.invoke((self.rpc_client.as_ref()).get_program_accounts(pubkey)) } /// Returns all accounts owned by the provided program pubkey. @@ -3706,10 +3657,7 @@ impl RpcClient { pubkey: &Pubkey, config: RpcProgramAccountsConfig, ) -> ClientResult> { - self.invoke( - self.rpc_client - .get_program_accounts_with_config(pubkey, config), - ) + self.invoke((self.rpc_client.as_ref()).get_program_accounts_with_config(pubkey, config)) } /// Returns the stake minimum delegation, in lamports. @@ -3732,7 +3680,7 @@ impl RpcClient { /// # Ok::<(), ClientError>(()) /// ``` pub fn get_stake_minimum_delegation(&self) -> ClientResult { - self.invoke(self.rpc_client.get_stake_minimum_delegation()) + self.invoke((self.rpc_client.as_ref()).get_stake_minimum_delegation()) } /// Returns the stake minimum delegation, in lamports, based on the commitment level. @@ -3768,7 +3716,7 @@ impl RpcClient { /// Request the transaction count. pub fn get_transaction_count(&self) -> ClientResult { - self.invoke(self.rpc_client.get_transaction_count()) + self.invoke((self.rpc_client.as_ref()).get_transaction_count()) } pub fn get_transaction_count_with_commitment( @@ -3776,8 +3724,7 @@ impl RpcClient { commitment_config: CommitmentConfig, ) -> ClientResult { self.invoke( - self.rpc_client - .get_transaction_count_with_commitment(commitment_config), + (self.rpc_client.as_ref()).get_transaction_count_with_commitment(commitment_config), ) } @@ -3787,7 +3734,7 @@ impl RpcClient { )] #[allow(deprecated)] pub fn get_fees(&self) -> ClientResult { - self.invoke(self.rpc_client.get_fees()) + self.invoke((self.rpc_client.as_ref()).get_fees()) } #[deprecated( @@ -3796,13 +3743,13 @@ impl RpcClient { )] #[allow(deprecated)] pub fn get_fees_with_commitment(&self, commitment_config: CommitmentConfig) -> RpcResult { - self.invoke(self.rpc_client.get_fees_with_commitment(commitment_config)) + self.invoke((self.rpc_client.as_ref()).get_fees_with_commitment(commitment_config)) } #[deprecated(since = "1.9.0", note = "Please use `get_latest_blockhash` instead")] #[allow(deprecated)] pub fn get_recent_blockhash(&self) -> ClientResult<(Hash, FeeCalculator)> { - self.invoke(self.rpc_client.get_recent_blockhash()) + self.invoke((self.rpc_client.as_ref()).get_recent_blockhash()) } #[deprecated( @@ -3815,8 +3762,7 @@ impl RpcClient { commitment_config: CommitmentConfig, ) -> RpcResult<(Hash, FeeCalculator, Slot)> { self.invoke( - self.rpc_client - .get_recent_blockhash_with_commitment(commitment_config), + (self.rpc_client.as_ref()).get_recent_blockhash_with_commitment(commitment_config), ) } @@ -3826,7 +3772,7 @@ impl RpcClient { &self, blockhash: &Hash, ) -> ClientResult> { - self.invoke(self.rpc_client.get_fee_calculator_for_blockhash(blockhash)) + self.invoke((self.rpc_client.as_ref()).get_fee_calculator_for_blockhash(blockhash)) } #[deprecated( @@ -3840,7 +3786,7 @@ impl RpcClient { commitment_config: CommitmentConfig, ) -> RpcResult> { self.invoke( - self.rpc_client + (self.rpc_client.as_ref()) .get_fee_calculator_for_blockhash_with_commitment(blockhash, commitment_config), ) } @@ -3851,7 +3797,7 @@ impl RpcClient { )] #[allow(deprecated)] pub fn get_fee_rate_governor(&self) -> RpcResult { - self.invoke(self.rpc_client.get_fee_rate_governor()) + self.invoke((self.rpc_client.as_ref()).get_fee_rate_governor()) } #[deprecated( @@ -3860,23 +3806,23 @@ impl RpcClient { )] #[allow(deprecated)] pub fn get_new_blockhash(&self, blockhash: &Hash) -> ClientResult<(Hash, FeeCalculator)> { - self.invoke(self.rpc_client.get_new_blockhash(blockhash)) + self.invoke((self.rpc_client.as_ref()).get_new_blockhash(blockhash)) } pub fn get_first_available_block(&self) -> ClientResult { - self.invoke(self.rpc_client.get_first_available_block()) + self.invoke((self.rpc_client.as_ref()).get_first_available_block()) } pub fn get_genesis_hash(&self) -> ClientResult { - self.invoke(self.rpc_client.get_genesis_hash()) + self.invoke((self.rpc_client.as_ref()).get_genesis_hash()) } pub fn get_health(&self) -> ClientResult<()> { - self.invoke(self.rpc_client.get_health()) + self.invoke((self.rpc_client.as_ref()).get_health()) } pub fn get_token_account(&self, pubkey: &Pubkey) -> ClientResult> { - self.invoke(self.rpc_client.get_token_account(pubkey)) + self.invoke((self.rpc_client.as_ref()).get_token_account(pubkey)) } pub fn get_token_account_with_commitment( @@ -3885,13 +3831,12 @@ impl RpcClient { commitment_config: CommitmentConfig, ) -> RpcResult> { self.invoke( - self.rpc_client - .get_token_account_with_commitment(pubkey, commitment_config), + (self.rpc_client.as_ref()).get_token_account_with_commitment(pubkey, commitment_config), ) } pub fn get_token_account_balance(&self, pubkey: &Pubkey) -> ClientResult { - self.invoke(self.rpc_client.get_token_account_balance(pubkey)) + self.invoke((self.rpc_client.as_ref()).get_token_account_balance(pubkey)) } pub fn get_token_account_balance_with_commitment( @@ -3900,7 +3845,7 @@ impl RpcClient { commitment_config: CommitmentConfig, ) -> RpcResult { self.invoke( - self.rpc_client + (self.rpc_client.as_ref()) .get_token_account_balance_with_commitment(pubkey, commitment_config), ) } @@ -3911,7 +3856,7 @@ impl RpcClient { token_account_filter: TokenAccountsFilter, ) -> ClientResult> { self.invoke( - self.rpc_client + (self.rpc_client.as_ref()) .get_token_accounts_by_delegate(delegate, token_account_filter), ) } @@ -3923,12 +3868,11 @@ impl RpcClient { commitment_config: CommitmentConfig, ) -> RpcResult> { self.invoke( - self.rpc_client - .get_token_accounts_by_delegate_with_commitment( - delegate, - token_account_filter, - commitment_config, - ), + (self.rpc_client.as_ref()).get_token_accounts_by_delegate_with_commitment( + delegate, + token_account_filter, + commitment_config, + ), ) } @@ -3938,8 +3882,7 @@ impl RpcClient { token_account_filter: TokenAccountsFilter, ) -> ClientResult> { self.invoke( - self.rpc_client - .get_token_accounts_by_owner(owner, token_account_filter), + (self.rpc_client.as_ref()).get_token_accounts_by_owner(owner, token_account_filter), ) } @@ -3949,15 +3892,17 @@ impl RpcClient { token_account_filter: TokenAccountsFilter, commitment_config: CommitmentConfig, ) -> RpcResult> { - self.invoke(self.rpc_client.get_token_accounts_by_owner_with_commitment( - owner, - token_account_filter, - commitment_config, - )) + self.invoke( + (self.rpc_client.as_ref()).get_token_accounts_by_owner_with_commitment( + owner, + token_account_filter, + commitment_config, + ), + ) } pub fn get_token_supply(&self, mint: &Pubkey) -> ClientResult { - self.invoke(self.rpc_client.get_token_supply(mint)) + self.invoke((self.rpc_client.as_ref()).get_token_supply(mint)) } pub fn get_token_supply_with_commitment( @@ -3966,13 +3911,12 @@ impl RpcClient { commitment_config: CommitmentConfig, ) -> RpcResult { self.invoke( - self.rpc_client - .get_token_supply_with_commitment(mint, commitment_config), + (self.rpc_client.as_ref()).get_token_supply_with_commitment(mint, commitment_config), ) } pub fn request_airdrop(&self, pubkey: &Pubkey, lamports: u64) -> ClientResult { - self.invoke(self.rpc_client.request_airdrop(pubkey, lamports)) + self.invoke((self.rpc_client.as_ref()).request_airdrop(pubkey, lamports)) } pub fn request_airdrop_with_blockhash( @@ -3981,7 +3925,7 @@ impl RpcClient { lamports: u64, recent_blockhash: &Hash, ) -> ClientResult { - self.invoke(self.rpc_client.request_airdrop_with_blockhash( + self.invoke((self.rpc_client.as_ref()).request_airdrop_with_blockhash( pubkey, lamports, recent_blockhash, @@ -3995,8 +3939,7 @@ impl RpcClient { config: RpcRequestAirdropConfig, ) -> ClientResult { self.invoke( - self.rpc_client - .request_airdrop_with_config(pubkey, lamports, config), + (self.rpc_client.as_ref()).request_airdrop_with_config(pubkey, lamports, config), ) } @@ -4006,8 +3949,7 @@ impl RpcClient { commitment_config: CommitmentConfig, ) -> ClientResult { self.invoke( - self.rpc_client - .poll_get_balance_with_commitment(pubkey, commitment_config), + (self.rpc_client.as_ref()).poll_get_balance_with_commitment(pubkey, commitment_config), ) } @@ -4017,7 +3959,7 @@ impl RpcClient { expected_balance: Option, commitment_config: CommitmentConfig, ) -> Option { - self.invoke(self.rpc_client.wait_for_balance_with_commitment( + self.invoke((self.rpc_client.as_ref()).wait_for_balance_with_commitment( pubkey, expected_balance, commitment_config, @@ -4027,7 +3969,7 @@ impl RpcClient { /// Poll the server to confirm a transaction. pub fn poll_for_signature(&self, signature: &Signature) -> ClientResult<()> { - self.invoke(self.rpc_client.poll_for_signature(signature)) + self.invoke((self.rpc_client.as_ref()).poll_for_signature(signature)) } /// Poll the server to confirm a transaction. @@ -4037,7 +3979,7 @@ impl RpcClient { commitment_config: CommitmentConfig, ) -> ClientResult<()> { self.invoke( - self.rpc_client + (self.rpc_client.as_ref()) .poll_for_signature_with_commitment(signature, commitment_config), ) } @@ -4049,7 +3991,7 @@ impl RpcClient { min_confirmed_blocks: usize, ) -> ClientResult { self.invoke( - self.rpc_client + (self.rpc_client.as_ref()) .poll_for_signature_confirmation(signature, min_confirmed_blocks), ) } @@ -4059,13 +4001,12 @@ impl RpcClient { signature: &Signature, ) -> ClientResult { self.invoke( - self.rpc_client - .get_num_blocks_since_signature_confirmation(signature), + (self.rpc_client.as_ref()).get_num_blocks_since_signature_confirmation(signature), ) } pub fn get_latest_blockhash(&self) -> ClientResult { - self.invoke(self.rpc_client.get_latest_blockhash()) + self.invoke((self.rpc_client.as_ref()).get_latest_blockhash()) } #[allow(deprecated)] @@ -4073,10 +4014,7 @@ impl RpcClient { &self, commitment: CommitmentConfig, ) -> ClientResult<(Hash, u64)> { - self.invoke( - self.rpc_client - .get_latest_blockhash_with_commitment(commitment), - ) + self.invoke((self.rpc_client.as_ref()).get_latest_blockhash_with_commitment(commitment)) } #[allow(deprecated)] @@ -4085,20 +4023,20 @@ impl RpcClient { blockhash: &Hash, commitment: CommitmentConfig, ) -> ClientResult { - self.invoke(self.rpc_client.is_blockhash_valid(blockhash, commitment)) + self.invoke((self.rpc_client.as_ref()).is_blockhash_valid(blockhash, commitment)) } #[allow(deprecated)] pub fn get_fee_for_message(&self, message: &Message) -> ClientResult { - self.invoke(self.rpc_client.get_fee_for_message(message)) + self.invoke((self.rpc_client.as_ref()).get_fee_for_message(message)) } pub fn get_new_latest_blockhash(&self, blockhash: &Hash) -> ClientResult { - self.invoke(self.rpc_client.get_new_latest_blockhash(blockhash)) + self.invoke((self.rpc_client.as_ref()).get_new_latest_blockhash(blockhash)) } pub fn get_transport_stats(&self) -> RpcTransportStats { - self.rpc_client.get_transport_stats() + (self.rpc_client.as_ref()).get_transport_stats() } fn invoke>>(&self, f: F) -> ClientResult { @@ -4107,6 +4045,14 @@ impl RpcClient { // lesser evil. tokio::task::block_in_place(move || self.runtime.as_ref().expect("runtime").block_on(f)) } + + pub(crate) fn get_inner_client(&self) -> &Arc { + &self.rpc_client + } + + pub(crate) fn runtime(&self) -> &tokio::runtime::Runtime { + self.runtime.as_ref().expect("runtime") + } } /// Mocks for documentation examples diff --git a/client/src/tpu_client.rs b/client/src/tpu_client.rs index 5e94098e2..3f3c8d2dc 100644 --- a/client/src/tpu_client.rs +++ b/client/src/tpu_client.rs @@ -1,55 +1,24 @@ +pub use crate::nonblocking::tpu_client::TpuSenderError; use { crate::{ - client_error::{ClientError, Result as ClientResult}, connection_cache::ConnectionCache, - pubsub_client::{PubsubClient, PubsubClientError, PubsubClientSubscription}, - rpc_client::RpcClient, - rpc_request::MAX_GET_SIGNATURE_STATUSES_QUERY_ITEMS, - rpc_response::{RpcContactInfo, SlotUpdate}, - spinner, - tpu_connection::TpuConnection, + nonblocking::tpu_client::TpuClient as NonblockingTpuClient, rpc_client::RpcClient, }, - bincode::serialize, - log::*, solana_sdk::{ clock::Slot, - commitment_config::CommitmentConfig, - epoch_info::EpochInfo, message::Message, - pubkey::Pubkey, - signature::SignerError, signers::Signers, transaction::{Transaction, TransactionError}, - transport::{Result as TransportResult, TransportError}, + transport::Result as TransportResult, }, std::{ - collections::{HashMap, HashSet, VecDeque}, - net::{SocketAddr, UdpSocket}, - str::FromStr, - sync::{ - atomic::{AtomicBool, Ordering}, - Arc, RwLock, - }, - thread::{sleep, JoinHandle}, + collections::VecDeque, + net::UdpSocket, + sync::{Arc, RwLock}, }, - thiserror::Error, - tokio::time::{Duration, Instant}, + tokio::time::Duration, }; -#[derive(Error, Debug)] -pub enum TpuSenderError { - #[error("Pubsub error: {0:?}")] - PubsubError(#[from] PubsubClientError), - #[error("RPC error: {0:?}")] - RpcError(#[from] ClientError), - #[error("IO error: {0:?}")] - IoError(#[from] std::io::Error), - #[error("Signer error: {0:?}")] - SignerError(#[from] SignerError), - #[error("Custom error: {0}")] - Custom(String), -} - type Result = std::result::Result; /// Default number of slots used to build TPU socket fanout set @@ -83,61 +52,34 @@ impl Default for TpuClientConfig { /// The client uses RPC to determine the current leader and fetch node contact info pub struct TpuClient { _deprecated: UdpSocket, // TpuClient now uses the connection_cache to choose a send_socket - fanout_slots: u64, - leader_tpu_service: LeaderTpuService, - exit: Arc, + //todo: get rid of this field rpc_client: Arc, - connection_cache: Arc, + tpu_client: Arc, } impl TpuClient { /// Serialize and send transaction to the current and upcoming leader TPUs according to fanout /// size pub fn send_transaction(&self, transaction: &Transaction) -> bool { - let wire_transaction = serialize(transaction).expect("serialization should succeed"); - self.send_wire_transaction(wire_transaction) + self.invoke(self.tpu_client.send_transaction(transaction)) } /// Send a wire transaction to the current and upcoming leader TPUs according to fanout size pub fn send_wire_transaction(&self, wire_transaction: Vec) -> bool { - self.try_send_wire_transaction(wire_transaction).is_ok() + self.invoke(self.tpu_client.send_wire_transaction(wire_transaction)) } /// Serialize and send transaction to the current and upcoming leader TPUs according to fanout /// size /// Returns the last error if all sends fail pub fn try_send_transaction(&self, transaction: &Transaction) -> TransportResult<()> { - let wire_transaction = serialize(transaction).expect("serialization should succeed"); - self.try_send_wire_transaction(wire_transaction) + self.invoke(self.tpu_client.try_send_transaction(transaction)) } /// Send a wire transaction to the current and upcoming leader TPUs according to fanout size /// Returns the last error if all sends fail - fn try_send_wire_transaction(&self, wire_transaction: Vec) -> TransportResult<()> { - let mut last_error: Option = None; - let mut some_success = false; - - for tpu_address in self - .leader_tpu_service - .leader_tpu_sockets(self.fanout_slots) - { - let conn = self.connection_cache.get_connection(&tpu_address); - let result = conn.send_wire_transaction_async(wire_transaction.clone()); - if let Err(err) = result { - last_error = Some(err); - } else { - some_success = true; - } - } - if !some_success { - Err(if let Some(err) = last_error { - err - } else { - std::io::Error::new(std::io::ErrorKind::Other, "No sends attempted").into() - }) - } else { - Ok(()) - } + pub fn try_send_wire_transaction(&self, wire_transaction: Vec) -> TransportResult<()> { + self.invoke(self.tpu_client.try_send_wire_transaction(wire_transaction)) } /// Create a new client that disconnects when dropped @@ -146,8 +88,16 @@ impl TpuClient { websocket_url: &str, config: TpuClientConfig, ) -> Result { - let connection_cache = Arc::new(ConnectionCache::default()); - Self::new_with_connection_cache(rpc_client, websocket_url, config, connection_cache) + let create_tpu_client = + NonblockingTpuClient::new(rpc_client.get_inner_client().clone(), websocket_url, config); + let tpu_client = + tokio::task::block_in_place(|| rpc_client.runtime().block_on(create_tpu_client))?; + + Ok(Self { + _deprecated: UdpSocket::bind("0.0.0.0:0").unwrap(), + rpc_client, + tpu_client: Arc::new(tpu_client), + }) } /// Create a new client that disconnects when dropped @@ -157,17 +107,19 @@ impl TpuClient { config: TpuClientConfig, connection_cache: Arc, ) -> Result { - let exit = Arc::new(AtomicBool::new(false)); - let leader_tpu_service = - LeaderTpuService::new(rpc_client.clone(), websocket_url, exit.clone())?; + let create_tpu_client = NonblockingTpuClient::new_with_connection_cache( + rpc_client.get_inner_client().clone(), + websocket_url, + config, + connection_cache, + ); + let tpu_client = + tokio::task::block_in_place(|| rpc_client.runtime().block_on(create_tpu_client))?; Ok(Self { _deprecated: UdpSocket::bind("0.0.0.0:0").unwrap(), - fanout_slots: config.fanout_slots.min(MAX_FANOUT_SLOTS).max(1), - leader_tpu_service, - exit, rpc_client, - connection_cache, + tpu_client: Arc::new(tpu_client), }) } @@ -176,332 +128,21 @@ impl TpuClient { messages: &[Message], signers: &T, ) -> Result>> { - let mut expired_blockhash_retries = 5; - - let progress_bar = spinner::new_progress_bar(); - progress_bar.set_message("Setting up..."); - - let mut transactions = messages - .iter() - .enumerate() - .map(|(i, message)| (i, Transaction::new_unsigned(message.clone()))) - .collect::>(); - let total_transactions = transactions.len(); - let mut transaction_errors = vec![None; transactions.len()]; - let mut confirmed_transactions = 0; - let mut block_height = self.rpc_client.get_block_height()?; - - while expired_blockhash_retries > 0 { - let (blockhash, last_valid_block_height) = self - .rpc_client - .get_latest_blockhash_with_commitment(self.rpc_client.commitment())?; - - let mut pending_transactions = HashMap::new(); - for (i, mut transaction) in transactions { - transaction.try_sign(signers, blockhash)?; - pending_transactions.insert(transaction.signatures[0], (i, transaction)); - } - - let mut last_resend = Instant::now() - TRANSACTION_RESEND_INTERVAL; - while block_height <= last_valid_block_height { - let num_transactions = pending_transactions.len(); - - // Periodically re-send all pending transactions - if Instant::now().duration_since(last_resend) > TRANSACTION_RESEND_INTERVAL { - for (index, (_i, transaction)) in pending_transactions.values().enumerate() { - if !self.send_transaction(transaction) { - let _result = self.rpc_client.send_transaction(transaction).ok(); - } - spinner::set_message_for_confirmed_transactions( - &progress_bar, - confirmed_transactions, - total_transactions, - None, //block_height, - last_valid_block_height, - &format!("Sending {}/{} transactions", index + 1, num_transactions,), - ); - sleep(SEND_TRANSACTION_INTERVAL); - } - last_resend = Instant::now(); - } - - // Wait for the next block before checking for transaction statuses - let mut block_height_refreshes = 10; - spinner::set_message_for_confirmed_transactions( - &progress_bar, - confirmed_transactions, - total_transactions, - Some(block_height), - last_valid_block_height, - &format!("Waiting for next block, {} pending...", num_transactions), - ); - let mut new_block_height = block_height; - while block_height == new_block_height && block_height_refreshes > 0 { - sleep(Duration::from_millis(500)); - new_block_height = self.rpc_client.get_block_height()?; - block_height_refreshes -= 1; - } - block_height = new_block_height; - - // Collect statuses for the transactions, drop those that are confirmed - let pending_signatures = pending_transactions.keys().cloned().collect::>(); - for pending_signatures_chunk in - pending_signatures.chunks(MAX_GET_SIGNATURE_STATUSES_QUERY_ITEMS) - { - if let Ok(result) = self - .rpc_client - .get_signature_statuses(pending_signatures_chunk) - { - let statuses = result.value; - for (signature, status) in - pending_signatures_chunk.iter().zip(statuses.into_iter()) - { - if let Some(status) = status { - if status.satisfies_commitment(self.rpc_client.commitment()) { - if let Some((i, _)) = pending_transactions.remove(signature) { - confirmed_transactions += 1; - if status.err.is_some() { - progress_bar.println(format!( - "Failed transaction: {:?}", - status - )); - } - transaction_errors[i] = status.err; - } - } - } - } - } - spinner::set_message_for_confirmed_transactions( - &progress_bar, - confirmed_transactions, - total_transactions, - Some(block_height), - last_valid_block_height, - "Checking transaction status...", - ); - } - - if pending_transactions.is_empty() { - return Ok(transaction_errors); - } - } - - transactions = pending_transactions.into_iter().map(|(_k, v)| v).collect(); - progress_bar.println(format!( - "Blockhash expired. {} retries remaining", - expired_blockhash_retries - )); - expired_blockhash_retries -= 1; - } - Err(TpuSenderError::Custom("Max retries exceeded".into())) + self.invoke( + self.tpu_client + .send_and_confirm_messages_with_spinner(messages, signers), + ) } pub fn rpc_client(&self) -> &RpcClient { &self.rpc_client } -} -impl Drop for TpuClient { - fn drop(&mut self) { - self.exit.store(true, Ordering::Relaxed); - self.leader_tpu_service.join(); - } -} - -pub(crate) struct LeaderTpuCacheUpdateInfo { - pub(crate) maybe_cluster_nodes: Option>>, - pub(crate) maybe_epoch_info: Option>, - pub(crate) maybe_slot_leaders: Option>>, -} -impl LeaderTpuCacheUpdateInfo { - pub(crate) fn has_some(&self) -> bool { - self.maybe_cluster_nodes.is_some() - || self.maybe_epoch_info.is_some() - || self.maybe_slot_leaders.is_some() - } -} - -pub(crate) struct LeaderTpuCache { - first_slot: Slot, - leaders: Vec, - leader_tpu_map: HashMap, - slots_in_epoch: Slot, - last_epoch_info_slot: Slot, -} - -impl LeaderTpuCache { - pub(crate) fn new( - first_slot: Slot, - slots_in_epoch: Slot, - leaders: Vec, - cluster_nodes: Vec, - ) -> Self { - let leader_tpu_map = Self::extract_cluster_tpu_sockets(cluster_nodes); - Self { - first_slot, - leaders, - leader_tpu_map, - slots_in_epoch, - last_epoch_info_slot: first_slot, - } - } - - // Last slot that has a cached leader pubkey - pub(crate) fn last_slot(&self) -> Slot { - self.first_slot + self.leaders.len().saturating_sub(1) as u64 - } - - pub(crate) fn slot_info(&self) -> (Slot, Slot, Slot) { - ( - self.last_slot(), - self.last_epoch_info_slot, - self.slots_in_epoch, - ) - } - - // Get the TPU sockets for the current leader and upcoming leaders according to fanout size - pub(crate) fn get_leader_sockets( - &self, - current_slot: Slot, - fanout_slots: u64, - ) -> Vec { - let mut leader_set = HashSet::new(); - let mut leader_sockets = Vec::new(); - for leader_slot in current_slot..current_slot + fanout_slots { - if let Some(leader) = self.get_slot_leader(leader_slot) { - if let Some(tpu_socket) = self.leader_tpu_map.get(leader) { - if leader_set.insert(*leader) { - leader_sockets.push(*tpu_socket); - } - } else { - // The leader is probably delinquent - trace!("TPU not available for leader {}", leader); - } - } else { - // Overran the local leader schedule cache - warn!( - "Leader not known for slot {}; cache holds slots [{},{}]", - leader_slot, - self.first_slot, - self.last_slot() - ); - } - } - leader_sockets - } - - pub(crate) fn get_slot_leader(&self, slot: Slot) -> Option<&Pubkey> { - if slot >= self.first_slot { - let index = slot - self.first_slot; - self.leaders.get(index as usize) - } else { - None - } - } - - pub(crate) fn extract_cluster_tpu_sockets( - cluster_contact_info: Vec, - ) -> HashMap { - cluster_contact_info - .into_iter() - .filter_map(|contact_info| { - Some(( - Pubkey::from_str(&contact_info.pubkey).ok()?, - contact_info.tpu?, - )) - }) - .collect() - } - - pub(crate) fn fanout(slots_in_epoch: Slot) -> Slot { - (2 * MAX_FANOUT_SLOTS).min(slots_in_epoch) - } - - pub(crate) fn update_all( - &mut self, - estimated_current_slot: Slot, - cache_update_info: LeaderTpuCacheUpdateInfo, - ) -> (bool, bool) { - let mut has_error = false; - let mut cluster_refreshed = false; - if let Some(cluster_nodes) = cache_update_info.maybe_cluster_nodes { - match cluster_nodes { - Ok(cluster_nodes) => { - let leader_tpu_map = LeaderTpuCache::extract_cluster_tpu_sockets(cluster_nodes); - self.leader_tpu_map = leader_tpu_map; - cluster_refreshed = true; - } - Err(err) => { - warn!("Failed to fetch cluster tpu sockets: {}", err); - has_error = true; - } - } - } - - if let Some(Ok(epoch_info)) = cache_update_info.maybe_epoch_info { - self.slots_in_epoch = epoch_info.slots_in_epoch; - self.last_epoch_info_slot = estimated_current_slot; - } - - if let Some(slot_leaders) = cache_update_info.maybe_slot_leaders { - match slot_leaders { - Ok(slot_leaders) => { - self.first_slot = estimated_current_slot; - self.leaders = slot_leaders; - } - Err(err) => { - warn!( - "Failed to fetch slot leaders (current estimated slot: {}): {}", - estimated_current_slot, err - ); - has_error = true; - } - } - } - (has_error, cluster_refreshed) - } -} - -fn maybe_fetch_cache_info( - leader_tpu_cache: &Arc>, - last_cluster_refresh: Instant, - rpc_client: &RpcClient, - recent_slots: &RecentLeaderSlots, -) -> LeaderTpuCacheUpdateInfo { - // Refresh cluster TPU ports every 5min in case validators restart with new port configuration - // or new validators come online - let maybe_cluster_nodes = if last_cluster_refresh.elapsed() > Duration::from_secs(5 * 60) { - Some(rpc_client.get_cluster_nodes()) - } else { - None - }; - - let estimated_current_slot = recent_slots.estimated_current_slot(); - let (last_slot, last_epoch_info_slot, slots_in_epoch) = { - let leader_tpu_cache = leader_tpu_cache.read().unwrap(); - leader_tpu_cache.slot_info() - }; - let maybe_epoch_info = - if estimated_current_slot >= last_epoch_info_slot.saturating_sub(slots_in_epoch) { - Some(rpc_client.get_epoch_info()) - } else { - None - }; - - let maybe_slot_leaders = if estimated_current_slot >= last_slot.saturating_sub(MAX_FANOUT_SLOTS) - { - Some(rpc_client.get_slot_leaders( - estimated_current_slot, - LeaderTpuCache::fanout(slots_in_epoch), - )) - } else { - None - }; - LeaderTpuCacheUpdateInfo { - maybe_cluster_nodes, - maybe_epoch_info, - maybe_slot_leaders, + fn invoke>(&self, f: F) -> T { + // `block_on()` panics if called within an asynchronous execution context. Whereas + // `block_in_place()` only panics if called from a current_thread runtime, which is the + // lesser evil. + tokio::task::block_in_place(move || self.rpc_client.runtime().block_on(f)) } } @@ -559,126 +200,6 @@ impl From> for RecentLeaderSlots { } } -/// Service that tracks upcoming leaders and maintains an up-to-date mapping -/// of leader id to TPU socket address. -struct LeaderTpuService { - recent_slots: RecentLeaderSlots, - leader_tpu_cache: Arc>, - subscription: Option>, - t_leader_tpu_service: Option>, -} - -impl LeaderTpuService { - fn new(rpc_client: Arc, websocket_url: &str, exit: Arc) -> Result { - let start_slot = rpc_client.get_slot_with_commitment(CommitmentConfig::processed())?; - - let recent_slots = RecentLeaderSlots::new(start_slot); - let slots_in_epoch = rpc_client.get_epoch_info()?.slots_in_epoch; - let leaders = - rpc_client.get_slot_leaders(start_slot, LeaderTpuCache::fanout(slots_in_epoch))?; - let cluster_nodes = rpc_client.get_cluster_nodes()?; - let leader_tpu_cache = Arc::new(RwLock::new(LeaderTpuCache::new( - start_slot, - slots_in_epoch, - leaders, - cluster_nodes, - ))); - - let subscription = if !websocket_url.is_empty() { - let recent_slots = recent_slots.clone(); - Some(PubsubClient::slot_updates_subscribe( - websocket_url, - move |update| { - let current_slot = match update { - // This update indicates that a full slot was received by the connected - // node so we can stop sending transactions to the leader for that slot - SlotUpdate::Completed { slot, .. } => slot.saturating_add(1), - // This update indicates that we have just received the first shred from - // the leader for this slot and they are probably still accepting transactions. - SlotUpdate::FirstShredReceived { slot, .. } => slot, - _ => return, - }; - recent_slots.record_slot(current_slot); - }, - )?) - } else { - None - }; - - let t_leader_tpu_service = Some({ - let recent_slots = recent_slots.clone(); - let leader_tpu_cache = leader_tpu_cache.clone(); - std::thread::Builder::new() - .name("ldr-tpu-srv".to_string()) - .spawn(move || Self::run(rpc_client, recent_slots, leader_tpu_cache, exit)) - .unwrap() - }); - - Ok(LeaderTpuService { - recent_slots, - leader_tpu_cache, - subscription, - t_leader_tpu_service, - }) - } - - fn join(&mut self) { - if let Some(mut subscription) = self.subscription.take() { - let _ = subscription.send_unsubscribe(); - let _ = subscription.shutdown(); - } - if let Some(t_handle) = self.t_leader_tpu_service.take() { - t_handle.join().unwrap(); - } - } - - fn leader_tpu_sockets(&self, fanout_slots: u64) -> Vec { - let current_slot = self.recent_slots.estimated_current_slot(); - self.leader_tpu_cache - .read() - .unwrap() - .get_leader_sockets(current_slot, fanout_slots) - } - - fn run( - rpc_client: Arc, - recent_slots: RecentLeaderSlots, - leader_tpu_cache: Arc>, - exit: Arc, - ) { - let mut last_cluster_refresh = Instant::now(); - let mut sleep_ms = 1000; - loop { - if exit.load(Ordering::Relaxed) { - break; - } - - // Sleep a few slots before checking if leader cache needs to be refreshed again - sleep(Duration::from_millis(sleep_ms)); - sleep_ms = 1000; - - let cache_update_info = maybe_fetch_cache_info( - &leader_tpu_cache, - last_cluster_refresh, - &rpc_client, - &recent_slots, - ); - - if cache_update_info.has_some() { - let mut leader_tpu_cache = leader_tpu_cache.write().unwrap(); - let (has_error, cluster_refreshed) = leader_tpu_cache - .update_all(recent_slots.estimated_current_slot(), cache_update_info); - if has_error { - sleep_ms = 100; - } - if cluster_refreshed { - last_cluster_refresh = Instant::now(); - } - } - } - } -} - #[cfg(test)] mod tests { use super::*;