From 3d077fb656c8a729ee86c27716c0f53f6a07c804 Mon Sep 17 00:00:00 2001 From: Michael Vines Date: Wed, 30 Dec 2020 20:55:41 -0800 Subject: [PATCH] Revert "Upgrade in-tree tokio 0.2 usage to tokio 0.3" This reverts commit 444ed768dcacb073607a43401a3674c7b6061552. --- Cargo.lock | 57 +++++++++++++++++------------ core/Cargo.toml | 2 +- core/src/bigtable_upload_service.rs | 8 ++-- core/src/rpc.rs | 38 +++++++++---------- core/src/rpc_service.rs | 21 ++++++----- ledger-tool/Cargo.toml | 2 +- ledger-tool/src/bigtable.rs | 2 +- ledger/Cargo.toml | 2 +- ledger/src/bigtable_upload.rs | 3 +- 9 files changed, 75 insertions(+), 60 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index fc9b97648..29664a7e9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2228,6 +2228,18 @@ dependencies = [ "slab", ] +[[package]] +name = "mio-named-pipes" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0840c1c50fd55e521b247f949c241c9997709f23bd7f023b9762cd561e935656" +dependencies = [ + "log 0.4.11", + "mio 0.6.22", + "miow 0.3.6", + "winapi 0.3.8", +] + [[package]] name = "mio-uds" version = "0.6.8" @@ -2705,12 +2717,6 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f7505eeebd78492e0f6108f7171c4948dbb120ee8119d9d77d0afa5469bef67f" -[[package]] -name = "pin-project-lite" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6b063f57ec186e6140e2b8b6921e5f1bd89c7356dda5b33acc5401203ca6131c" - [[package]] name = "pin-utils" version = "0.1.0" @@ -3164,7 +3170,7 @@ dependencies = [ "mime_guess", "native-tls", "percent-encoding 2.1.0", - "pin-project-lite 0.1.5", + "pin-project-lite", "rustls", "serde", "serde_json", @@ -3687,7 +3693,7 @@ dependencies = [ "solana-runtime", "solana-sdk", "tarpc", - "tokio 0.3.5", + "tokio 0.3.2", "tokio-serde", ] @@ -3699,7 +3705,7 @@ dependencies = [ "serde", "solana-sdk", "tarpc", - "tokio 0.3.5", + "tokio 0.3.2", ] [[package]] @@ -3715,7 +3721,7 @@ dependencies = [ "solana-runtime", "solana-sdk", "tarpc", - "tokio 0.3.5", + "tokio 0.3.2", "tokio-serde", ] @@ -4050,7 +4056,7 @@ dependencies = [ "tempfile", "thiserror", "tokio 0.1.22", - "tokio 0.3.5", + "tokio 0.2.22", "tokio-codec", "tokio-fs", "tokio-io", @@ -4357,7 +4363,7 @@ dependencies = [ "solana-vote-program", "tempfile", "thiserror", - "tokio 0.3.5", + "tokio 0.2.22", "trees", ] @@ -4393,7 +4399,7 @@ dependencies = [ "solana-version", "solana-vote-program", "tempfile", - "tokio 0.3.5", + "tokio 0.2.22", ] [[package]] @@ -4670,7 +4676,7 @@ dependencies = [ "solana-program 1.6.0", "solana-runtime", "solana-sdk", - "tokio 0.3.5", + "tokio 0.3.2", ] [[package]] @@ -5439,7 +5445,7 @@ dependencies = [ "serde", "static_assertions", "tarpc-plugins", - "tokio 0.3.5", + "tokio 0.3.2", "tokio-serde", "tokio-util 0.4.0", ] @@ -5640,22 +5646,27 @@ dependencies = [ "futures-core", "iovec", "lazy_static", + "libc", "memchr 2.3.3", "mio 0.6.22", + "mio-named-pipes", + "mio-uds", "num_cpus", - "pin-project-lite 0.1.5", + "pin-project-lite", + "signal-hook-registry", "slab", "tokio-macros 0.2.5", + "winapi 0.3.8", ] [[package]] name = "tokio" -version = "0.3.5" +version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a12a3eb39ee2c231be64487f1fcbe726c8f2514876a55480a5ab8559fc374252" +checksum = "71f1b20504fd0aa9dab3ae17e8c4dd9431e5e08fd6921689f9745a4004883a17" dependencies = [ - "autocfg 1.0.0", "bytes 0.6.0", + "fnv", "futures-core", "lazy_static", "libc", @@ -5663,7 +5674,7 @@ dependencies = [ "mio 0.7.6", "num_cpus", "parking_lot 0.11.0", - "pin-project-lite 0.2.0", + "pin-project-lite", "signal-hook-registry", "slab", "tokio-macros 0.3.1", @@ -5918,7 +5929,7 @@ dependencies = [ "futures-core", "futures-sink", "log 0.4.11", - "pin-project-lite 0.1.5", + "pin-project-lite", "tokio 0.2.22", ] @@ -5932,8 +5943,8 @@ dependencies = [ "futures-core", "futures-sink", "log 0.4.11", - "pin-project-lite 0.1.5", - "tokio 0.3.5", + "pin-project-lite", + "tokio 0.3.2", ] [[package]] diff --git a/core/Cargo.toml b/core/Cargo.toml index 89514500d..ff1436837 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -76,7 +76,7 @@ solana-vote-program = { path = "../programs/vote", version = "1.6.0" } spl-token-v2-0 = { package = "spl-token", version = "=3.0.1", features = ["no-entrypoint"] } tempfile = "3.1.0" thiserror = "1.0" -tokio = { version = "0.3", features = ["full"] } +tokio = { version = "0.2", features = ["full"] } tokio_01 = { version = "0.1", package = "tokio" } tokio_01_bytes = { version = "0.4.7", package = "bytes" } tokio_fs_01 = { version = "0.1", package = "tokio-fs" } diff --git a/core/src/bigtable_upload_service.rs b/core/src/bigtable_upload_service.rs index e862200d1..23db772af 100644 --- a/core/src/bigtable_upload_service.rs +++ b/core/src/bigtable_upload_service.rs @@ -5,7 +5,7 @@ use std::{ sync::{Arc, RwLock}, thread::{self, Builder, JoinHandle}, }; -use tokio::runtime::Runtime; +use tokio::runtime; // Delay uploading the largest confirmed root for this many slots. This is done in an attempt to // ensure that the `CacheBlockTimeService` has had enough time to add the block time for the root @@ -21,7 +21,7 @@ pub struct BigTableUploadService { impl BigTableUploadService { pub fn new( - runtime: Arc, + runtime_handle: runtime::Handle, bigtable_ledger_storage: solana_storage_bigtable::LedgerStorage, blockstore: Arc, block_commitment_cache: Arc>, @@ -32,7 +32,7 @@ impl BigTableUploadService { .name("bigtable-upload".to_string()) .spawn(move || { Self::run( - runtime, + runtime_handle, bigtable_ledger_storage, blockstore, block_commitment_cache, @@ -45,7 +45,7 @@ impl BigTableUploadService { } fn run( - runtime: Arc, + runtime: runtime::Handle, bigtable_ledger_storage: solana_storage_bigtable::LedgerStorage, blockstore: Arc, block_commitment_cache: Arc>, diff --git a/core/src/rpc.rs b/core/src/rpc.rs index 170383e3f..f67ece5d7 100644 --- a/core/src/rpc.rs +++ b/core/src/rpc.rs @@ -78,7 +78,7 @@ use std::{ Arc, Mutex, RwLock, }, }; -use tokio::runtime::Runtime; +use tokio::runtime; pub const MAX_REQUEST_PAYLOAD_SIZE: usize = 50 * (1 << 10); // 50kB pub const PERFORMANCE_SAMPLES_LIMIT: usize = 720; @@ -122,7 +122,7 @@ pub struct JsonRpcRequestProcessor { cluster_info: Arc, genesis_hash: Hash, transaction_sender: Arc>>, - runtime: Arc, + runtime_handle: runtime::Handle, bigtable_ledger_storage: Option, optimistically_confirmed_bank: Arc>, } @@ -201,7 +201,7 @@ impl JsonRpcRequestProcessor { health: Arc, cluster_info: Arc, genesis_hash: Hash, - runtime: Arc, + runtime: &runtime::Runtime, bigtable_ledger_storage: Option, optimistically_confirmed_bank: Arc>, ) -> (Self, Receiver) { @@ -217,7 +217,7 @@ impl JsonRpcRequestProcessor { cluster_info, genesis_hash, transaction_sender: Arc::new(Mutex::new(sender)), - runtime, + runtime_handle: runtime.handle().clone(), bigtable_ledger_storage, optimistically_confirmed_bank, }, @@ -253,7 +253,7 @@ impl JsonRpcRequestProcessor { cluster_info, genesis_hash, transaction_sender: Arc::new(Mutex::new(sender)), - runtime: Arc::new(Runtime::new().expect("Runtime")), + runtime_handle: runtime::Runtime::new().unwrap().handle().clone(), bigtable_ledger_storage: None, optimistically_confirmed_bank: Arc::new(RwLock::new(OptimisticallyConfirmedBank { bank: bank.clone(), @@ -669,7 +669,7 @@ impl JsonRpcRequestProcessor { if result.is_err() { if let Some(bigtable_ledger_storage) = &self.bigtable_ledger_storage { return Ok(self - .runtime + .runtime_handle .block_on(bigtable_ledger_storage.get_confirmed_block(slot)) .ok() .map(|confirmed_block| confirmed_block.encode(encoding))); @@ -712,7 +712,7 @@ impl JsonRpcRequestProcessor { // [start_slot..end_slot] can be fetched from BigTable. if let Some(bigtable_ledger_storage) = &self.bigtable_ledger_storage { return Ok(self - .runtime + .runtime_handle .block_on( bigtable_ledger_storage .get_confirmed_blocks(start_slot, (end_slot - start_slot) as usize), @@ -748,7 +748,7 @@ impl JsonRpcRequestProcessor { // range can be fetched from BigTable. if let Some(bigtable_ledger_storage) = &self.bigtable_ledger_storage { return Ok(self - .runtime + .runtime_handle .block_on(bigtable_ledger_storage.get_confirmed_blocks(start_slot, limit)) .unwrap_or_else(|_| vec![])); } @@ -775,7 +775,7 @@ impl JsonRpcRequestProcessor { if result.is_err() || matches!(result, Ok(None)) { if let Some(bigtable_ledger_storage) = &self.bigtable_ledger_storage { return Ok(self - .runtime + .runtime_handle .block_on(bigtable_ledger_storage.get_confirmed_block(slot)) .ok() .and_then(|confirmed_block| confirmed_block.block_time)); @@ -851,7 +851,7 @@ impl JsonRpcRequestProcessor { }) .or_else(|| { if let Some(bigtable_ledger_storage) = &self.bigtable_ledger_storage { - self.runtime + self.runtime_handle .block_on(bigtable_ledger_storage.get_signature_status(&signature)) .map(Some) .unwrap_or(None) @@ -919,7 +919,7 @@ impl JsonRpcRequestProcessor { None => { if let Some(bigtable_ledger_storage) = &self.bigtable_ledger_storage { return self - .runtime + .runtime_handle .block_on(bigtable_ledger_storage.get_confirmed_transaction(&signature)) .unwrap_or(None) .map(|confirmed| confirmed.encode(encoding)); @@ -986,7 +986,7 @@ impl JsonRpcRequestProcessor { before = results.last().map(|x| x.signature); } - let bigtable_results = self.runtime.block_on( + let bigtable_results = self.runtime_handle.block_on( bigtable_ledger_storage.get_confirmed_signatures_for_address( &address, before.as_ref(), @@ -1019,7 +1019,7 @@ impl JsonRpcRequestProcessor { if let Some(bigtable_ledger_storage) = &self.bigtable_ledger_storage { let bigtable_slot = self - .runtime + .runtime_handle .block_on(bigtable_ledger_storage.get_first_available_block()) .unwrap_or(None) .unwrap_or(slot); @@ -2927,7 +2927,7 @@ pub mod tests { RpcHealth::stub(), cluster_info.clone(), Hash::default(), - Arc::new(tokio::runtime::Runtime::new().unwrap()), + &runtime::Runtime::new().unwrap(), None, OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks), ); @@ -4327,7 +4327,7 @@ pub mod tests { health.clone(), cluster_info, Hash::default(), - Arc::new(tokio::runtime::Runtime::new().unwrap()), + &runtime::Runtime::new().unwrap(), None, OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks), ); @@ -4523,7 +4523,7 @@ pub mod tests { RpcHealth::stub(), cluster_info, Hash::default(), - Arc::new(tokio::runtime::Runtime::new().unwrap()), + &runtime::Runtime::new().unwrap(), None, OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks), ); @@ -4555,7 +4555,7 @@ pub mod tests { RpcHealth::stub(), cluster_info, Hash::default(), - Arc::new(tokio::runtime::Runtime::new().unwrap()), + &runtime::Runtime::new().unwrap(), None, OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks), ); @@ -4646,7 +4646,7 @@ pub mod tests { RpcHealth::stub(), cluster_info, Hash::default(), - Arc::new(tokio::runtime::Runtime::new().unwrap()), + &runtime::Runtime::new().unwrap(), None, OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks), ); @@ -5826,7 +5826,7 @@ pub mod tests { RpcHealth::stub(), cluster_info, Hash::default(), - Arc::new(tokio::runtime::Runtime::new().unwrap()), + &runtime::Runtime::new().unwrap(), None, optimistically_confirmed_bank.clone(), ); diff --git a/core/src/rpc_service.rs b/core/src/rpc_service.rs index 7b5853116..b6b950121 100644 --- a/core/src/rpc_service.rs +++ b/core/src/rpc_service.rs @@ -32,6 +32,7 @@ use std::{ sync::{mpsc::channel, Arc, Mutex, RwLock}, thread::{self, Builder, JoinHandle}, }; +use tokio::runtime; pub struct JsonRpcService { thread_hdl: JoinHandle<()>, @@ -40,6 +41,7 @@ pub struct JsonRpcService { pub request_processor: JsonRpcRequestProcessor, // Used only by test_rpc_new()... close_handle: Option, + runtime: runtime::Runtime, } struct RpcRequestMiddleware { @@ -280,13 +282,12 @@ impl JsonRpcService { )); let tpu_address = cluster_info.my_contact_info().tpu; - let runtime = Arc::new( - tokio::runtime::Builder::new_multi_thread() - .thread_name("rpc-runtime") - .enable_all() - .build() - .expect("Runtime"), - ); + let mut runtime = runtime::Builder::new() + .threaded_scheduler() + .thread_name("rpc-runtime") + .enable_all() + .build() + .expect("Runtime"); let exit_bigtable_ledger_upload_service = Arc::new(AtomicBool::new(false)); @@ -300,7 +301,7 @@ impl JsonRpcService { info!("BigTable ledger storage initialized"); let bigtable_ledger_upload_service = Arc::new(BigTableUploadService::new( - runtime.clone(), + runtime.handle().clone(), bigtable_ledger_storage.clone(), blockstore.clone(), block_commitment_cache.clone(), @@ -329,7 +330,7 @@ impl JsonRpcService { health.clone(), cluster_info.clone(), genesis_hash, - runtime, + &runtime, bigtable_ledger_storage, optimistically_confirmed_bank, ); @@ -403,6 +404,7 @@ impl JsonRpcService { .register_exit(Box::new(move || close_handle_.close())); Self { thread_hdl, + runtime, #[cfg(test)] request_processor: test_request_processor, close_handle: Some(close_handle), @@ -416,6 +418,7 @@ impl JsonRpcService { } pub fn join(self) -> thread::Result<()> { + self.runtime.shutdown_background(); self.thread_hdl.join() } } diff --git a/ledger-tool/Cargo.toml b/ledger-tool/Cargo.toml index de0461977..69100a34d 100644 --- a/ledger-tool/Cargo.toml +++ b/ledger-tool/Cargo.toml @@ -35,7 +35,7 @@ solana-transaction-status = { path = "../transaction-status", version = "1.6.0" solana-version = { path = "../version", version = "1.6.0" } solana-vote-program = { path = "../programs/vote", version = "1.6.0" } tempfile = "3.1.0" -tokio = { version = "0.3", features = ["full"] } +tokio = { version = "0.2.22", features = ["full"] } [dev-dependencies] assert_cmd = "1.0" diff --git a/ledger-tool/src/bigtable.rs b/ledger-tool/src/bigtable.rs index 29ce60bce..ae177a9cf 100644 --- a/ledger-tool/src/bigtable.rs +++ b/ledger-tool/src/bigtable.rs @@ -378,7 +378,7 @@ impl BigTableSubCommand for App<'_, '_> { } pub fn bigtable_process_command(ledger_path: &Path, matches: &ArgMatches<'_>) { - let runtime = tokio::runtime::Runtime::new().unwrap(); + let mut runtime = tokio::runtime::Runtime::new().unwrap(); let future = match matches.subcommand() { ("upload", Some(arg_matches)) => { diff --git a/ledger/Cargo.toml b/ledger/Cargo.toml index c7cefbe34..eb0ef293d 100644 --- a/ledger/Cargo.toml +++ b/ledger/Cargo.toml @@ -50,7 +50,7 @@ solana-storage-proto = { path = "../storage-proto", version = "1.6.0" } solana-vote-program = { path = "../programs/vote", version = "1.6.0" } tempfile = "3.1.0" thiserror = "1.0" -tokio = { version = "0.3", features = ["full"] } +tokio = { version = "0.2.22", features = ["full"] } trees = "0.2.1" [dependencies.rocksdb] diff --git a/ledger/src/bigtable_upload.rs b/ledger/src/bigtable_upload.rs index 2f81b5f2b..2ebc29f59 100644 --- a/ledger/src/bigtable_upload.rs +++ b/ledger/src/bigtable_upload.rs @@ -11,6 +11,7 @@ use std::{ }, time::Duration, }; +use tokio::time::delay_for; // Attempt to upload this many blocks in parallel const NUM_BLOCKS_TO_UPLOAD_IN_PARALLEL: usize = 32; @@ -80,7 +81,7 @@ pub async fn upload_confirmed_blocks( Err(err) => { error!("get_confirmed_blocks for {} failed: {:?}", start_slot, err); // Consider exponential backoff... - tokio::time::sleep(Duration::from_secs(2)).await; + delay_for(Duration::from_secs(2)).await; } } };