From a4f0d8636a40e4b30b16b8a203ae6c93b1ed5dd7 Mon Sep 17 00:00:00 2001 From: Michael Vines Date: Mon, 5 Apr 2021 09:41:00 -0700 Subject: [PATCH] RpcClient no longer panics in a tokio multi-threaded runtime --- Cargo.lock | 1 + client/Cargo.toml | 1 + client/src/http_sender.rs | 83 ++++++++++++++++++++++++++++++--------- client/src/rpc_client.rs | 15 +++++++ programs/bpf/Cargo.lock | 1 + 5 files changed, 82 insertions(+), 19 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ff02892b1..2ccb43033 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4259,6 +4259,7 @@ dependencies = [ "solana-version", "solana-vote-program", "thiserror", + "tokio 1.1.1", "tungstenite", "url 2.2.0", ] diff --git a/client/Cargo.toml b/client/Cargo.toml index dfdc5bb1a..8dfc3e66e 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -32,6 +32,7 @@ solana-transaction-status = { path = "../transaction-status", version = "=1.7.0" solana-version = { path = "../version", version = "=1.7.0" } solana-vote-program = { path = "../programs/vote", version = "=1.7.0" } thiserror = "1.0" +tokio = { version = "1", features = ["full"] } tungstenite = "0.10.1" url = "2.1.1" diff --git a/client/src/http_sender.rs b/client/src/http_sender.rs index a6f967a66..e80adea68 100644 --- a/client/src/http_sender.rs +++ b/client/src/http_sender.rs @@ -8,12 +8,20 @@ use { }, log::*, reqwest::{self, header::CONTENT_TYPE, StatusCode}, - std::{thread::sleep, time::Duration}, + std::{ + sync::{ + atomic::{AtomicU64, Ordering}, + Arc, + }, + thread::sleep, + time::Duration, + }, }; pub struct HttpSender { - client: reqwest::blocking::Client, + client: Arc, url: String, + request_id: AtomicU64, } impl HttpSender { @@ -22,12 +30,22 @@ impl HttpSender { } pub fn new_with_timeout(url: String, timeout: Duration) -> Self { - let client = reqwest::blocking::Client::builder() - .timeout(timeout) - .build() - .expect("build rpc client"); + // `reqwest::blocking::Client` panics if run in a tokio async context. Shuttle the + // request to a different tokio thread to avoid this + let client = Arc::new( + tokio::task::block_in_place(move || { + reqwest::blocking::Client::builder() + .timeout(timeout) + .build() + }) + .expect("build rpc client"), + ); - Self { client, url } + Self { + client, + url, + request_id: AtomicU64::new(0), + } } } @@ -40,20 +58,26 @@ struct RpcErrorObject { impl RpcSender for HttpSender { fn send(&self, request: RpcRequest, params: serde_json::Value) -> Result { - // Concurrent requests are not supported so reuse the same request id for all requests - let request_id = 1; - - let request_json = request.build_request_json(request_id, params); + let request_id = self.request_id.fetch_add(1, Ordering::Relaxed); + let request_json = request.build_request_json(request_id, params).to_string(); let mut too_many_requests_retries = 5; loop { - match self - .client - .post(&self.url) - .header(CONTENT_TYPE, "application/json") - .body(request_json.to_string()) - .send() - { + // `reqwest::blocking::Client` panics if run in a tokio async context. Shuttle the + // request to a different tokio thread to avoid this + let response = { + let client = self.client.clone(); + let request_json = request_json.clone(); + tokio::task::block_in_place(move || { + client + .post(&self.url) + .header(CONTENT_TYPE, "application/json") + .body(request_json) + .send() + }) + }; + + match response { Ok(response) => { if !response.status().is_success() { if response.status() == StatusCode::TOO_MANY_REQUESTS @@ -72,7 +96,9 @@ impl RpcSender for HttpSender { return Err(response.error_for_status().unwrap_err().into()); } - let json: serde_json::Value = serde_json::from_str(&response.text()?)?; + let response_text = tokio::task::block_in_place(move || response.text())?; + + let json: serde_json::Value = serde_json::from_str(&response_text)?; if json["error"].is_object() { return match serde_json::from_value::(json["error"].clone()) { @@ -122,3 +148,22 @@ impl RpcSender for HttpSender { } } } + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test(flavor = "multi_thread")] + async fn http_sender_on_tokio_multi_thread() { + let http_sender = HttpSender::new("http://localhost:1234".to_string()); + let _ = http_sender.send(RpcRequest::GetVersion, serde_json::Value::Null); + } + + #[tokio::test(flavor = "current_thread")] + #[should_panic(expected = "can call blocking only when running on the multi-threaded runtime")] + async fn http_sender_ontokio_current_thread_should_panic() { + // RpcClient::new() will panic in the tokio current-thread runtime due to `tokio::task::block_in_place()` usage, and there + // doesn't seem to be a way to detect whether the tokio runtime is multi_thread or current_thread... + let _ = HttpSender::new("http://localhost:1234".to_string()); + } +} diff --git a/client/src/rpc_client.rs b/client/src/rpc_client.rs index 6c22d4a22..46afccea2 100644 --- a/client/src/rpc_client.rs +++ b/client/src/rpc_client.rs @@ -1693,6 +1693,21 @@ mod tests { #[test] fn test_send() { + _test_send(); + } + + #[tokio::test(flavor = "current_thread")] + #[should_panic(expected = "can call blocking only when running on the multi-threaded runtime")] + async fn test_send_async_current_thread_should_panic() { + _test_send(); + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_send_async_multi_thread() { + _test_send(); + } + + fn _test_send() { let (sender, receiver) = channel(); thread::spawn(move || { let rpc_addr = "0.0.0.0:0".parse().unwrap(); diff --git a/programs/bpf/Cargo.lock b/programs/bpf/Cargo.lock index eacde4ee1..f5c6edd16 100644 --- a/programs/bpf/Cargo.lock +++ b/programs/bpf/Cargo.lock @@ -2946,6 +2946,7 @@ dependencies = [ "solana-version", "solana-vote-program", "thiserror", + "tokio 1.1.1", "tungstenite", "url", ]