From dadea873a915f686fd481d1ffb553f4a06af6fc6 Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Wed, 11 Nov 2020 18:56:26 -0700 Subject: [PATCH] Send BanksClient RPC requests before returning futures (#13539) * Send RPC requests before returning futures * Add process_transactions() --- Cargo.lock | 1 + banks-client/src/lib.rs | 148 +++++++++++++++++++++---------------- banks-interface/Cargo.toml | 3 + 3 files changed, 87 insertions(+), 65 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5a134fd7ec..bc4af20edf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3632,6 +3632,7 @@ dependencies = [ "serde", "solana-sdk", "tarpc", + "tokio 0.3.2", ] [[package]] diff --git a/banks-client/src/lib.rs b/banks-client/src/lib.rs index 298637ef98..2a98e311ab 100644 --- a/banks-client/src/lib.rs +++ b/banks-client/src/lib.rs @@ -5,7 +5,7 @@ //! but they are undocumented, may change over time, and are generally more //! cumbersome to use. -use futures::future::join_all; +use futures::{future::join_all, Future, FutureExt}; pub use solana_banks_interface::{BanksClient as TarpcClient, TransactionStatus}; use solana_banks_interface::{BanksRequest, BanksResponse}; use solana_sdk::{ @@ -52,168 +52,187 @@ impl BanksClient { TarpcClient::new(config, transport) } - pub async fn send_transaction_with_context( + pub fn send_transaction_with_context( &mut self, ctx: Context, transaction: Transaction, - ) -> io::Result<()> { - self.inner - .send_transaction_with_context(ctx, transaction) - .await + ) -> impl Future> + '_ { + self.inner.send_transaction_with_context(ctx, transaction) } - pub async fn get_fees_with_commitment_and_context( + pub fn get_fees_with_commitment_and_context( &mut self, ctx: Context, commitment: CommitmentLevel, - ) -> io::Result<(FeeCalculator, Hash, Slot)> { + ) -> impl Future> + '_ { self.inner .get_fees_with_commitment_and_context(ctx, commitment) - .await } - pub async fn get_transaction_status_with_context( + pub fn get_transaction_status_with_context( &mut self, ctx: Context, signature: Signature, - ) -> io::Result> { + ) -> impl Future>> + '_ { self.inner .get_transaction_status_with_context(ctx, signature) - .await } - pub async fn get_slot_with_context( + pub fn get_slot_with_context( &mut self, ctx: Context, commitment: CommitmentLevel, - ) -> io::Result { - self.inner.get_slot_with_context(ctx, commitment).await + ) -> impl Future> + '_ { + self.inner.get_slot_with_context(ctx, commitment) } - pub async fn process_transaction_with_commitment_and_context( + pub fn process_transaction_with_commitment_and_context( &mut self, ctx: Context, transaction: Transaction, commitment: CommitmentLevel, - ) -> io::Result>> { + ) -> impl Future>>> + '_ { self.inner .process_transaction_with_commitment_and_context(ctx, transaction, commitment) - .await } - pub async fn get_account_with_commitment_and_context( + pub fn get_account_with_commitment_and_context( &mut self, ctx: Context, address: Pubkey, commitment: CommitmentLevel, - ) -> io::Result> { + ) -> impl Future>> + '_ { self.inner .get_account_with_commitment_and_context(ctx, address, commitment) - .await } /// Send a transaction and return immediately. The server will resend the /// transaction until either it is accepted by the cluster or the transaction's /// blockhash expires. - pub async fn send_transaction(&mut self, transaction: Transaction) -> io::Result<()> { + pub fn send_transaction( + &mut self, + transaction: Transaction, + ) -> impl Future> + '_ { self.send_transaction_with_context(context::current(), transaction) - .await } /// Return the fee parameters associated with a recent, rooted blockhash. The cluster /// will use the transaction's blockhash to look up these same fee parameters and /// use them to calculate the transaction fee. - pub async fn get_fees(&mut self) -> io::Result<(FeeCalculator, Hash, Slot)> { + pub fn get_fees( + &mut self, + ) -> impl Future> + '_ { self.get_fees_with_commitment_and_context(context::current(), CommitmentLevel::Root) - .await } /// Return the cluster rent - pub async fn get_rent(&mut self) -> io::Result { - let rent_sysvar = self - .get_account(sysvar::rent::id()) - .await? - .ok_or_else(|| io::Error::new(io::ErrorKind::Other, "Rent sysvar not present"))?; - - from_account::(&rent_sysvar).ok_or_else(|| { - io::Error::new(io::ErrorKind::Other, "Failed to deserialize Rent sysvar") + pub fn get_rent(&mut self) -> impl Future> + '_ { + self.get_account(sysvar::rent::id()).map(|result| { + let rent_sysvar = result? + .ok_or_else(|| io::Error::new(io::ErrorKind::Other, "Rent sysvar not present"))?; + from_account::(&rent_sysvar).ok_or_else(|| { + io::Error::new(io::ErrorKind::Other, "Failed to deserialize Rent sysvar") + }) }) } /// Return a recent, rooted blockhash from the server. The cluster will only accept /// transactions with a blockhash that has not yet expired. Use the `get_fees` /// method to get both a blockhash and the blockhash's last valid slot. - pub async fn get_recent_blockhash(&mut self) -> io::Result { - Ok(self.get_fees().await?.1) + pub fn get_recent_blockhash(&mut self) -> impl Future> + '_ { + self.get_fees().map(|result| Ok(result?.1)) } /// Send a transaction and return after the transaction has been rejected or /// reached the given level of commitment. - pub async fn process_transaction_with_commitment( + pub fn process_transaction_with_commitment( &mut self, transaction: Transaction, commitment: CommitmentLevel, - ) -> transport::Result<()> { + ) -> impl Future> + '_ { let mut ctx = context::current(); ctx.deadline += Duration::from_secs(50); - let result = self - .process_transaction_with_commitment_and_context(ctx, transaction, commitment) - .await?; - match result { - None => Err(Error::new(ErrorKind::TimedOut, "invalid blockhash or fee-payer").into()), - Some(transaction_result) => Ok(transaction_result?), - } + self.process_transaction_with_commitment_and_context(ctx, transaction, commitment) + .map(|result| match result? { + None => { + Err(Error::new(ErrorKind::TimedOut, "invalid blockhash or fee-payer").into()) + } + Some(transaction_result) => Ok(transaction_result?), + }) } - /// Send a transaction and return after the transaction has been finalized or rejected. - pub async fn process_transaction(&mut self, transaction: Transaction) -> transport::Result<()> { + /// Send a transaction and return until the transaction has been finalized or rejected. + pub fn process_transaction( + &mut self, + transaction: Transaction, + ) -> impl Future> + '_ { self.process_transaction_with_commitment(transaction, CommitmentLevel::default()) - .await + } + + pub async fn process_transactions_with_commitment( + &mut self, + transactions: Vec, + commitment: CommitmentLevel, + ) -> transport::Result<()> { + let mut clients: Vec<_> = transactions.iter().map(|_| self.clone()).collect(); + let futures = clients + .iter_mut() + .zip(transactions) + .map(|(client, transaction)| { + client.process_transaction_with_commitment(transaction, commitment) + }); + let statuses = join_all(futures).await; + statuses.into_iter().collect() // Convert Vec> to Result> + } + + /// Send transactions and return until the transaction has been finalized or rejected. + pub fn process_transactions( + &mut self, + transactions: Vec, + ) -> impl Future> + '_ { + self.process_transactions_with_commitment(transactions, CommitmentLevel::default()) } /// Return the most recent rooted slot height. All transactions at or below this height /// are said to be finalized. The cluster will not fork to a higher slot height. - pub async fn get_root_slot(&mut self) -> io::Result { + pub fn get_root_slot(&mut self) -> impl Future> + '_ { self.get_slot_with_context(context::current(), CommitmentLevel::Root) - .await } /// Return the account at the given address at the slot corresponding to the given /// commitment level. If the account is not found, None is returned. - pub async fn get_account_with_commitment( + pub fn get_account_with_commitment( &mut self, address: Pubkey, commitment: CommitmentLevel, - ) -> io::Result> { + ) -> impl Future>> + '_ { self.get_account_with_commitment_and_context(context::current(), address, commitment) - .await } /// Return the account at the given address at the time of the most recent root slot. /// If the account is not found, None is returned. - pub async fn get_account(&mut self, address: Pubkey) -> io::Result> { + pub fn get_account( + &mut self, + address: Pubkey, + ) -> impl Future>> + '_ { self.get_account_with_commitment(address, CommitmentLevel::default()) - .await } /// Return the balance in lamports of an account at the given address at the slot /// corresponding to the given commitment level. - pub async fn get_balance_with_commitment( + pub fn get_balance_with_commitment( &mut self, address: Pubkey, commitment: CommitmentLevel, - ) -> io::Result { - let account = self - .get_account_with_commitment_and_context(context::current(), address, commitment) - .await?; - Ok(account.map(|x| x.lamports).unwrap_or(0)) + ) -> impl Future> + '_ { + self.get_account_with_commitment_and_context(context::current(), address, commitment) + .map(|result| Ok(result?.map(|x| x.lamports).unwrap_or(0))) } /// Return the balance in lamports of an account at the given address at the time /// of the most recent root slot. - pub async fn get_balance(&mut self, address: Pubkey) -> io::Result { + pub fn get_balance(&mut self, address: Pubkey) -> impl Future> + '_ { self.get_balance_with_commitment(address, CommitmentLevel::default()) - .await } /// Return the status of a transaction with a signature matching the transaction's first @@ -221,12 +240,11 @@ impl BanksClient { /// blockhash was expired or the fee-paying account had insufficient funds to pay the /// transaction fee. Note that servers rarely store the full transaction history. This /// method may return None if the transaction status has been discarded. - pub async fn get_transaction_status( + pub fn get_transaction_status( &mut self, signature: Signature, - ) -> io::Result> { + ) -> impl Future>> + '_ { self.get_transaction_status_with_context(context::current(), signature) - .await } /// Same as get_transaction_status, but for multiple transactions. diff --git a/banks-interface/Cargo.toml b/banks-interface/Cargo.toml index 9ca906a219..ba70884340 100644 --- a/banks-interface/Cargo.toml +++ b/banks-interface/Cargo.toml @@ -13,6 +13,9 @@ serde = { version = "1.0.112", features = ["derive"] } solana-sdk = { path = "../sdk", version = "1.5.0" } tarpc = { version = "0.23.0", features = ["full"] } +[dev-dependencies] +tokio = { version = "0.3", features = ["full"] } + [lib] crate-type = ["lib"] name = "solana_banks_interface"