diff --git a/core/src/fullnode.rs b/core/src/fullnode.rs index 97b4326ca1..11e86ca6ef 100644 --- a/core/src/fullnode.rs +++ b/core/src/fullnode.rs @@ -32,10 +32,7 @@ use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::Receiver; use std::sync::{Arc, Mutex, RwLock}; -use std::thread::sleep; -use std::thread::JoinHandle; -use std::thread::{spawn, Result}; -use std::time::Duration; +use std::thread::Result; pub struct FullnodeConfig { pub sigverify_disabled: bool, @@ -69,7 +66,6 @@ pub struct Fullnode { exit: Arc, rpc_service: Option, rpc_pubsub_service: Option, - rpc_working_bank_handle: JoinHandle<()>, gossip_service: GossipService, poh_recorder: Arc>, poh_service: PohService, @@ -147,6 +143,7 @@ impl Fullnode { SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), node.info.rpc.port()), storage_state.clone(), config.rpc_config.clone(), + bank_forks.clone(), &exit, ); @@ -232,19 +229,6 @@ impl Fullnode { &blocktree, &exit, ); - let exit_ = exit.clone(); - let bank_forks_ = bank_forks.clone(); - let rpc_service_rp = rpc_service.request_processor.clone(); - let rpc_working_bank_handle = spawn(move || loop { - if exit_.load(Ordering::Relaxed) { - break; - } - let bank = bank_forks_.read().unwrap().working_bank(); - trace!("rpc working bank {} {}", bank.slot(), bank.last_blockhash()); - rpc_service_rp.write().unwrap().set_bank(&bank); - let timer = Duration::from_millis(100); - sleep(timer); - }); inc_new_counter_info!("fullnode-new", 1); Self { @@ -252,7 +236,6 @@ impl Fullnode { gossip_service, rpc_service: Some(rpc_service), rpc_pubsub_service: Some(rpc_pubsub_service), - rpc_working_bank_handle, tpu, tvu, exit, @@ -316,7 +299,6 @@ impl Service for Fullnode { rpc_pubsub_service.join()?; } - self.rpc_working_bank_handle.join()?; self.gossip_service.join()?; self.tpu.join()?; self.tvu.join()?; diff --git a/core/src/rpc.rs b/core/src/rpc.rs index 9c1a23400d..446f31fefe 100644 --- a/core/src/rpc.rs +++ b/core/src/rpc.rs @@ -1,15 +1,16 @@ //! The `rpc` module implements the Solana RPC interface. +use crate::bank_forks::BankForks; use crate::cluster_info::ClusterInfo; use crate::packet::PACKET_DATA_SIZE; use crate::storage_stage::StorageState; use bincode::{deserialize, serialize}; use bs58; -use jsonrpc_core::{Error, ErrorCode, Metadata, Result}; +use jsonrpc_core::{Error, Metadata, Result}; use jsonrpc_derive::rpc; use solana_client::rpc_signature_status::RpcSignatureStatus; use solana_drone::drone::request_airdrop_transaction; -use solana_runtime::bank::{self, Bank}; +use solana_runtime::bank; use solana_sdk::account::Account; use solana_sdk::pubkey::Pubkey; use solana_sdk::signature::Signature; @@ -38,32 +39,25 @@ impl Default for JsonRpcConfig { #[derive(Clone)] pub struct JsonRpcRequestProcessor { - bank: Option>, + bank_forks: Arc>, storage_state: StorageState, config: JsonRpcConfig, fullnode_exit: Arc, } impl JsonRpcRequestProcessor { - fn bank(&self) -> Result<&Arc> { - self.bank.as_ref().ok_or(Error { - code: ErrorCode::InternalError, - message: "No bank available".into(), - data: None, - }) - } - - pub fn set_bank(&mut self, bank: &Arc) { - self.bank = Some(bank.clone()); + fn bank(&self) -> Arc { + self.bank_forks.read().unwrap().working_bank() } pub fn new( storage_state: StorageState, config: JsonRpcConfig, + bank_forks: Arc>, fullnode_exit: &Arc, ) -> Self { JsonRpcRequestProcessor { - bank: None, + bank_forks, storage_state, config, fullnode_exit: fullnode_exit.clone(), @@ -71,29 +65,26 @@ impl JsonRpcRequestProcessor { } pub fn get_account_info(&self, pubkey: &Pubkey) -> Result { - self.bank()? + self.bank() .get_account(&pubkey) .ok_or_else(Error::invalid_request) } - pub fn get_balance(&self, pubkey: &Pubkey) -> Result { - let val = self.bank()?.get_balance(&pubkey); - Ok(val) + pub fn get_balance(&self, pubkey: &Pubkey) -> u64 { + self.bank().get_balance(&pubkey) } - fn get_recent_blockhash(&self) -> Result { - let id = self.bank()?.last_blockhash(); - Ok(bs58::encode(id).into_string()) + fn get_recent_blockhash(&self) -> String { + let id = self.bank().last_blockhash(); + bs58::encode(id).into_string() } pub fn get_signature_status(&self, signature: Signature) -> Option> { - self.bank() - .ok() - .and_then(|bank| bank.get_signature_status(&signature)) + self.bank().get_signature_status(&signature) } fn get_transaction_count(&self) -> Result { - Ok(self.bank()?.transaction_count() as u64) + Ok(self.bank().transaction_count() as u64) } fn get_storage_blockhash(&self) -> Result { @@ -235,15 +226,16 @@ impl RpcSol for RpcSolImpl { fn get_balance(&self, meta: Self::Metadata, id: String) -> Result { info!("get_balance rpc request received: {:?}", id); let pubkey = verify_pubkey(id)?; - meta.request_processor.read().unwrap().get_balance(&pubkey) + Ok(meta.request_processor.read().unwrap().get_balance(&pubkey)) } fn get_recent_blockhash(&self, meta: Self::Metadata) -> Result { info!("get_recent_blockhash rpc request received"); - meta.request_processor + Ok(meta + .request_processor .read() .unwrap() - .get_recent_blockhash() + .get_recent_blockhash()) } fn get_signature_status(&self, meta: Self::Metadata, id: String) -> Result { @@ -303,7 +295,7 @@ impl RpcSol for RpcSolImpl { .request_processor .read() .unwrap() - .bank()? + .bank() .last_blockhash(); let transaction = request_airdrop_transaction(&drone_addr, &pubkey, lamports, blockhash) .map_err(|err| { @@ -419,8 +411,8 @@ mod tests { use std::thread; fn start_rpc_handler_with_tx(pubkey: &Pubkey) -> (MetaIoHandler, Meta, Hash, Keypair) { - let (genesis_block, alice) = GenesisBlock::new(10_000); - let bank = Arc::new(Bank::new(&genesis_block)); + let (bank_forks, alice) = new_bank_forks(); + let bank = bank_forks.read().unwrap().working_bank(); let exit = Arc::new(AtomicBool::new(false)); let blockhash = bank.last_blockhash(); @@ -430,9 +422,9 @@ mod tests { let request_processor = Arc::new(RwLock::new(JsonRpcRequestProcessor::new( StorageState::default(), JsonRpcConfig::default(), + bank_forks, &exit, ))); - request_processor.write().unwrap().set_bank(&bank); let cluster_info = Arc::new(RwLock::new(ClusterInfo::new_with_invalid_keypair( ContactInfo::default(), ))); @@ -452,13 +444,16 @@ mod tests { #[test] fn test_rpc_request_processor_new() { - let (genesis_block, alice) = GenesisBlock::new(10_000); let bob_pubkey = Keypair::new().pubkey(); - let bank = Arc::new(Bank::new(&genesis_block)); let exit = Arc::new(AtomicBool::new(false)); - let mut request_processor = - JsonRpcRequestProcessor::new(StorageState::default(), JsonRpcConfig::default(), &exit); - request_processor.set_bank(&bank); + let (bank_forks, alice) = new_bank_forks(); + let bank = bank_forks.read().unwrap().working_bank(); + let request_processor = JsonRpcRequestProcessor::new( + StorageState::default(), + JsonRpcConfig::default(), + bank_forks, + &exit, + ); thread::spawn(move || { let blockhash = bank.last_blockhash(); let tx = SystemTransaction::new_move(&alice, &bob_pubkey, 20, blockhash, 0); @@ -618,8 +613,6 @@ mod tests { #[test] fn test_rpc_send_bad_tx() { - let (genesis_block, _) = GenesisBlock::new(10_000); - let bank = Arc::new(Bank::new(&genesis_block)); let exit = Arc::new(AtomicBool::new(false)); let mut io = MetaIoHandler::default(); @@ -627,12 +620,12 @@ mod tests { io.extend_with(rpc.to_delegate()); let meta = Meta { request_processor: { - let mut request_processor = JsonRpcRequestProcessor::new( + let request_processor = JsonRpcRequestProcessor::new( StorageState::default(), JsonRpcConfig::default(), + new_bank_forks().0, &exit, ); - request_processor.set_bank(&bank); Arc::new(RwLock::new(request_processor)) }, cluster_info: Arc::new(RwLock::new(ClusterInfo::new_with_invalid_keypair( @@ -694,11 +687,24 @@ mod tests { ); } + fn new_bank_forks() -> (Arc>, Keypair) { + let (genesis_block, alice) = GenesisBlock::new(10_000); + let bank = bank::Bank::new(&genesis_block); + ( + Arc::new(RwLock::new(BankForks::new(bank.slot(), bank))), + alice, + ) + } + #[test] fn test_rpc_request_processor_config_default_trait_fullnode_exit_fails() { let exit = Arc::new(AtomicBool::new(false)); - let request_processor = - JsonRpcRequestProcessor::new(StorageState::default(), JsonRpcConfig::default(), &exit); + let request_processor = JsonRpcRequestProcessor::new( + StorageState::default(), + JsonRpcConfig::default(), + new_bank_forks().0, + &exit, + ); assert_eq!(request_processor.fullnode_exit(), Ok(false)); assert_eq!(exit.load(Ordering::Relaxed), false); } @@ -708,8 +714,12 @@ mod tests { let exit = Arc::new(AtomicBool::new(false)); let mut config = JsonRpcConfig::default(); config.enable_fullnode_exit = true; - let request_processor = - JsonRpcRequestProcessor::new(StorageState::default(), config, &exit); + let request_processor = JsonRpcRequestProcessor::new( + StorageState::default(), + config, + new_bank_forks().0, + &exit, + ); assert_eq!(request_processor.fullnode_exit(), Ok(true)); assert_eq!(exit.load(Ordering::Relaxed), true); } diff --git a/core/src/rpc_service.rs b/core/src/rpc_service.rs index 4d50dee165..eb5e1b7ede 100644 --- a/core/src/rpc_service.rs +++ b/core/src/rpc_service.rs @@ -1,12 +1,12 @@ //! The `rpc_service` module implements the Solana JSON RPC service. +use crate::bank_forks::BankForks; use crate::cluster_info::ClusterInfo; use crate::rpc::*; use crate::service::Service; use crate::storage_stage::StorageState; use jsonrpc_core::MetaIoHandler; use jsonrpc_http_server::{hyper, AccessControlAllowOrigin, DomainsValidation, ServerBuilder}; -use solana_runtime::bank::Bank; use std::net::SocketAddr; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, RwLock}; @@ -24,6 +24,7 @@ impl JsonRpcService { rpc_addr: SocketAddr, storage_state: StorageState, config: JsonRpcConfig, + bank_forks: Arc>, exit: &Arc, ) -> Self { info!("rpc bound to {:?}", rpc_addr); @@ -31,6 +32,7 @@ impl JsonRpcService { let request_processor = Arc::new(RwLock::new(JsonRpcRequestProcessor::new( storage_state, config, + bank_forks, exit, ))); let request_processor_ = request_processor.clone(); @@ -69,10 +71,6 @@ impl JsonRpcService { request_processor, } } - - pub fn set_bank(&mut self, bank: &Arc) { - self.request_processor.write().unwrap().set_bank(bank); - } } impl Service for JsonRpcService { @@ -104,14 +102,15 @@ mod tests { IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), solana_netutil::find_available_port_in_range((10000, 65535)).unwrap(), ); - let mut rpc_service = JsonRpcService::new( + let bank_forks = Arc::new(RwLock::new(BankForks::new(bank.slot(), bank))); + let rpc_service = JsonRpcService::new( &cluster_info, rpc_addr, StorageState::default(), JsonRpcConfig::default(), + bank_forks, &exit, ); - rpc_service.set_bank(&Arc::new(bank)); let thread = rpc_service.thread_hdl.thread(); assert_eq!(thread.name().unwrap(), "solana-jsonrpc"); @@ -122,7 +121,6 @@ mod tests { .read() .unwrap() .get_balance(&alice.pubkey()) - .unwrap() ); exit.store(true, Ordering::Relaxed); rpc_service.join().unwrap();