diff --git a/client/src/http_sender.rs b/client/src/http_sender.rs index e5eff5d20..ea79d7333 100644 --- a/client/src/http_sender.rs +++ b/client/src/http_sender.rs @@ -4,8 +4,7 @@ use crate::{ rpc_sender::RpcSender, }; use log::*; -use reqwest::{self, header::CONTENT_TYPE}; -use solana_sdk::clock::{DEFAULT_TICKS_PER_SECOND, DEFAULT_TICKS_PER_SLOT}; +use reqwest::{self, header::CONTENT_TYPE, StatusCode}; use std::{thread::sleep, time::Duration}; pub struct HttpSender { @@ -29,17 +28,13 @@ impl HttpSender { } impl RpcSender for HttpSender { - fn send( - &self, - request: RpcRequest, - params: serde_json::Value, - mut retries: usize, - ) -> Result { + fn send(&self, request: RpcRequest, params: serde_json::Value) -> Result { // Concurrent requests are not supported so reuse the same request id for all requests let request_id = 1; let request_json = request.build_request_json(request_id, params); + let mut too_many_requests_retries = 5; loop { match self .client @@ -50,6 +45,19 @@ impl RpcSender for HttpSender { { Ok(response) => { if !response.status().is_success() { + if response.status() == StatusCode::TOO_MANY_REQUESTS + && too_many_requests_retries > 0 + { + too_many_requests_retries -= 1; + debug!( + "Server responded with {:?}, {} retries left", + response, too_many_requests_retries + ); + + // Sleep for 500ms to give the server a break + sleep(Duration::from_millis(500)); + continue; + } return Err(response.error_for_status().unwrap_err().into()); } @@ -63,17 +71,8 @@ impl RpcSender for HttpSender { } return Ok(json["result"].clone()); } - Err(e) => { - info!("{:?} failed, {} retries left: {:?}", request, retries, e); - if retries == 0 { - return Err(e.into()); - } - retries -= 1; - - // Sleep for approximately half a slot - sleep(Duration::from_millis( - 500 * DEFAULT_TICKS_PER_SLOT / DEFAULT_TICKS_PER_SECOND, - )); + Err(err) => { + return Err(err.into()); } } } diff --git a/client/src/mock_sender.rs b/client/src/mock_sender.rs index a3c33b156..aa6e570bf 100644 --- a/client/src/mock_sender.rs +++ b/client/src/mock_sender.rs @@ -38,12 +38,7 @@ impl MockSender { } impl RpcSender for MockSender { - fn send( - &self, - request: RpcRequest, - params: serde_json::Value, - _retries: usize, - ) -> Result { + fn send(&self, request: RpcRequest, params: serde_json::Value) -> Result { if let Some(value) = self.mocks.write().unwrap().remove(&request) { return Ok(value); } diff --git a/client/src/rpc_client.rs b/client/src/rpc_client.rs index 9fc267d26..a93a81d61 100644 --- a/client/src/rpc_client.rs +++ b/client/src/rpc_client.rs @@ -98,7 +98,7 @@ impl RpcClient { let serialized_encoded = bs58::encode(serialize(transaction).unwrap()).into_string(); let signature_base58_str: String = - self.send(RpcRequest::SendTransaction, json!([serialized_encoded]), 5)?; + self.send(RpcRequest::SendTransaction, json!([serialized_encoded]))?; let signature = signature_base58_str .parse::() @@ -127,7 +127,6 @@ impl RpcClient { self.send( RpcRequest::SimulateTransaction, json!([serialized_encoded, { "sigVerify": sig_verify }]), - 0, ) } @@ -143,7 +142,7 @@ impl RpcClient { signatures: &[Signature], ) -> RpcResult>> { let signatures: Vec<_> = signatures.iter().map(|s| s.to_string()).collect(); - self.send(RpcRequest::GetSignatureStatuses, json!([signatures]), 5) + self.send(RpcRequest::GetSignatureStatuses, json!([signatures])) } pub fn get_signature_status_with_commitment( @@ -154,7 +153,6 @@ impl RpcClient { let result: Response>> = self.send( RpcRequest::GetSignatureStatuses, json!([[signature.to_string()]]), - 5, )?; Ok(result.value[0] .clone() @@ -173,7 +171,6 @@ impl RpcClient { json!([[signature.to_string()], { "searchTransactionHistory": search_transaction_history }]), - 5, )?; Ok(result.value[0] .clone() @@ -189,14 +186,14 @@ impl RpcClient { &self, commitment_config: CommitmentConfig, ) -> ClientResult { - self.send(RpcRequest::GetSlot, json!([commitment_config]), 0) + self.send(RpcRequest::GetSlot, json!([commitment_config])) } pub fn supply_with_commitment( &self, commitment_config: CommitmentConfig, ) -> RpcResult { - self.send(RpcRequest::GetSupply, json!([commitment_config]), 0) + self.send(RpcRequest::GetSupply, json!([commitment_config])) } pub fn total_supply(&self) -> ClientResult { @@ -207,14 +204,14 @@ impl RpcClient { &self, commitment_config: CommitmentConfig, ) -> ClientResult { - self.send(RpcRequest::GetTotalSupply, json!([commitment_config]), 0) + self.send(RpcRequest::GetTotalSupply, json!([commitment_config])) } pub fn get_largest_accounts_with_config( &self, config: RpcLargestAccountsConfig, ) -> RpcResult> { - self.send(RpcRequest::GetLargestAccounts, json!([config]), 0) + self.send(RpcRequest::GetLargestAccounts, json!([config])) } pub fn get_vote_accounts(&self) -> ClientResult { @@ -225,11 +222,11 @@ impl RpcClient { &self, commitment_config: CommitmentConfig, ) -> ClientResult { - self.send(RpcRequest::GetVoteAccounts, json!([commitment_config]), 0) + self.send(RpcRequest::GetVoteAccounts, json!([commitment_config])) } pub fn get_cluster_nodes(&self) -> ClientResult> { - self.send(RpcRequest::GetClusterNodes, Value::Null, 0) + self.send(RpcRequest::GetClusterNodes, Value::Null) } pub fn get_confirmed_block(&self, slot: Slot) -> ClientResult { @@ -241,7 +238,7 @@ impl RpcClient { slot: Slot, encoding: TransactionEncoding, ) -> ClientResult { - self.send(RpcRequest::GetConfirmedBlock, json!([slot, encoding]), 0) + self.send(RpcRequest::GetConfirmedBlock, json!([slot, encoding])) } pub fn get_confirmed_blocks( @@ -252,7 +249,6 @@ impl RpcClient { self.send( RpcRequest::GetConfirmedBlocks, json!([start_slot, end_slot]), - 0, ) } @@ -265,7 +261,6 @@ impl RpcClient { let signatures_base58_str: Vec = self.send( RpcRequest::GetConfirmedSignaturesForAddress, json!([address.to_string(), start_slot, end_slot]), - 0, )?; let mut signatures = vec![]; @@ -287,13 +282,12 @@ impl RpcClient { self.send( RpcRequest::GetConfirmedTransaction, json!([signature.to_string(), encoding]), - 0, ) } pub fn get_block_time(&self, slot: Slot) -> ClientResult { let request = RpcRequest::GetBlockTime; - let response = self.sender.send(request, json!([slot]), 0); + let response = self.sender.send(request, json!([slot])); response .map(|result_json| { @@ -316,7 +310,7 @@ impl RpcClient { &self, commitment_config: CommitmentConfig, ) -> ClientResult { - self.send(RpcRequest::GetEpochInfo, json!([commitment_config]), 0) + self.send(RpcRequest::GetEpochInfo, json!([commitment_config])) } pub fn get_leader_schedule( @@ -334,16 +328,15 @@ impl RpcClient { self.send( RpcRequest::GetLeaderSchedule, json!([slot, commitment_config]), - 0, ) } pub fn get_epoch_schedule(&self) -> ClientResult { - self.send(RpcRequest::GetEpochSchedule, Value::Null, 0) + self.send(RpcRequest::GetEpochSchedule, Value::Null) } pub fn get_identity(&self) -> ClientResult { - let rpc_identity: RpcIdentity = self.send(RpcRequest::GetIdentity, Value::Null, 0)?; + let rpc_identity: RpcIdentity = self.send(RpcRequest::GetIdentity, Value::Null)?; rpc_identity.identity.parse::().map_err(|_| { ClientError::new_with_request( @@ -354,15 +347,15 @@ impl RpcClient { } pub fn get_inflation(&self) -> ClientResult { - self.send(RpcRequest::GetInflation, Value::Null, 0) + self.send(RpcRequest::GetInflation, Value::Null) } pub fn get_version(&self) -> ClientResult { - self.send(RpcRequest::GetVersion, Value::Null, 0) + self.send(RpcRequest::GetVersion, Value::Null) } pub fn minimum_ledger_slot(&self) -> ClientResult { - self.send(RpcRequest::MinimumLedgerSlot, Value::Null, 0) + self.send(RpcRequest::MinimumLedgerSlot, Value::Null) } pub fn send_and_confirm_transaction( @@ -525,7 +518,6 @@ impl RpcClient { let response = self.sender.send( RpcRequest::GetAccountInfo, json!([pubkey.to_string(), commitment_config]), - 0, ); response @@ -562,7 +554,7 @@ impl RpcClient { let request = RpcRequest::GetMinimumBalanceForRentExemption; let minimum_balance_json = self .sender - .send(request, json!([data_len]), 0) + .send(request, json!([data_len])) .map_err(|err| err.into_with_request(request))?; let minimum_balance: u64 = serde_json::from_value(minimum_balance_json) @@ -590,16 +582,12 @@ impl RpcClient { self.send( RpcRequest::GetBalance, json!([pubkey.to_string(), commitment_config]), - 0, ) } pub fn get_program_accounts(&self, pubkey: &Pubkey) -> ClientResult> { - let accounts: Vec = self.send( - RpcRequest::GetProgramAccounts, - json!([pubkey.to_string()]), - 0, - )?; + let accounts: Vec = + self.send(RpcRequest::GetProgramAccounts, json!([pubkey.to_string()]))?; let mut pubkey_accounts: Vec<(Pubkey, Account)> = Vec::new(); for RpcKeyedAccount { pubkey, account } in accounts.into_iter() { let pubkey = pubkey.parse().map_err(|_| { @@ -622,11 +610,7 @@ impl RpcClient { &self, commitment_config: CommitmentConfig, ) -> ClientResult { - self.send( - RpcRequest::GetTransactionCount, - json!([commitment_config]), - 0, - ) + self.send(RpcRequest::GetTransactionCount, json!([commitment_config])) } pub fn get_recent_blockhash(&self) -> ClientResult<(Hash, FeeCalculator)> { @@ -649,7 +633,6 @@ impl RpcClient { } = self.send::>( RpcRequest::GetRecentBlockhash, json!([commitment_config]), - 0, )?; let blockhash = blockhash.parse().map_err(|_| { @@ -671,7 +654,6 @@ impl RpcClient { let Response { value, .. } = self.send::>>( RpcRequest::GetFeeCalculatorForBlockhash, json!([blockhash.to_string()]), - 0, )?; Ok(value.map(|rf| rf.fee_calculator)) @@ -681,11 +663,8 @@ impl RpcClient { let Response { context, value: RpcFeeRateGovernor { fee_rate_governor }, - } = self.send::>( - RpcRequest::GetFeeRateGovernor, - Value::Null, - 0, - )?; + } = + self.send::>(RpcRequest::GetFeeRateGovernor, Value::Null)?; Ok(Response { context, @@ -720,7 +699,7 @@ impl RpcClient { } pub fn get_genesis_hash(&self) -> ClientResult { - let hash_str: String = self.send(RpcRequest::GetGenesisHash, Value::Null, 0)?; + let hash_str: String = self.send(RpcRequest::GetGenesisHash, Value::Null)?; let hash = hash_str.parse().map_err(|_| { ClientError::new_with_request( RpcError::ParseError("Hash".to_string()).into(), @@ -916,7 +895,6 @@ impl RpcClient { let result: Response>> = self.send( RpcRequest::GetSignatureStatuses, json!([[signature.to_string()]]), - 5, )?; let confirmations = result.value[0] @@ -1028,17 +1006,17 @@ impl RpcClient { } pub fn validator_exit(&self) -> ClientResult { - self.send(RpcRequest::ValidatorExit, Value::Null, 0) + self.send(RpcRequest::ValidatorExit, Value::Null) } - pub fn send(&self, request: RpcRequest, params: Value, retries: usize) -> ClientResult + pub fn send(&self, request: RpcRequest, params: Value) -> ClientResult where T: serde::de::DeserializeOwned, { assert!(params.is_array() || params.is_null()); let response = self .sender - .send(request, params, retries) + .send(request, params) .map_err(|err| err.into_with_request(request))?; serde_json::from_value(response) .map_err(|err| ClientError::new_with_request(err.into(), request)) @@ -1114,62 +1092,21 @@ mod tests { .send( RpcRequest::GetBalance, json!(["deadbeefXjn8o3yroDHxUtKsZZgoy4GPkPPXfouKNHhx"]), - 0, ) .unwrap(); assert_eq!(balance, 50); let blockhash: String = rpc_client - .send(RpcRequest::GetRecentBlockhash, Value::Null, 0) + .send(RpcRequest::GetRecentBlockhash, Value::Null) .unwrap(); assert_eq!(blockhash, "deadbeefXjn8o3yroDHxUtKsZZgoy4GPkPPXfouKNHhx"); // Send erroneous parameter let blockhash: ClientResult = - rpc_client.send(RpcRequest::GetRecentBlockhash, json!(["parameter"]), 0); + rpc_client.send(RpcRequest::GetRecentBlockhash, json!(["parameter"])); assert_eq!(blockhash.is_err(), true); } - #[test] - fn test_retry_send() { - solana_logger::setup(); - let (sender, receiver) = channel(); - thread::spawn(move || { - // 1. Pick a random port - // 2. Tell the sender to start using it - // 3. Delay for 1.5 seconds before starting the server to ensure the sender will fail - // and need to retry - let rpc_addr: SocketAddr = "0.0.0.0:4242".parse().unwrap(); - sender.send(rpc_addr.clone()).unwrap(); - sleep(Duration::from_millis(1500)); - - let mut io = IoHandler::default(); - io.add_method("getBalance", move |_params: Params| { - Ok(Value::Number(Number::from(5))) - }); - let server = ServerBuilder::new(io) - .threads(1) - .cors(DomainsValidation::AllowOnly(vec![ - AccessControlAllowOrigin::Any, - ])) - .start_http(&rpc_addr) - .expect("Unable to start RPC server"); - server.wait(); - }); - - let rpc_addr = receiver.recv().unwrap(); - let rpc_client = RpcClient::new_socket(rpc_addr); - - let balance: u64 = rpc_client - .send( - RpcRequest::GetBalance, - json!(["deadbeefXjn8o3yroDHxUtKsZZgoy4GPkPPXfouKNHhw"]), - 10, - ) - .unwrap(); - assert_eq!(balance, 5); - } - #[test] fn test_send_transaction() { let rpc_client = RpcClient::new_mock("succeeds".to_string()); diff --git a/client/src/rpc_sender.rs b/client/src/rpc_sender.rs index 1f87663a6..6574637b0 100644 --- a/client/src/rpc_sender.rs +++ b/client/src/rpc_sender.rs @@ -1,10 +1,5 @@ use crate::{client_error::Result, rpc_request::RpcRequest}; pub trait RpcSender { - fn send( - &self, - request: RpcRequest, - params: serde_json::Value, - retries: usize, - ) -> Result; + fn send(&self, request: RpcRequest, params: serde_json::Value) -> Result; }