client: Add retry logic on Pubsub 429s (#19990)

This commit is contained in:
Jon Cinque 2021-09-18 12:40:43 +02:00 committed by GitHub
parent 36f46e1c31
commit e9b066d497
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 40 additions and 5 deletions

View File

@ -23,7 +23,8 @@ use {
mpsc::{channel, Receiver},
Arc, RwLock,
},
thread::JoinHandle,
thread::{sleep, JoinHandle},
time::Duration,
},
thiserror::Error,
tungstenite::{connect, stream::MaybeTlsStream, Message, WebSocket},
@ -165,6 +166,40 @@ pub type SignatureSubscription = (
pub struct PubsubClient {}
fn connect_with_retry(
url: Url,
) -> Result<WebSocket<MaybeTlsStream<TcpStream>>, tungstenite::Error> {
let mut connection_retries = 5;
loop {
let result = connect(url.clone()).map(|(socket, _)| socket);
if let Err(tungstenite::Error::Http(response)) = &result {
if response.status() == reqwest::StatusCode::TOO_MANY_REQUESTS && connection_retries > 0
{
let mut duration = Duration::from_millis(500);
if let Some(retry_after) = response.headers().get(reqwest::header::RETRY_AFTER) {
if let Ok(retry_after) = retry_after.to_str() {
if let Ok(retry_after) = retry_after.parse::<u64>() {
if retry_after < 120 {
duration = Duration::from_secs(retry_after);
}
}
}
}
connection_retries -= 1;
debug!(
"Too many requests: server responded with {:?}, {} retries left, pausing for {:?}",
response, connection_retries, duration
);
sleep(duration);
continue;
}
}
return result;
}
}
impl PubsubClient {
pub fn logs_subscribe(
url: &str,
@ -172,7 +207,7 @@ impl PubsubClient {
config: RpcTransactionLogsConfig,
) -> Result<LogsSubscription, PubsubClientError> {
let url = Url::parse(url)?;
let (socket, _response) = connect(url)?;
let socket = connect_with_retry(url)?;
let (sender, receiver) = channel();
let socket = Arc::new(RwLock::new(socket));
@ -227,7 +262,7 @@ impl PubsubClient {
pub fn slot_subscribe(url: &str) -> Result<SlotsSubscription, PubsubClientError> {
let url = Url::parse(url)?;
let (socket, _response) = connect(url)?;
let socket = connect_with_retry(url)?;
let (sender, receiver) = channel::<SlotInfo>();
let socket = Arc::new(RwLock::new(socket));
@ -283,7 +318,7 @@ impl PubsubClient {
config: Option<RpcSignatureSubscribeConfig>,
) -> Result<SignatureSubscription, PubsubClientError> {
let url = Url::parse(url)?;
let (socket, _response) = connect(url)?;
let socket = connect_with_retry(url)?;
let (sender, receiver) = channel();
let socket = Arc::new(RwLock::new(socket));
@ -349,7 +384,7 @@ impl PubsubClient {
handler: impl Fn(SlotUpdate) + Send + 'static,
) -> Result<PubsubClientSubscription<SlotUpdate>, PubsubClientError> {
let url = Url::parse(url)?;
let (socket, _response) = connect(url)?;
let socket = connect_with_retry(url)?;
let socket = Arc::new(RwLock::new(socket));
let exit = Arc::new(AtomicBool::new(false));