diff --git a/Cargo.lock b/Cargo.lock index 0d53e5dbc..18e998b95 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5095,7 +5095,6 @@ dependencies = [ "tarpc", "tokio", "tokio-serde", - "tokio-stream", ] [[package]] diff --git a/banks-server/Cargo.toml b/banks-server/Cargo.toml index e4ada7291..4c29dcd30 100644 --- a/banks-server/Cargo.toml +++ b/banks-server/Cargo.toml @@ -21,7 +21,6 @@ solana-send-transaction-service = { workspace = true } tarpc = { workspace = true, features = ["full"] } tokio = { workspace = true, features = ["full"] } tokio-serde = { workspace = true, features = ["bincode"] } -tokio-stream = { workspace = true } [lib] crate-type = ["lib"] diff --git a/banks-server/src/lib.rs b/banks-server/src/lib.rs index 33c1ae5c0..e8dc9f5e3 100644 --- a/banks-server/src/lib.rs +++ b/banks-server/src/lib.rs @@ -1,3 +1,2 @@ #![allow(clippy::integer_arithmetic)] pub mod banks_server; -pub mod rpc_banks_service; diff --git a/banks-server/src/rpc_banks_service.rs b/banks-server/src/rpc_banks_service.rs deleted file mode 100644 index f3224fb64..000000000 --- a/banks-server/src/rpc_banks_service.rs +++ /dev/null @@ -1,135 +0,0 @@ -//! The `rpc_banks_service` module implements the Solana Banks RPC API. - -use { - crate::banks_server::start_tcp_server, - futures::{future::FutureExt, pin_mut, prelude::stream::StreamExt, select}, - solana_client::connection_cache::ConnectionCache, - solana_runtime::{bank_forks::BankForks, commitment::BlockCommitmentCache}, - std::{ - net::SocketAddr, - sync::{ - atomic::{AtomicBool, Ordering}, - Arc, RwLock, - }, - thread::{self, Builder, JoinHandle}, - }, - tokio::{ - runtime::Runtime, - time::{self, Duration}, - }, - tokio_stream::wrappers::IntervalStream, -}; - -pub struct RpcBanksService { - thread_hdl: JoinHandle<()>, -} - -/// Run the TCP service until `exit` is set to true -async fn start_abortable_tcp_server( - listen_addr: SocketAddr, - tpu_addr: SocketAddr, - bank_forks: Arc>, - block_commitment_cache: Arc>, - connection_cache: Arc, - exit: Arc, -) { - let server = start_tcp_server( - listen_addr, - tpu_addr, - bank_forks.clone(), - block_commitment_cache.clone(), - connection_cache, - exit.clone(), - ) - .fuse(); - let interval = IntervalStream::new(time::interval(Duration::from_millis(100))).fuse(); - pin_mut!(server, interval); - loop { - select! { - _ = server => {}, - _ = interval.select_next_some() => { - if exit.load(Ordering::Relaxed) { - break; - } - } - } - } -} - -impl RpcBanksService { - fn run( - listen_addr: SocketAddr, - tpu_addr: SocketAddr, - bank_forks: Arc>, - block_commitment_cache: Arc>, - connection_cache: Arc, - exit: Arc, - ) { - let server = start_abortable_tcp_server( - listen_addr, - tpu_addr, - bank_forks, - block_commitment_cache, - connection_cache, - exit, - ); - Runtime::new().unwrap().block_on(server); - } - - pub fn new( - listen_addr: SocketAddr, - tpu_addr: SocketAddr, - bank_forks: &Arc>, - block_commitment_cache: &Arc>, - connection_cache: &Arc, - exit: &Arc, - ) -> Self { - let bank_forks = bank_forks.clone(); - let block_commitment_cache = block_commitment_cache.clone(); - let connection_cache = connection_cache.clone(); - let exit = exit.clone(); - let thread_hdl = Builder::new() - .name("solRpcBanksSvc".to_string()) - .spawn(move || { - Self::run( - listen_addr, - tpu_addr, - bank_forks, - block_commitment_cache, - connection_cache, - exit, - ) - }) - .unwrap(); - - Self { thread_hdl } - } - - pub fn join(self) -> thread::Result<()> { - self.thread_hdl.join() - } -} - -#[cfg(test)] -mod tests { - use {super::*, solana_runtime::bank::Bank}; - - #[test] - fn test_rpc_banks_server_exit() { - let bank_forks = Arc::new(RwLock::new(BankForks::new(Bank::default_for_tests()))); - let block_commitment_cache = Arc::new(RwLock::new(BlockCommitmentCache::default())); - let connection_cache = Arc::new(ConnectionCache::default()); - let exit = Arc::new(AtomicBool::new(false)); - let addr = "127.0.0.1:0".parse().unwrap(); - let service = RpcBanksService::new( - addr, - addr, - &bank_forks, - &block_commitment_cache, - &connection_cache, - &exit, - ); - exit.store(true, Ordering::Relaxed); - service.join().unwrap(); - } -} diff --git a/programs/sbf/Cargo.lock b/programs/sbf/Cargo.lock index b28ab1c94..c1cd6a158 100644 --- a/programs/sbf/Cargo.lock +++ b/programs/sbf/Cargo.lock @@ -4565,7 +4565,6 @@ dependencies = [ "tarpc", "tokio", "tokio-serde", - "tokio-stream", ] [[package]]