From 6de5354b8e19ccf3b591f4e855913b78a93b820c Mon Sep 17 00:00:00 2001 From: Michael Vines Date: Fri, 22 Feb 2019 17:15:15 -0800 Subject: [PATCH] Update the RPC bank on fullnode rotation --- src/fullnode.rs | 8 ++++++- src/rpc.rs | 56 ++++++++++++++++++++++++++-------------------- src/rpc_service.rs | 24 +++++++++----------- 3 files changed, 49 insertions(+), 39 deletions(-) diff --git a/src/fullnode.rs b/src/fullnode.rs index b8feb29106..bc400d7b15 100644 --- a/src/fullnode.rs +++ b/src/fullnode.rs @@ -178,7 +178,6 @@ impl Fullnode { let storage_state = StorageState::new(); let rpc_service = JsonRpcService::new( - &bank_forks, &cluster_info, SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), node.info.rpc.port()), drone_addr, @@ -285,6 +284,13 @@ impl Fullnode { rotation_info.last_entry_id, ); + if let Some(ref mut rpc_service) = self.rpc_service { + // TODO: This is not the correct bank. Instead TVU should pass along the + // frozen Bank for each completed block for RPC to use from it's notion of the "best" + // available fork (until we want to surface multiple forks to RPC) + rpc_service.set_bank(self.bank_forks.read().unwrap().working_bank()); + } + if rotation_info.leader_id == self.id { let transition = match self.node_services.tpu.is_leader() { Some(was_leader) => { diff --git a/src/rpc.rs b/src/rpc.rs index 6b950e5d64..2f7644b325 100644 --- a/src/rpc.rs +++ b/src/rpc.rs @@ -1,6 +1,5 @@ //! The `rpc` module implements the Solana RPC interface. -use crate::bank_forks::BankForks; use crate::cluster_info::ClusterInfo; use crate::packet::PACKET_DATA_SIZE; use crate::rpc_status::RpcSignatureStatus; @@ -23,42 +22,52 @@ use std::time::{Duration, Instant}; #[derive(Clone)] pub struct JsonRpcRequestProcessor { - pub bank_forks: Arc>, + bank: Option>, storage_state: StorageState, } impl JsonRpcRequestProcessor { - fn bank(&self) -> Arc { - self.bank_forks.read().unwrap().working_bank() + fn bank(&self) -> Result<&Arc> { + self.bank.as_ref().ok_or(Error { + code: ErrorCode::InternalError, + message: "No bank available".into(), + data: None, + }) + } + + pub fn set_bank(&mut self, bank: Arc) { + self.bank = Some(bank); } /// Create a new request processor that wraps the given Bank. - pub fn new(bank_forks: Arc>, storage_state: StorageState) -> Self { + pub fn new(storage_state: StorageState) -> Self { JsonRpcRequestProcessor { - bank_forks, + bank: None, storage_state, } } /// Process JSON-RPC request items sent via JSON-RPC. pub fn get_account_info(&self, pubkey: Pubkey) -> Result { - self.bank() + self.bank()? .get_account(&pubkey) .ok_or_else(Error::invalid_request) } pub fn get_balance(&self, pubkey: Pubkey) -> Result { - let val = self.bank().get_balance(&pubkey); + let val = self.bank()?.get_balance(&pubkey); Ok(val) } fn get_last_id(&self) -> Result { - let id = self.bank().last_id(); + let id = self.bank()?.last_id(); Ok(bs58::encode(id).into_string()) } pub fn get_signature_status(&self, signature: Signature) -> Option> { - self.bank().get_signature_status(&signature) + self.bank() + .ok() + .and_then(|bank| bank.get_signature_status(&signature)) } fn get_transaction_count(&self) -> Result { - Ok(self.bank().transaction_count() as u64) + Ok(self.bank()?.transaction_count() as u64) } fn get_storage_mining_last_id(&self) -> Result { let id = self.storage_state.get_last_id(); @@ -236,7 +245,7 @@ impl RpcSol for RpcSolImpl { trace!("request_airdrop id={} tokens={}", id, tokens); let pubkey = verify_pubkey(id)?; - let last_id = meta.request_processor.read().unwrap().bank().last_id(); + let last_id = meta.request_processor.read().unwrap().bank()?.last_id(); let transaction = request_airdrop_transaction(&meta.drone_addr, &pubkey, tokens, last_id) .map_err(|err| { info!("request_airdrop_transaction failed: {:?}", err); @@ -344,17 +353,16 @@ mod tests { fn start_rpc_handler_with_tx(pubkey: Pubkey) -> (MetaIoHandler, Meta, Hash, Keypair) { let (genesis_block, alice) = GenesisBlock::new(10_000); - let bank_forks = BankForks::new(0, Bank::new(&genesis_block)); - let bank = bank_forks.working_bank(); + let bank = Arc::new(Bank::new(&genesis_block)); let last_id = bank.last_id(); let tx = SystemTransaction::new_move(&alice, pubkey, 20, last_id, 0); bank.process_transaction(&tx).expect("process transaction"); let request_processor = Arc::new(RwLock::new(JsonRpcRequestProcessor::new( - Arc::new(RwLock::new(bank_forks)), StorageState::default(), ))); + request_processor.write().unwrap().set_bank(bank); let cluster_info = Arc::new(RwLock::new(ClusterInfo::new(NodeInfo::default()))); let leader = NodeInfo::new_with_socketaddr(&socketaddr!("127.0.0.1:1234")); @@ -379,11 +387,10 @@ mod tests { fn test_rpc_request_processor_new() { let (genesis_block, alice) = GenesisBlock::new(10_000); let bob_pubkey = Keypair::new().pubkey(); - let bank_forks = Arc::new(RwLock::new(BankForks::new(0, Bank::new(&genesis_block)))); - let request_processor = - JsonRpcRequestProcessor::new(bank_forks.clone(), StorageState::default()); + let bank = Arc::new(Bank::new(&genesis_block)); + let mut request_processor = JsonRpcRequestProcessor::new(StorageState::default()); + request_processor.set_bank(bank.clone()); thread::spawn(move || { - let bank = bank_forks.read().unwrap().working_bank(); let last_id = bank.last_id(); let tx = SystemTransaction::new_move(&alice, bob_pubkey, 20, last_id, 0); bank.process_transaction(&tx).expect("process transaction"); @@ -543,16 +550,17 @@ mod tests { #[test] fn test_rpc_send_bad_tx() { let (genesis_block, _) = GenesisBlock::new(10_000); - let bank_forks = BankForks::new(0, Bank::new(&genesis_block)); + let bank = Arc::new(Bank::new(&genesis_block)); let mut io = MetaIoHandler::default(); let rpc = RpcSolImpl; io.extend_with(rpc.to_delegate()); let meta = Meta { - request_processor: Arc::new(RwLock::new(JsonRpcRequestProcessor::new( - Arc::new(RwLock::new(bank_forks)), - StorageState::default(), - ))), + request_processor: { + let mut request_processor = JsonRpcRequestProcessor::new(StorageState::default()); + request_processor.set_bank(bank); + Arc::new(RwLock::new(request_processor)) + }, cluster_info: Arc::new(RwLock::new(ClusterInfo::new(NodeInfo::default()))), drone_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0), rpc_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0), diff --git a/src/rpc_service.rs b/src/rpc_service.rs index 0bdb35e315..0d522331be 100644 --- a/src/rpc_service.rs +++ b/src/rpc_service.rs @@ -1,12 +1,12 @@ //! The `rpc_service` module implements the Solana JSON RPC service. -use crate::bank_forks::BankForks; use crate::cluster_info::ClusterInfo; use crate::rpc::*; use crate::service::Service; use crate::storage_stage::StorageState; use jsonrpc_core::MetaIoHandler; use jsonrpc_http_server::{hyper, AccessControlAllowOrigin, DomainsValidation, ServerBuilder}; +use solana_runtime::bank::Bank; use std::net::SocketAddr; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, RwLock}; @@ -23,7 +23,6 @@ pub struct JsonRpcService { impl JsonRpcService { pub fn new( - bank_forks: &Arc>, cluster_info: &Arc>, rpc_addr: SocketAddr, drone_addr: SocketAddr, @@ -31,10 +30,7 @@ impl JsonRpcService { ) -> Self { info!("rpc bound to {:?}", rpc_addr); let exit = Arc::new(AtomicBool::new(false)); - let request_processor = Arc::new(RwLock::new(JsonRpcRequestProcessor::new( - bank_forks.clone(), - storage_state, - ))); + let request_processor = Arc::new(RwLock::new(JsonRpcRequestProcessor::new(storage_state))); let request_processor_ = request_processor.clone(); let info = cluster_info.clone(); @@ -75,6 +71,10 @@ impl JsonRpcService { } } + pub fn set_bank(&mut self, bank: Arc) { + self.request_processor.write().unwrap().set_bank(bank); + } + pub fn exit(&self) { self.exit.store(true, Ordering::Relaxed); } @@ -105,7 +105,7 @@ mod tests { #[test] fn test_rpc_new() { let (genesis_block, alice) = GenesisBlock::new(10_000); - let bank_forks = BankForks::new(0, Bank::new(&genesis_block)); + let bank = Bank::new(&genesis_block); let cluster_info = Arc::new(RwLock::new(ClusterInfo::new(NodeInfo::default()))); let rpc_addr = SocketAddr::new( IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), @@ -115,13 +115,9 @@ mod tests { IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), solana_netutil::find_available_port_in_range((10000, 65535)).unwrap(), ); - let rpc_service = JsonRpcService::new( - &Arc::new(RwLock::new(bank_forks)), - &cluster_info, - rpc_addr, - drone_addr, - StorageState::default(), - ); + let mut rpc_service = + JsonRpcService::new(&cluster_info, rpc_addr, drone_addr, StorageState::default()); + rpc_service.set_bank(Arc::new(bank)); let thread = rpc_service.thread_hdl.thread(); assert_eq!(thread.name().unwrap(), "solana-jsonrpc");