Change JsonRpc exit to use wait->close (#5566)
* Add wait-close-join pattern to rpc_service * Create ValidatorExit struct
This commit is contained in:
parent
159e518671
commit
bb558acdf0
|
@ -5,6 +5,7 @@ use crate::cluster_info::ClusterInfo;
|
|||
use crate::contact_info::ContactInfo;
|
||||
use crate::packet::PACKET_DATA_SIZE;
|
||||
use crate::storage_stage::StorageState;
|
||||
use crate::validator::ValidatorExit;
|
||||
use crate::version::VERSION;
|
||||
use bincode::{deserialize, serialize};
|
||||
use jsonrpc_core::{Error, Metadata, Result};
|
||||
|
@ -18,7 +19,6 @@ use solana_sdk::signature::Signature;
|
|||
use solana_sdk::transaction::{self, Transaction};
|
||||
use solana_vote_api::vote_state::{VoteState, MAX_LOCKOUT_HISTORY};
|
||||
use std::net::{SocketAddr, UdpSocket};
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::{Arc, RwLock};
|
||||
use std::thread::sleep;
|
||||
use std::time::{Duration, Instant};
|
||||
|
@ -43,7 +43,7 @@ pub struct JsonRpcRequestProcessor {
|
|||
bank_forks: Arc<RwLock<BankForks>>,
|
||||
storage_state: StorageState,
|
||||
config: JsonRpcConfig,
|
||||
fullnode_exit: Arc<AtomicBool>,
|
||||
validator_exit: Arc<RwLock<Option<ValidatorExit>>>,
|
||||
}
|
||||
|
||||
impl JsonRpcRequestProcessor {
|
||||
|
@ -55,13 +55,13 @@ impl JsonRpcRequestProcessor {
|
|||
storage_state: StorageState,
|
||||
config: JsonRpcConfig,
|
||||
bank_forks: Arc<RwLock<BankForks>>,
|
||||
fullnode_exit: &Arc<AtomicBool>,
|
||||
validator_exit: &Arc<RwLock<Option<ValidatorExit>>>,
|
||||
) -> Self {
|
||||
JsonRpcRequestProcessor {
|
||||
bank_forks,
|
||||
storage_state,
|
||||
config,
|
||||
fullnode_exit: fullnode_exit.clone(),
|
||||
validator_exit: validator_exit.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -185,7 +185,9 @@ impl JsonRpcRequestProcessor {
|
|||
pub fn fullnode_exit(&self) -> Result<bool> {
|
||||
if self.config.enable_fullnode_exit {
|
||||
warn!("fullnode_exit request...");
|
||||
self.fullnode_exit.store(true, Ordering::Relaxed);
|
||||
if let Some(x) = self.validator_exit.write().unwrap().take() {
|
||||
x.exit()
|
||||
}
|
||||
Ok(true)
|
||||
} else {
|
||||
debug!("fullnode_exit ignored");
|
||||
|
@ -660,7 +662,7 @@ impl RpcSol for RpcSolImpl {
|
|||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
pub mod tests {
|
||||
use super::*;
|
||||
use crate::contact_info::ContactInfo;
|
||||
use crate::genesis_utils::{create_genesis_block, GenesisBlockInfo};
|
||||
|
@ -671,6 +673,7 @@ mod tests {
|
|||
use solana_sdk::signature::{Keypair, KeypairUtil};
|
||||
use solana_sdk::system_transaction;
|
||||
use solana_sdk::transaction::TransactionError;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::thread;
|
||||
|
||||
const TEST_MINT_LAMPORTS: u64 = 10_000;
|
||||
|
@ -682,6 +685,7 @@ mod tests {
|
|||
let bank = bank_forks.read().unwrap().working_bank();
|
||||
let leader_pubkey = *bank.collector_id();
|
||||
let exit = Arc::new(AtomicBool::new(false));
|
||||
let validator_exit = create_validator_exit(&exit);
|
||||
|
||||
let blockhash = bank.confirmed_last_blockhash().0;
|
||||
let tx = system_transaction::transfer(&alice, pubkey, 20, blockhash);
|
||||
|
@ -694,7 +698,7 @@ mod tests {
|
|||
StorageState::default(),
|
||||
JsonRpcConfig::default(),
|
||||
bank_forks,
|
||||
&exit,
|
||||
&validator_exit,
|
||||
)));
|
||||
let cluster_info = Arc::new(RwLock::new(ClusterInfo::new_with_invalid_keypair(
|
||||
ContactInfo::default(),
|
||||
|
@ -722,13 +726,14 @@ mod tests {
|
|||
fn test_rpc_request_processor_new() {
|
||||
let bob_pubkey = Pubkey::new_rand();
|
||||
let exit = Arc::new(AtomicBool::new(false));
|
||||
let validator_exit = create_validator_exit(&exit);
|
||||
let (bank_forks, alice) = new_bank_forks();
|
||||
let bank = bank_forks.read().unwrap().working_bank();
|
||||
let request_processor = JsonRpcRequestProcessor::new(
|
||||
StorageState::default(),
|
||||
JsonRpcConfig::default(),
|
||||
bank_forks,
|
||||
&exit,
|
||||
&validator_exit,
|
||||
);
|
||||
thread::spawn(move || {
|
||||
let blockhash = bank.confirmed_last_blockhash().0;
|
||||
|
@ -1037,6 +1042,7 @@ mod tests {
|
|||
#[test]
|
||||
fn test_rpc_send_bad_tx() {
|
||||
let exit = Arc::new(AtomicBool::new(false));
|
||||
let validator_exit = create_validator_exit(&exit);
|
||||
|
||||
let mut io = MetaIoHandler::default();
|
||||
let rpc = RpcSolImpl;
|
||||
|
@ -1047,7 +1053,7 @@ mod tests {
|
|||
StorageState::default(),
|
||||
JsonRpcConfig::default(),
|
||||
new_bank_forks().0,
|
||||
&exit,
|
||||
&validator_exit,
|
||||
);
|
||||
Arc::new(RwLock::new(request_processor))
|
||||
},
|
||||
|
@ -1117,14 +1123,22 @@ mod tests {
|
|||
)
|
||||
}
|
||||
|
||||
pub fn create_validator_exit(exit: &Arc<AtomicBool>) -> Arc<RwLock<Option<ValidatorExit>>> {
|
||||
let mut validator_exit = ValidatorExit::default();
|
||||
let exit_ = exit.clone();
|
||||
validator_exit.register_exit(Box::new(move || exit_.store(true, Ordering::Relaxed)));
|
||||
Arc::new(RwLock::new(Some(validator_exit)))
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_rpc_request_processor_config_default_trait_fullnode_exit_fails() {
|
||||
let exit = Arc::new(AtomicBool::new(false));
|
||||
let validator_exit = create_validator_exit(&exit);
|
||||
let request_processor = JsonRpcRequestProcessor::new(
|
||||
StorageState::default(),
|
||||
JsonRpcConfig::default(),
|
||||
new_bank_forks().0,
|
||||
&exit,
|
||||
&validator_exit,
|
||||
);
|
||||
assert_eq!(request_processor.fullnode_exit(), Ok(false));
|
||||
assert_eq!(exit.load(Ordering::Relaxed), false);
|
||||
|
@ -1133,13 +1147,14 @@ mod tests {
|
|||
#[test]
|
||||
fn test_rpc_request_processor_allow_fullnode_exit_config() {
|
||||
let exit = Arc::new(AtomicBool::new(false));
|
||||
let validator_exit = create_validator_exit(&exit);
|
||||
let mut config = JsonRpcConfig::default();
|
||||
config.enable_fullnode_exit = true;
|
||||
let request_processor = JsonRpcRequestProcessor::new(
|
||||
StorageState::default(),
|
||||
config,
|
||||
new_bank_forks().0,
|
||||
&exit,
|
||||
&validator_exit,
|
||||
);
|
||||
assert_eq!(request_processor.fullnode_exit(), Ok(true));
|
||||
assert_eq!(exit.load(Ordering::Relaxed), true);
|
||||
|
|
|
@ -5,17 +5,18 @@ use crate::cluster_info::ClusterInfo;
|
|||
use crate::rpc::*;
|
||||
use crate::service::Service;
|
||||
use crate::storage_stage::StorageState;
|
||||
use crate::validator::ValidatorExit;
|
||||
use jsonrpc_core::MetaIoHandler;
|
||||
use jsonrpc_http_server::CloseHandle;
|
||||
use jsonrpc_http_server::{
|
||||
hyper, AccessControlAllowOrigin, DomainsValidation, RequestMiddleware, RequestMiddlewareAction,
|
||||
ServerBuilder,
|
||||
};
|
||||
use std::net::SocketAddr;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::mpsc::channel;
|
||||
use std::sync::{Arc, RwLock};
|
||||
use std::thread::{self, sleep, Builder, JoinHandle};
|
||||
use std::time::Duration;
|
||||
use std::thread::{self, Builder, JoinHandle};
|
||||
use tokio::prelude::Future;
|
||||
|
||||
pub struct JsonRpcService {
|
||||
|
@ -23,6 +24,8 @@ pub struct JsonRpcService {
|
|||
|
||||
#[cfg(test)]
|
||||
pub request_processor: Arc<RwLock<JsonRpcRequestProcessor>>, // Used only by test_rpc_new()...
|
||||
|
||||
close_handle: Option<CloseHandle>,
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
|
@ -88,7 +91,7 @@ impl JsonRpcService {
|
|||
config: JsonRpcConfig,
|
||||
bank_forks: Arc<RwLock<BankForks>>,
|
||||
ledger_path: &Path,
|
||||
exit: &Arc<AtomicBool>,
|
||||
validator_exit: &Arc<RwLock<Option<ValidatorExit>>>,
|
||||
) -> Self {
|
||||
info!("rpc bound to {:?}", rpc_addr);
|
||||
info!("rpc configuration: {:?}", config);
|
||||
|
@ -96,14 +99,14 @@ impl JsonRpcService {
|
|||
storage_state,
|
||||
config,
|
||||
bank_forks,
|
||||
exit,
|
||||
validator_exit,
|
||||
)));
|
||||
let request_processor_ = request_processor.clone();
|
||||
|
||||
let cluster_info = cluster_info.clone();
|
||||
let exit_ = exit.clone();
|
||||
let ledger_path = ledger_path.to_path_buf();
|
||||
|
||||
let (close_handle_sender, close_handle_receiver) = channel();
|
||||
let thread_hdl = Builder::new()
|
||||
.name("solana-jsonrpc".to_string())
|
||||
.spawn(move || {
|
||||
|
@ -126,16 +129,30 @@ impl JsonRpcService {
|
|||
return;
|
||||
}
|
||||
|
||||
while !exit_.load(Ordering::Relaxed) {
|
||||
sleep(Duration::from_millis(100));
|
||||
}
|
||||
server.unwrap().close();
|
||||
let server = server.unwrap();
|
||||
close_handle_sender.send(server.close_handle()).unwrap();
|
||||
server.wait();
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
let close_handle = close_handle_receiver.recv().unwrap();
|
||||
let close_handle_ = close_handle.clone();
|
||||
let mut validator_exit_write = validator_exit.write().unwrap();
|
||||
validator_exit_write
|
||||
.as_mut()
|
||||
.unwrap()
|
||||
.register_exit(Box::new(move || close_handle_.close()));
|
||||
Self {
|
||||
thread_hdl,
|
||||
#[cfg(test)]
|
||||
request_processor,
|
||||
close_handle: Some(close_handle),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn exit(&mut self) {
|
||||
if let Some(c) = self.close_handle.take() {
|
||||
c.close()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -153,9 +170,11 @@ mod tests {
|
|||
use super::*;
|
||||
use crate::contact_info::ContactInfo;
|
||||
use crate::genesis_utils::{create_genesis_block, GenesisBlockInfo};
|
||||
use crate::rpc::tests::create_validator_exit;
|
||||
use solana_runtime::bank::Bank;
|
||||
use solana_sdk::signature::KeypairUtil;
|
||||
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
|
||||
use std::sync::atomic::AtomicBool;
|
||||
|
||||
#[test]
|
||||
fn test_rpc_new() {
|
||||
|
@ -165,6 +184,7 @@ mod tests {
|
|||
..
|
||||
} = create_genesis_block(10_000);
|
||||
let exit = Arc::new(AtomicBool::new(false));
|
||||
let validator_exit = create_validator_exit(&exit);
|
||||
let bank = Bank::new(&genesis_block);
|
||||
let cluster_info = Arc::new(RwLock::new(ClusterInfo::new_with_invalid_keypair(
|
||||
ContactInfo::default(),
|
||||
|
@ -174,14 +194,14 @@ mod tests {
|
|||
solana_netutil::find_available_port_in_range((10000, 65535)).unwrap(),
|
||||
);
|
||||
let bank_forks = Arc::new(RwLock::new(BankForks::new(bank.slot(), bank)));
|
||||
let rpc_service = JsonRpcService::new(
|
||||
let mut rpc_service = JsonRpcService::new(
|
||||
&cluster_info,
|
||||
rpc_addr,
|
||||
StorageState::default(),
|
||||
JsonRpcConfig::default(),
|
||||
bank_forks,
|
||||
&PathBuf::from("farf"),
|
||||
&exit,
|
||||
&validator_exit,
|
||||
);
|
||||
let thread = rpc_service.thread_hdl.thread();
|
||||
assert_eq!(thread.name().unwrap(), "solana-jsonrpc");
|
||||
|
@ -194,7 +214,7 @@ mod tests {
|
|||
.unwrap()
|
||||
.get_balance(&mint_keypair.pubkey())
|
||||
);
|
||||
exit.store(true, Ordering::Relaxed);
|
||||
rpc_service.exit();
|
||||
rpc_service.join().unwrap();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -67,9 +67,26 @@ impl Default for ValidatorConfig {
|
|||
}
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct ValidatorExit {
|
||||
exits: Vec<Box<FnOnce() + Send + Sync>>,
|
||||
}
|
||||
|
||||
impl ValidatorExit {
|
||||
pub fn register_exit(&mut self, exit: Box<FnOnce() -> () + Send + Sync>) {
|
||||
self.exits.push(exit);
|
||||
}
|
||||
|
||||
pub fn exit(self) {
|
||||
for exit in self.exits {
|
||||
exit();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct Validator {
|
||||
pub id: Pubkey,
|
||||
exit: Arc<AtomicBool>,
|
||||
validator_exit: Arc<RwLock<Option<ValidatorExit>>>,
|
||||
rpc_service: Option<JsonRpcService>,
|
||||
rpc_pubsub_service: Option<PubSubService>,
|
||||
gossip_service: GossipService,
|
||||
|
@ -140,6 +157,11 @@ impl Validator {
|
|||
let bank = bank_forks[bank_info.bank_slot].clone();
|
||||
let bank_forks = Arc::new(RwLock::new(bank_forks));
|
||||
|
||||
let mut validator_exit = ValidatorExit::default();
|
||||
let exit_ = exit.clone();
|
||||
validator_exit.register_exit(Box::new(move || exit_.store(true, Ordering::Relaxed)));
|
||||
let validator_exit = Arc::new(RwLock::new(Some(validator_exit)));
|
||||
|
||||
node.info.wallclock = timestamp();
|
||||
let cluster_info = Arc::new(RwLock::new(ClusterInfo::new(
|
||||
node.info.clone(),
|
||||
|
@ -162,7 +184,7 @@ impl Validator {
|
|||
config.rpc_config.clone(),
|
||||
bank_forks.clone(),
|
||||
ledger_path,
|
||||
&exit,
|
||||
&validator_exit,
|
||||
))
|
||||
};
|
||||
|
||||
|
@ -318,19 +340,21 @@ impl Validator {
|
|||
rpc_pubsub_service,
|
||||
tpu,
|
||||
tvu,
|
||||
exit,
|
||||
poh_service,
|
||||
poh_recorder,
|
||||
ip_echo_server,
|
||||
validator_exit,
|
||||
}
|
||||
}
|
||||
|
||||
// Used for notifying many nodes in parallel to exit
|
||||
pub fn exit(&self) {
|
||||
self.exit.store(true, Ordering::Relaxed);
|
||||
pub fn exit(&mut self) {
|
||||
if let Some(x) = self.validator_exit.write().unwrap().take() {
|
||||
x.exit()
|
||||
}
|
||||
}
|
||||
|
||||
pub fn close(self) -> Result<()> {
|
||||
pub fn close(mut self) -> Result<()> {
|
||||
self.exit();
|
||||
self.join()
|
||||
}
|
||||
|
@ -549,7 +573,7 @@ mod tests {
|
|||
let leader_node = Node::new_localhost_with_pubkey(&leader_keypair.pubkey());
|
||||
|
||||
let mut ledger_paths = vec![];
|
||||
let validators: Vec<Validator> = (0..2)
|
||||
let mut validators: Vec<Validator> = (0..2)
|
||||
.map(|_| {
|
||||
let validator_keypair = Keypair::new();
|
||||
let validator_node = Node::new_localhost_with_pubkey(&validator_keypair.pubkey());
|
||||
|
@ -575,7 +599,7 @@ mod tests {
|
|||
.collect();
|
||||
|
||||
// Each validator can exit in parallel to speed many sequential calls to `join`
|
||||
validators.iter().for_each(|v| v.exit());
|
||||
validators.iter_mut().for_each(|v| v.exit());
|
||||
// While join is called sequentially, the above exit call notified all the
|
||||
// validators to exit from all their threads
|
||||
validators.into_iter().for_each(|validator| {
|
||||
|
|
|
@ -257,8 +257,8 @@ impl LocalCluster {
|
|||
cluster
|
||||
}
|
||||
|
||||
pub fn exit(&self) {
|
||||
for node in self.fullnodes.values() {
|
||||
pub fn exit(&mut self) {
|
||||
for node in self.fullnodes.values_mut() {
|
||||
node.exit();
|
||||
}
|
||||
}
|
||||
|
@ -587,7 +587,7 @@ impl Cluster for LocalCluster {
|
|||
|
||||
fn restart_node(&mut self, pubkey: Pubkey) {
|
||||
// Shut down the fullnode
|
||||
let node = self.fullnodes.remove(&pubkey).unwrap();
|
||||
let mut node = self.fullnodes.remove(&pubkey).unwrap();
|
||||
node.exit();
|
||||
node.join().unwrap();
|
||||
|
||||
|
|
Loading…
Reference in New Issue