//! Nonblocking [`RpcSender`] over HTTP. use { crate::rpc_sender::*, async_trait::async_trait, log::*, reqwest::{ self, header::{self, CONTENT_TYPE, RETRY_AFTER}, StatusCode, }, solana_rpc_client_api::{ client_error::Result, custom_error, error_object::RpcErrorObject, request::{RpcError, RpcRequest, RpcResponseErrorData}, response::RpcSimulateTransactionResult, }, std::{ sync::{ atomic::{AtomicU64, Ordering}, Arc, RwLock, }, time::{Duration, Instant}, }, tokio::time::sleep, }; pub struct HttpSender { client: Arc, url: String, request_id: AtomicU64, stats: RwLock, } /// Nonblocking [`RpcSender`] over HTTP. impl HttpSender { /// Create an HTTP RPC sender. /// /// The URL is an HTTP URL, usually for port 8899, as in /// "http://localhost:8899". The sender has a default timeout of 30 seconds. pub fn new(url: U) -> Self { Self::new_with_timeout(url, Duration::from_secs(30)) } /// Create an HTTP RPC sender. /// /// The URL is an HTTP URL, usually for port 8899. pub fn new_with_timeout(url: U, timeout: Duration) -> Self { Self::new_with_client( url, reqwest::Client::builder() .default_headers(Self::default_headers()) .timeout(timeout) .pool_idle_timeout(timeout) .build() .expect("build rpc client"), ) } /// Create an HTTP RPC sender. /// /// Most flexible way to create a sender. Pass a created `reqwest::Client`. pub fn new_with_client(url: U, client: reqwest::Client) -> Self { Self { client: Arc::new(client), url: url.to_string(), request_id: AtomicU64::new(0), stats: RwLock::new(RpcTransportStats::default()), } } /// Create default headers used by HTTP Sender. pub fn default_headers() -> header::HeaderMap { let mut default_headers = header::HeaderMap::new(); default_headers.append( header::HeaderName::from_static("solana-client"), header::HeaderValue::from_str( format!("rust/{}", solana_version::Version::default()).as_str(), ) .unwrap(), ); default_headers } } struct StatsUpdater<'a> { stats: &'a RwLock, request_start_time: Instant, rate_limited_time: Duration, } impl<'a> StatsUpdater<'a> { fn new(stats: &'a RwLock) -> Self { Self { stats, request_start_time: Instant::now(), rate_limited_time: Duration::default(), } } fn add_rate_limited_time(&mut self, duration: Duration) { self.rate_limited_time += duration; } } impl<'a> Drop for StatsUpdater<'a> { fn drop(&mut self) { let mut stats = self.stats.write().unwrap(); stats.request_count += 1; stats.elapsed_time += Instant::now().duration_since(self.request_start_time); stats.rate_limited_time += self.rate_limited_time; } } #[async_trait] impl RpcSender for HttpSender { fn get_transport_stats(&self) -> RpcTransportStats { self.stats.read().unwrap().clone() } async fn send( &self, request: RpcRequest, params: serde_json::Value, ) -> Result { let mut stats_updater = StatsUpdater::new(&self.stats); let request_id = self.request_id.fetch_add(1, Ordering::Relaxed); let request_json = request.build_request_json(request_id, params).to_string(); let mut too_many_requests_retries = 5; loop { let response = { let client = self.client.clone(); let request_json = request_json.clone(); client .post(&self.url) .header(CONTENT_TYPE, "application/json") .body(request_json) .send() .await }?; if !response.status().is_success() { if response.status() == StatusCode::TOO_MANY_REQUESTS && too_many_requests_retries > 0 { let mut duration = Duration::from_millis(500); if let Some(retry_after) = response.headers().get(RETRY_AFTER) { if let Ok(retry_after) = retry_after.to_str() { if let Ok(retry_after) = retry_after.parse::() { if retry_after < 120 { duration = Duration::from_secs(retry_after); } } } } too_many_requests_retries -= 1; debug!( "Too many requests: server responded with {:?}, {} retries left, pausing for {:?}", response, too_many_requests_retries, duration ); sleep(duration).await; stats_updater.add_rate_limited_time(duration); continue; } return Err(response.error_for_status().unwrap_err().into()); } let mut json = response.json::().await?; if json["error"].is_object() { return match serde_json::from_value::(json["error"].clone()) { Ok(rpc_error_object) => { let data = match rpc_error_object.code { custom_error::JSON_RPC_SERVER_ERROR_SEND_TRANSACTION_PREFLIGHT_FAILURE => { match serde_json::from_value::(json["error"]["data"].clone()) { Ok(data) => RpcResponseErrorData::SendTransactionPreflightFailure(data), Err(err) => { debug!("Failed to deserialize RpcSimulateTransactionResult: {:?}", err); RpcResponseErrorData::Empty } } }, custom_error::JSON_RPC_SERVER_ERROR_NODE_UNHEALTHY => { match serde_json::from_value::(json["error"]["data"].clone()) { Ok(custom_error::NodeUnhealthyErrorData {num_slots_behind}) => RpcResponseErrorData::NodeUnhealthy {num_slots_behind}, Err(_err) => { RpcResponseErrorData::Empty } } }, _ => RpcResponseErrorData::Empty }; Err(RpcError::RpcResponseError { code: rpc_error_object.code, message: rpc_error_object.message, data, } .into()) } Err(err) => Err(RpcError::RpcRequestError(format!( "Failed to deserialize RPC error response: {} [{}]", serde_json::to_string(&json["error"]).unwrap(), err )) .into()), }; } return Ok(json["result"].take()); } } fn url(&self) -> String { self.url.clone() } } #[cfg(test)] mod tests { use super::*; #[tokio::test(flavor = "multi_thread")] async fn http_sender_on_tokio_multi_thread() { let http_sender = HttpSender::new("http://localhost:1234".to_string()); let _ = http_sender .send(RpcRequest::GetVersion, serde_json::Value::Null) .await; } #[tokio::test(flavor = "current_thread")] async fn http_sender_on_tokio_current_thread() { let http_sender = HttpSender::new("http://localhost:1234".to_string()); let _ = http_sender .send(RpcRequest::GetVersion, serde_json::Value::Null) .await; } }