Add signature polling to SyncClient (#3996)

automerge
This commit is contained in:
Sagar Dhawan 2019-04-25 12:46:40 -07:00 committed by Grimes
parent d12705f9b0
commit a3c302c36a
6 changed files with 102 additions and 21 deletions

View File

@ -519,7 +519,14 @@ pub fn airdrop_lamports<T: Client>(
match request_airdrop_transaction(&drone_addr, &id.pubkey(), airdrop_amount, blockhash) {
Ok(transaction) => {
let signature = client.async_send_transaction(transaction).unwrap();
client.get_signature_status(&signature).unwrap();
client
.poll_for_signature_confirmation(&signature, 1)
.unwrap_or_else(|_| {
panic!(
"Error requesting airdrop: to addr: {:?} amount: {}",
drone_addr, airdrop_amount
)
})
}
Err(err) => {
panic!(

View File

@ -137,19 +137,6 @@ impl ThinClient {
self.rpc_client.wait_for_balance(pubkey, expected_balance)
}
pub fn poll_for_signature(&self, signature: &Signature) -> io::Result<()> {
self.rpc_client.poll_for_signature(signature)
}
/// Poll the server until the signature has been confirmed by at least `min_confirmed_blocks`
pub fn poll_for_signature_confirmation(
&self,
signature: &Signature,
min_confirmed_blocks: usize,
) -> io::Result<()> {
self.rpc_client
.poll_for_signature_confirmation(signature, min_confirmed_blocks)
}
/// Check a signature in the bank. This method blocks
/// until the server sends a response.
pub fn check_signature(&self, signature: &Signature) -> bool {
@ -236,6 +223,21 @@ impl SyncClient for ThinClient {
let transaction_count = self.rpc_client.get_transaction_count()?;
Ok(transaction_count)
}
/// Poll the server until the signature has been confirmed by at least `min_confirmed_blocks`
fn poll_for_signature_confirmation(
&self,
signature: &Signature,
min_confirmed_blocks: usize,
) -> TransportResult<()> {
Ok(self
.rpc_client
.poll_for_signature_confirmation(signature, min_confirmed_blocks)?)
}
fn poll_for_signature(&self, signature: &Signature) -> TransportResult<()> {
Ok(self.rpc_client.poll_for_signature(signature)?)
}
}
impl AsyncClient for ThinClient {

View File

@ -17,7 +17,7 @@ use solana_sdk::system_transaction;
use solana_sdk::timing::{
duration_as_ms, DEFAULT_TICKS_PER_SLOT, NUM_CONSECUTIVE_LEADER_SLOTS, NUM_TICKS_PER_SECOND,
};
use std::io;
use solana_sdk::transport::TransportError;
use std::thread::sleep;
use std::time::Duration;
@ -202,7 +202,7 @@ pub fn kill_entry_and_spend_and_verify_rest(
);
match sig {
Err(e) => {
result = Err(e);
result = Err(TransportError::IoError(e));
continue;
}
@ -227,7 +227,7 @@ fn poll_all_nodes_for_signature(
cluster_nodes: &[ContactInfo],
sig: &Signature,
confs: usize,
) -> io::Result<()> {
) -> Result<(), TransportError> {
for validator in cluster_nodes {
if validator.id == entry_point_info.id {
continue;

View File

@ -25,6 +25,7 @@ use solana_sdk::hash::{Hash, Hasher};
use solana_sdk::signature::{Keypair, KeypairUtil, Signature};
use solana_sdk::system_transaction;
use solana_sdk::transaction::Transaction;
use solana_sdk::transport::TransportError;
use solana_storage_api::storage_instruction;
use std::fs::File;
use std::io;
@ -405,7 +406,7 @@ impl Replicator {
client: &ThinClient,
keypair: &Keypair,
storage_keypair: &Keypair,
) -> io::Result<()> {
) -> Result<()> {
// make sure replicator has some balance
if client.poll_get_balance(&keypair.pubkey())? == 0 {
Err(io::Error::new(
@ -429,7 +430,14 @@ impl Replicator {
0,
);
let signature = client.async_send_transaction(tx)?;
client.poll_for_signature(&signature)?;
client
.poll_for_signature(&signature)
.map_err(|err| match err {
TransportError::IoError(e) => e,
TransportError::TransactionError(_) => {
io::Error::new(ErrorKind::Other, "signature not found")
}
})?;
}
Ok(())
}

View File

@ -8,12 +8,13 @@ use solana_sdk::signature::Signature;
use solana_sdk::signature::{Keypair, KeypairUtil};
use solana_sdk::system_instruction;
use solana_sdk::transaction::{self, Transaction};
use solana_sdk::transport::Result;
use solana_sdk::transport::{Result, TransportError};
use std::io;
use std::sync::mpsc::{channel, Receiver, Sender};
use std::sync::Arc;
use std::sync::Mutex;
use std::thread::Builder;
use std::thread::{sleep, Builder};
use std::time::{Duration, Instant};
pub struct BankClient {
bank: Arc<Bank>,
@ -112,6 +113,59 @@ impl SyncClient for BankClient {
fn get_transaction_count(&self) -> Result<u64> {
Ok(self.bank.transaction_count())
}
fn poll_for_signature_confirmation(
&self,
signature: &Signature,
min_confirmed_blocks: usize,
) -> Result<()> {
let mut now = Instant::now();
let mut confirmed_blocks = 0;
loop {
let response = self.bank.get_signature_confirmation_status(signature);
if let Some((confirmations, res)) = response {
if res.is_ok() {
if confirmed_blocks != confirmations {
now = Instant::now();
confirmed_blocks = confirmations;
}
if confirmations >= min_confirmed_blocks {
break;
}
}
};
if now.elapsed().as_secs() > 15 {
// TODO: Return a better error.
return Err(TransportError::IoError(io::Error::new(
io::ErrorKind::Other,
"signature not found",
)));
}
sleep(Duration::from_millis(250));
}
Ok(())
}
fn poll_for_signature(&self, signature: &Signature) -> Result<()> {
let now = Instant::now();
loop {
let response = self.bank.get_signature_status(signature);
if let Some(res) = response {
if res.is_ok() {
break;
}
}
if now.elapsed().as_secs() > 15 {
// TODO: Return a better error.
return Err(TransportError::IoError(io::Error::new(
io::ErrorKind::Other,
"signature not found",
)));
}
sleep(Duration::from_millis(250));
}
Ok(())
}
}
impl BankClient {

View File

@ -50,6 +50,16 @@ pub trait SyncClient {
/// Get transaction count
fn get_transaction_count(&self) -> Result<u64>;
/// Poll until the signature has been confirmed by at least `min_confirmed_blocks`
fn poll_for_signature_confirmation(
&self,
signature: &Signature,
min_confirmed_blocks: usize,
) -> Result<()>;
/// Poll to confirm a transaction.
fn poll_for_signature(&self, signature: &Signature) -> Result<()>;
}
pub trait AsyncClient {