Fullnode rpc to exit with unsafe config

This commit is contained in:
Anatoly Yakovenko 2019-03-03 22:01:09 -08:00 committed by Grimes
parent a3016aebaf
commit c299dd390e
11 changed files with 201 additions and 12 deletions

View File

@ -7,6 +7,8 @@ use crate::contact_info::ContactInfo;
use crate::gossip_service::discover; use crate::gossip_service::discover;
use solana_sdk::signature::{Keypair, KeypairUtil}; use solana_sdk::signature::{Keypair, KeypairUtil};
use solana_sdk::system_transaction::SystemTransaction; use solana_sdk::system_transaction::SystemTransaction;
use std::thread::sleep;
use std::time::Duration;
/// Spend and verify from every node in the network /// Spend and verify from every node in the network
pub fn spend_and_verify_all_nodes( pub fn spend_and_verify_all_nodes(
@ -39,3 +41,17 @@ pub fn spend_and_verify_all_nodes(
} }
} }
} }
pub fn fullnode_exit(entry_point_info: &ContactInfo, nodes: usize) {
let cluster_nodes = discover(&entry_point_info, nodes);
assert!(cluster_nodes.len() >= nodes);
for node in &cluster_nodes {
let mut client = mk_client(&node);
assert!(client.fullnode_exit().unwrap());
}
sleep(Duration::from_millis(250));
for node in &cluster_nodes {
let mut client = mk_client(&node);
assert!(client.fullnode_exit().is_err());
}
}

View File

@ -10,6 +10,7 @@ use crate::entry::Entry;
use crate::gossip_service::GossipService; use crate::gossip_service::GossipService;
use crate::poh_recorder::PohRecorder; use crate::poh_recorder::PohRecorder;
use crate::poh_service::{PohService, PohServiceConfig}; use crate::poh_service::{PohService, PohServiceConfig};
use crate::rpc::JsonRpcConfig;
use crate::rpc_pubsub_service::PubSubService; use crate::rpc_pubsub_service::PubSubService;
use crate::rpc_service::JsonRpcService; use crate::rpc_service::JsonRpcService;
use crate::rpc_subscriptions::RpcSubscriptions; use crate::rpc_subscriptions::RpcSubscriptions;
@ -65,6 +66,7 @@ pub struct FullnodeConfig {
pub storage_rotate_count: u64, pub storage_rotate_count: u64,
pub tick_config: PohServiceConfig, pub tick_config: PohServiceConfig,
pub account_paths: Option<String>, pub account_paths: Option<String>,
pub rpc_config: JsonRpcConfig,
} }
impl Default for FullnodeConfig { impl Default for FullnodeConfig {
fn default() -> Self { fn default() -> Self {
@ -79,6 +81,7 @@ impl Default for FullnodeConfig {
storage_rotate_count: NUM_HASHES_FOR_STORAGE_ROTATE, storage_rotate_count: NUM_HASHES_FOR_STORAGE_ROTATE,
tick_config: PohServiceConfig::default(), tick_config: PohServiceConfig::default(),
account_paths: None, account_paths: None,
rpc_config: JsonRpcConfig::default(),
} }
} }
} }
@ -165,6 +168,8 @@ impl Fullnode {
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), node.info.rpc.port()), SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), node.info.rpc.port()),
drone_addr, drone_addr,
storage_state.clone(), storage_state.clone(),
config.rpc_config.clone(),
exit.clone(),
); );
let subscriptions = Arc::new(RpcSubscriptions::default()); let subscriptions = Arc::new(RpcSubscriptions::default());
@ -232,6 +237,7 @@ impl Fullnode {
ledger_signal_receiver, ledger_signal_receiver,
&subscriptions, &subscriptions,
&poh_recorder, &poh_recorder,
exit.clone(),
); );
let tpu = Tpu::new( let tpu = Tpu::new(
id, id,
@ -242,6 +248,7 @@ impl Fullnode {
node.sockets.broadcast, node.sockets.broadcast,
config.sigverify_disabled, config.sigverify_disabled,
&blocktree, &blocktree,
exit.clone(),
); );
let exit_ = exit.clone(); let exit_ = exit.clone();
let bank_forks_ = bank_forks.clone(); let bank_forks_ = bank_forks.clone();

View File

@ -3,6 +3,7 @@ use crate::client::mk_client;
use crate::cluster_info::{Node, NodeInfo}; use crate::cluster_info::{Node, NodeInfo};
use crate::fullnode::{Fullnode, FullnodeConfig}; use crate::fullnode::{Fullnode, FullnodeConfig};
use crate::gossip_service::discover; use crate::gossip_service::discover;
use crate::rpc::JsonRpcConfig;
use crate::service::Service; use crate::service::Service;
use crate::thin_client::retry_get_balance; use crate::thin_client::retry_get_balance;
use crate::thin_client::ThinClient; use crate::thin_client::ThinClient;
@ -28,6 +29,25 @@ pub struct LocalCluster {
impl LocalCluster { impl LocalCluster {
pub fn new(num_nodes: usize, cluster_lamports: u64, lamports_per_node: u64) -> Self { pub fn new(num_nodes: usize, cluster_lamports: u64, lamports_per_node: u64) -> Self {
Self::new_with_config(
num_nodes,
cluster_lamports,
lamports_per_node,
&FullnodeConfig::default(),
)
}
pub fn new_unsafe(num_nodes: usize, cluster_lamports: u64, lamports_per_node: u64) -> Self {
let mut unsafe_rpc = FullnodeConfig::default();
unsafe_rpc.rpc_config = JsonRpcConfig::Unsafe;
Self::new_with_config(num_nodes, cluster_lamports, lamports_per_node, &unsafe_rpc)
}
pub fn new_with_config(
num_nodes: usize,
cluster_lamports: u64,
lamports_per_node: u64,
fullnode_config: &FullnodeConfig,
) -> Self {
let leader_keypair = Arc::new(Keypair::new()); let leader_keypair = Arc::new(Keypair::new());
let leader_pubkey = leader_keypair.pubkey(); let leader_pubkey = leader_keypair.pubkey();
let leader_node = Node::new_localhost_with_pubkey(leader_keypair.pubkey()); let leader_node = Node::new_localhost_with_pubkey(leader_keypair.pubkey());
@ -39,7 +59,6 @@ impl LocalCluster {
ledger_paths.push(genesis_ledger_path.clone()); ledger_paths.push(genesis_ledger_path.clone());
ledger_paths.push(leader_ledger_path.clone()); ledger_paths.push(leader_ledger_path.clone());
let voting_keypair = VotingKeypair::new_local(&leader_keypair); let voting_keypair = VotingKeypair::new_local(&leader_keypair);
let fullnode_config = FullnodeConfig::default();
let leader_node_info = leader_node.info.clone(); let leader_node_info = leader_node.info.clone();
let leader_server = Fullnode::new( let leader_server = Fullnode::new(
leader_node, leader_node,
@ -47,7 +66,7 @@ impl LocalCluster {
&leader_ledger_path, &leader_ledger_path,
voting_keypair, voting_keypair,
None, None,
&fullnode_config, fullnode_config,
); );
let mut fullnodes = vec![leader_server]; let mut fullnodes = vec![leader_server];
let mut client = mk_client(&leader_node_info); let mut client = mk_client(&leader_node_info);
@ -84,7 +103,7 @@ impl LocalCluster {
&ledger_path, &ledger_path,
voting_keypair, voting_keypair,
Some(&leader_node_info), Some(&leader_node_info),
&fullnode_config, fullnode_config,
); );
fullnodes.push(validator_server); fullnodes.push(validator_server);
} }
@ -189,4 +208,12 @@ mod test {
let network = LocalCluster::new(1, 100, 3); let network = LocalCluster::new(1, 100, 3);
drop(network) drop(network)
} }
#[test]
fn test_local_cluster_start_and_exit_unsafe() {
solana_logger::setup();
let network = LocalCluster::new_unsafe(1, 100, 3);
drop(network)
}
} }

View File

@ -16,14 +16,29 @@ use solana_sdk::signature::Signature;
use solana_sdk::transaction::Transaction; use solana_sdk::transaction::Transaction;
use std::mem; use std::mem;
use std::net::{SocketAddr, UdpSocket}; use std::net::{SocketAddr, UdpSocket};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
use std::thread::sleep; use std::thread::sleep;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
#[derive(Clone)]
pub enum JsonRpcConfig {
Safe,
Unsafe,
}
impl Default for JsonRpcConfig {
fn default() -> Self {
JsonRpcConfig::Safe
}
}
#[derive(Clone)] #[derive(Clone)]
pub struct JsonRpcRequestProcessor { pub struct JsonRpcRequestProcessor {
bank: Option<Arc<Bank>>, bank: Option<Arc<Bank>>,
storage_state: StorageState, storage_state: StorageState,
config: JsonRpcConfig,
fullnode_exit: Arc<AtomicBool>,
} }
impl JsonRpcRequestProcessor { impl JsonRpcRequestProcessor {
@ -39,10 +54,16 @@ impl JsonRpcRequestProcessor {
self.bank = Some(bank.clone()); self.bank = Some(bank.clone());
} }
pub fn new(storage_state: StorageState) -> Self { pub fn new(
storage_state: StorageState,
config: JsonRpcConfig,
fullnode_exit: Arc<AtomicBool>,
) -> Self {
JsonRpcRequestProcessor { JsonRpcRequestProcessor {
bank: None, bank: None,
storage_state, storage_state,
config,
fullnode_exit,
} }
} }
@ -87,6 +108,20 @@ impl JsonRpcRequestProcessor {
.storage_state .storage_state
.get_pubkeys_for_entry_height(entry_height)) .get_pubkeys_for_entry_height(entry_height))
} }
pub fn fullnode_exit(&self) -> Result<bool> {
match self.config {
JsonRpcConfig::Safe => {
debug!("safe config, fullnode_exit ignored");
Ok(false)
}
JsonRpcConfig::Unsafe => {
warn!("JsonRPC fullnode_exit request...");
self.fullnode_exit.store(true, Ordering::Relaxed);
Ok(true)
}
}
}
} }
fn get_leader_addr(cluster_info: &Arc<RwLock<ClusterInfo>>) -> Result<SocketAddr> { fn get_leader_addr(cluster_info: &Arc<RwLock<ClusterInfo>>) -> Result<SocketAddr> {
@ -182,6 +217,9 @@ pub trait RpcSol {
_: Self::Metadata, _: Self::Metadata,
_: u64, _: u64,
) -> Result<Vec<Pubkey>>; ) -> Result<Vec<Pubkey>>;
#[rpc(meta, name = "fullnodeExit")]
fn fullnode_exit(&self, _: Self::Metadata) -> Result<bool>;
} }
pub struct RpcSolImpl; pub struct RpcSolImpl;
@ -360,6 +398,10 @@ impl RpcSol for RpcSolImpl {
.unwrap() .unwrap()
.get_storage_pubkeys_for_entry_height(entry_height) .get_storage_pubkeys_for_entry_height(entry_height)
} }
fn fullnode_exit(&self, meta: Self::Metadata) -> Result<bool> {
meta.request_processor.read().unwrap().fullnode_exit()
}
} }
#[cfg(test)] #[cfg(test)]
@ -377,6 +419,7 @@ mod tests {
fn start_rpc_handler_with_tx(pubkey: Pubkey) -> (MetaIoHandler<Meta>, Meta, Hash, Keypair) { fn start_rpc_handler_with_tx(pubkey: Pubkey) -> (MetaIoHandler<Meta>, Meta, Hash, Keypair) {
let (genesis_block, alice) = GenesisBlock::new(10_000); let (genesis_block, alice) = GenesisBlock::new(10_000);
let bank = Arc::new(Bank::new(&genesis_block)); let bank = Arc::new(Bank::new(&genesis_block));
let exit = Arc::new(AtomicBool::new(false));
let blockhash = bank.last_blockhash(); let blockhash = bank.last_blockhash();
let tx = SystemTransaction::new_move(&alice, pubkey, 20, blockhash, 0); let tx = SystemTransaction::new_move(&alice, pubkey, 20, blockhash, 0);
@ -384,6 +427,8 @@ mod tests {
let request_processor = Arc::new(RwLock::new(JsonRpcRequestProcessor::new( let request_processor = Arc::new(RwLock::new(JsonRpcRequestProcessor::new(
StorageState::default(), StorageState::default(),
JsonRpcConfig::default(),
exit,
))); )));
request_processor.write().unwrap().set_bank(&bank); request_processor.write().unwrap().set_bank(&bank);
let cluster_info = Arc::new(RwLock::new(ClusterInfo::new(NodeInfo::default()))); let cluster_info = Arc::new(RwLock::new(ClusterInfo::new(NodeInfo::default())));
@ -411,7 +456,9 @@ mod tests {
let (genesis_block, alice) = GenesisBlock::new(10_000); let (genesis_block, alice) = GenesisBlock::new(10_000);
let bob_pubkey = Keypair::new().pubkey(); let bob_pubkey = Keypair::new().pubkey();
let bank = Arc::new(Bank::new(&genesis_block)); let bank = Arc::new(Bank::new(&genesis_block));
let mut request_processor = JsonRpcRequestProcessor::new(StorageState::default()); let exit = Arc::new(AtomicBool::new(false));
let mut request_processor =
JsonRpcRequestProcessor::new(StorageState::default(), JsonRpcConfig::default(), exit);
request_processor.set_bank(&bank); request_processor.set_bank(&bank);
thread::spawn(move || { thread::spawn(move || {
let blockhash = bank.last_blockhash(); let blockhash = bank.last_blockhash();
@ -574,13 +621,18 @@ mod tests {
fn test_rpc_send_bad_tx() { fn test_rpc_send_bad_tx() {
let (genesis_block, _) = GenesisBlock::new(10_000); let (genesis_block, _) = GenesisBlock::new(10_000);
let bank = Arc::new(Bank::new(&genesis_block)); let bank = Arc::new(Bank::new(&genesis_block));
let exit = Arc::new(AtomicBool::new(false));
let mut io = MetaIoHandler::default(); let mut io = MetaIoHandler::default();
let rpc = RpcSolImpl; let rpc = RpcSolImpl;
io.extend_with(rpc.to_delegate()); io.extend_with(rpc.to_delegate());
let meta = Meta { let meta = Meta {
request_processor: { request_processor: {
let mut request_processor = JsonRpcRequestProcessor::new(StorageState::default()); let mut request_processor = JsonRpcRequestProcessor::new(
StorageState::default(),
JsonRpcConfig::default(),
exit,
);
request_processor.set_bank(&bank); request_processor.set_bank(&bank);
Arc::new(RwLock::new(request_processor)) Arc::new(RwLock::new(request_processor))
}, },
@ -651,4 +703,40 @@ mod tests {
Err(Error::invalid_request()) Err(Error::invalid_request())
); );
} }
#[test]
fn test_rpc_request_processor_default_exit_is_a_noop() {
let exit = Arc::new(AtomicBool::new(false));
let request_processor = JsonRpcRequestProcessor::new(
StorageState::default(),
JsonRpcConfig::default(),
exit.clone(),
);
assert_eq!(request_processor.fullnode_exit(), false);
assert_eq!(exit.load(Ordering::Relaxed), false);
}
#[test]
fn test_rpc_request_processor_safe_exit_is_a_noop() {
let exit = Arc::new(AtomicBool::new(false));
let request_processor = JsonRpcRequestProcessor::new(
StorageState::default(),
JsonRpcConfig::Safe,
exit.clone(),
);
assert_eq!(request_processor.fullnode_exit(), false);
assert_eq!(exit.load(Ordering::Relaxed), false);
}
#[test]
fn test_rpc_request_processor_unsafe_exit() {
let exit = Arc::new(AtomicBool::new(false));
let request_processor = JsonRpcRequestProcessor::new(
StorageState::default(),
JsonRpcConfig::Unsafe,
exit.clone(),
);
assert_eq!(request_processor.fullnode_exit(), true);
assert_eq!(exit.load(Ordering::Relaxed), true);
}
} }

View File

@ -140,6 +140,7 @@ pub enum RpcRequest {
GetStorageBlockhash, GetStorageBlockhash,
GetStorageEntryHeight, GetStorageEntryHeight,
GetStoragePubkeysForEntryHeight, GetStoragePubkeysForEntryHeight,
FullnodeExit,
} }
impl RpcRequest { impl RpcRequest {
@ -160,6 +161,7 @@ impl RpcRequest {
RpcRequest::GetStorageBlockhash => "getStorageBlockhash", RpcRequest::GetStorageBlockhash => "getStorageBlockhash",
RpcRequest::GetStorageEntryHeight => "getStorageEntryHeight", RpcRequest::GetStorageEntryHeight => "getStorageEntryHeight",
RpcRequest::GetStoragePubkeysForEntryHeight => "getStoragePubkeysForEntryHeight", RpcRequest::GetStoragePubkeysForEntryHeight => "getStoragePubkeysForEntryHeight",
RpcRequest::FullnodeExit => "fullnodeExit",
}; };
let mut request = json!({ let mut request = json!({
"jsonrpc": jsonrpc, "jsonrpc": jsonrpc,

View File

@ -27,10 +27,15 @@ impl JsonRpcService {
rpc_addr: SocketAddr, rpc_addr: SocketAddr,
drone_addr: SocketAddr, drone_addr: SocketAddr,
storage_state: StorageState, storage_state: StorageState,
config: JsonRpcConfig,
exit: Arc<AtomicBool>,
) -> Self { ) -> Self {
info!("rpc bound to {:?}", rpc_addr); info!("rpc bound to {:?}", rpc_addr);
let exit = Arc::new(AtomicBool::new(false)); let request_processor = Arc::new(RwLock::new(JsonRpcRequestProcessor::new(
let request_processor = Arc::new(RwLock::new(JsonRpcRequestProcessor::new(storage_state))); storage_state,
config,
exit.clone(),
)));
let request_processor_ = request_processor.clone(); let request_processor_ = request_processor.clone();
let info = cluster_info.clone(); let info = cluster_info.clone();
@ -105,6 +110,7 @@ mod tests {
#[test] #[test]
fn test_rpc_new() { fn test_rpc_new() {
let (genesis_block, alice) = GenesisBlock::new(10_000); let (genesis_block, alice) = GenesisBlock::new(10_000);
let exit = Arc::new(AtomicBool::new(false));
let bank = Bank::new(&genesis_block); let bank = Bank::new(&genesis_block);
let cluster_info = Arc::new(RwLock::new(ClusterInfo::new(NodeInfo::default()))); let cluster_info = Arc::new(RwLock::new(ClusterInfo::new(NodeInfo::default())));
let rpc_addr = SocketAddr::new( let rpc_addr = SocketAddr::new(
@ -115,8 +121,14 @@ mod tests {
IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
solana_netutil::find_available_port_in_range((10000, 65535)).unwrap(), solana_netutil::find_available_port_in_range((10000, 65535)).unwrap(),
); );
let mut rpc_service = let mut rpc_service = JsonRpcService::new(
JsonRpcService::new(&cluster_info, rpc_addr, drone_addr, StorageState::default()); &cluster_info,
rpc_addr,
drone_addr,
StorageState::default(),
JsonRpcConfig::default(),
exit,
);
rpc_service.set_bank(&Arc::new(bank)); rpc_service.set_bank(&Arc::new(bank));
let thread = rpc_service.thread_hdl.thread(); let thread = rpc_service.thread_hdl.thread();
assert_eq!(thread.name().unwrap(), "solana-jsonrpc"); assert_eq!(thread.name().unwrap(), "solana-jsonrpc");

View File

@ -364,6 +364,23 @@ impl ThinClient {
}; };
} }
} }
pub fn fullnode_exit(&mut self) -> io::Result<bool> {
trace!("fullnode_exit sending request to {}", self.rpc_addr);
let response = self
.rpc_client
.make_rpc_request(1, RpcRequest::FullnodeExit, None)
.map_err(|error| {
debug!("Response from {} fullndoe_exit: {}", self.rpc_addr, error);
io::Error::new(io::ErrorKind::Other, "FullodeExit request failure")
})?;
serde_json::from_value(response).map_err(|error| {
debug!(
"ParseError: from {} fullndoe_exit: {}",
self.rpc_addr, error
);
io::Error::new(io::ErrorKind::Other, "FullodeExit parse failure")
})
}
} }
impl Drop for ThinClient { impl Drop for ThinClient {

View File

@ -82,10 +82,10 @@ impl Tpu {
broadcast_socket: UdpSocket, broadcast_socket: UdpSocket,
sigverify_disabled: bool, sigverify_disabled: bool,
blocktree: &Arc<Blocktree>, blocktree: &Arc<Blocktree>,
exit: Arc<AtomicBool>,
) -> Self { ) -> Self {
cluster_info.write().unwrap().set_leader(id); cluster_info.write().unwrap().set_leader(id);
let exit = Arc::new(AtomicBool::new(false));
let (packet_sender, packet_receiver) = channel(); let (packet_sender, packet_receiver) = channel();
let fetch_stage = let fetch_stage =
FetchStage::new_with_sender(transactions_sockets, exit.clone(), &packet_sender.clone()); FetchStage::new_with_sender(transactions_sockets, exit.clone(), &packet_sender.clone());

View File

@ -67,11 +67,11 @@ impl Tvu {
ledger_signal_receiver: Receiver<bool>, ledger_signal_receiver: Receiver<bool>,
subscriptions: &Arc<RpcSubscriptions>, subscriptions: &Arc<RpcSubscriptions>,
poh_recorder: &Arc<Mutex<PohRecorder>>, poh_recorder: &Arc<Mutex<PohRecorder>>,
exit: Arc<AtomicBool>,
) -> Self ) -> Self
where where
T: 'static + KeypairUtil + Sync + Send, T: 'static + KeypairUtil + Sync + Send,
{ {
let exit = Arc::new(AtomicBool::new(false));
let keypair: Arc<Keypair> = cluster_info let keypair: Arc<Keypair> = cluster_info
.read() .read()
.expect("Unable to read from cluster_info during Tvu creation") .expect("Unable to read from cluster_info during Tvu creation")
@ -221,6 +221,7 @@ pub mod tests {
.expect("Expected to successfully open ledger"); .expect("Expected to successfully open ledger");
let bank = bank_forks.working_bank(); let bank = bank_forks.working_bank();
let (poh_recorder, poh_service, _entry_receiver) = create_test_recorder(&bank); let (poh_recorder, poh_service, _entry_receiver) = create_test_recorder(&bank);
let exit = Arc::new(AtomicBool::new(false));
let tvu = Tvu::new( let tvu = Tvu::new(
Some(Arc::new(Keypair::new())), Some(Arc::new(Keypair::new())),
&Arc::new(RwLock::new(bank_forks)), &Arc::new(RwLock::new(bank_forks)),
@ -240,6 +241,7 @@ pub mod tests {
l_receiver, l_receiver,
&Arc::new(RpcSubscriptions::default()), &Arc::new(RpcSubscriptions::default()),
&poh_recorder, &poh_recorder,
exit,
); );
tvu.close().expect("close"); tvu.close().expect("close");
poh_service.close().expect("close"); poh_service.close().expect("close");

View File

@ -38,3 +38,20 @@ fn test_spend_and_verify_all_nodes_3() -> () {
num_nodes, num_nodes,
); );
} }
#[test]
#[should_panic]
fn test_fullnode_exit_safe_config_should_panic_2() -> () {
solana_logger::setup();
let num_nodes = 2;
let local = LocalCluster::new(num_nodes, 10_000, 100);
cluster_tests::fullnode_exit(&local.entry_point_info, num_nodes);
}
#[test]
fn test_fullnode_exit_unsafe_config_2() -> () {
solana_logger::setup();
let num_nodes = 2;
let local = LocalCluster::new_unsafe(num_nodes, 10_000, 100);
cluster_tests::fullnode_exit(&local.entry_point_info, num_nodes);
}

View File

@ -131,6 +131,7 @@ fn test_replay() {
ledger_signal_receiver, ledger_signal_receiver,
&Arc::new(RpcSubscriptions::default()), &Arc::new(RpcSubscriptions::default()),
&poh_recorder, &poh_recorder,
exit.clone(),
); );
let mut alice_ref_balance = starting_balance; let mut alice_ref_balance = starting_balance;