From c498775a3d3bc3b5a6499f866126bc4bb77a6517 Mon Sep 17 00:00:00 2001 From: Michael Vines Date: Sat, 16 Mar 2019 17:17:44 -0700 Subject: [PATCH] Move generic rpc_client functions from wallet/ to client/ --- bench-tps/src/bench.rs | 10 +- bench-tps/src/main.rs | 2 +- client/src/rpc_client.rs | 349 ++++++++++++++++++++++++++++++---- client/src/thin_client.rs | 12 +- core/src/cluster_tests.rs | 6 +- core/src/local_cluster.rs | 6 +- core/src/replicator.rs | 4 +- core/src/storage_stage.rs | 2 +- tests/thin_client.rs | 12 +- wallet/src/wallet.rs | 383 +++----------------------------------- 10 files changed, 363 insertions(+), 423 deletions(-) diff --git a/bench-tps/src/bench.rs b/bench-tps/src/bench.rs index e2bc56e3c..2fa775bf1 100644 --- a/bench-tps/src/bench.rs +++ b/bench-tps/src/bench.rs @@ -119,7 +119,7 @@ pub fn send_barrier_transaction( ); } - *blockhash = barrier_client.get_recent_blockhash(); + *blockhash = barrier_client.get_recent_blockhash().unwrap(); let transaction = SystemTransaction::new_account(&source_keypair, dest_id, 0, *blockhash, 0); @@ -164,7 +164,7 @@ pub fn send_barrier_transaction( exit(1); } - let new_blockhash = barrier_client.get_recent_blockhash(); + let new_blockhash = barrier_client.get_recent_blockhash().unwrap(); if new_blockhash == *blockhash { if poll_count > 0 && poll_count % 8 == 0 { println!("blockhash is not advancing, still at {:?}", *blockhash); @@ -186,7 +186,7 @@ pub fn generate_txs( contact_info: &ContactInfo, ) { let client = create_client(contact_info.client_facing_addr(), FULLNODE_PORT_RANGE); - let blockhash = client.get_recent_blockhash(); + let blockhash = client.get_recent_blockhash().unwrap(); let tx_count = source.len(); println!("Signing transactions... {} (reclaim={})", tx_count, reclaim); let signing_start = Instant::now(); @@ -376,7 +376,7 @@ pub fn fund_keys(client: &ThinClient, source: &Keypair, dests: &[Keypair], lampo to_fund_txs.len(), ); - let blockhash = client.get_recent_blockhash(); + let blockhash = client.get_recent_blockhash().unwrap(); // re-sign retained to_fund_txes with updated blockhash to_fund_txs.par_iter_mut().for_each(|(k, tx)| { @@ -415,7 +415,7 @@ pub fn airdrop_lamports(client: &ThinClient, drone_addr: &SocketAddr, id: &Keypa id.pubkey(), ); - let blockhash = client.get_recent_blockhash(); + let blockhash = client.get_recent_blockhash().unwrap(); match request_airdrop_transaction(&drone_addr, &id.pubkey(), airdrop_amount, blockhash) { Ok(transaction) => { let signature = client.transfer_signed(&transaction).unwrap(); diff --git a/bench-tps/src/main.rs b/bench-tps/src/main.rs index 4207ac4f5..bfed3bf56 100644 --- a/bench-tps/src/main.rs +++ b/bench-tps/src/main.rs @@ -103,7 +103,7 @@ fn main() { airdrop_lamports(&barrier_client, &drone_addr, &barrier_source_keypair, 1); println!("Get last ID..."); - let mut blockhash = client.get_recent_blockhash(); + let mut blockhash = client.get_recent_blockhash().unwrap(); println!("Got last ID {:?}", blockhash); let first_tx_count = client.get_transaction_count().expect("transaction count"); diff --git a/client/src/rpc_client.rs b/client/src/rpc_client.rs index 14cb34d72..9677dfb9f 100644 --- a/client/src/rpc_client.rs +++ b/client/src/rpc_client.rs @@ -2,16 +2,21 @@ use crate::generic_rpc_client_request::GenericRpcClientRequest; use crate::mock_rpc_client_request::MockRpcClientRequest; use crate::rpc_client_request::RpcClientRequest; use crate::rpc_request::RpcRequest; +use crate::rpc_signature_status::RpcSignatureStatus; +use bincode::serialize; use bs58; use log::*; use serde_json::{json, Value}; use solana_sdk::account::Account; use solana_sdk::hash::Hash; use solana_sdk::pubkey::Pubkey; -use solana_sdk::signature::Signature; +use solana_sdk::signature::{Keypair, KeypairUtil, Signature}; +use solana_sdk::timing::{DEFAULT_TICKS_PER_SLOT, NUM_TICKS_PER_SECOND}; +use solana_sdk::transaction::Transaction; use std::error; use std::io; use std::net::SocketAddr; +use std::str::FromStr; use std::thread::sleep; use std::time::{Duration, Instant}; @@ -43,6 +48,175 @@ impl RpcClient { } } + pub fn send_transaction( + &self, + transaction: &Transaction, + ) -> Result> { + let serialized = serialize(transaction).unwrap(); + let params = json!([serialized]); + let signature = self + .client + .send(&RpcRequest::SendTransaction, Some(params), 5)?; + if signature.as_str().is_none() { + Err(io::Error::new( + io::ErrorKind::Other, + "Received result of an unexpected type", + ))?; + } + Ok(signature.as_str().unwrap().to_string()) + } + + pub fn get_signature_status( + &self, + signature: &str, + ) -> Result> { + let params = json!([signature.to_string()]); + let signature_status = + self.client + .send(&RpcRequest::GetSignatureStatus, Some(params), 5)?; + if let Some(status) = signature_status.as_str() { + let rpc_status = RpcSignatureStatus::from_str(status).map_err(|err| { + io::Error::new( + io::ErrorKind::Other, + format!("Unable to parse signature status: {:?}", err), + ) + })?; + Ok(rpc_status) + } else { + Err(io::Error::new( + io::ErrorKind::Other, + "Received result of an unexpected type", + ))? + } + } + + pub fn send_and_confirm_transaction( + &self, + transaction: &mut Transaction, + signer: &T, + ) -> Result> { + let mut send_retries = 5; + loop { + let mut status_retries = 4; + let signature_str = self.send_transaction(transaction)?; + let status = loop { + let status = self.get_signature_status(&signature_str)?; + if status == RpcSignatureStatus::SignatureNotFound { + status_retries -= 1; + if status_retries == 0 { + break status; + } + } else { + break status; + } + if cfg!(not(test)) { + // Retry ~twice during a slot + sleep(Duration::from_millis( + 500 * DEFAULT_TICKS_PER_SLOT / NUM_TICKS_PER_SECOND, + )); + } + }; + match status { + RpcSignatureStatus::AccountInUse | RpcSignatureStatus::SignatureNotFound => { + // Fetch a new blockhash and re-sign the transaction before sending it again + self.resign_transaction(transaction, signer)?; + send_retries -= 1; + } + RpcSignatureStatus::Confirmed => { + return Ok(signature_str); + } + _ => { + send_retries = 0; + } + } + if send_retries == 0 { + Err(io::Error::new( + io::ErrorKind::Other, + format!("Transaction {:?} failed: {:?}", signature_str, status), + ))?; + } + } + } + + pub fn send_and_confirm_transactions( + &self, + mut transactions: Vec, + signer: &Keypair, + ) -> Result<(), Box> { + let mut send_retries = 5; + loop { + let mut status_retries = 4; + + // Send all transactions + let mut transactions_signatures = vec![]; + for transaction in transactions { + if cfg!(not(test)) { + // Delay ~1 tick between write transactions in an attempt to reduce AccountInUse errors + // when all the write transactions modify the same program account (eg, deploying a + // new program) + sleep(Duration::from_millis(1000 / NUM_TICKS_PER_SECOND)); + } + + let signature = self.send_transaction(&transaction).ok(); + transactions_signatures.push((transaction, signature)) + } + + // Collect statuses for all the transactions, drop those that are confirmed + while status_retries > 0 { + status_retries -= 1; + + if cfg!(not(test)) { + // Retry ~twice during a slot + sleep(Duration::from_millis( + 500 * DEFAULT_TICKS_PER_SLOT / NUM_TICKS_PER_SECOND, + )); + } + + transactions_signatures = transactions_signatures + .into_iter() + .filter(|(_transaction, signature)| { + if let Some(signature) = signature { + if let Ok(status) = self.get_signature_status(&signature) { + return status != RpcSignatureStatus::Confirmed; + } + } + true + }) + .collect(); + + if transactions_signatures.is_empty() { + return Ok(()); + } + } + + if send_retries == 0 { + Err(io::Error::new(io::ErrorKind::Other, "Transactions failed"))?; + } + send_retries -= 1; + + // Re-sign any failed transactions with a new blockhash and retry + let blockhash = + self.get_new_blockhash(&transactions_signatures[0].0.recent_blockhash)?; + transactions = transactions_signatures + .into_iter() + .map(|(mut transaction, _)| { + transaction.sign(&[signer], blockhash); + transaction + }) + .collect(); + } + } + + pub fn resign_transaction( + &self, + tx: &mut Transaction, + signer_key: &T, + ) -> Result<(), Box> { + let blockhash = self.get_new_blockhash(&tx.recent_blockhash)?; + tx.sign(&[signer_key], blockhash); + Ok(()) + } + pub fn retry_get_balance( &self, pubkey: &Pubkey, @@ -126,56 +300,50 @@ impl RpcClient { } } - /// Request the last Entry ID from the server without blocking. - /// Returns the blockhash Hash or None if there was no response from the server. - pub fn try_get_recent_blockhash(&self, mut num_retries: u64) -> Option { - loop { - let response = self.client.send(&RpcRequest::GetRecentBlockhash, None, 0); - - match response { + pub fn get_recent_blockhash(&self) -> io::Result { + let mut num_retries = 5; + while num_retries > 0 { + match self.client.send(&RpcRequest::GetRecentBlockhash, None, 0) { Ok(value) => { - let blockhash_str = value.as_str().unwrap(); - let blockhash_vec = bs58::decode(blockhash_str).into_vec().unwrap(); - return Some(Hash::new(&blockhash_vec)); - } - Err(error) => { - debug!("thin_client get_recent_blockhash error: {:?}", error); - num_retries -= 1; - if num_retries == 0 { - return None; + if let Some(blockhash_str) = value.as_str() { + let blockhash_vec = bs58::decode(blockhash_str) + .into_vec() + .expect("bs58::decode"); + return Ok(Hash::new(&blockhash_vec)); } } + Err(err) => { + debug!("retry_get_recent_blockhash failed: {:?}", err); + } } + num_retries -= 1; } + Err(io::Error::new( + io::ErrorKind::Other, + "Unable to get recent blockhash, too many retries", + )) } - /// Request the last Entry ID from the server. This method blocks - /// until the server sends a response. - pub fn get_recent_blockhash(&self) -> Hash { - loop { - if let Some(hash) = self.try_get_recent_blockhash(10) { - return hash; - } - } - } - - /// Request a new last Entry ID from the server. This method blocks - /// until the server sends a response. - pub fn get_next_blockhash(&self, previous_blockhash: &Hash) -> Hash { - self.get_next_blockhash_ext(previous_blockhash, &|| { - sleep(Duration::from_millis(100)); - }) - } - - fn get_next_blockhash_ext(&self, previous_blockhash: &Hash, func: &Fn()) -> Hash { - loop { - let blockhash = self.get_recent_blockhash(); - if blockhash != *previous_blockhash { - break blockhash; + pub fn get_new_blockhash(&self, blockhash: &Hash) -> io::Result { + let mut num_retries = 5; + while num_retries > 0 { + if let Ok(new_blockhash) = self.get_recent_blockhash() { + if new_blockhash != *blockhash { + return Ok(new_blockhash); + } } debug!("Got same blockhash ({:?}), will retry...", blockhash); - func() + + // Retry ~twice during a slot + sleep(Duration::from_millis( + 500 * DEFAULT_TICKS_PER_SLOT / NUM_TICKS_PER_SECOND, + )); + num_retries -= 1; } + Err(io::Error::new( + io::ErrorKind::Other, + "Unable to get next blockhash, too many retries", + )) } pub fn poll_balance_with_timeout( @@ -308,10 +476,13 @@ pub fn get_rpc_request_str(rpc_addr: SocketAddr, tls: bool) -> String { #[cfg(test)] mod tests { use super::*; + use crate::mock_rpc_client_request::{PUBKEY, SIGNATURE}; use jsonrpc_core::{Error, IoHandler, Params}; use jsonrpc_http_server::{AccessControlAllowOrigin, DomainsValidation, ServerBuilder}; use serde_json::Number; use solana_logger; + use solana_sdk::signature::{Keypair, KeypairUtil}; + use solana_sdk::system_transaction::SystemTransaction; use std::sync::mpsc::channel; use std::thread; @@ -409,4 +580,100 @@ mod tests { ); assert_eq!(balance.unwrap().as_u64().unwrap(), 5); } + + #[test] + fn test_send_transaction() { + let rpc_client = RpcClient::new_mock("succeeds".to_string()); + + let key = Keypair::new(); + let to = Keypair::new().pubkey(); + let blockhash = Hash::default(); + let tx = SystemTransaction::new_account(&key, &to, 50, blockhash, 0); + + let signature = rpc_client.send_transaction(&tx); + assert_eq!(signature.unwrap(), SIGNATURE.to_string()); + + let rpc_client = RpcClient::new_mock("fails".to_string()); + + let signature = rpc_client.send_transaction(&tx); + assert!(signature.is_err()); + } + #[test] + fn test_get_recent_blockhash() { + let rpc_client = RpcClient::new_mock("succeeds".to_string()); + + let vec = bs58::decode(PUBKEY).into_vec().unwrap(); + let expected_blockhash = Hash::new(&vec); + + let blockhash = dbg!(rpc_client.get_recent_blockhash()).expect("blockhash ok"); + assert_eq!(blockhash, expected_blockhash); + + let rpc_client = RpcClient::new_mock("fails".to_string()); + + let blockhash = dbg!(rpc_client.get_recent_blockhash()); + assert!(blockhash.is_err()); + } + + #[test] + fn test_get_signature_status() { + let rpc_client = RpcClient::new_mock("succeeds".to_string()); + let signature = "good_signature"; + let status = rpc_client.get_signature_status(&signature); + assert_eq!(status.unwrap(), RpcSignatureStatus::Confirmed); + + let rpc_client = RpcClient::new_mock("bad_sig_status".to_string()); + let signature = "bad_status"; + let status = rpc_client.get_signature_status(&signature); + assert!(status.is_err()); + + let rpc_client = RpcClient::new_mock("fails".to_string()); + let signature = "bad_status_fmt"; + let status = rpc_client.get_signature_status(&signature); + assert!(status.is_err()); + } + + #[test] + fn test_send_and_confirm_transaction() { + let rpc_client = RpcClient::new_mock("succeeds".to_string()); + + let key = Keypair::new(); + let to = Keypair::new().pubkey(); + let blockhash = Hash::default(); + let mut tx = SystemTransaction::new_account(&key, &to, 50, blockhash, 0); + + let result = rpc_client.send_and_confirm_transaction(&mut tx, &key); + result.unwrap(); + + let rpc_client = RpcClient::new_mock("account_in_use".to_string()); + let result = rpc_client.send_and_confirm_transaction(&mut tx, &key); + assert!(result.is_err()); + + let rpc_client = RpcClient::new_mock("fails".to_string()); + let result = rpc_client.send_and_confirm_transaction(&mut tx, &key); + assert!(result.is_err()); + } + + #[test] + fn test_resign_transaction() { + let rpc_client = RpcClient::new_mock("succeeds".to_string()); + + let key = Keypair::new(); + let to = Keypair::new().pubkey(); + let vec = bs58::decode("HUu3LwEzGRsUkuJS121jzkPJW39Kq62pXCTmTa1F9jDL") + .into_vec() + .unwrap(); + let blockhash = Hash::new(&vec); + let prev_tx = SystemTransaction::new_account(&key, &to, 50, blockhash, 0); + let mut tx = SystemTransaction::new_account(&key, &to, 50, blockhash, 0); + + rpc_client.resign_transaction(&mut tx, &key).unwrap(); + + assert_ne!(prev_tx, tx); + assert_ne!(prev_tx.signatures, tx.signatures); + assert_ne!(prev_tx.recent_blockhash, tx.recent_blockhash); + assert_eq!(prev_tx.fee, tx.fee); + assert_eq!(prev_tx.account_keys, tx.account_keys); + assert_eq!(prev_tx.instructions, tx.instructions); + } + } diff --git a/client/src/thin_client.rs b/client/src/thin_client.rs index 5d49eaaf7..a80d8908c 100644 --- a/client/src/thin_client.rs +++ b/client/src/thin_client.rs @@ -81,7 +81,7 @@ impl ThinClient { tries: usize, ) -> io::Result { for x in 0..tries { - transaction.sign(&[keypair], self.get_recent_blockhash()); + transaction.sign(&[keypair], self.get_recent_blockhash()?); let mut buf = vec![0; transaction.serialized_size().unwrap() as usize]; let mut wr = std::io::Cursor::new(&mut buf[..]); serialize_into(&mut wr, &transaction) @@ -111,16 +111,12 @@ impl ThinClient { self.rpc_client.get_transaction_count() } - pub fn try_get_recent_blockhash(&self, num_retries: u64) -> Option { - self.rpc_client.try_get_recent_blockhash(num_retries) - } - - pub fn get_recent_blockhash(&self) -> Hash { + pub fn get_recent_blockhash(&self) -> io::Result { self.rpc_client.get_recent_blockhash() } - pub fn get_next_blockhash(&self, previous_blockhash: &Hash) -> Hash { - self.rpc_client.get_next_blockhash(previous_blockhash) + pub fn get_new_blockhash(&self, blockhash: &Hash) -> io::Result { + self.rpc_client.get_new_blockhash(blockhash) } pub fn poll_balance_with_timeout( diff --git a/core/src/cluster_tests.rs b/core/src/cluster_tests.rs index 817ec8e2f..8b362a174 100644 --- a/core/src/cluster_tests.rs +++ b/core/src/cluster_tests.rs @@ -37,7 +37,7 @@ pub fn spend_and_verify_all_nodes( &funding_keypair, &random_keypair.pubkey(), 1, - client.get_recent_blockhash(), + client.get_recent_blockhash().unwrap(), 0, ); let sig = client @@ -62,7 +62,7 @@ pub fn send_many_transactions(node: &ContactInfo, funding_keypair: &Keypair, num &funding_keypair, &random_keypair.pubkey(), 1, - client.get_recent_blockhash(), + client.get_recent_blockhash().unwrap(), 0, ); client @@ -159,7 +159,7 @@ pub fn kill_entry_and_spend_and_verify_rest( &funding_keypair, &random_keypair.pubkey(), 1, - client.get_recent_blockhash(), + client.get_recent_blockhash().unwrap(), 0, ); diff --git a/core/src/local_cluster.rs b/core/src/local_cluster.rs index b11d75ce0..d24075d13 100644 --- a/core/src/local_cluster.rs +++ b/core/src/local_cluster.rs @@ -197,7 +197,7 @@ impl LocalCluster { lamports: u64, ) -> u64 { trace!("getting leader blockhash"); - let blockhash = client.get_recent_blockhash(); + let blockhash = client.get_recent_blockhash().unwrap(); let mut tx = SystemTransaction::new_account(&source_keypair, dest_pubkey, lamports, blockhash, 0); info!( @@ -228,7 +228,7 @@ impl LocalCluster { let mut transaction = VoteTransaction::new_account( from_account, &vote_account_pubkey, - client.get_recent_blockhash(), + client.get_recent_blockhash().unwrap(), amount, 1, ); @@ -243,7 +243,7 @@ impl LocalCluster { // 2) Set delegate for new vote account let mut transaction = VoteTransaction::delegate_vote_account( vote_account, - client.get_recent_blockhash(), + client.get_recent_blockhash().unwrap(), &delegate_id, 0, ); diff --git a/core/src/replicator.rs b/core/src/replicator.rs index 32db48b74..3b36ac36d 100644 --- a/core/src/replicator.rs +++ b/core/src/replicator.rs @@ -316,7 +316,7 @@ impl Replicator { ); Self::get_airdrop_lamports(&client, &self.keypair, &self.cluster_entrypoint); - let blockhash = client.get_recent_blockhash(); + let blockhash = client.get_recent_blockhash().expect("blockhash"); let mut tx = StorageTransaction::new_mining_proof( &self.keypair, self.hash, @@ -388,7 +388,7 @@ impl Replicator { let airdrop_amount = 1; - let blockhash = client.get_recent_blockhash(); + let blockhash = client.get_recent_blockhash().expect("blockhash"); match request_airdrop_transaction( &drone_addr, &keypair.pubkey(), diff --git a/core/src/storage_stage.rs b/core/src/storage_stage.rs index 948ce1c17..4e99c8dfa 100644 --- a/core/src/storage_stage.rs +++ b/core/src/storage_stage.rs @@ -242,7 +242,7 @@ impl StorageStage { let mut blockhash = None; for _ in 0..10 { - if let Some(new_blockhash) = client.try_get_recent_blockhash(1) { + if let Ok(new_blockhash) = client.get_recent_blockhash() { blockhash = Some(new_blockhash); break; } diff --git a/tests/thin_client.rs b/tests/thin_client.rs index 711751b47..cf3317b1a 100644 --- a/tests/thin_client.rs +++ b/tests/thin_client.rs @@ -48,7 +48,7 @@ fn test_thin_client_basic() { let transaction_count = client.get_transaction_count().unwrap(); assert_eq!(transaction_count, 0); - let blockhash = client.get_recent_blockhash(); + let blockhash = client.get_recent_blockhash().unwrap(); info!("test_thin_client blockhash: {:?}", blockhash); let signature = transfer(&client, 500, &alice, &bob_pubkey, &blockhash).unwrap(); @@ -74,13 +74,13 @@ fn test_bad_sig() { let client = create_client(leader_data.client_facing_addr(), FULLNODE_PORT_RANGE); - let blockhash = client.get_recent_blockhash(); + let blockhash = client.get_recent_blockhash().unwrap(); let tx = SystemTransaction::new_account(&alice, &bob_pubkey, 500, blockhash, 0); let _sig = client.transfer_signed(&tx).unwrap(); - let blockhash = client.get_recent_blockhash(); + let blockhash = client.get_recent_blockhash().unwrap(); let mut tr2 = SystemTransaction::new_account(&alice, &bob_pubkey, 501, blockhash, 0); let mut instruction2 = deserialize(tr2.data(0)).unwrap(); @@ -107,7 +107,7 @@ fn test_register_vote_account() { // Create the validator account, transfer some lamports to that account let validator_keypair = Keypair::new(); - let blockhash = client.get_recent_blockhash(); + let blockhash = client.get_recent_blockhash().unwrap(); let signature = transfer( &client, 500, @@ -122,7 +122,7 @@ fn test_register_vote_account() { // Create and register the vote account let validator_vote_account_keypair = Keypair::new(); let vote_account_id = validator_vote_account_keypair.pubkey(); - let blockhash = client.get_recent_blockhash(); + let blockhash = client.get_recent_blockhash().unwrap(); let transaction = VoteTransaction::new_account(&validator_keypair, &vote_account_id, blockhash, 1, 1); @@ -179,7 +179,7 @@ fn test_zero_balance_after_nonzero() { discover(&leader_data.gossip, 1).unwrap(); let client = create_client(leader_data.client_facing_addr(), FULLNODE_PORT_RANGE); - let blockhash = client.get_recent_blockhash(); + let blockhash = client.get_recent_blockhash().unwrap(); info!("test_thin_client blockhash: {:?}", blockhash); let starting_alice_balance = client.poll_get_balance(&alice.pubkey()).unwrap(); diff --git a/wallet/src/wallet.rs b/wallet/src/wallet.rs index 44487631f..8cfa2bc92 100644 --- a/wallet/src/wallet.rs +++ b/wallet/src/wallet.rs @@ -1,4 +1,3 @@ -use bincode::serialize; use bs58; use chrono::prelude::*; use clap::ArgMatches; @@ -9,7 +8,6 @@ use solana_budget_api; use solana_budget_api::budget_transaction::BudgetTransaction; use solana_client::rpc_client::{get_rpc_request_str, RpcClient}; use solana_client::rpc_request::RpcRequest; -use solana_client::rpc_signature_status::RpcSignatureStatus; #[cfg(not(test))] use solana_drone::drone::request_airdrop_transaction; use solana_drone::drone::DRONE_PORT; @@ -22,16 +20,12 @@ use solana_sdk::pubkey::Pubkey; use solana_sdk::rpc_port::DEFAULT_RPC_PORT; use solana_sdk::signature::{Keypair, KeypairUtil, Signature}; use solana_sdk::system_transaction::SystemTransaction; -use solana_sdk::timing::{DEFAULT_TICKS_PER_SLOT, NUM_TICKS_PER_SECOND}; use solana_sdk::transaction::Transaction; use solana_vote_api::vote_instruction::VoteInstruction; use solana_vote_api::vote_transaction::VoteTransaction; use std::fs::File; use std::io::Read; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; -use std::str::FromStr; -use std::thread::sleep; -use std::time::Duration; use std::{error, fmt, mem}; const USERDATA_CHUNK_SIZE: usize = 256; @@ -439,7 +433,7 @@ fn process_configure_staking( delegate_option: Option, authorized_voter_option: Option, ) -> ProcessResult { - let recent_blockhash = get_recent_blockhash(&rpc_client)?; + let recent_blockhash = rpc_client.get_recent_blockhash()?; let mut ixs = vec![]; if let Some(delegate_id) = delegate_option { ixs.push(VoteInstruction::new_delegate_stake( @@ -455,7 +449,7 @@ fn process_configure_staking( } let mut tx = Transaction::new(ixs); tx.sign(&[&config.id], recent_blockhash); - let signature_str = send_and_confirm_transaction(&rpc_client, &mut tx, &config.id)?; + let signature_str = rpc_client.send_and_confirm_transaction(&mut tx, &config.id)?; Ok(signature_str.to_string()) } @@ -465,10 +459,10 @@ fn process_create_staking( voting_account_id: &Pubkey, lamports: u64, ) -> ProcessResult { - let recent_blockhash = get_recent_blockhash(&rpc_client)?; + let recent_blockhash = rpc_client.get_recent_blockhash()?; let mut tx = VoteTransaction::new_account(&config.id, voting_account_id, recent_blockhash, lamports, 0); - let signature_str = send_and_confirm_transaction(&rpc_client, &mut tx, &config.id)?; + let signature_str = rpc_client.send_and_confirm_transaction(&mut tx, &config.id)?; Ok(signature_str.to_string()) } @@ -486,7 +480,7 @@ fn process_deploy( } } - let blockhash = get_recent_blockhash(&rpc_client)?; + let blockhash = rpc_client.get_recent_blockhash()?; let program_id = Keypair::new(); let mut file = File::open(program_location).map_err(|err| { WalletError::DynamicProgramError( @@ -510,9 +504,11 @@ fn process_deploy( 0, ); trace!("Creating program account"); - send_and_confirm_transaction(&rpc_client, &mut tx, &config.id).map_err(|_| { - WalletError::DynamicProgramError("Program allocate space failed".to_string()) - })?; + rpc_client + .send_and_confirm_transaction(&mut tx, &config.id) + .map_err(|_| { + WalletError::DynamicProgramError("Program allocate space failed".to_string()) + })?; trace!("Writing program data"); let write_transactions: Vec<_> = program_data @@ -529,13 +525,15 @@ fn process_deploy( ) }) .collect(); - send_and_confirm_transactions(&rpc_client, write_transactions, &program_id)?; + rpc_client.send_and_confirm_transactions(write_transactions, &program_id)?; trace!("Finalizing program account"); let mut tx = LoaderTransaction::new_finalize(&program_id, &bpf_loader::id(), blockhash, 0); - send_and_confirm_transaction(&rpc_client, &mut tx, &program_id).map_err(|_| { - WalletError::DynamicProgramError("Program finalize transaction failed".to_string()) - })?; + rpc_client + .send_and_confirm_transaction(&mut tx, &program_id) + .map_err(|_| { + WalletError::DynamicProgramError("Program finalize transaction failed".to_string()) + })?; Ok(json!({ "programId": format!("{}", program_id.pubkey()), @@ -553,11 +551,11 @@ fn process_pay( witnesses: &Option>, cancelable: Option, ) -> ProcessResult { - let blockhash = get_recent_blockhash(&rpc_client)?; + let blockhash = rpc_client.get_recent_blockhash()?; if timestamp == None && *witnesses == None { let mut tx = SystemTransaction::new_move(&config.id, to, lamports, blockhash, 0); - let signature_str = send_and_confirm_transaction(&rpc_client, &mut tx, &config.id)?; + let signature_str = rpc_client.send_and_confirm_transaction(&mut tx, &config.id)?; Ok(signature_str.to_string()) } else if *witnesses == None { let dt = timestamp.unwrap(); @@ -579,7 +577,7 @@ fn process_pay( lamports, blockhash, ); - let signature_str = send_and_confirm_transaction(&rpc_client, &mut tx, &config.id)?; + let signature_str = rpc_client.send_and_confirm_transaction(&mut tx, &config.id)?; Ok(json!({ "signature": signature_str, @@ -587,7 +585,7 @@ fn process_pay( }) .to_string()) } else if timestamp == None { - let blockhash = get_recent_blockhash(&rpc_client)?; + let blockhash = rpc_client.get_recent_blockhash()?; let witness = if let Some(ref witness_vec) = *witnesses { witness_vec[0] @@ -609,7 +607,7 @@ fn process_pay( lamports, blockhash, ); - let signature_str = send_and_confirm_transaction(&rpc_client, &mut tx, &config.id)?; + let signature_str = rpc_client.send_and_confirm_transaction(&mut tx, &config.id)?; Ok(json!({ "signature": signature_str, @@ -622,10 +620,10 @@ fn process_pay( } fn process_cancel(rpc_client: &RpcClient, config: &WalletConfig, pubkey: &Pubkey) -> ProcessResult { - let blockhash = get_recent_blockhash(&rpc_client)?; + let blockhash = rpc_client.get_recent_blockhash()?; let mut tx = BudgetTransaction::new_signature(&config.id, pubkey, &config.id.pubkey(), blockhash); - let signature_str = send_and_confirm_transaction(&rpc_client, &mut tx, &config.id)?; + let signature_str = rpc_client.send_and_confirm_transaction(&mut tx, &config.id)?; Ok(signature_str.to_string()) } @@ -655,10 +653,10 @@ fn process_time_elapsed( request_and_confirm_airdrop(&rpc_client, &drone_addr, &config.id.pubkey(), 1)?; } - let blockhash = get_recent_blockhash(&rpc_client)?; + let blockhash = rpc_client.get_recent_blockhash()?; let mut tx = BudgetTransaction::new_timestamp(&config.id, pubkey, to, dt, blockhash); - let signature_str = send_and_confirm_transaction(&rpc_client, &mut tx, &config.id)?; + let signature_str = rpc_client.send_and_confirm_transaction(&mut tx, &config.id)?; Ok(signature_str.to_string()) } @@ -676,9 +674,9 @@ fn process_witness( request_and_confirm_airdrop(&rpc_client, &drone_addr, &config.id.pubkey(), 1)?; } - let blockhash = get_recent_blockhash(&rpc_client)?; + let blockhash = rpc_client.get_recent_blockhash()?; let mut tx = BudgetTransaction::new_signature(&config.id, pubkey, to, blockhash); - let signature_str = send_and_confirm_transaction(&rpc_client, &mut tx, &config.id)?; + let signature_str = rpc_client.send_and_confirm_transaction(&mut tx, &config.id)?; Ok(signature_str.to_string()) } @@ -772,215 +770,6 @@ pub fn process_command(config: &WalletConfig) -> ProcessResult { } } -fn get_recent_blockhash(rpc_client: &RpcClient) -> Result> { - let result = rpc_client.retry_make_rpc_request(&RpcRequest::GetRecentBlockhash, None, 5)?; - if result.as_str().is_none() { - Err(WalletError::RpcRequestError( - "Received bad blockhash".to_string(), - ))? - } - let blockhash_str = result.as_str().unwrap(); - let blockhash_vec = bs58::decode(blockhash_str) - .into_vec() - .map_err(|_| WalletError::RpcRequestError("Received bad blockhash".to_string()))?; - Ok(Hash::new(&blockhash_vec)) -} - -fn get_next_blockhash( - rpc_client: &RpcClient, - previous_blockhash: &Hash, -) -> Result> { - let mut next_blockhash_retries = 3; - loop { - let next_blockhash = get_recent_blockhash(rpc_client)?; - if cfg!(not(test)) { - if next_blockhash != *previous_blockhash { - return Ok(next_blockhash); - } - } else { - // When using MockRpcClient, get_recent_blockhash() returns a constant value - return Ok(next_blockhash); - } - if next_blockhash_retries == 0 { - Err(WalletError::RpcRequestError( - format!( - "Unable to fetch new blockhash, blockhash stuck at {:?}", - next_blockhash - ) - .to_string(), - ))?; - } - next_blockhash_retries -= 1; - // Retry ~twice during a slot - sleep(Duration::from_millis( - 500 * DEFAULT_TICKS_PER_SLOT / NUM_TICKS_PER_SECOND, - )); - } -} - -fn send_transaction( - rpc_client: &RpcClient, - transaction: &Transaction, -) -> Result> { - let serialized = serialize(transaction).unwrap(); - let params = json!([serialized]); - let signature = - rpc_client.retry_make_rpc_request(&RpcRequest::SendTransaction, Some(params), 5)?; - if signature.as_str().is_none() { - Err(WalletError::RpcRequestError( - "Received result of an unexpected type".to_string(), - ))? - } - Ok(signature.as_str().unwrap().to_string()) -} - -fn confirm_transaction( - rpc_client: &RpcClient, - signature: &str, -) -> Result> { - let params = json!([signature.to_string()]); - let signature_status = - rpc_client.retry_make_rpc_request(&RpcRequest::GetSignatureStatus, Some(params), 5)?; - if let Some(status) = signature_status.as_str() { - let rpc_status = RpcSignatureStatus::from_str(status).map_err(|_| { - WalletError::RpcRequestError("Unable to parse signature status".to_string()) - })?; - Ok(rpc_status) - } else { - Err(WalletError::RpcRequestError( - "Received result of an unexpected type".to_string(), - ))? - } -} - -fn send_and_confirm_transaction( - rpc_client: &RpcClient, - transaction: &mut Transaction, - signer: &T, -) -> Result> { - let mut send_retries = 5; - loop { - let mut status_retries = 4; - let signature_str = send_transaction(rpc_client, transaction)?; - let status = loop { - let status = confirm_transaction(rpc_client, &signature_str)?; - if status == RpcSignatureStatus::SignatureNotFound { - status_retries -= 1; - if status_retries == 0 { - break status; - } - } else { - break status; - } - if cfg!(not(test)) { - // Retry ~twice during a slot - sleep(Duration::from_millis( - 500 * DEFAULT_TICKS_PER_SLOT / NUM_TICKS_PER_SECOND, - )); - } - }; - match status { - RpcSignatureStatus::AccountInUse | RpcSignatureStatus::SignatureNotFound => { - // Fetch a new blockhash and re-sign the transaction before sending it again - resign_transaction(rpc_client, transaction, signer)?; - send_retries -= 1; - } - RpcSignatureStatus::Confirmed => { - return Ok(signature_str); - } - _ => { - send_retries = 0; - } - } - if send_retries == 0 { - Err(WalletError::RpcRequestError(format!( - "Transaction {:?} failed: {:?}", - signature_str, status - )))?; - } - } -} - -fn send_and_confirm_transactions( - rpc_client: &RpcClient, - mut transactions: Vec, - signer: &Keypair, -) -> Result<(), Box> { - let mut send_retries = 5; - loop { - let mut status_retries = 4; - - // Send all transactions - let mut transactions_signatures = vec![]; - for transaction in transactions { - if cfg!(not(test)) { - // Delay ~1 tick between write transactions in an attempt to reduce AccountInUse errors - // since all the write transactions modify the same program account - sleep(Duration::from_millis(1000 / NUM_TICKS_PER_SECOND)); - } - - let signature = send_transaction(&rpc_client, &transaction).ok(); - transactions_signatures.push((transaction, signature)) - } - - // Collect statuses for all the transactions, drop those that are confirmed - while status_retries > 0 { - status_retries -= 1; - - if cfg!(not(test)) { - // Retry ~twice during a slot - sleep(Duration::from_millis( - 500 * DEFAULT_TICKS_PER_SLOT / NUM_TICKS_PER_SECOND, - )); - } - - transactions_signatures = transactions_signatures - .into_iter() - .filter(|(_transaction, signature)| { - if let Some(signature) = signature { - if let Ok(status) = confirm_transaction(rpc_client, &signature) { - return status != RpcSignatureStatus::Confirmed; - } - } - true - }) - .collect(); - - if transactions_signatures.is_empty() { - return Ok(()); - } - } - - if send_retries == 0 { - Err(WalletError::RpcRequestError( - "Transactions failed".to_string(), - ))?; - } - send_retries -= 1; - - // Re-sign any failed transactions with a new blockhash and retry - let blockhash = - get_next_blockhash(rpc_client, &transactions_signatures[0].0.recent_blockhash)?; - transactions = transactions_signatures - .into_iter() - .map(|(mut transaction, _)| { - transaction.sign(&[signer], blockhash); - transaction - }) - .collect(); - } -} - -fn resign_transaction( - rpc_client: &RpcClient, - tx: &mut Transaction, - signer_key: &T, -) -> Result<(), Box> { - let blockhash = get_next_blockhash(rpc_client, &tx.recent_blockhash)?; - tx.sign(&[signer_key], blockhash); - Ok(()) -} - // Quick and dirty Keypair that assumes the client will do retries but not update the // blockhash. If the client updates the blockhash, the signature will be invalid. // TODO: Parse `msg` and use that data to make a new airdrop request. @@ -1025,10 +814,10 @@ pub fn request_and_confirm_airdrop( to_pubkey: &Pubkey, lamports: u64, ) -> Result<(), Box> { - let blockhash = get_recent_blockhash(rpc_client)?; + let blockhash = rpc_client.get_recent_blockhash()?; let keypair = DroneKeypair::new_keypair(drone_addr, to_pubkey, lamports, blockhash)?; let mut tx = keypair.airdrop_transaction(); - send_and_confirm_transaction(rpc_client, &mut tx, &keypair)?; + rpc_client.send_and_confirm_transaction(&mut tx, &keypair)?; Ok(()) } @@ -1037,8 +826,7 @@ mod tests { use super::*; use clap::{App, Arg, ArgGroup, SubCommand}; use serde_json::Value; - use solana::socketaddr; - use solana_client::mock_rpc_client_request::{PUBKEY, SIGNATURE}; + use solana_client::mock_rpc_client_request::SIGNATURE; use solana_sdk::signature::{gen_keypair_file, read_keypair, read_pkcs8, Keypair, KeypairUtil}; use std::fs; use std::net::{Ipv4Addr, SocketAddr}; @@ -1705,115 +1493,4 @@ mod tests { fs::remove_file(&outfile).unwrap(); assert!(!Path::new(&outfile).exists()); } - - #[test] - fn test_wallet_get_recent_blockhash() { - let rpc_client = RpcClient::new_mock("succeeds".to_string()); - - let vec = bs58::decode(PUBKEY).into_vec().unwrap(); - let expected_blockhash = Hash::new(&vec); - - let blockhash = get_recent_blockhash(&rpc_client); - assert_eq!(blockhash.unwrap(), expected_blockhash); - - let rpc_client = RpcClient::new_mock("fails".to_string()); - - let blockhash = get_recent_blockhash(&rpc_client); - assert!(blockhash.is_err()); - } - - #[test] - fn test_wallet_send_transaction() { - let rpc_client = RpcClient::new_mock("succeeds".to_string()); - - let key = Keypair::new(); - let to = Keypair::new().pubkey(); - let blockhash = Hash::default(); - let tx = SystemTransaction::new_account(&key, &to, 50, blockhash, 0); - - let signature = send_transaction(&rpc_client, &tx); - assert_eq!(signature.unwrap(), SIGNATURE.to_string()); - - let rpc_client = RpcClient::new_mock("fails".to_string()); - - let signature = send_transaction(&rpc_client, &tx); - assert!(signature.is_err()); - } - - #[test] - fn test_wallet_confirm_transaction() { - let rpc_client = RpcClient::new_mock("succeeds".to_string()); - let signature = "good_signature"; - let status = confirm_transaction(&rpc_client, &signature); - assert_eq!(status.unwrap(), RpcSignatureStatus::Confirmed); - - let rpc_client = RpcClient::new_mock("bad_sig_status".to_string()); - let signature = "bad_status"; - let status = confirm_transaction(&rpc_client, &signature); - assert!(status.is_err()); - - let rpc_client = RpcClient::new_mock("fails".to_string()); - let signature = "bad_status_fmt"; - let status = confirm_transaction(&rpc_client, &signature); - assert!(status.is_err()); - } - - #[test] - fn test_wallet_send_and_confirm_transaction() { - let rpc_client = RpcClient::new_mock("succeeds".to_string()); - - let key = Keypair::new(); - let to = Keypair::new().pubkey(); - let blockhash = Hash::default(); - let mut tx = SystemTransaction::new_account(&key, &to, 50, blockhash, 0); - - let result = send_and_confirm_transaction(&rpc_client, &mut tx, &key); - result.unwrap(); - - let rpc_client = RpcClient::new_mock("account_in_use".to_string()); - let result = send_and_confirm_transaction(&rpc_client, &mut tx, &key); - assert!(result.is_err()); - - let rpc_client = RpcClient::new_mock("fails".to_string()); - let result = send_and_confirm_transaction(&rpc_client, &mut tx, &key); - assert!(result.is_err()); - } - - #[test] - fn test_wallet_resign_transaction() { - let rpc_client = RpcClient::new_mock("succeeds".to_string()); - - let key = Keypair::new(); - let to = Keypair::new().pubkey(); - let vec = bs58::decode("HUu3LwEzGRsUkuJS121jzkPJW39Kq62pXCTmTa1F9jDL") - .into_vec() - .unwrap(); - let blockhash = Hash::new(&vec); - let prev_tx = SystemTransaction::new_account(&key, &to, 50, blockhash, 0); - let mut tx = SystemTransaction::new_account(&key, &to, 50, blockhash, 0); - - resign_transaction(&rpc_client, &mut tx, &key).unwrap(); - - assert_ne!(prev_tx, tx); - assert_ne!(prev_tx.signatures, tx.signatures); - assert_ne!(prev_tx.recent_blockhash, tx.recent_blockhash); - assert_eq!(prev_tx.fee, tx.fee); - assert_eq!(prev_tx.account_keys, tx.account_keys); - assert_eq!(prev_tx.instructions, tx.instructions); - } - - #[test] - fn test_request_and_confirm_airdrop() { - let rpc_client = RpcClient::new_mock("succeeds".to_string()); - let drone_addr = socketaddr!(0, 0); - let pubkey = Keypair::new().pubkey(); - let lamports = 50; - assert_eq!( - request_and_confirm_airdrop(&rpc_client, &drone_addr, &pubkey, lamports).unwrap(), - () - ); - - let lamports = 0; - assert!(request_and_confirm_airdrop(&rpc_client, &drone_addr, &pubkey, lamports).is_err()); - } }