Use cluster_info in rpc to get current leader addresses (#1480)

This commit is contained in:
Tyera Eulberg 2018-10-12 14:25:56 -06:00 committed by GitHub
parent 14a9ef4bbe
commit 1515bba9c6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 66 additions and 38 deletions

View File

@ -3,7 +3,6 @@
use bank::Bank;
use broadcast_stage::BroadcastStage;
use cluster_info::{ClusterInfo, Node, NodeInfo};
use drone::DRONE_PORT;
use entry::Entry;
use hash::Hash;
use leader_scheduler::LeaderScheduler;
@ -259,23 +258,6 @@ impl Fullnode {
.expect("Failed to clone respond socket"),
));
// TODO: this code assumes this node is the leader
let mut drone_addr = node.info.contact_info.tpu;
drone_addr.set_port(DRONE_PORT);
// Use custom RPC port, if provided (`Some(port)`)
// RPC port may be any open port on the node
// If rpc_port == `None`, node will listen on the default RPC_PORT from Rpc module
// If rpc_port == `Some(0)`, node will dynamically choose any open port. Useful for tests.
let rpc_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::from(0)), rpc_port.unwrap_or(RPC_PORT));
let rpc_service = JsonRpcService::new(
&bank,
node.info.contact_info.tpu,
drone_addr,
rpc_addr,
exit.clone(),
);
let last_entry_id = &ledger_tail
.last()
.expect("Expected at least one entry in the ledger")
@ -287,6 +269,15 @@ impl Fullnode {
ClusterInfo::new(node.info).expect("ClusterInfo::new"),
));
// Use custom RPC port, if provided (`Some(port)`)
// RPC port may be any open port on the node
// If rpc_port == `None`, node will listen on the default RPC_PORT from Rpc module
// If rpc_port == `Some(0)`, node will dynamically choose any open port. Useful for tests.
let rpc_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::from(0)), rpc_port.unwrap_or(RPC_PORT));
// TODO: The RPC service assumes that there is a drone running on the leader
// Drone location/id will need to be handled a different way as soon as leader rotation begins
let rpc_service = JsonRpcService::new(&bank, &cluster_info, rpc_addr, exit.clone());
let ncp = Ncp::new(
&cluster_info,
shared_window.clone(),

View File

@ -3,7 +3,8 @@
use bank::{Bank, BankError};
use bincode::deserialize;
use bs58;
use cluster_info::FULLNODE_PORT_RANGE;
use cluster_info::{ClusterInfo, FULLNODE_PORT_RANGE};
use drone::DRONE_PORT;
use jsonrpc_core::*;
use jsonrpc_http_server::*;
use jsonrpc_macros::pubsub::Sink;
@ -17,7 +18,7 @@ use std::mem;
use std::net::{SocketAddr, UdpSocket};
use std::result;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::sync::{Arc, RwLock};
use std::thread::{self, sleep, Builder, JoinHandle};
use std::time::Duration;
use std::time::Instant;
@ -33,12 +34,12 @@ pub struct JsonRpcService {
impl JsonRpcService {
pub fn new(
bank: &Arc<Bank>,
transactions_addr: SocketAddr,
drone_addr: SocketAddr,
cluster_info: &Arc<RwLock<ClusterInfo>>,
rpc_addr: SocketAddr,
exit: Arc<AtomicBool>,
) -> Self {
let request_processor = JsonRpcRequestProcessor::new(bank.clone());
let info = cluster_info.clone();
let exit_pubsub = exit.clone();
let thread_hdl = Builder::new()
.name("solana-jsonrpc".to_string())
@ -50,8 +51,7 @@ impl JsonRpcService {
let server =
ServerBuilder::with_meta_extractor(io, move |_req: &hyper::Request<hyper::Body>| Meta {
request_processor: request_processor.clone(),
transactions_addr,
drone_addr,
cluster_info: info.clone(),
rpc_addr,
exit: exit_pubsub.clone(),
}).threads(4)
@ -88,8 +88,7 @@ impl Service for JsonRpcService {
#[derive(Clone)]
pub struct Meta {
pub request_processor: JsonRpcRequestProcessor,
pub transactions_addr: SocketAddr,
pub drone_addr: SocketAddr,
pub cluster_info: Arc<RwLock<ClusterInfo>>,
pub rpc_addr: SocketAddr,
pub exit: Arc<AtomicBool>,
}
@ -198,6 +197,8 @@ impl RpcSol for RpcSolImpl {
meta.request_processor.get_transaction_count()
}
fn request_airdrop(&self, meta: Self::Metadata, id: String, tokens: u64) -> Result<String> {
let mut drone_addr = get_leader_addr(&meta.cluster_info)?;
drone_addr.set_port(DRONE_PORT);
let pubkey_vec = bs58::decode(id)
.into_vec()
.map_err(|_| Error::invalid_request())?;
@ -205,8 +206,8 @@ impl RpcSol for RpcSolImpl {
return Err(Error::invalid_request());
}
let pubkey = Pubkey::new(&pubkey_vec);
let signature = request_airdrop(&meta.drone_addr, &pubkey, tokens)
.map_err(|_| Error::internal_error())?;
let signature =
request_airdrop(&drone_addr, &pubkey, tokens).map_err(|_| Error::internal_error())?;
let now = Instant::now();
let mut signature_status;
loop {
@ -221,13 +222,14 @@ impl RpcSol for RpcSolImpl {
}
}
fn send_transaction(&self, meta: Self::Metadata, data: Vec<u8>) -> Result<String> {
let transactions_addr = get_leader_addr(&meta.cluster_info)?;
let tx: Transaction = deserialize(&data).map_err(|err| {
debug!("send_transaction: deserialize error: {:?}", err);
Error::invalid_request()
})?;
let transactions_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
transactions_socket
.send_to(&data, &meta.transactions_addr)
.send_to(&data, transactions_addr)
.map_err(|err| {
debug!("send_transaction: send_to error: {:?}", err);
Error::internal_error()
@ -311,15 +313,27 @@ impl JsonRpcRequestProcessor {
}
}
fn get_leader_addr(cluster_info: &Arc<RwLock<ClusterInfo>>) -> Result<SocketAddr> {
if let Some(leader_data) = cluster_info.read().unwrap().leader_data() {
Ok(leader_data.contact_info.tpu)
} else {
Err(Error {
code: ErrorCode::InternalError,
message: "No leader detected".into(),
data: None,
})
}
}
#[cfg(test)]
mod tests {
use super::*;
use bank::Bank;
use cluster_info::NodeInfo;
use jsonrpc_core::Response;
use mint::Mint;
use signature::{Keypair, KeypairUtil};
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::sync::Arc;
use system_transaction::SystemTransaction;
use transaction::Transaction;
@ -334,8 +348,9 @@ mod tests {
bank.process_transaction(&tx).expect("process transaction");
let request_processor = JsonRpcRequestProcessor::new(Arc::new(bank));
let transactions_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0);
let drone_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0);
let cluster_info = Arc::new(RwLock::new(
ClusterInfo::new(NodeInfo::new_unspecified()).unwrap(),
));
let rpc_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0);
let exit = Arc::new(AtomicBool::new(false));
@ -344,8 +359,7 @@ mod tests {
io.extend_with(rpc.to_delegate());
let meta = Meta {
request_processor,
transactions_addr,
drone_addr,
cluster_info,
rpc_addr,
exit,
};
@ -406,8 +420,9 @@ mod tests {
let req = r#"{"jsonrpc":"2.0","id":1,"method":"confirmTransaction","params":[1234567890]}"#;
let meta = Meta {
request_processor: JsonRpcRequestProcessor::new(Arc::new(bank)),
transactions_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0),
drone_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0),
cluster_info: Arc::new(RwLock::new(
ClusterInfo::new(NodeInfo::new_unspecified()).unwrap(),
)),
rpc_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0),
exit: Arc::new(AtomicBool::new(false)),
};
@ -433,8 +448,9 @@ mod tests {
r#"{"jsonrpc":"2.0","id":1,"method":"confirmTransaction","params":["a1b2c3d4e5"]}"#;
let meta = Meta {
request_processor: JsonRpcRequestProcessor::new(Arc::new(bank)),
transactions_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0),
drone_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0),
cluster_info: Arc::new(RwLock::new(
ClusterInfo::new(NodeInfo::new_unspecified()).unwrap(),
)),
rpc_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0),
exit: Arc::new(AtomicBool::new(false)),
};
@ -449,4 +465,25 @@ mod tests {
.expect("actual response deserialization");
assert_eq!(expected, result);
}
#[test]
fn test_rpc_get_leader_addr() {
let cluster_info = Arc::new(RwLock::new(
ClusterInfo::new(NodeInfo::new_unspecified()).unwrap(),
));
assert_eq!(
get_leader_addr(&cluster_info),
Err(Error {
code: ErrorCode::InternalError,
message: "No leader detected".into(),
data: None,
})
);
let leader = NodeInfo::new_with_socketaddr(&socketaddr!("127.0.0.1:1234"));
cluster_info.write().unwrap().insert(&leader);
cluster_info.write().unwrap().set_leader(leader.id);
assert_eq!(
get_leader_addr(&cluster_info),
Ok(socketaddr!("127.0.0.1:1234"))
);
}
}