Clean up RPCClient retry handling: only retry on 429, after a little sleep (#10182)

This commit is contained in:
Michael Vines 2020-05-22 08:53:53 -07:00 committed by GitHub
parent c7cdbc98e5
commit 4779858dd4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 48 additions and 122 deletions

View File

@ -4,8 +4,7 @@ use crate::{
rpc_sender::RpcSender,
};
use log::*;
use reqwest::{self, header::CONTENT_TYPE};
use solana_sdk::clock::{DEFAULT_TICKS_PER_SECOND, DEFAULT_TICKS_PER_SLOT};
use reqwest::{self, header::CONTENT_TYPE, StatusCode};
use std::{thread::sleep, time::Duration};
pub struct HttpSender {
@ -29,17 +28,13 @@ impl HttpSender {
}
impl RpcSender for HttpSender {
fn send(
&self,
request: RpcRequest,
params: serde_json::Value,
mut retries: usize,
) -> Result<serde_json::Value> {
fn send(&self, request: RpcRequest, params: serde_json::Value) -> Result<serde_json::Value> {
// Concurrent requests are not supported so reuse the same request id for all requests
let request_id = 1;
let request_json = request.build_request_json(request_id, params);
let mut too_many_requests_retries = 5;
loop {
match self
.client
@ -50,6 +45,19 @@ impl RpcSender for HttpSender {
{
Ok(response) => {
if !response.status().is_success() {
if response.status() == StatusCode::TOO_MANY_REQUESTS
&& too_many_requests_retries > 0
{
too_many_requests_retries -= 1;
debug!(
"Server responded with {:?}, {} retries left",
response, too_many_requests_retries
);
// Sleep for 500ms to give the server a break
sleep(Duration::from_millis(500));
continue;
}
return Err(response.error_for_status().unwrap_err().into());
}
@ -63,17 +71,8 @@ impl RpcSender for HttpSender {
}
return Ok(json["result"].clone());
}
Err(e) => {
info!("{:?} failed, {} retries left: {:?}", request, retries, e);
if retries == 0 {
return Err(e.into());
}
retries -= 1;
// Sleep for approximately half a slot
sleep(Duration::from_millis(
500 * DEFAULT_TICKS_PER_SLOT / DEFAULT_TICKS_PER_SECOND,
));
Err(err) => {
return Err(err.into());
}
}
}

View File

@ -38,12 +38,7 @@ impl MockSender {
}
impl RpcSender for MockSender {
fn send(
&self,
request: RpcRequest,
params: serde_json::Value,
_retries: usize,
) -> Result<serde_json::Value> {
fn send(&self, request: RpcRequest, params: serde_json::Value) -> Result<serde_json::Value> {
if let Some(value) = self.mocks.write().unwrap().remove(&request) {
return Ok(value);
}

View File

@ -98,7 +98,7 @@ impl RpcClient {
let serialized_encoded = bs58::encode(serialize(transaction).unwrap()).into_string();
let signature_base58_str: String =
self.send(RpcRequest::SendTransaction, json!([serialized_encoded]), 5)?;
self.send(RpcRequest::SendTransaction, json!([serialized_encoded]))?;
let signature = signature_base58_str
.parse::<Signature>()
@ -127,7 +127,6 @@ impl RpcClient {
self.send(
RpcRequest::SimulateTransaction,
json!([serialized_encoded, { "sigVerify": sig_verify }]),
0,
)
}
@ -143,7 +142,7 @@ impl RpcClient {
signatures: &[Signature],
) -> RpcResult<Vec<Option<TransactionStatus>>> {
let signatures: Vec<_> = signatures.iter().map(|s| s.to_string()).collect();
self.send(RpcRequest::GetSignatureStatuses, json!([signatures]), 5)
self.send(RpcRequest::GetSignatureStatuses, json!([signatures]))
}
pub fn get_signature_status_with_commitment(
@ -154,7 +153,6 @@ impl RpcClient {
let result: Response<Vec<Option<TransactionStatus>>> = self.send(
RpcRequest::GetSignatureStatuses,
json!([[signature.to_string()]]),
5,
)?;
Ok(result.value[0]
.clone()
@ -173,7 +171,6 @@ impl RpcClient {
json!([[signature.to_string()], {
"searchTransactionHistory": search_transaction_history
}]),
5,
)?;
Ok(result.value[0]
.clone()
@ -189,14 +186,14 @@ impl RpcClient {
&self,
commitment_config: CommitmentConfig,
) -> ClientResult<Slot> {
self.send(RpcRequest::GetSlot, json!([commitment_config]), 0)
self.send(RpcRequest::GetSlot, json!([commitment_config]))
}
pub fn supply_with_commitment(
&self,
commitment_config: CommitmentConfig,
) -> RpcResult<RpcSupply> {
self.send(RpcRequest::GetSupply, json!([commitment_config]), 0)
self.send(RpcRequest::GetSupply, json!([commitment_config]))
}
pub fn total_supply(&self) -> ClientResult<u64> {
@ -207,14 +204,14 @@ impl RpcClient {
&self,
commitment_config: CommitmentConfig,
) -> ClientResult<u64> {
self.send(RpcRequest::GetTotalSupply, json!([commitment_config]), 0)
self.send(RpcRequest::GetTotalSupply, json!([commitment_config]))
}
pub fn get_largest_accounts_with_config(
&self,
config: RpcLargestAccountsConfig,
) -> RpcResult<Vec<RpcAccountBalance>> {
self.send(RpcRequest::GetLargestAccounts, json!([config]), 0)
self.send(RpcRequest::GetLargestAccounts, json!([config]))
}
pub fn get_vote_accounts(&self) -> ClientResult<RpcVoteAccountStatus> {
@ -225,11 +222,11 @@ impl RpcClient {
&self,
commitment_config: CommitmentConfig,
) -> ClientResult<RpcVoteAccountStatus> {
self.send(RpcRequest::GetVoteAccounts, json!([commitment_config]), 0)
self.send(RpcRequest::GetVoteAccounts, json!([commitment_config]))
}
pub fn get_cluster_nodes(&self) -> ClientResult<Vec<RpcContactInfo>> {
self.send(RpcRequest::GetClusterNodes, Value::Null, 0)
self.send(RpcRequest::GetClusterNodes, Value::Null)
}
pub fn get_confirmed_block(&self, slot: Slot) -> ClientResult<ConfirmedBlock> {
@ -241,7 +238,7 @@ impl RpcClient {
slot: Slot,
encoding: TransactionEncoding,
) -> ClientResult<ConfirmedBlock> {
self.send(RpcRequest::GetConfirmedBlock, json!([slot, encoding]), 0)
self.send(RpcRequest::GetConfirmedBlock, json!([slot, encoding]))
}
pub fn get_confirmed_blocks(
@ -252,7 +249,6 @@ impl RpcClient {
self.send(
RpcRequest::GetConfirmedBlocks,
json!([start_slot, end_slot]),
0,
)
}
@ -265,7 +261,6 @@ impl RpcClient {
let signatures_base58_str: Vec<String> = self.send(
RpcRequest::GetConfirmedSignaturesForAddress,
json!([address.to_string(), start_slot, end_slot]),
0,
)?;
let mut signatures = vec![];
@ -287,13 +282,12 @@ impl RpcClient {
self.send(
RpcRequest::GetConfirmedTransaction,
json!([signature.to_string(), encoding]),
0,
)
}
pub fn get_block_time(&self, slot: Slot) -> ClientResult<UnixTimestamp> {
let request = RpcRequest::GetBlockTime;
let response = self.sender.send(request, json!([slot]), 0);
let response = self.sender.send(request, json!([slot]));
response
.map(|result_json| {
@ -316,7 +310,7 @@ impl RpcClient {
&self,
commitment_config: CommitmentConfig,
) -> ClientResult<EpochInfo> {
self.send(RpcRequest::GetEpochInfo, json!([commitment_config]), 0)
self.send(RpcRequest::GetEpochInfo, json!([commitment_config]))
}
pub fn get_leader_schedule(
@ -334,16 +328,15 @@ impl RpcClient {
self.send(
RpcRequest::GetLeaderSchedule,
json!([slot, commitment_config]),
0,
)
}
pub fn get_epoch_schedule(&self) -> ClientResult<EpochSchedule> {
self.send(RpcRequest::GetEpochSchedule, Value::Null, 0)
self.send(RpcRequest::GetEpochSchedule, Value::Null)
}
pub fn get_identity(&self) -> ClientResult<Pubkey> {
let rpc_identity: RpcIdentity = self.send(RpcRequest::GetIdentity, Value::Null, 0)?;
let rpc_identity: RpcIdentity = self.send(RpcRequest::GetIdentity, Value::Null)?;
rpc_identity.identity.parse::<Pubkey>().map_err(|_| {
ClientError::new_with_request(
@ -354,15 +347,15 @@ impl RpcClient {
}
pub fn get_inflation(&self) -> ClientResult<Inflation> {
self.send(RpcRequest::GetInflation, Value::Null, 0)
self.send(RpcRequest::GetInflation, Value::Null)
}
pub fn get_version(&self) -> ClientResult<RpcVersionInfo> {
self.send(RpcRequest::GetVersion, Value::Null, 0)
self.send(RpcRequest::GetVersion, Value::Null)
}
pub fn minimum_ledger_slot(&self) -> ClientResult<Slot> {
self.send(RpcRequest::MinimumLedgerSlot, Value::Null, 0)
self.send(RpcRequest::MinimumLedgerSlot, Value::Null)
}
pub fn send_and_confirm_transaction(
@ -525,7 +518,6 @@ impl RpcClient {
let response = self.sender.send(
RpcRequest::GetAccountInfo,
json!([pubkey.to_string(), commitment_config]),
0,
);
response
@ -562,7 +554,7 @@ impl RpcClient {
let request = RpcRequest::GetMinimumBalanceForRentExemption;
let minimum_balance_json = self
.sender
.send(request, json!([data_len]), 0)
.send(request, json!([data_len]))
.map_err(|err| err.into_with_request(request))?;
let minimum_balance: u64 = serde_json::from_value(minimum_balance_json)
@ -590,16 +582,12 @@ impl RpcClient {
self.send(
RpcRequest::GetBalance,
json!([pubkey.to_string(), commitment_config]),
0,
)
}
pub fn get_program_accounts(&self, pubkey: &Pubkey) -> ClientResult<Vec<(Pubkey, Account)>> {
let accounts: Vec<RpcKeyedAccount> = self.send(
RpcRequest::GetProgramAccounts,
json!([pubkey.to_string()]),
0,
)?;
let accounts: Vec<RpcKeyedAccount> =
self.send(RpcRequest::GetProgramAccounts, json!([pubkey.to_string()]))?;
let mut pubkey_accounts: Vec<(Pubkey, Account)> = Vec::new();
for RpcKeyedAccount { pubkey, account } in accounts.into_iter() {
let pubkey = pubkey.parse().map_err(|_| {
@ -622,11 +610,7 @@ impl RpcClient {
&self,
commitment_config: CommitmentConfig,
) -> ClientResult<u64> {
self.send(
RpcRequest::GetTransactionCount,
json!([commitment_config]),
0,
)
self.send(RpcRequest::GetTransactionCount, json!([commitment_config]))
}
pub fn get_recent_blockhash(&self) -> ClientResult<(Hash, FeeCalculator)> {
@ -649,7 +633,6 @@ impl RpcClient {
} = self.send::<Response<RpcBlockhashFeeCalculator>>(
RpcRequest::GetRecentBlockhash,
json!([commitment_config]),
0,
)?;
let blockhash = blockhash.parse().map_err(|_| {
@ -671,7 +654,6 @@ impl RpcClient {
let Response { value, .. } = self.send::<Response<Option<RpcFeeCalculator>>>(
RpcRequest::GetFeeCalculatorForBlockhash,
json!([blockhash.to_string()]),
0,
)?;
Ok(value.map(|rf| rf.fee_calculator))
@ -681,11 +663,8 @@ impl RpcClient {
let Response {
context,
value: RpcFeeRateGovernor { fee_rate_governor },
} = self.send::<Response<RpcFeeRateGovernor>>(
RpcRequest::GetFeeRateGovernor,
Value::Null,
0,
)?;
} =
self.send::<Response<RpcFeeRateGovernor>>(RpcRequest::GetFeeRateGovernor, Value::Null)?;
Ok(Response {
context,
@ -720,7 +699,7 @@ impl RpcClient {
}
pub fn get_genesis_hash(&self) -> ClientResult<Hash> {
let hash_str: String = self.send(RpcRequest::GetGenesisHash, Value::Null, 0)?;
let hash_str: String = self.send(RpcRequest::GetGenesisHash, Value::Null)?;
let hash = hash_str.parse().map_err(|_| {
ClientError::new_with_request(
RpcError::ParseError("Hash".to_string()).into(),
@ -916,7 +895,6 @@ impl RpcClient {
let result: Response<Vec<Option<TransactionStatus>>> = self.send(
RpcRequest::GetSignatureStatuses,
json!([[signature.to_string()]]),
5,
)?;
let confirmations = result.value[0]
@ -1028,17 +1006,17 @@ impl RpcClient {
}
pub fn validator_exit(&self) -> ClientResult<bool> {
self.send(RpcRequest::ValidatorExit, Value::Null, 0)
self.send(RpcRequest::ValidatorExit, Value::Null)
}
pub fn send<T>(&self, request: RpcRequest, params: Value, retries: usize) -> ClientResult<T>
pub fn send<T>(&self, request: RpcRequest, params: Value) -> ClientResult<T>
where
T: serde::de::DeserializeOwned,
{
assert!(params.is_array() || params.is_null());
let response = self
.sender
.send(request, params, retries)
.send(request, params)
.map_err(|err| err.into_with_request(request))?;
serde_json::from_value(response)
.map_err(|err| ClientError::new_with_request(err.into(), request))
@ -1114,62 +1092,21 @@ mod tests {
.send(
RpcRequest::GetBalance,
json!(["deadbeefXjn8o3yroDHxUtKsZZgoy4GPkPPXfouKNHhx"]),
0,
)
.unwrap();
assert_eq!(balance, 50);
let blockhash: String = rpc_client
.send(RpcRequest::GetRecentBlockhash, Value::Null, 0)
.send(RpcRequest::GetRecentBlockhash, Value::Null)
.unwrap();
assert_eq!(blockhash, "deadbeefXjn8o3yroDHxUtKsZZgoy4GPkPPXfouKNHhx");
// Send erroneous parameter
let blockhash: ClientResult<String> =
rpc_client.send(RpcRequest::GetRecentBlockhash, json!(["parameter"]), 0);
rpc_client.send(RpcRequest::GetRecentBlockhash, json!(["parameter"]));
assert_eq!(blockhash.is_err(), true);
}
#[test]
fn test_retry_send() {
solana_logger::setup();
let (sender, receiver) = channel();
thread::spawn(move || {
// 1. Pick a random port
// 2. Tell the sender to start using it
// 3. Delay for 1.5 seconds before starting the server to ensure the sender will fail
// and need to retry
let rpc_addr: SocketAddr = "0.0.0.0:4242".parse().unwrap();
sender.send(rpc_addr.clone()).unwrap();
sleep(Duration::from_millis(1500));
let mut io = IoHandler::default();
io.add_method("getBalance", move |_params: Params| {
Ok(Value::Number(Number::from(5)))
});
let server = ServerBuilder::new(io)
.threads(1)
.cors(DomainsValidation::AllowOnly(vec![
AccessControlAllowOrigin::Any,
]))
.start_http(&rpc_addr)
.expect("Unable to start RPC server");
server.wait();
});
let rpc_addr = receiver.recv().unwrap();
let rpc_client = RpcClient::new_socket(rpc_addr);
let balance: u64 = rpc_client
.send(
RpcRequest::GetBalance,
json!(["deadbeefXjn8o3yroDHxUtKsZZgoy4GPkPPXfouKNHhw"]),
10,
)
.unwrap();
assert_eq!(balance, 5);
}
#[test]
fn test_send_transaction() {
let rpc_client = RpcClient::new_mock("succeeds".to_string());

View File

@ -1,10 +1,5 @@
use crate::{client_error::Result, rpc_request::RpcRequest};
pub trait RpcSender {
fn send(
&self,
request: RpcRequest,
params: serde_json::Value,
retries: usize,
) -> Result<serde_json::Value>;
fn send(&self, request: RpcRequest, params: serde_json::Value) -> Result<serde_json::Value>;
}