Plumb BankForks into RPC subsystem
This commit is contained in:
parent
13d018e3e1
commit
4d5e2c8a4d
|
@ -100,7 +100,7 @@ pub struct Fullnode {
|
|||
rpc_service: Option<JsonRpcService>,
|
||||
rpc_pubsub_service: Option<PubSubService>,
|
||||
gossip_service: GossipService,
|
||||
bank_forks: BankForks,
|
||||
bank_forks: Arc<RwLock<BankForks>>,
|
||||
sigverify_disabled: bool,
|
||||
tpu_sockets: Vec<UdpSocket>,
|
||||
broadcast_socket: UdpSocket,
|
||||
|
@ -149,6 +149,7 @@ impl Fullnode {
|
|||
|
||||
let exit = Arc::new(AtomicBool::new(false));
|
||||
let blocktree = Arc::new(blocktree);
|
||||
let bank_forks = Arc::new(RwLock::new(bank_forks));
|
||||
|
||||
node.info.wallclock = timestamp();
|
||||
let cluster_info = Arc::new(RwLock::new(ClusterInfo::new_with_keypair(
|
||||
|
@ -172,7 +173,7 @@ impl Fullnode {
|
|||
let storage_state = StorageState::new();
|
||||
|
||||
let rpc_service = JsonRpcService::new(
|
||||
&bank,
|
||||
&bank_forks,
|
||||
&cluster_info,
|
||||
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), node.info.rpc.port()),
|
||||
drone_addr,
|
||||
|
@ -422,7 +423,7 @@ impl Fullnode {
|
|||
match self.rotation_receiver.recv_timeout(timeout) {
|
||||
Ok(tick_height) => {
|
||||
trace!("{:?}: rotate at tick_height={}", self.id, tick_height);
|
||||
let bank = self.bank_forks.working_bank();
|
||||
let bank = self.bank_forks.read().unwrap().working_bank();
|
||||
let (next_leader, max_tick_height) = self.get_next_leader(&bank, tick_height);
|
||||
let transition =
|
||||
self.rotate(&bank, next_leader, max_tick_height, 0, &bank.last_id());
|
||||
|
|
44
src/rpc.rs
44
src/rpc.rs
|
@ -1,5 +1,6 @@
|
|||
//! The `rpc` module implements the Solana RPC interface.
|
||||
|
||||
use crate::bank_forks::BankForks;
|
||||
use crate::cluster_info::ClusterInfo;
|
||||
use crate::packet::PACKET_DATA_SIZE;
|
||||
use crate::rpc_status::RpcSignatureStatus;
|
||||
|
@ -22,38 +23,42 @@ use std::time::{Duration, Instant};
|
|||
|
||||
#[derive(Clone)]
|
||||
pub struct JsonRpcRequestProcessor {
|
||||
pub bank: Arc<Bank>,
|
||||
pub bank_forks: Arc<RwLock<BankForks>>,
|
||||
storage_state: StorageState,
|
||||
}
|
||||
|
||||
impl JsonRpcRequestProcessor {
|
||||
fn bank(&self) -> Arc<Bank> {
|
||||
self.bank_forks.read().unwrap().working_bank()
|
||||
}
|
||||
|
||||
/// Create a new request processor that wraps the given Bank.
|
||||
pub fn new(bank: Arc<Bank>, storage_state: StorageState) -> Self {
|
||||
pub fn new(bank_forks: Arc<RwLock<BankForks>>, storage_state: StorageState) -> Self {
|
||||
JsonRpcRequestProcessor {
|
||||
bank,
|
||||
bank_forks,
|
||||
storage_state,
|
||||
}
|
||||
}
|
||||
|
||||
/// Process JSON-RPC request items sent via JSON-RPC.
|
||||
pub fn get_account_info(&self, pubkey: Pubkey) -> Result<Account> {
|
||||
self.bank
|
||||
self.bank()
|
||||
.get_account(&pubkey)
|
||||
.ok_or_else(Error::invalid_request)
|
||||
}
|
||||
pub fn get_balance(&self, pubkey: Pubkey) -> Result<u64> {
|
||||
let val = self.bank.get_balance(&pubkey);
|
||||
let val = self.bank().get_balance(&pubkey);
|
||||
Ok(val)
|
||||
}
|
||||
fn get_last_id(&self) -> Result<String> {
|
||||
let id = self.bank.last_id();
|
||||
let id = self.bank().last_id();
|
||||
Ok(bs58::encode(id).into_string())
|
||||
}
|
||||
pub fn get_signature_status(&self, signature: Signature) -> Option<bank::Result<()>> {
|
||||
self.bank.get_signature_status(&signature)
|
||||
self.bank().get_signature_status(&signature)
|
||||
}
|
||||
fn get_transaction_count(&self) -> Result<u64> {
|
||||
Ok(self.bank.transaction_count() as u64)
|
||||
Ok(self.bank().transaction_count() as u64)
|
||||
}
|
||||
fn get_storage_mining_last_id(&self) -> Result<String> {
|
||||
let id = self.storage_state.get_last_id();
|
||||
|
@ -231,7 +236,7 @@ impl RpcSol for RpcSolImpl {
|
|||
trace!("request_airdrop id={} tokens={}", id, tokens);
|
||||
let pubkey = verify_pubkey(id)?;
|
||||
|
||||
let last_id = meta.request_processor.read().unwrap().bank.last_id();
|
||||
let last_id = meta.request_processor.read().unwrap().bank().last_id();
|
||||
let transaction = request_airdrop_transaction(&meta.drone_addr, &pubkey, tokens, last_id)
|
||||
.map_err(|err| {
|
||||
info!("request_airdrop_transaction failed: {:?}", err);
|
||||
|
@ -339,14 +344,15 @@ mod tests {
|
|||
|
||||
fn start_rpc_handler_with_tx(pubkey: Pubkey) -> (MetaIoHandler<Meta>, Meta, Hash, Keypair) {
|
||||
let (genesis_block, alice) = GenesisBlock::new(10_000);
|
||||
let bank = Bank::new(&genesis_block);
|
||||
let bank_forks = BankForks::new(0, Bank::new(&genesis_block));
|
||||
let bank = bank_forks.working_bank();
|
||||
|
||||
let last_id = bank.last_id();
|
||||
let tx = SystemTransaction::new_move(&alice, pubkey, 20, last_id, 0);
|
||||
bank.process_transaction(&tx).expect("process transaction");
|
||||
|
||||
let request_processor = Arc::new(RwLock::new(JsonRpcRequestProcessor::new(
|
||||
Arc::new(bank),
|
||||
Arc::new(RwLock::new(bank_forks)),
|
||||
StorageState::default(),
|
||||
)));
|
||||
let cluster_info = Arc::new(RwLock::new(ClusterInfo::new(NodeInfo::default())));
|
||||
|
@ -373,16 +379,14 @@ mod tests {
|
|||
fn test_rpc_request_processor_new() {
|
||||
let (genesis_block, alice) = GenesisBlock::new(10_000);
|
||||
let bob_pubkey = Keypair::new().pubkey();
|
||||
let bank = Bank::new(&genesis_block);
|
||||
let arc_bank = Arc::new(bank);
|
||||
let bank_forks = Arc::new(RwLock::new(BankForks::new(0, Bank::new(&genesis_block))));
|
||||
let request_processor =
|
||||
JsonRpcRequestProcessor::new(arc_bank.clone(), StorageState::default());
|
||||
JsonRpcRequestProcessor::new(bank_forks.clone(), StorageState::default());
|
||||
thread::spawn(move || {
|
||||
let last_id = arc_bank.last_id();
|
||||
let bank = bank_forks.read().unwrap().working_bank();
|
||||
let last_id = bank.last_id();
|
||||
let tx = SystemTransaction::new_move(&alice, bob_pubkey, 20, last_id, 0);
|
||||
arc_bank
|
||||
.process_transaction(&tx)
|
||||
.expect("process transaction");
|
||||
bank.process_transaction(&tx).expect("process transaction");
|
||||
})
|
||||
.join()
|
||||
.unwrap();
|
||||
|
@ -539,14 +543,14 @@ mod tests {
|
|||
#[test]
|
||||
fn test_rpc_send_bad_tx() {
|
||||
let (genesis_block, _) = GenesisBlock::new(10_000);
|
||||
let bank = Bank::new(&genesis_block);
|
||||
let bank_forks = BankForks::new(0, Bank::new(&genesis_block));
|
||||
|
||||
let mut io = MetaIoHandler::default();
|
||||
let rpc = RpcSolImpl;
|
||||
io.extend_with(rpc.to_delegate());
|
||||
let meta = Meta {
|
||||
request_processor: Arc::new(RwLock::new(JsonRpcRequestProcessor::new(
|
||||
Arc::new(bank),
|
||||
Arc::new(RwLock::new(bank_forks)),
|
||||
StorageState::default(),
|
||||
))),
|
||||
cluster_info: Arc::new(RwLock::new(ClusterInfo::new(NodeInfo::default()))),
|
||||
|
|
|
@ -1,12 +1,12 @@
|
|||
//! The `rpc_service` module implements the Solana JSON RPC service.
|
||||
|
||||
use crate::bank_forks::BankForks;
|
||||
use crate::cluster_info::ClusterInfo;
|
||||
use crate::rpc::*;
|
||||
use crate::service::Service;
|
||||
use crate::storage_stage::StorageState;
|
||||
use jsonrpc_core::MetaIoHandler;
|
||||
use jsonrpc_http_server::{hyper, AccessControlAllowOrigin, DomainsValidation, ServerBuilder};
|
||||
use solana_runtime::bank::Bank;
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::{Arc, RwLock};
|
||||
|
@ -18,12 +18,12 @@ pub const RPC_PORT: u16 = 8899;
|
|||
pub struct JsonRpcService {
|
||||
thread_hdl: JoinHandle<()>,
|
||||
exit: Arc<AtomicBool>,
|
||||
request_processor: Arc<RwLock<JsonRpcRequestProcessor>>,
|
||||
pub request_processor: Arc<RwLock<JsonRpcRequestProcessor>>, // Used only by tests...
|
||||
}
|
||||
|
||||
impl JsonRpcService {
|
||||
pub fn new(
|
||||
bank: &Arc<Bank>,
|
||||
bank_forks: &Arc<RwLock<BankForks>>,
|
||||
cluster_info: &Arc<RwLock<ClusterInfo>>,
|
||||
rpc_addr: SocketAddr,
|
||||
drone_addr: SocketAddr,
|
||||
|
@ -32,10 +32,9 @@ impl JsonRpcService {
|
|||
info!("rpc bound to {:?}", rpc_addr);
|
||||
let exit = Arc::new(AtomicBool::new(false));
|
||||
let request_processor = Arc::new(RwLock::new(JsonRpcRequestProcessor::new(
|
||||
bank.clone(),
|
||||
bank_forks.clone(),
|
||||
storage_state,
|
||||
)));
|
||||
request_processor.write().unwrap().bank = bank.clone();
|
||||
let request_processor_ = request_processor.clone();
|
||||
|
||||
let info = cluster_info.clone();
|
||||
|
@ -76,10 +75,6 @@ impl JsonRpcService {
|
|||
}
|
||||
}
|
||||
|
||||
pub fn set_bank(&mut self, bank: &Arc<Bank>) {
|
||||
self.request_processor.write().unwrap().bank = bank.clone();
|
||||
}
|
||||
|
||||
pub fn exit(&self) {
|
||||
self.exit.store(true, Ordering::Relaxed);
|
||||
}
|
||||
|
@ -102,6 +97,7 @@ impl Service for JsonRpcService {
|
|||
mod tests {
|
||||
use super::*;
|
||||
use crate::cluster_info::NodeInfo;
|
||||
use solana_runtime::bank::Bank;
|
||||
use solana_sdk::genesis_block::GenesisBlock;
|
||||
use solana_sdk::signature::KeypairUtil;
|
||||
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
|
||||
|
@ -109,7 +105,7 @@ mod tests {
|
|||
#[test]
|
||||
fn test_rpc_new() {
|
||||
let (genesis_block, alice) = GenesisBlock::new(10_000);
|
||||
let bank = Bank::new(&genesis_block);
|
||||
let bank_forks = BankForks::new(0, Bank::new(&genesis_block));
|
||||
let cluster_info = Arc::new(RwLock::new(ClusterInfo::new(NodeInfo::default())));
|
||||
let rpc_addr = SocketAddr::new(
|
||||
IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
|
||||
|
@ -120,7 +116,7 @@ mod tests {
|
|||
solana_netutil::find_available_port_in_range((10000, 65535)).unwrap(),
|
||||
);
|
||||
let rpc_service = JsonRpcService::new(
|
||||
&Arc::new(bank),
|
||||
&Arc::new(RwLock::new(bank_forks)),
|
||||
&cluster_info,
|
||||
rpc_addr,
|
||||
drone_addr,
|
||||
|
|
Loading…
Reference in New Issue