bank fork rpc (#3351)

This commit is contained in:
anatoly yakovenko 2019-03-18 14:18:43 -07:00 committed by GitHub
parent efc39ffdde
commit 211c81f2a2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 63 additions and 73 deletions

View File

@ -32,10 +32,7 @@ use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::Receiver; use std::sync::mpsc::Receiver;
use std::sync::{Arc, Mutex, RwLock}; use std::sync::{Arc, Mutex, RwLock};
use std::thread::sleep; use std::thread::Result;
use std::thread::JoinHandle;
use std::thread::{spawn, Result};
use std::time::Duration;
pub struct FullnodeConfig { pub struct FullnodeConfig {
pub sigverify_disabled: bool, pub sigverify_disabled: bool,
@ -69,7 +66,6 @@ pub struct Fullnode {
exit: Arc<AtomicBool>, exit: Arc<AtomicBool>,
rpc_service: Option<JsonRpcService>, rpc_service: Option<JsonRpcService>,
rpc_pubsub_service: Option<PubSubService>, rpc_pubsub_service: Option<PubSubService>,
rpc_working_bank_handle: JoinHandle<()>,
gossip_service: GossipService, gossip_service: GossipService,
poh_recorder: Arc<Mutex<PohRecorder>>, poh_recorder: Arc<Mutex<PohRecorder>>,
poh_service: PohService, poh_service: PohService,
@ -147,6 +143,7 @@ impl Fullnode {
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), node.info.rpc.port()), SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), node.info.rpc.port()),
storage_state.clone(), storage_state.clone(),
config.rpc_config.clone(), config.rpc_config.clone(),
bank_forks.clone(),
&exit, &exit,
); );
@ -232,19 +229,6 @@ impl Fullnode {
&blocktree, &blocktree,
&exit, &exit,
); );
let exit_ = exit.clone();
let bank_forks_ = bank_forks.clone();
let rpc_service_rp = rpc_service.request_processor.clone();
let rpc_working_bank_handle = spawn(move || loop {
if exit_.load(Ordering::Relaxed) {
break;
}
let bank = bank_forks_.read().unwrap().working_bank();
trace!("rpc working bank {} {}", bank.slot(), bank.last_blockhash());
rpc_service_rp.write().unwrap().set_bank(&bank);
let timer = Duration::from_millis(100);
sleep(timer);
});
inc_new_counter_info!("fullnode-new", 1); inc_new_counter_info!("fullnode-new", 1);
Self { Self {
@ -252,7 +236,6 @@ impl Fullnode {
gossip_service, gossip_service,
rpc_service: Some(rpc_service), rpc_service: Some(rpc_service),
rpc_pubsub_service: Some(rpc_pubsub_service), rpc_pubsub_service: Some(rpc_pubsub_service),
rpc_working_bank_handle,
tpu, tpu,
tvu, tvu,
exit, exit,
@ -316,7 +299,6 @@ impl Service for Fullnode {
rpc_pubsub_service.join()?; rpc_pubsub_service.join()?;
} }
self.rpc_working_bank_handle.join()?;
self.gossip_service.join()?; self.gossip_service.join()?;
self.tpu.join()?; self.tpu.join()?;
self.tvu.join()?; self.tvu.join()?;

View File

@ -1,15 +1,16 @@
//! The `rpc` module implements the Solana RPC interface. //! The `rpc` module implements the Solana RPC interface.
use crate::bank_forks::BankForks;
use crate::cluster_info::ClusterInfo; use crate::cluster_info::ClusterInfo;
use crate::packet::PACKET_DATA_SIZE; use crate::packet::PACKET_DATA_SIZE;
use crate::storage_stage::StorageState; use crate::storage_stage::StorageState;
use bincode::{deserialize, serialize}; use bincode::{deserialize, serialize};
use bs58; use bs58;
use jsonrpc_core::{Error, ErrorCode, Metadata, Result}; use jsonrpc_core::{Error, Metadata, Result};
use jsonrpc_derive::rpc; use jsonrpc_derive::rpc;
use solana_client::rpc_signature_status::RpcSignatureStatus; use solana_client::rpc_signature_status::RpcSignatureStatus;
use solana_drone::drone::request_airdrop_transaction; use solana_drone::drone::request_airdrop_transaction;
use solana_runtime::bank::{self, Bank}; use solana_runtime::bank;
use solana_sdk::account::Account; use solana_sdk::account::Account;
use solana_sdk::pubkey::Pubkey; use solana_sdk::pubkey::Pubkey;
use solana_sdk::signature::Signature; use solana_sdk::signature::Signature;
@ -38,32 +39,25 @@ impl Default for JsonRpcConfig {
#[derive(Clone)] #[derive(Clone)]
pub struct JsonRpcRequestProcessor { pub struct JsonRpcRequestProcessor {
bank: Option<Arc<Bank>>, bank_forks: Arc<RwLock<BankForks>>,
storage_state: StorageState, storage_state: StorageState,
config: JsonRpcConfig, config: JsonRpcConfig,
fullnode_exit: Arc<AtomicBool>, fullnode_exit: Arc<AtomicBool>,
} }
impl JsonRpcRequestProcessor { impl JsonRpcRequestProcessor {
fn bank(&self) -> Result<&Arc<Bank>> { fn bank(&self) -> Arc<bank::Bank> {
self.bank.as_ref().ok_or(Error { self.bank_forks.read().unwrap().working_bank()
code: ErrorCode::InternalError,
message: "No bank available".into(),
data: None,
})
}
pub fn set_bank(&mut self, bank: &Arc<Bank>) {
self.bank = Some(bank.clone());
} }
pub fn new( pub fn new(
storage_state: StorageState, storage_state: StorageState,
config: JsonRpcConfig, config: JsonRpcConfig,
bank_forks: Arc<RwLock<BankForks>>,
fullnode_exit: &Arc<AtomicBool>, fullnode_exit: &Arc<AtomicBool>,
) -> Self { ) -> Self {
JsonRpcRequestProcessor { JsonRpcRequestProcessor {
bank: None, bank_forks,
storage_state, storage_state,
config, config,
fullnode_exit: fullnode_exit.clone(), fullnode_exit: fullnode_exit.clone(),
@ -71,29 +65,26 @@ impl JsonRpcRequestProcessor {
} }
pub fn get_account_info(&self, pubkey: &Pubkey) -> Result<Account> { pub fn get_account_info(&self, pubkey: &Pubkey) -> Result<Account> {
self.bank()? self.bank()
.get_account(&pubkey) .get_account(&pubkey)
.ok_or_else(Error::invalid_request) .ok_or_else(Error::invalid_request)
} }
pub fn get_balance(&self, pubkey: &Pubkey) -> Result<u64> { pub fn get_balance(&self, pubkey: &Pubkey) -> u64 {
let val = self.bank()?.get_balance(&pubkey); self.bank().get_balance(&pubkey)
Ok(val)
} }
fn get_recent_blockhash(&self) -> Result<String> { fn get_recent_blockhash(&self) -> String {
let id = self.bank()?.last_blockhash(); let id = self.bank().last_blockhash();
Ok(bs58::encode(id).into_string()) bs58::encode(id).into_string()
} }
pub fn get_signature_status(&self, signature: Signature) -> Option<bank::Result<()>> { pub fn get_signature_status(&self, signature: Signature) -> Option<bank::Result<()>> {
self.bank() self.bank().get_signature_status(&signature)
.ok()
.and_then(|bank| bank.get_signature_status(&signature))
} }
fn get_transaction_count(&self) -> Result<u64> { fn get_transaction_count(&self) -> Result<u64> {
Ok(self.bank()?.transaction_count() as u64) Ok(self.bank().transaction_count() as u64)
} }
fn get_storage_blockhash(&self) -> Result<String> { fn get_storage_blockhash(&self) -> Result<String> {
@ -235,15 +226,16 @@ impl RpcSol for RpcSolImpl {
fn get_balance(&self, meta: Self::Metadata, id: String) -> Result<u64> { fn get_balance(&self, meta: Self::Metadata, id: String) -> Result<u64> {
info!("get_balance rpc request received: {:?}", id); info!("get_balance rpc request received: {:?}", id);
let pubkey = verify_pubkey(id)?; let pubkey = verify_pubkey(id)?;
meta.request_processor.read().unwrap().get_balance(&pubkey) Ok(meta.request_processor.read().unwrap().get_balance(&pubkey))
} }
fn get_recent_blockhash(&self, meta: Self::Metadata) -> Result<String> { fn get_recent_blockhash(&self, meta: Self::Metadata) -> Result<String> {
info!("get_recent_blockhash rpc request received"); info!("get_recent_blockhash rpc request received");
meta.request_processor Ok(meta
.request_processor
.read() .read()
.unwrap() .unwrap()
.get_recent_blockhash() .get_recent_blockhash())
} }
fn get_signature_status(&self, meta: Self::Metadata, id: String) -> Result<RpcSignatureStatus> { fn get_signature_status(&self, meta: Self::Metadata, id: String) -> Result<RpcSignatureStatus> {
@ -303,7 +295,7 @@ impl RpcSol for RpcSolImpl {
.request_processor .request_processor
.read() .read()
.unwrap() .unwrap()
.bank()? .bank()
.last_blockhash(); .last_blockhash();
let transaction = request_airdrop_transaction(&drone_addr, &pubkey, lamports, blockhash) let transaction = request_airdrop_transaction(&drone_addr, &pubkey, lamports, blockhash)
.map_err(|err| { .map_err(|err| {
@ -419,8 +411,8 @@ mod tests {
use std::thread; use std::thread;
fn start_rpc_handler_with_tx(pubkey: &Pubkey) -> (MetaIoHandler<Meta>, Meta, Hash, Keypair) { fn start_rpc_handler_with_tx(pubkey: &Pubkey) -> (MetaIoHandler<Meta>, Meta, Hash, Keypair) {
let (genesis_block, alice) = GenesisBlock::new(10_000); let (bank_forks, alice) = new_bank_forks();
let bank = Arc::new(Bank::new(&genesis_block)); let bank = bank_forks.read().unwrap().working_bank();
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
let blockhash = bank.last_blockhash(); let blockhash = bank.last_blockhash();
@ -430,9 +422,9 @@ mod tests {
let request_processor = Arc::new(RwLock::new(JsonRpcRequestProcessor::new( let request_processor = Arc::new(RwLock::new(JsonRpcRequestProcessor::new(
StorageState::default(), StorageState::default(),
JsonRpcConfig::default(), JsonRpcConfig::default(),
bank_forks,
&exit, &exit,
))); )));
request_processor.write().unwrap().set_bank(&bank);
let cluster_info = Arc::new(RwLock::new(ClusterInfo::new_with_invalid_keypair( let cluster_info = Arc::new(RwLock::new(ClusterInfo::new_with_invalid_keypair(
ContactInfo::default(), ContactInfo::default(),
))); )));
@ -452,13 +444,16 @@ mod tests {
#[test] #[test]
fn test_rpc_request_processor_new() { fn test_rpc_request_processor_new() {
let (genesis_block, alice) = GenesisBlock::new(10_000);
let bob_pubkey = Keypair::new().pubkey(); let bob_pubkey = Keypair::new().pubkey();
let bank = Arc::new(Bank::new(&genesis_block));
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
let mut request_processor = let (bank_forks, alice) = new_bank_forks();
JsonRpcRequestProcessor::new(StorageState::default(), JsonRpcConfig::default(), &exit); let bank = bank_forks.read().unwrap().working_bank();
request_processor.set_bank(&bank); let request_processor = JsonRpcRequestProcessor::new(
StorageState::default(),
JsonRpcConfig::default(),
bank_forks,
&exit,
);
thread::spawn(move || { thread::spawn(move || {
let blockhash = bank.last_blockhash(); let blockhash = bank.last_blockhash();
let tx = SystemTransaction::new_move(&alice, &bob_pubkey, 20, blockhash, 0); let tx = SystemTransaction::new_move(&alice, &bob_pubkey, 20, blockhash, 0);
@ -618,8 +613,6 @@ mod tests {
#[test] #[test]
fn test_rpc_send_bad_tx() { fn test_rpc_send_bad_tx() {
let (genesis_block, _) = GenesisBlock::new(10_000);
let bank = Arc::new(Bank::new(&genesis_block));
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
let mut io = MetaIoHandler::default(); let mut io = MetaIoHandler::default();
@ -627,12 +620,12 @@ mod tests {
io.extend_with(rpc.to_delegate()); io.extend_with(rpc.to_delegate());
let meta = Meta { let meta = Meta {
request_processor: { request_processor: {
let mut request_processor = JsonRpcRequestProcessor::new( let request_processor = JsonRpcRequestProcessor::new(
StorageState::default(), StorageState::default(),
JsonRpcConfig::default(), JsonRpcConfig::default(),
new_bank_forks().0,
&exit, &exit,
); );
request_processor.set_bank(&bank);
Arc::new(RwLock::new(request_processor)) Arc::new(RwLock::new(request_processor))
}, },
cluster_info: Arc::new(RwLock::new(ClusterInfo::new_with_invalid_keypair( cluster_info: Arc::new(RwLock::new(ClusterInfo::new_with_invalid_keypair(
@ -694,11 +687,24 @@ mod tests {
); );
} }
fn new_bank_forks() -> (Arc<RwLock<BankForks>>, Keypair) {
let (genesis_block, alice) = GenesisBlock::new(10_000);
let bank = bank::Bank::new(&genesis_block);
(
Arc::new(RwLock::new(BankForks::new(bank.slot(), bank))),
alice,
)
}
#[test] #[test]
fn test_rpc_request_processor_config_default_trait_fullnode_exit_fails() { fn test_rpc_request_processor_config_default_trait_fullnode_exit_fails() {
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
let request_processor = let request_processor = JsonRpcRequestProcessor::new(
JsonRpcRequestProcessor::new(StorageState::default(), JsonRpcConfig::default(), &exit); StorageState::default(),
JsonRpcConfig::default(),
new_bank_forks().0,
&exit,
);
assert_eq!(request_processor.fullnode_exit(), Ok(false)); assert_eq!(request_processor.fullnode_exit(), Ok(false));
assert_eq!(exit.load(Ordering::Relaxed), false); assert_eq!(exit.load(Ordering::Relaxed), false);
} }
@ -708,8 +714,12 @@ mod tests {
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
let mut config = JsonRpcConfig::default(); let mut config = JsonRpcConfig::default();
config.enable_fullnode_exit = true; config.enable_fullnode_exit = true;
let request_processor = let request_processor = JsonRpcRequestProcessor::new(
JsonRpcRequestProcessor::new(StorageState::default(), config, &exit); StorageState::default(),
config,
new_bank_forks().0,
&exit,
);
assert_eq!(request_processor.fullnode_exit(), Ok(true)); assert_eq!(request_processor.fullnode_exit(), Ok(true));
assert_eq!(exit.load(Ordering::Relaxed), true); assert_eq!(exit.load(Ordering::Relaxed), true);
} }

View File

@ -1,12 +1,12 @@
//! The `rpc_service` module implements the Solana JSON RPC service. //! The `rpc_service` module implements the Solana JSON RPC service.
use crate::bank_forks::BankForks;
use crate::cluster_info::ClusterInfo; use crate::cluster_info::ClusterInfo;
use crate::rpc::*; use crate::rpc::*;
use crate::service::Service; use crate::service::Service;
use crate::storage_stage::StorageState; use crate::storage_stage::StorageState;
use jsonrpc_core::MetaIoHandler; use jsonrpc_core::MetaIoHandler;
use jsonrpc_http_server::{hyper, AccessControlAllowOrigin, DomainsValidation, ServerBuilder}; use jsonrpc_http_server::{hyper, AccessControlAllowOrigin, DomainsValidation, ServerBuilder};
use solana_runtime::bank::Bank;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
@ -24,6 +24,7 @@ impl JsonRpcService {
rpc_addr: SocketAddr, rpc_addr: SocketAddr,
storage_state: StorageState, storage_state: StorageState,
config: JsonRpcConfig, config: JsonRpcConfig,
bank_forks: Arc<RwLock<BankForks>>,
exit: &Arc<AtomicBool>, exit: &Arc<AtomicBool>,
) -> Self { ) -> Self {
info!("rpc bound to {:?}", rpc_addr); info!("rpc bound to {:?}", rpc_addr);
@ -31,6 +32,7 @@ impl JsonRpcService {
let request_processor = Arc::new(RwLock::new(JsonRpcRequestProcessor::new( let request_processor = Arc::new(RwLock::new(JsonRpcRequestProcessor::new(
storage_state, storage_state,
config, config,
bank_forks,
exit, exit,
))); )));
let request_processor_ = request_processor.clone(); let request_processor_ = request_processor.clone();
@ -69,10 +71,6 @@ impl JsonRpcService {
request_processor, request_processor,
} }
} }
pub fn set_bank(&mut self, bank: &Arc<Bank>) {
self.request_processor.write().unwrap().set_bank(bank);
}
} }
impl Service for JsonRpcService { impl Service for JsonRpcService {
@ -104,14 +102,15 @@ mod tests {
IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
solana_netutil::find_available_port_in_range((10000, 65535)).unwrap(), solana_netutil::find_available_port_in_range((10000, 65535)).unwrap(),
); );
let mut rpc_service = JsonRpcService::new( let bank_forks = Arc::new(RwLock::new(BankForks::new(bank.slot(), bank)));
let rpc_service = JsonRpcService::new(
&cluster_info, &cluster_info,
rpc_addr, rpc_addr,
StorageState::default(), StorageState::default(),
JsonRpcConfig::default(), JsonRpcConfig::default(),
bank_forks,
&exit, &exit,
); );
rpc_service.set_bank(&Arc::new(bank));
let thread = rpc_service.thread_hdl.thread(); let thread = rpc_service.thread_hdl.thread();
assert_eq!(thread.name().unwrap(), "solana-jsonrpc"); assert_eq!(thread.name().unwrap(), "solana-jsonrpc");
@ -122,7 +121,6 @@ mod tests {
.read() .read()
.unwrap() .unwrap()
.get_balance(&alice.pubkey()) .get_balance(&alice.pubkey())
.unwrap()
); );
exit.store(true, Ordering::Relaxed); exit.store(true, Ordering::Relaxed);
rpc_service.join().unwrap(); rpc_service.join().unwrap();