From a3c302c36aa0c8213e1ba5a3ebf3e5309a86c894 Mon Sep 17 00:00:00 2001 From: Sagar Dhawan Date: Thu, 25 Apr 2019 12:46:40 -0700 Subject: [PATCH] Add signature polling to SyncClient (#3996) automerge --- bench-tps/src/bench.rs | 9 +++++- client/src/thin_client.rs | 28 +++++++++--------- core/src/cluster_tests.rs | 6 ++-- core/src/replicator.rs | 12 ++++++-- runtime/src/bank_client.rs | 58 ++++++++++++++++++++++++++++++++++++-- sdk/src/client.rs | 10 +++++++ 6 files changed, 102 insertions(+), 21 deletions(-) diff --git a/bench-tps/src/bench.rs b/bench-tps/src/bench.rs index 412d716d0..7b8aae9f9 100644 --- a/bench-tps/src/bench.rs +++ b/bench-tps/src/bench.rs @@ -519,7 +519,14 @@ pub fn airdrop_lamports( match request_airdrop_transaction(&drone_addr, &id.pubkey(), airdrop_amount, blockhash) { Ok(transaction) => { let signature = client.async_send_transaction(transaction).unwrap(); - client.get_signature_status(&signature).unwrap(); + client + .poll_for_signature_confirmation(&signature, 1) + .unwrap_or_else(|_| { + panic!( + "Error requesting airdrop: to addr: {:?} amount: {}", + drone_addr, airdrop_amount + ) + }) } Err(err) => { panic!( diff --git a/client/src/thin_client.rs b/client/src/thin_client.rs index 5aebedb62..b94fa583a 100644 --- a/client/src/thin_client.rs +++ b/client/src/thin_client.rs @@ -137,19 +137,6 @@ impl ThinClient { self.rpc_client.wait_for_balance(pubkey, expected_balance) } - pub fn poll_for_signature(&self, signature: &Signature) -> io::Result<()> { - self.rpc_client.poll_for_signature(signature) - } - /// Poll the server until the signature has been confirmed by at least `min_confirmed_blocks` - pub fn poll_for_signature_confirmation( - &self, - signature: &Signature, - min_confirmed_blocks: usize, - ) -> io::Result<()> { - self.rpc_client - .poll_for_signature_confirmation(signature, min_confirmed_blocks) - } - /// Check a signature in the bank. This method blocks /// until the server sends a response. pub fn check_signature(&self, signature: &Signature) -> bool { @@ -236,6 +223,21 @@ impl SyncClient for ThinClient { let transaction_count = self.rpc_client.get_transaction_count()?; Ok(transaction_count) } + + /// Poll the server until the signature has been confirmed by at least `min_confirmed_blocks` + fn poll_for_signature_confirmation( + &self, + signature: &Signature, + min_confirmed_blocks: usize, + ) -> TransportResult<()> { + Ok(self + .rpc_client + .poll_for_signature_confirmation(signature, min_confirmed_blocks)?) + } + + fn poll_for_signature(&self, signature: &Signature) -> TransportResult<()> { + Ok(self.rpc_client.poll_for_signature(signature)?) + } } impl AsyncClient for ThinClient { diff --git a/core/src/cluster_tests.rs b/core/src/cluster_tests.rs index 2b3b8f6cb..2fbad6208 100644 --- a/core/src/cluster_tests.rs +++ b/core/src/cluster_tests.rs @@ -17,7 +17,7 @@ use solana_sdk::system_transaction; use solana_sdk::timing::{ duration_as_ms, DEFAULT_TICKS_PER_SLOT, NUM_CONSECUTIVE_LEADER_SLOTS, NUM_TICKS_PER_SECOND, }; -use std::io; +use solana_sdk::transport::TransportError; use std::thread::sleep; use std::time::Duration; @@ -202,7 +202,7 @@ pub fn kill_entry_and_spend_and_verify_rest( ); match sig { Err(e) => { - result = Err(e); + result = Err(TransportError::IoError(e)); continue; } @@ -227,7 +227,7 @@ fn poll_all_nodes_for_signature( cluster_nodes: &[ContactInfo], sig: &Signature, confs: usize, -) -> io::Result<()> { +) -> Result<(), TransportError> { for validator in cluster_nodes { if validator.id == entry_point_info.id { continue; diff --git a/core/src/replicator.rs b/core/src/replicator.rs index f6985edeb..248e84d4e 100644 --- a/core/src/replicator.rs +++ b/core/src/replicator.rs @@ -25,6 +25,7 @@ use solana_sdk::hash::{Hash, Hasher}; use solana_sdk::signature::{Keypair, KeypairUtil, Signature}; use solana_sdk::system_transaction; use solana_sdk::transaction::Transaction; +use solana_sdk::transport::TransportError; use solana_storage_api::storage_instruction; use std::fs::File; use std::io; @@ -405,7 +406,7 @@ impl Replicator { client: &ThinClient, keypair: &Keypair, storage_keypair: &Keypair, - ) -> io::Result<()> { + ) -> Result<()> { // make sure replicator has some balance if client.poll_get_balance(&keypair.pubkey())? == 0 { Err(io::Error::new( @@ -429,7 +430,14 @@ impl Replicator { 0, ); let signature = client.async_send_transaction(tx)?; - client.poll_for_signature(&signature)?; + client + .poll_for_signature(&signature) + .map_err(|err| match err { + TransportError::IoError(e) => e, + TransportError::TransactionError(_) => { + io::Error::new(ErrorKind::Other, "signature not found") + } + })?; } Ok(()) } diff --git a/runtime/src/bank_client.rs b/runtime/src/bank_client.rs index 22b731de8..7a5f5e97e 100644 --- a/runtime/src/bank_client.rs +++ b/runtime/src/bank_client.rs @@ -8,12 +8,13 @@ use solana_sdk::signature::Signature; use solana_sdk::signature::{Keypair, KeypairUtil}; use solana_sdk::system_instruction; use solana_sdk::transaction::{self, Transaction}; -use solana_sdk::transport::Result; +use solana_sdk::transport::{Result, TransportError}; use std::io; use std::sync::mpsc::{channel, Receiver, Sender}; use std::sync::Arc; use std::sync::Mutex; -use std::thread::Builder; +use std::thread::{sleep, Builder}; +use std::time::{Duration, Instant}; pub struct BankClient { bank: Arc, @@ -112,6 +113,59 @@ impl SyncClient for BankClient { fn get_transaction_count(&self) -> Result { Ok(self.bank.transaction_count()) } + + fn poll_for_signature_confirmation( + &self, + signature: &Signature, + min_confirmed_blocks: usize, + ) -> Result<()> { + let mut now = Instant::now(); + let mut confirmed_blocks = 0; + loop { + let response = self.bank.get_signature_confirmation_status(signature); + if let Some((confirmations, res)) = response { + if res.is_ok() { + if confirmed_blocks != confirmations { + now = Instant::now(); + confirmed_blocks = confirmations; + } + if confirmations >= min_confirmed_blocks { + break; + } + } + }; + if now.elapsed().as_secs() > 15 { + // TODO: Return a better error. + return Err(TransportError::IoError(io::Error::new( + io::ErrorKind::Other, + "signature not found", + ))); + } + sleep(Duration::from_millis(250)); + } + Ok(()) + } + + fn poll_for_signature(&self, signature: &Signature) -> Result<()> { + let now = Instant::now(); + loop { + let response = self.bank.get_signature_status(signature); + if let Some(res) = response { + if res.is_ok() { + break; + } + } + if now.elapsed().as_secs() > 15 { + // TODO: Return a better error. + return Err(TransportError::IoError(io::Error::new( + io::ErrorKind::Other, + "signature not found", + ))); + } + sleep(Duration::from_millis(250)); + } + Ok(()) + } } impl BankClient { diff --git a/sdk/src/client.rs b/sdk/src/client.rs index acfc1e403..0dd03cf17 100644 --- a/sdk/src/client.rs +++ b/sdk/src/client.rs @@ -50,6 +50,16 @@ pub trait SyncClient { /// Get transaction count fn get_transaction_count(&self) -> Result; + + /// Poll until the signature has been confirmed by at least `min_confirmed_blocks` + fn poll_for_signature_confirmation( + &self, + signature: &Signature, + min_confirmed_blocks: usize, + ) -> Result<()>; + + /// Poll to confirm a transaction. + fn poll_for_signature(&self, signature: &Signature) -> Result<()>; } pub trait AsyncClient {