From 1515bba9c6df80842298269b6d6f5684de0c82db Mon Sep 17 00:00:00 2001 From: Tyera Eulberg Date: Fri, 12 Oct 2018 14:25:56 -0600 Subject: [PATCH] Use cluster_info in rpc to get current leader addresses (#1480) --- src/fullnode.rs | 27 ++++++----------- src/rpc.rs | 77 ++++++++++++++++++++++++++++++++++++------------- 2 files changed, 66 insertions(+), 38 deletions(-) diff --git a/src/fullnode.rs b/src/fullnode.rs index 13f9bc5e8..c5cfe2c4d 100644 --- a/src/fullnode.rs +++ b/src/fullnode.rs @@ -3,7 +3,6 @@ use bank::Bank; use broadcast_stage::BroadcastStage; use cluster_info::{ClusterInfo, Node, NodeInfo}; -use drone::DRONE_PORT; use entry::Entry; use hash::Hash; use leader_scheduler::LeaderScheduler; @@ -259,23 +258,6 @@ impl Fullnode { .expect("Failed to clone respond socket"), )); - // TODO: this code assumes this node is the leader - let mut drone_addr = node.info.contact_info.tpu; - drone_addr.set_port(DRONE_PORT); - - // Use custom RPC port, if provided (`Some(port)`) - // RPC port may be any open port on the node - // If rpc_port == `None`, node will listen on the default RPC_PORT from Rpc module - // If rpc_port == `Some(0)`, node will dynamically choose any open port. Useful for tests. - let rpc_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::from(0)), rpc_port.unwrap_or(RPC_PORT)); - let rpc_service = JsonRpcService::new( - &bank, - node.info.contact_info.tpu, - drone_addr, - rpc_addr, - exit.clone(), - ); - let last_entry_id = &ledger_tail .last() .expect("Expected at least one entry in the ledger") @@ -287,6 +269,15 @@ impl Fullnode { ClusterInfo::new(node.info).expect("ClusterInfo::new"), )); + // Use custom RPC port, if provided (`Some(port)`) + // RPC port may be any open port on the node + // If rpc_port == `None`, node will listen on the default RPC_PORT from Rpc module + // If rpc_port == `Some(0)`, node will dynamically choose any open port. Useful for tests. + let rpc_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::from(0)), rpc_port.unwrap_or(RPC_PORT)); + // TODO: The RPC service assumes that there is a drone running on the leader + // Drone location/id will need to be handled a different way as soon as leader rotation begins + let rpc_service = JsonRpcService::new(&bank, &cluster_info, rpc_addr, exit.clone()); + let ncp = Ncp::new( &cluster_info, shared_window.clone(), diff --git a/src/rpc.rs b/src/rpc.rs index d7f36445f..7c9ad0323 100644 --- a/src/rpc.rs +++ b/src/rpc.rs @@ -3,7 +3,8 @@ use bank::{Bank, BankError}; use bincode::deserialize; use bs58; -use cluster_info::FULLNODE_PORT_RANGE; +use cluster_info::{ClusterInfo, FULLNODE_PORT_RANGE}; +use drone::DRONE_PORT; use jsonrpc_core::*; use jsonrpc_http_server::*; use jsonrpc_macros::pubsub::Sink; @@ -17,7 +18,7 @@ use std::mem; use std::net::{SocketAddr, UdpSocket}; use std::result; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::Arc; +use std::sync::{Arc, RwLock}; use std::thread::{self, sleep, Builder, JoinHandle}; use std::time::Duration; use std::time::Instant; @@ -33,12 +34,12 @@ pub struct JsonRpcService { impl JsonRpcService { pub fn new( bank: &Arc, - transactions_addr: SocketAddr, - drone_addr: SocketAddr, + cluster_info: &Arc>, rpc_addr: SocketAddr, exit: Arc, ) -> Self { let request_processor = JsonRpcRequestProcessor::new(bank.clone()); + let info = cluster_info.clone(); let exit_pubsub = exit.clone(); let thread_hdl = Builder::new() .name("solana-jsonrpc".to_string()) @@ -50,8 +51,7 @@ impl JsonRpcService { let server = ServerBuilder::with_meta_extractor(io, move |_req: &hyper::Request| Meta { request_processor: request_processor.clone(), - transactions_addr, - drone_addr, + cluster_info: info.clone(), rpc_addr, exit: exit_pubsub.clone(), }).threads(4) @@ -88,8 +88,7 @@ impl Service for JsonRpcService { #[derive(Clone)] pub struct Meta { pub request_processor: JsonRpcRequestProcessor, - pub transactions_addr: SocketAddr, - pub drone_addr: SocketAddr, + pub cluster_info: Arc>, pub rpc_addr: SocketAddr, pub exit: Arc, } @@ -198,6 +197,8 @@ impl RpcSol for RpcSolImpl { meta.request_processor.get_transaction_count() } fn request_airdrop(&self, meta: Self::Metadata, id: String, tokens: u64) -> Result { + let mut drone_addr = get_leader_addr(&meta.cluster_info)?; + drone_addr.set_port(DRONE_PORT); let pubkey_vec = bs58::decode(id) .into_vec() .map_err(|_| Error::invalid_request())?; @@ -205,8 +206,8 @@ impl RpcSol for RpcSolImpl { return Err(Error::invalid_request()); } let pubkey = Pubkey::new(&pubkey_vec); - let signature = request_airdrop(&meta.drone_addr, &pubkey, tokens) - .map_err(|_| Error::internal_error())?; + let signature = + request_airdrop(&drone_addr, &pubkey, tokens).map_err(|_| Error::internal_error())?; let now = Instant::now(); let mut signature_status; loop { @@ -221,13 +222,14 @@ impl RpcSol for RpcSolImpl { } } fn send_transaction(&self, meta: Self::Metadata, data: Vec) -> Result { + let transactions_addr = get_leader_addr(&meta.cluster_info)?; let tx: Transaction = deserialize(&data).map_err(|err| { debug!("send_transaction: deserialize error: {:?}", err); Error::invalid_request() })?; let transactions_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); transactions_socket - .send_to(&data, &meta.transactions_addr) + .send_to(&data, transactions_addr) .map_err(|err| { debug!("send_transaction: send_to error: {:?}", err); Error::internal_error() @@ -311,15 +313,27 @@ impl JsonRpcRequestProcessor { } } +fn get_leader_addr(cluster_info: &Arc>) -> Result { + if let Some(leader_data) = cluster_info.read().unwrap().leader_data() { + Ok(leader_data.contact_info.tpu) + } else { + Err(Error { + code: ErrorCode::InternalError, + message: "No leader detected".into(), + data: None, + }) + } +} + #[cfg(test)] mod tests { use super::*; use bank::Bank; + use cluster_info::NodeInfo; use jsonrpc_core::Response; use mint::Mint; use signature::{Keypair, KeypairUtil}; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; - use std::sync::Arc; use system_transaction::SystemTransaction; use transaction::Transaction; @@ -334,8 +348,9 @@ mod tests { bank.process_transaction(&tx).expect("process transaction"); let request_processor = JsonRpcRequestProcessor::new(Arc::new(bank)); - let transactions_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0); - let drone_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0); + let cluster_info = Arc::new(RwLock::new( + ClusterInfo::new(NodeInfo::new_unspecified()).unwrap(), + )); let rpc_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0); let exit = Arc::new(AtomicBool::new(false)); @@ -344,8 +359,7 @@ mod tests { io.extend_with(rpc.to_delegate()); let meta = Meta { request_processor, - transactions_addr, - drone_addr, + cluster_info, rpc_addr, exit, }; @@ -406,8 +420,9 @@ mod tests { let req = r#"{"jsonrpc":"2.0","id":1,"method":"confirmTransaction","params":[1234567890]}"#; let meta = Meta { request_processor: JsonRpcRequestProcessor::new(Arc::new(bank)), - transactions_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0), - drone_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0), + cluster_info: Arc::new(RwLock::new( + ClusterInfo::new(NodeInfo::new_unspecified()).unwrap(), + )), rpc_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0), exit: Arc::new(AtomicBool::new(false)), }; @@ -433,8 +448,9 @@ mod tests { r#"{"jsonrpc":"2.0","id":1,"method":"confirmTransaction","params":["a1b2c3d4e5"]}"#; let meta = Meta { request_processor: JsonRpcRequestProcessor::new(Arc::new(bank)), - transactions_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0), - drone_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0), + cluster_info: Arc::new(RwLock::new( + ClusterInfo::new(NodeInfo::new_unspecified()).unwrap(), + )), rpc_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0), exit: Arc::new(AtomicBool::new(false)), }; @@ -449,4 +465,25 @@ mod tests { .expect("actual response deserialization"); assert_eq!(expected, result); } + #[test] + fn test_rpc_get_leader_addr() { + let cluster_info = Arc::new(RwLock::new( + ClusterInfo::new(NodeInfo::new_unspecified()).unwrap(), + )); + assert_eq!( + get_leader_addr(&cluster_info), + Err(Error { + code: ErrorCode::InternalError, + message: "No leader detected".into(), + data: None, + }) + ); + let leader = NodeInfo::new_with_socketaddr(&socketaddr!("127.0.0.1:1234")); + cluster_info.write().unwrap().insert(&leader); + cluster_info.write().unwrap().set_leader(leader.id); + assert_eq!( + get_leader_addr(&cluster_info), + Ok(socketaddr!("127.0.0.1:1234")) + ); + } }