242 lines
8.6 KiB
Rust
242 lines
8.6 KiB
Rust
//! Nonblocking [`RpcSender`] over HTTP.
|
|
|
|
use {
|
|
crate::rpc_sender::*,
|
|
async_trait::async_trait,
|
|
log::*,
|
|
reqwest::{
|
|
self,
|
|
header::{self, CONTENT_TYPE, RETRY_AFTER},
|
|
StatusCode,
|
|
},
|
|
solana_rpc_client_api::{
|
|
client_error::Result,
|
|
custom_error,
|
|
error_object::RpcErrorObject,
|
|
request::{RpcError, RpcRequest, RpcResponseErrorData},
|
|
response::RpcSimulateTransactionResult,
|
|
},
|
|
std::{
|
|
sync::{
|
|
atomic::{AtomicU64, Ordering},
|
|
Arc, RwLock,
|
|
},
|
|
time::{Duration, Instant},
|
|
},
|
|
tokio::time::sleep,
|
|
};
|
|
|
|
pub struct HttpSender {
|
|
client: Arc<reqwest::Client>,
|
|
url: String,
|
|
request_id: AtomicU64,
|
|
stats: RwLock<RpcTransportStats>,
|
|
}
|
|
|
|
/// Nonblocking [`RpcSender`] over HTTP.
|
|
impl HttpSender {
|
|
/// Create an HTTP RPC sender.
|
|
///
|
|
/// The URL is an HTTP URL, usually for port 8899, as in
|
|
/// "http://localhost:8899". The sender has a default timeout of 30 seconds.
|
|
pub fn new<U: ToString>(url: U) -> Self {
|
|
Self::new_with_timeout(url, Duration::from_secs(30))
|
|
}
|
|
|
|
/// Create an HTTP RPC sender.
|
|
///
|
|
/// The URL is an HTTP URL, usually for port 8899.
|
|
pub fn new_with_timeout<U: ToString>(url: U, timeout: Duration) -> Self {
|
|
Self::new_with_client(
|
|
url,
|
|
reqwest::Client::builder()
|
|
.default_headers(Self::default_headers())
|
|
.timeout(timeout)
|
|
.pool_idle_timeout(timeout)
|
|
.build()
|
|
.expect("build rpc client"),
|
|
)
|
|
}
|
|
|
|
/// Create an HTTP RPC sender.
|
|
///
|
|
/// Most flexible way to create a sender. Pass a created `reqwest::Client`.
|
|
pub fn new_with_client<U: ToString>(url: U, client: reqwest::Client) -> Self {
|
|
Self {
|
|
client: Arc::new(client),
|
|
url: url.to_string(),
|
|
request_id: AtomicU64::new(0),
|
|
stats: RwLock::new(RpcTransportStats::default()),
|
|
}
|
|
}
|
|
|
|
/// Create default headers used by HTTP Sender.
|
|
pub fn default_headers() -> header::HeaderMap {
|
|
let mut default_headers = header::HeaderMap::new();
|
|
default_headers.append(
|
|
header::HeaderName::from_static("solana-client"),
|
|
header::HeaderValue::from_str(
|
|
format!("rust/{}", solana_version::Version::default()).as_str(),
|
|
)
|
|
.unwrap(),
|
|
);
|
|
default_headers
|
|
}
|
|
}
|
|
|
|
struct StatsUpdater<'a> {
|
|
stats: &'a RwLock<RpcTransportStats>,
|
|
request_start_time: Instant,
|
|
rate_limited_time: Duration,
|
|
}
|
|
|
|
impl<'a> StatsUpdater<'a> {
|
|
fn new(stats: &'a RwLock<RpcTransportStats>) -> Self {
|
|
Self {
|
|
stats,
|
|
request_start_time: Instant::now(),
|
|
rate_limited_time: Duration::default(),
|
|
}
|
|
}
|
|
|
|
fn add_rate_limited_time(&mut self, duration: Duration) {
|
|
self.rate_limited_time += duration;
|
|
}
|
|
}
|
|
|
|
impl<'a> Drop for StatsUpdater<'a> {
|
|
fn drop(&mut self) {
|
|
let mut stats = self.stats.write().unwrap();
|
|
stats.request_count += 1;
|
|
stats.elapsed_time += Instant::now().duration_since(self.request_start_time);
|
|
stats.rate_limited_time += self.rate_limited_time;
|
|
}
|
|
}
|
|
|
|
#[async_trait]
|
|
impl RpcSender for HttpSender {
|
|
fn get_transport_stats(&self) -> RpcTransportStats {
|
|
self.stats.read().unwrap().clone()
|
|
}
|
|
|
|
async fn send(
|
|
&self,
|
|
request: RpcRequest,
|
|
params: serde_json::Value,
|
|
) -> Result<serde_json::Value> {
|
|
let mut stats_updater = StatsUpdater::new(&self.stats);
|
|
|
|
let request_id = self.request_id.fetch_add(1, Ordering::Relaxed);
|
|
let request_json = request.build_request_json(request_id, params).to_string();
|
|
|
|
let mut too_many_requests_retries = 5;
|
|
loop {
|
|
let response = {
|
|
let client = self.client.clone();
|
|
let request_json = request_json.clone();
|
|
client
|
|
.post(&self.url)
|
|
.header(CONTENT_TYPE, "application/json")
|
|
.body(request_json)
|
|
.send()
|
|
.await
|
|
}?;
|
|
|
|
if !response.status().is_success() {
|
|
if response.status() == StatusCode::TOO_MANY_REQUESTS
|
|
&& too_many_requests_retries > 0
|
|
{
|
|
let mut duration = Duration::from_millis(500);
|
|
if let Some(retry_after) = response.headers().get(RETRY_AFTER) {
|
|
if let Ok(retry_after) = retry_after.to_str() {
|
|
if let Ok(retry_after) = retry_after.parse::<u64>() {
|
|
if retry_after < 120 {
|
|
duration = Duration::from_secs(retry_after);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
too_many_requests_retries -= 1;
|
|
debug!(
|
|
"Too many requests: server responded with {:?}, {} retries left, pausing for {:?}",
|
|
response, too_many_requests_retries, duration
|
|
);
|
|
|
|
sleep(duration).await;
|
|
stats_updater.add_rate_limited_time(duration);
|
|
continue;
|
|
}
|
|
return Err(response.error_for_status().unwrap_err().into());
|
|
}
|
|
|
|
let mut json = response.json::<serde_json::Value>().await?;
|
|
if json["error"].is_object() {
|
|
return match serde_json::from_value::<RpcErrorObject>(json["error"].clone()) {
|
|
Ok(rpc_error_object) => {
|
|
let data = match rpc_error_object.code {
|
|
custom_error::JSON_RPC_SERVER_ERROR_SEND_TRANSACTION_PREFLIGHT_FAILURE => {
|
|
match serde_json::from_value::<RpcSimulateTransactionResult>(json["error"]["data"].clone()) {
|
|
Ok(data) => RpcResponseErrorData::SendTransactionPreflightFailure(data),
|
|
Err(err) => {
|
|
debug!("Failed to deserialize RpcSimulateTransactionResult: {:?}", err);
|
|
RpcResponseErrorData::Empty
|
|
}
|
|
}
|
|
},
|
|
custom_error::JSON_RPC_SERVER_ERROR_NODE_UNHEALTHY => {
|
|
match serde_json::from_value::<custom_error::NodeUnhealthyErrorData>(json["error"]["data"].clone()) {
|
|
Ok(custom_error::NodeUnhealthyErrorData {num_slots_behind}) => RpcResponseErrorData::NodeUnhealthy {num_slots_behind},
|
|
Err(_err) => {
|
|
RpcResponseErrorData::Empty
|
|
}
|
|
}
|
|
},
|
|
_ => RpcResponseErrorData::Empty
|
|
};
|
|
|
|
Err(RpcError::RpcResponseError {
|
|
code: rpc_error_object.code,
|
|
message: rpc_error_object.message,
|
|
data,
|
|
}
|
|
.into())
|
|
}
|
|
Err(err) => Err(RpcError::RpcRequestError(format!(
|
|
"Failed to deserialize RPC error response: {} [{}]",
|
|
serde_json::to_string(&json["error"]).unwrap(),
|
|
err
|
|
))
|
|
.into()),
|
|
};
|
|
}
|
|
return Ok(json["result"].take());
|
|
}
|
|
}
|
|
|
|
fn url(&self) -> String {
|
|
self.url.clone()
|
|
}
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use super::*;
|
|
|
|
#[tokio::test(flavor = "multi_thread")]
|
|
async fn http_sender_on_tokio_multi_thread() {
|
|
let http_sender = HttpSender::new("http://localhost:1234".to_string());
|
|
let _ = http_sender
|
|
.send(RpcRequest::GetVersion, serde_json::Value::Null)
|
|
.await;
|
|
}
|
|
|
|
#[tokio::test(flavor = "current_thread")]
|
|
async fn http_sender_on_tokio_current_thread() {
|
|
let http_sender = HttpSender::new("http://localhost:1234".to_string());
|
|
let _ = http_sender
|
|
.send(RpcRequest::GetVersion, serde_json::Value::Null)
|
|
.await;
|
|
}
|
|
}
|