Update the RPC bank on fullnode rotation
This commit is contained in:
parent
87281f6ed5
commit
6de5354b8e
|
@ -178,7 +178,6 @@ impl Fullnode {
|
||||||
let storage_state = StorageState::new();
|
let storage_state = StorageState::new();
|
||||||
|
|
||||||
let rpc_service = JsonRpcService::new(
|
let rpc_service = JsonRpcService::new(
|
||||||
&bank_forks,
|
|
||||||
&cluster_info,
|
&cluster_info,
|
||||||
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), node.info.rpc.port()),
|
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), node.info.rpc.port()),
|
||||||
drone_addr,
|
drone_addr,
|
||||||
|
@ -285,6 +284,13 @@ impl Fullnode {
|
||||||
rotation_info.last_entry_id,
|
rotation_info.last_entry_id,
|
||||||
);
|
);
|
||||||
|
|
||||||
|
if let Some(ref mut rpc_service) = self.rpc_service {
|
||||||
|
// TODO: This is not the correct bank. Instead TVU should pass along the
|
||||||
|
// frozen Bank for each completed block for RPC to use from it's notion of the "best"
|
||||||
|
// available fork (until we want to surface multiple forks to RPC)
|
||||||
|
rpc_service.set_bank(self.bank_forks.read().unwrap().working_bank());
|
||||||
|
}
|
||||||
|
|
||||||
if rotation_info.leader_id == self.id {
|
if rotation_info.leader_id == self.id {
|
||||||
let transition = match self.node_services.tpu.is_leader() {
|
let transition = match self.node_services.tpu.is_leader() {
|
||||||
Some(was_leader) => {
|
Some(was_leader) => {
|
||||||
|
|
56
src/rpc.rs
56
src/rpc.rs
|
@ -1,6 +1,5 @@
|
||||||
//! The `rpc` module implements the Solana RPC interface.
|
//! The `rpc` module implements the Solana RPC interface.
|
||||||
|
|
||||||
use crate::bank_forks::BankForks;
|
|
||||||
use crate::cluster_info::ClusterInfo;
|
use crate::cluster_info::ClusterInfo;
|
||||||
use crate::packet::PACKET_DATA_SIZE;
|
use crate::packet::PACKET_DATA_SIZE;
|
||||||
use crate::rpc_status::RpcSignatureStatus;
|
use crate::rpc_status::RpcSignatureStatus;
|
||||||
|
@ -23,42 +22,52 @@ use std::time::{Duration, Instant};
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct JsonRpcRequestProcessor {
|
pub struct JsonRpcRequestProcessor {
|
||||||
pub bank_forks: Arc<RwLock<BankForks>>,
|
bank: Option<Arc<Bank>>,
|
||||||
storage_state: StorageState,
|
storage_state: StorageState,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl JsonRpcRequestProcessor {
|
impl JsonRpcRequestProcessor {
|
||||||
fn bank(&self) -> Arc<Bank> {
|
fn bank(&self) -> Result<&Arc<Bank>> {
|
||||||
self.bank_forks.read().unwrap().working_bank()
|
self.bank.as_ref().ok_or(Error {
|
||||||
|
code: ErrorCode::InternalError,
|
||||||
|
message: "No bank available".into(),
|
||||||
|
data: None,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn set_bank(&mut self, bank: Arc<Bank>) {
|
||||||
|
self.bank = Some(bank);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Create a new request processor that wraps the given Bank.
|
/// Create a new request processor that wraps the given Bank.
|
||||||
pub fn new(bank_forks: Arc<RwLock<BankForks>>, storage_state: StorageState) -> Self {
|
pub fn new(storage_state: StorageState) -> Self {
|
||||||
JsonRpcRequestProcessor {
|
JsonRpcRequestProcessor {
|
||||||
bank_forks,
|
bank: None,
|
||||||
storage_state,
|
storage_state,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Process JSON-RPC request items sent via JSON-RPC.
|
/// Process JSON-RPC request items sent via JSON-RPC.
|
||||||
pub fn get_account_info(&self, pubkey: Pubkey) -> Result<Account> {
|
pub fn get_account_info(&self, pubkey: Pubkey) -> Result<Account> {
|
||||||
self.bank()
|
self.bank()?
|
||||||
.get_account(&pubkey)
|
.get_account(&pubkey)
|
||||||
.ok_or_else(Error::invalid_request)
|
.ok_or_else(Error::invalid_request)
|
||||||
}
|
}
|
||||||
pub fn get_balance(&self, pubkey: Pubkey) -> Result<u64> {
|
pub fn get_balance(&self, pubkey: Pubkey) -> Result<u64> {
|
||||||
let val = self.bank().get_balance(&pubkey);
|
let val = self.bank()?.get_balance(&pubkey);
|
||||||
Ok(val)
|
Ok(val)
|
||||||
}
|
}
|
||||||
fn get_last_id(&self) -> Result<String> {
|
fn get_last_id(&self) -> Result<String> {
|
||||||
let id = self.bank().last_id();
|
let id = self.bank()?.last_id();
|
||||||
Ok(bs58::encode(id).into_string())
|
Ok(bs58::encode(id).into_string())
|
||||||
}
|
}
|
||||||
pub fn get_signature_status(&self, signature: Signature) -> Option<bank::Result<()>> {
|
pub fn get_signature_status(&self, signature: Signature) -> Option<bank::Result<()>> {
|
||||||
self.bank().get_signature_status(&signature)
|
self.bank()
|
||||||
|
.ok()
|
||||||
|
.and_then(|bank| bank.get_signature_status(&signature))
|
||||||
}
|
}
|
||||||
fn get_transaction_count(&self) -> Result<u64> {
|
fn get_transaction_count(&self) -> Result<u64> {
|
||||||
Ok(self.bank().transaction_count() as u64)
|
Ok(self.bank()?.transaction_count() as u64)
|
||||||
}
|
}
|
||||||
fn get_storage_mining_last_id(&self) -> Result<String> {
|
fn get_storage_mining_last_id(&self) -> Result<String> {
|
||||||
let id = self.storage_state.get_last_id();
|
let id = self.storage_state.get_last_id();
|
||||||
|
@ -236,7 +245,7 @@ impl RpcSol for RpcSolImpl {
|
||||||
trace!("request_airdrop id={} tokens={}", id, tokens);
|
trace!("request_airdrop id={} tokens={}", id, tokens);
|
||||||
let pubkey = verify_pubkey(id)?;
|
let pubkey = verify_pubkey(id)?;
|
||||||
|
|
||||||
let last_id = meta.request_processor.read().unwrap().bank().last_id();
|
let last_id = meta.request_processor.read().unwrap().bank()?.last_id();
|
||||||
let transaction = request_airdrop_transaction(&meta.drone_addr, &pubkey, tokens, last_id)
|
let transaction = request_airdrop_transaction(&meta.drone_addr, &pubkey, tokens, last_id)
|
||||||
.map_err(|err| {
|
.map_err(|err| {
|
||||||
info!("request_airdrop_transaction failed: {:?}", err);
|
info!("request_airdrop_transaction failed: {:?}", err);
|
||||||
|
@ -344,17 +353,16 @@ mod tests {
|
||||||
|
|
||||||
fn start_rpc_handler_with_tx(pubkey: Pubkey) -> (MetaIoHandler<Meta>, Meta, Hash, Keypair) {
|
fn start_rpc_handler_with_tx(pubkey: Pubkey) -> (MetaIoHandler<Meta>, Meta, Hash, Keypair) {
|
||||||
let (genesis_block, alice) = GenesisBlock::new(10_000);
|
let (genesis_block, alice) = GenesisBlock::new(10_000);
|
||||||
let bank_forks = BankForks::new(0, Bank::new(&genesis_block));
|
let bank = Arc::new(Bank::new(&genesis_block));
|
||||||
let bank = bank_forks.working_bank();
|
|
||||||
|
|
||||||
let last_id = bank.last_id();
|
let last_id = bank.last_id();
|
||||||
let tx = SystemTransaction::new_move(&alice, pubkey, 20, last_id, 0);
|
let tx = SystemTransaction::new_move(&alice, pubkey, 20, last_id, 0);
|
||||||
bank.process_transaction(&tx).expect("process transaction");
|
bank.process_transaction(&tx).expect("process transaction");
|
||||||
|
|
||||||
let request_processor = Arc::new(RwLock::new(JsonRpcRequestProcessor::new(
|
let request_processor = Arc::new(RwLock::new(JsonRpcRequestProcessor::new(
|
||||||
Arc::new(RwLock::new(bank_forks)),
|
|
||||||
StorageState::default(),
|
StorageState::default(),
|
||||||
)));
|
)));
|
||||||
|
request_processor.write().unwrap().set_bank(bank);
|
||||||
let cluster_info = Arc::new(RwLock::new(ClusterInfo::new(NodeInfo::default())));
|
let cluster_info = Arc::new(RwLock::new(ClusterInfo::new(NodeInfo::default())));
|
||||||
let leader = NodeInfo::new_with_socketaddr(&socketaddr!("127.0.0.1:1234"));
|
let leader = NodeInfo::new_with_socketaddr(&socketaddr!("127.0.0.1:1234"));
|
||||||
|
|
||||||
|
@ -379,11 +387,10 @@ mod tests {
|
||||||
fn test_rpc_request_processor_new() {
|
fn test_rpc_request_processor_new() {
|
||||||
let (genesis_block, alice) = GenesisBlock::new(10_000);
|
let (genesis_block, alice) = GenesisBlock::new(10_000);
|
||||||
let bob_pubkey = Keypair::new().pubkey();
|
let bob_pubkey = Keypair::new().pubkey();
|
||||||
let bank_forks = Arc::new(RwLock::new(BankForks::new(0, Bank::new(&genesis_block))));
|
let bank = Arc::new(Bank::new(&genesis_block));
|
||||||
let request_processor =
|
let mut request_processor = JsonRpcRequestProcessor::new(StorageState::default());
|
||||||
JsonRpcRequestProcessor::new(bank_forks.clone(), StorageState::default());
|
request_processor.set_bank(bank.clone());
|
||||||
thread::spawn(move || {
|
thread::spawn(move || {
|
||||||
let bank = bank_forks.read().unwrap().working_bank();
|
|
||||||
let last_id = bank.last_id();
|
let last_id = bank.last_id();
|
||||||
let tx = SystemTransaction::new_move(&alice, bob_pubkey, 20, last_id, 0);
|
let tx = SystemTransaction::new_move(&alice, bob_pubkey, 20, last_id, 0);
|
||||||
bank.process_transaction(&tx).expect("process transaction");
|
bank.process_transaction(&tx).expect("process transaction");
|
||||||
|
@ -543,16 +550,17 @@ mod tests {
|
||||||
#[test]
|
#[test]
|
||||||
fn test_rpc_send_bad_tx() {
|
fn test_rpc_send_bad_tx() {
|
||||||
let (genesis_block, _) = GenesisBlock::new(10_000);
|
let (genesis_block, _) = GenesisBlock::new(10_000);
|
||||||
let bank_forks = BankForks::new(0, Bank::new(&genesis_block));
|
let bank = Arc::new(Bank::new(&genesis_block));
|
||||||
|
|
||||||
let mut io = MetaIoHandler::default();
|
let mut io = MetaIoHandler::default();
|
||||||
let rpc = RpcSolImpl;
|
let rpc = RpcSolImpl;
|
||||||
io.extend_with(rpc.to_delegate());
|
io.extend_with(rpc.to_delegate());
|
||||||
let meta = Meta {
|
let meta = Meta {
|
||||||
request_processor: Arc::new(RwLock::new(JsonRpcRequestProcessor::new(
|
request_processor: {
|
||||||
Arc::new(RwLock::new(bank_forks)),
|
let mut request_processor = JsonRpcRequestProcessor::new(StorageState::default());
|
||||||
StorageState::default(),
|
request_processor.set_bank(bank);
|
||||||
))),
|
Arc::new(RwLock::new(request_processor))
|
||||||
|
},
|
||||||
cluster_info: Arc::new(RwLock::new(ClusterInfo::new(NodeInfo::default()))),
|
cluster_info: Arc::new(RwLock::new(ClusterInfo::new(NodeInfo::default()))),
|
||||||
drone_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0),
|
drone_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0),
|
||||||
rpc_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0),
|
rpc_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0),
|
||||||
|
|
|
@ -1,12 +1,12 @@
|
||||||
//! The `rpc_service` module implements the Solana JSON RPC service.
|
//! The `rpc_service` module implements the Solana JSON RPC service.
|
||||||
|
|
||||||
use crate::bank_forks::BankForks;
|
|
||||||
use crate::cluster_info::ClusterInfo;
|
use crate::cluster_info::ClusterInfo;
|
||||||
use crate::rpc::*;
|
use crate::rpc::*;
|
||||||
use crate::service::Service;
|
use crate::service::Service;
|
||||||
use crate::storage_stage::StorageState;
|
use crate::storage_stage::StorageState;
|
||||||
use jsonrpc_core::MetaIoHandler;
|
use jsonrpc_core::MetaIoHandler;
|
||||||
use jsonrpc_http_server::{hyper, AccessControlAllowOrigin, DomainsValidation, ServerBuilder};
|
use jsonrpc_http_server::{hyper, AccessControlAllowOrigin, DomainsValidation, ServerBuilder};
|
||||||
|
use solana_runtime::bank::Bank;
|
||||||
use std::net::SocketAddr;
|
use std::net::SocketAddr;
|
||||||
use std::sync::atomic::{AtomicBool, Ordering};
|
use std::sync::atomic::{AtomicBool, Ordering};
|
||||||
use std::sync::{Arc, RwLock};
|
use std::sync::{Arc, RwLock};
|
||||||
|
@ -23,7 +23,6 @@ pub struct JsonRpcService {
|
||||||
|
|
||||||
impl JsonRpcService {
|
impl JsonRpcService {
|
||||||
pub fn new(
|
pub fn new(
|
||||||
bank_forks: &Arc<RwLock<BankForks>>,
|
|
||||||
cluster_info: &Arc<RwLock<ClusterInfo>>,
|
cluster_info: &Arc<RwLock<ClusterInfo>>,
|
||||||
rpc_addr: SocketAddr,
|
rpc_addr: SocketAddr,
|
||||||
drone_addr: SocketAddr,
|
drone_addr: SocketAddr,
|
||||||
|
@ -31,10 +30,7 @@ impl JsonRpcService {
|
||||||
) -> Self {
|
) -> Self {
|
||||||
info!("rpc bound to {:?}", rpc_addr);
|
info!("rpc bound to {:?}", rpc_addr);
|
||||||
let exit = Arc::new(AtomicBool::new(false));
|
let exit = Arc::new(AtomicBool::new(false));
|
||||||
let request_processor = Arc::new(RwLock::new(JsonRpcRequestProcessor::new(
|
let request_processor = Arc::new(RwLock::new(JsonRpcRequestProcessor::new(storage_state)));
|
||||||
bank_forks.clone(),
|
|
||||||
storage_state,
|
|
||||||
)));
|
|
||||||
let request_processor_ = request_processor.clone();
|
let request_processor_ = request_processor.clone();
|
||||||
|
|
||||||
let info = cluster_info.clone();
|
let info = cluster_info.clone();
|
||||||
|
@ -75,6 +71,10 @@ impl JsonRpcService {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn set_bank(&mut self, bank: Arc<Bank>) {
|
||||||
|
self.request_processor.write().unwrap().set_bank(bank);
|
||||||
|
}
|
||||||
|
|
||||||
pub fn exit(&self) {
|
pub fn exit(&self) {
|
||||||
self.exit.store(true, Ordering::Relaxed);
|
self.exit.store(true, Ordering::Relaxed);
|
||||||
}
|
}
|
||||||
|
@ -105,7 +105,7 @@ mod tests {
|
||||||
#[test]
|
#[test]
|
||||||
fn test_rpc_new() {
|
fn test_rpc_new() {
|
||||||
let (genesis_block, alice) = GenesisBlock::new(10_000);
|
let (genesis_block, alice) = GenesisBlock::new(10_000);
|
||||||
let bank_forks = BankForks::new(0, Bank::new(&genesis_block));
|
let bank = Bank::new(&genesis_block);
|
||||||
let cluster_info = Arc::new(RwLock::new(ClusterInfo::new(NodeInfo::default())));
|
let cluster_info = Arc::new(RwLock::new(ClusterInfo::new(NodeInfo::default())));
|
||||||
let rpc_addr = SocketAddr::new(
|
let rpc_addr = SocketAddr::new(
|
||||||
IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
|
IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
|
||||||
|
@ -115,13 +115,9 @@ mod tests {
|
||||||
IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
|
IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
|
||||||
solana_netutil::find_available_port_in_range((10000, 65535)).unwrap(),
|
solana_netutil::find_available_port_in_range((10000, 65535)).unwrap(),
|
||||||
);
|
);
|
||||||
let rpc_service = JsonRpcService::new(
|
let mut rpc_service =
|
||||||
&Arc::new(RwLock::new(bank_forks)),
|
JsonRpcService::new(&cluster_info, rpc_addr, drone_addr, StorageState::default());
|
||||||
&cluster_info,
|
rpc_service.set_bank(Arc::new(bank));
|
||||||
rpc_addr,
|
|
||||||
drone_addr,
|
|
||||||
StorageState::default(),
|
|
||||||
);
|
|
||||||
let thread = rpc_service.thread_hdl.thread();
|
let thread = rpc_service.thread_hdl.thread();
|
||||||
assert_eq!(thread.name().unwrap(), "solana-jsonrpc");
|
assert_eq!(thread.name().unwrap(), "solana-jsonrpc");
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue