From c299dd390e7c40b330f087d4fc74f19ee3962645 Mon Sep 17 00:00:00 2001 From: Anatoly Yakovenko Date: Sun, 3 Mar 2019 22:01:09 -0800 Subject: [PATCH] Fullnode rpc to exit with unsafe config --- core/src/cluster_tests.rs | 16 +++++++ core/src/fullnode.rs | 7 +++ core/src/local_cluster.rs | 33 ++++++++++++-- core/src/rpc.rs | 94 +++++++++++++++++++++++++++++++++++++-- core/src/rpc_request.rs | 2 + core/src/rpc_service.rs | 20 +++++++-- core/src/thin_client.rs | 17 +++++++ core/src/tpu.rs | 2 +- core/src/tvu.rs | 4 +- tests/local_cluster.rs | 17 +++++++ tests/tvu.rs | 1 + 11 files changed, 201 insertions(+), 12 deletions(-) diff --git a/core/src/cluster_tests.rs b/core/src/cluster_tests.rs index 053a90f7d7..abc9a58b54 100644 --- a/core/src/cluster_tests.rs +++ b/core/src/cluster_tests.rs @@ -7,6 +7,8 @@ use crate::contact_info::ContactInfo; use crate::gossip_service::discover; use solana_sdk::signature::{Keypair, KeypairUtil}; use solana_sdk::system_transaction::SystemTransaction; +use std::thread::sleep; +use std::time::Duration; /// Spend and verify from every node in the network pub fn spend_and_verify_all_nodes( @@ -39,3 +41,17 @@ pub fn spend_and_verify_all_nodes( } } } + +pub fn fullnode_exit(entry_point_info: &ContactInfo, nodes: usize) { + let cluster_nodes = discover(&entry_point_info, nodes); + assert!(cluster_nodes.len() >= nodes); + for node in &cluster_nodes { + let mut client = mk_client(&node); + assert!(client.fullnode_exit().unwrap()); + } + sleep(Duration::from_millis(250)); + for node in &cluster_nodes { + let mut client = mk_client(&node); + assert!(client.fullnode_exit().is_err()); + } +} diff --git a/core/src/fullnode.rs b/core/src/fullnode.rs index 5437827082..cee23c49e9 100644 --- a/core/src/fullnode.rs +++ b/core/src/fullnode.rs @@ -10,6 +10,7 @@ use crate::entry::Entry; use crate::gossip_service::GossipService; use crate::poh_recorder::PohRecorder; use crate::poh_service::{PohService, PohServiceConfig}; +use crate::rpc::JsonRpcConfig; use crate::rpc_pubsub_service::PubSubService; use crate::rpc_service::JsonRpcService; use crate::rpc_subscriptions::RpcSubscriptions; @@ -65,6 +66,7 @@ pub struct FullnodeConfig { pub storage_rotate_count: u64, pub tick_config: PohServiceConfig, pub account_paths: Option, + pub rpc_config: JsonRpcConfig, } impl Default for FullnodeConfig { fn default() -> Self { @@ -79,6 +81,7 @@ impl Default for FullnodeConfig { storage_rotate_count: NUM_HASHES_FOR_STORAGE_ROTATE, tick_config: PohServiceConfig::default(), account_paths: None, + rpc_config: JsonRpcConfig::default(), } } } @@ -165,6 +168,8 @@ impl Fullnode { SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), node.info.rpc.port()), drone_addr, storage_state.clone(), + config.rpc_config.clone(), + exit.clone(), ); let subscriptions = Arc::new(RpcSubscriptions::default()); @@ -232,6 +237,7 @@ impl Fullnode { ledger_signal_receiver, &subscriptions, &poh_recorder, + exit.clone(), ); let tpu = Tpu::new( id, @@ -242,6 +248,7 @@ impl Fullnode { node.sockets.broadcast, config.sigverify_disabled, &blocktree, + exit.clone(), ); let exit_ = exit.clone(); let bank_forks_ = bank_forks.clone(); diff --git a/core/src/local_cluster.rs b/core/src/local_cluster.rs index fe6681b767..7eb53a34fc 100644 --- a/core/src/local_cluster.rs +++ b/core/src/local_cluster.rs @@ -3,6 +3,7 @@ use crate::client::mk_client; use crate::cluster_info::{Node, NodeInfo}; use crate::fullnode::{Fullnode, FullnodeConfig}; use crate::gossip_service::discover; +use crate::rpc::JsonRpcConfig; use crate::service::Service; use crate::thin_client::retry_get_balance; use crate::thin_client::ThinClient; @@ -28,6 +29,25 @@ pub struct LocalCluster { impl LocalCluster { pub fn new(num_nodes: usize, cluster_lamports: u64, lamports_per_node: u64) -> Self { + Self::new_with_config( + num_nodes, + cluster_lamports, + lamports_per_node, + &FullnodeConfig::default(), + ) + } + pub fn new_unsafe(num_nodes: usize, cluster_lamports: u64, lamports_per_node: u64) -> Self { + let mut unsafe_rpc = FullnodeConfig::default(); + unsafe_rpc.rpc_config = JsonRpcConfig::Unsafe; + Self::new_with_config(num_nodes, cluster_lamports, lamports_per_node, &unsafe_rpc) + } + + pub fn new_with_config( + num_nodes: usize, + cluster_lamports: u64, + lamports_per_node: u64, + fullnode_config: &FullnodeConfig, + ) -> Self { let leader_keypair = Arc::new(Keypair::new()); let leader_pubkey = leader_keypair.pubkey(); let leader_node = Node::new_localhost_with_pubkey(leader_keypair.pubkey()); @@ -39,7 +59,6 @@ impl LocalCluster { ledger_paths.push(genesis_ledger_path.clone()); ledger_paths.push(leader_ledger_path.clone()); let voting_keypair = VotingKeypair::new_local(&leader_keypair); - let fullnode_config = FullnodeConfig::default(); let leader_node_info = leader_node.info.clone(); let leader_server = Fullnode::new( leader_node, @@ -47,7 +66,7 @@ impl LocalCluster { &leader_ledger_path, voting_keypair, None, - &fullnode_config, + fullnode_config, ); let mut fullnodes = vec![leader_server]; let mut client = mk_client(&leader_node_info); @@ -84,7 +103,7 @@ impl LocalCluster { &ledger_path, voting_keypair, Some(&leader_node_info), - &fullnode_config, + fullnode_config, ); fullnodes.push(validator_server); } @@ -189,4 +208,12 @@ mod test { let network = LocalCluster::new(1, 100, 3); drop(network) } + + #[test] + fn test_local_cluster_start_and_exit_unsafe() { + solana_logger::setup(); + let network = LocalCluster::new_unsafe(1, 100, 3); + drop(network) + } + } diff --git a/core/src/rpc.rs b/core/src/rpc.rs index abcf556ffd..9fec5931c7 100644 --- a/core/src/rpc.rs +++ b/core/src/rpc.rs @@ -16,14 +16,29 @@ use solana_sdk::signature::Signature; use solana_sdk::transaction::Transaction; use std::mem; use std::net::{SocketAddr, UdpSocket}; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, RwLock}; use std::thread::sleep; use std::time::{Duration, Instant}; +#[derive(Clone)] +pub enum JsonRpcConfig { + Safe, + Unsafe, +} + +impl Default for JsonRpcConfig { + fn default() -> Self { + JsonRpcConfig::Safe + } +} + #[derive(Clone)] pub struct JsonRpcRequestProcessor { bank: Option>, storage_state: StorageState, + config: JsonRpcConfig, + fullnode_exit: Arc, } impl JsonRpcRequestProcessor { @@ -39,10 +54,16 @@ impl JsonRpcRequestProcessor { self.bank = Some(bank.clone()); } - pub fn new(storage_state: StorageState) -> Self { + pub fn new( + storage_state: StorageState, + config: JsonRpcConfig, + fullnode_exit: Arc, + ) -> Self { JsonRpcRequestProcessor { bank: None, storage_state, + config, + fullnode_exit, } } @@ -87,6 +108,20 @@ impl JsonRpcRequestProcessor { .storage_state .get_pubkeys_for_entry_height(entry_height)) } + + pub fn fullnode_exit(&self) -> Result { + match self.config { + JsonRpcConfig::Safe => { + debug!("safe config, fullnode_exit ignored"); + Ok(false) + } + JsonRpcConfig::Unsafe => { + warn!("JsonRPC fullnode_exit request..."); + self.fullnode_exit.store(true, Ordering::Relaxed); + Ok(true) + } + } + } } fn get_leader_addr(cluster_info: &Arc>) -> Result { @@ -182,6 +217,9 @@ pub trait RpcSol { _: Self::Metadata, _: u64, ) -> Result>; + + #[rpc(meta, name = "fullnodeExit")] + fn fullnode_exit(&self, _: Self::Metadata) -> Result; } pub struct RpcSolImpl; @@ -360,6 +398,10 @@ impl RpcSol for RpcSolImpl { .unwrap() .get_storage_pubkeys_for_entry_height(entry_height) } + + fn fullnode_exit(&self, meta: Self::Metadata) -> Result { + meta.request_processor.read().unwrap().fullnode_exit() + } } #[cfg(test)] @@ -377,6 +419,7 @@ mod tests { fn start_rpc_handler_with_tx(pubkey: Pubkey) -> (MetaIoHandler, Meta, Hash, Keypair) { let (genesis_block, alice) = GenesisBlock::new(10_000); let bank = Arc::new(Bank::new(&genesis_block)); + let exit = Arc::new(AtomicBool::new(false)); let blockhash = bank.last_blockhash(); let tx = SystemTransaction::new_move(&alice, pubkey, 20, blockhash, 0); @@ -384,6 +427,8 @@ mod tests { let request_processor = Arc::new(RwLock::new(JsonRpcRequestProcessor::new( StorageState::default(), + JsonRpcConfig::default(), + exit, ))); request_processor.write().unwrap().set_bank(&bank); let cluster_info = Arc::new(RwLock::new(ClusterInfo::new(NodeInfo::default()))); @@ -411,7 +456,9 @@ mod tests { let (genesis_block, alice) = GenesisBlock::new(10_000); let bob_pubkey = Keypair::new().pubkey(); let bank = Arc::new(Bank::new(&genesis_block)); - let mut request_processor = JsonRpcRequestProcessor::new(StorageState::default()); + let exit = Arc::new(AtomicBool::new(false)); + let mut request_processor = + JsonRpcRequestProcessor::new(StorageState::default(), JsonRpcConfig::default(), exit); request_processor.set_bank(&bank); thread::spawn(move || { let blockhash = bank.last_blockhash(); @@ -574,13 +621,18 @@ mod tests { fn test_rpc_send_bad_tx() { let (genesis_block, _) = GenesisBlock::new(10_000); let bank = Arc::new(Bank::new(&genesis_block)); + let exit = Arc::new(AtomicBool::new(false)); let mut io = MetaIoHandler::default(); let rpc = RpcSolImpl; io.extend_with(rpc.to_delegate()); let meta = Meta { request_processor: { - let mut request_processor = JsonRpcRequestProcessor::new(StorageState::default()); + let mut request_processor = JsonRpcRequestProcessor::new( + StorageState::default(), + JsonRpcConfig::default(), + exit, + ); request_processor.set_bank(&bank); Arc::new(RwLock::new(request_processor)) }, @@ -651,4 +703,40 @@ mod tests { Err(Error::invalid_request()) ); } + + #[test] + fn test_rpc_request_processor_default_exit_is_a_noop() { + let exit = Arc::new(AtomicBool::new(false)); + let request_processor = JsonRpcRequestProcessor::new( + StorageState::default(), + JsonRpcConfig::default(), + exit.clone(), + ); + assert_eq!(request_processor.fullnode_exit(), false); + assert_eq!(exit.load(Ordering::Relaxed), false); + } + #[test] + fn test_rpc_request_processor_safe_exit_is_a_noop() { + let exit = Arc::new(AtomicBool::new(false)); + let request_processor = JsonRpcRequestProcessor::new( + StorageState::default(), + JsonRpcConfig::Safe, + exit.clone(), + ); + assert_eq!(request_processor.fullnode_exit(), false); + assert_eq!(exit.load(Ordering::Relaxed), false); + } + + #[test] + fn test_rpc_request_processor_unsafe_exit() { + let exit = Arc::new(AtomicBool::new(false)); + let request_processor = JsonRpcRequestProcessor::new( + StorageState::default(), + JsonRpcConfig::Unsafe, + exit.clone(), + ); + assert_eq!(request_processor.fullnode_exit(), true); + assert_eq!(exit.load(Ordering::Relaxed), true); + } + } diff --git a/core/src/rpc_request.rs b/core/src/rpc_request.rs index aef8ad500f..e0c8030e95 100644 --- a/core/src/rpc_request.rs +++ b/core/src/rpc_request.rs @@ -140,6 +140,7 @@ pub enum RpcRequest { GetStorageBlockhash, GetStorageEntryHeight, GetStoragePubkeysForEntryHeight, + FullnodeExit, } impl RpcRequest { @@ -160,6 +161,7 @@ impl RpcRequest { RpcRequest::GetStorageBlockhash => "getStorageBlockhash", RpcRequest::GetStorageEntryHeight => "getStorageEntryHeight", RpcRequest::GetStoragePubkeysForEntryHeight => "getStoragePubkeysForEntryHeight", + RpcRequest::FullnodeExit => "fullnodeExit", }; let mut request = json!({ "jsonrpc": jsonrpc, diff --git a/core/src/rpc_service.rs b/core/src/rpc_service.rs index b98424c110..3d4e4db025 100644 --- a/core/src/rpc_service.rs +++ b/core/src/rpc_service.rs @@ -27,10 +27,15 @@ impl JsonRpcService { rpc_addr: SocketAddr, drone_addr: SocketAddr, storage_state: StorageState, + config: JsonRpcConfig, + exit: Arc, ) -> Self { info!("rpc bound to {:?}", rpc_addr); - let exit = Arc::new(AtomicBool::new(false)); - let request_processor = Arc::new(RwLock::new(JsonRpcRequestProcessor::new(storage_state))); + let request_processor = Arc::new(RwLock::new(JsonRpcRequestProcessor::new( + storage_state, + config, + exit.clone(), + ))); let request_processor_ = request_processor.clone(); let info = cluster_info.clone(); @@ -105,6 +110,7 @@ mod tests { #[test] fn test_rpc_new() { let (genesis_block, alice) = GenesisBlock::new(10_000); + let exit = Arc::new(AtomicBool::new(false)); let bank = Bank::new(&genesis_block); let cluster_info = Arc::new(RwLock::new(ClusterInfo::new(NodeInfo::default()))); let rpc_addr = SocketAddr::new( @@ -115,8 +121,14 @@ mod tests { IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), solana_netutil::find_available_port_in_range((10000, 65535)).unwrap(), ); - let mut rpc_service = - JsonRpcService::new(&cluster_info, rpc_addr, drone_addr, StorageState::default()); + let mut rpc_service = JsonRpcService::new( + &cluster_info, + rpc_addr, + drone_addr, + StorageState::default(), + JsonRpcConfig::default(), + exit, + ); rpc_service.set_bank(&Arc::new(bank)); let thread = rpc_service.thread_hdl.thread(); assert_eq!(thread.name().unwrap(), "solana-jsonrpc"); diff --git a/core/src/thin_client.rs b/core/src/thin_client.rs index b5213ec125..35d881e75e 100644 --- a/core/src/thin_client.rs +++ b/core/src/thin_client.rs @@ -364,6 +364,23 @@ impl ThinClient { }; } } + pub fn fullnode_exit(&mut self) -> io::Result { + trace!("fullnode_exit sending request to {}", self.rpc_addr); + let response = self + .rpc_client + .make_rpc_request(1, RpcRequest::FullnodeExit, None) + .map_err(|error| { + debug!("Response from {} fullndoe_exit: {}", self.rpc_addr, error); + io::Error::new(io::ErrorKind::Other, "FullodeExit request failure") + })?; + serde_json::from_value(response).map_err(|error| { + debug!( + "ParseError: from {} fullndoe_exit: {}", + self.rpc_addr, error + ); + io::Error::new(io::ErrorKind::Other, "FullodeExit parse failure") + }) + } } impl Drop for ThinClient { diff --git a/core/src/tpu.rs b/core/src/tpu.rs index 2e19f7ad99..f1526146d0 100644 --- a/core/src/tpu.rs +++ b/core/src/tpu.rs @@ -82,10 +82,10 @@ impl Tpu { broadcast_socket: UdpSocket, sigverify_disabled: bool, blocktree: &Arc, + exit: Arc, ) -> Self { cluster_info.write().unwrap().set_leader(id); - let exit = Arc::new(AtomicBool::new(false)); let (packet_sender, packet_receiver) = channel(); let fetch_stage = FetchStage::new_with_sender(transactions_sockets, exit.clone(), &packet_sender.clone()); diff --git a/core/src/tvu.rs b/core/src/tvu.rs index be0f8785b5..b48d180a74 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -67,11 +67,11 @@ impl Tvu { ledger_signal_receiver: Receiver, subscriptions: &Arc, poh_recorder: &Arc>, + exit: Arc, ) -> Self where T: 'static + KeypairUtil + Sync + Send, { - let exit = Arc::new(AtomicBool::new(false)); let keypair: Arc = cluster_info .read() .expect("Unable to read from cluster_info during Tvu creation") @@ -221,6 +221,7 @@ pub mod tests { .expect("Expected to successfully open ledger"); let bank = bank_forks.working_bank(); let (poh_recorder, poh_service, _entry_receiver) = create_test_recorder(&bank); + let exit = Arc::new(AtomicBool::new(false)); let tvu = Tvu::new( Some(Arc::new(Keypair::new())), &Arc::new(RwLock::new(bank_forks)), @@ -240,6 +241,7 @@ pub mod tests { l_receiver, &Arc::new(RpcSubscriptions::default()), &poh_recorder, + exit, ); tvu.close().expect("close"); poh_service.close().expect("close"); diff --git a/tests/local_cluster.rs b/tests/local_cluster.rs index 3ce99e7ad7..3611ca8183 100644 --- a/tests/local_cluster.rs +++ b/tests/local_cluster.rs @@ -38,3 +38,20 @@ fn test_spend_and_verify_all_nodes_3() -> () { num_nodes, ); } + +#[test] +#[should_panic] +fn test_fullnode_exit_safe_config_should_panic_2() -> () { + solana_logger::setup(); + let num_nodes = 2; + let local = LocalCluster::new(num_nodes, 10_000, 100); + cluster_tests::fullnode_exit(&local.entry_point_info, num_nodes); +} + +#[test] +fn test_fullnode_exit_unsafe_config_2() -> () { + solana_logger::setup(); + let num_nodes = 2; + let local = LocalCluster::new_unsafe(num_nodes, 10_000, 100); + cluster_tests::fullnode_exit(&local.entry_point_info, num_nodes); +} diff --git a/tests/tvu.rs b/tests/tvu.rs index 02cccb84e3..8d369f179d 100644 --- a/tests/tvu.rs +++ b/tests/tvu.rs @@ -131,6 +131,7 @@ fn test_replay() { ledger_signal_receiver, &Arc::new(RpcSubscriptions::default()), &poh_recorder, + exit.clone(), ); let mut alice_ref_balance = starting_balance;