Remove RPU; replace with RPC

This commit is contained in:
Tyera Eulberg 2018-11-05 10:50:58 -07:00 committed by Tyera Eulberg
parent 52491b467a
commit f683817b48
18 changed files with 245 additions and 666 deletions

View File

@ -638,7 +638,7 @@ fn main() {
let leader = leader.unwrap(); let leader = leader.unwrap();
println!("leader is at {} {}", leader.contact_info.rpu, leader.id); println!("leader RPC is at {} {}", leader.contact_info.rpc, leader.id);
let mut client = mk_client(&leader); let mut client = mk_client(&leader);
let mut barrier_client = mk_client(&leader); let mut barrier_client = mk_client(&leader);
@ -821,7 +821,7 @@ fn converge(
v = spy_ref v = spy_ref
.table .table
.values() .values()
.filter(|x| ClusterInfo::is_valid_address(&x.contact_info.rpu)) .filter(|x| ClusterInfo::is_valid_address(&x.contact_info.rpc))
.cloned() .cloned()
.collect(); .collect();

View File

@ -52,7 +52,13 @@ fn main() {
.value_name("DIR") .value_name("DIR")
.takes_value(true) .takes_value(true)
.required(true) .required(true)
.help("use DIR as persistent ledger location"), .help("Use DIR as persistent ledger location"),
).arg(
Arg::with_name("rpc")
.long("rpc")
.value_name("PORT")
.takes_value(true)
.help("Custom RPC port for this node"),
).get_matches(); ).get_matches();
let (keypair, ncp) = if let Some(i) = matches.value_of("identity") { let (keypair, ncp) = if let Some(i) = matches.value_of("identity") {
@ -102,6 +108,17 @@ fn main() {
// Remove this line to enable leader rotation // Remove this line to enable leader rotation
leader_scheduler.use_only_bootstrap_leader = true; leader_scheduler.use_only_bootstrap_leader = true;
let rpc_port = if let Some(port) = matches.value_of("rpc") {
let port_number = port.to_string().parse().expect("integer");
if port_number == 0 {
eprintln!("Invalid RPC port requested: {:?}", port);
exit(1);
}
Some(port_number)
} else {
None
};
let mut fullnode = Fullnode::new( let mut fullnode = Fullnode::new(
node, node,
ledger_path, ledger_path,
@ -110,6 +127,7 @@ fn main() {
network, network,
false, false,
leader_scheduler, leader_scheduler,
rpc_port,
); );
let mut client = mk_client(&leader); let mut client = mk_client(&leader);

View File

@ -1,20 +1,9 @@
use cluster_info::{NodeInfo, FULLNODE_PORT_RANGE}; use cluster_info::{NodeInfo, FULLNODE_PORT_RANGE};
use netutil::bind_in_range; use netutil::bind_in_range;
use std::time::Duration;
use thin_client::ThinClient; use thin_client::ThinClient;
pub fn mk_client(r: &NodeInfo) -> ThinClient { pub fn mk_client(r: &NodeInfo) -> ThinClient {
let (_, requests_socket) = bind_in_range(FULLNODE_PORT_RANGE).unwrap();
let (_, transactions_socket) = bind_in_range(FULLNODE_PORT_RANGE).unwrap(); let (_, transactions_socket) = bind_in_range(FULLNODE_PORT_RANGE).unwrap();
requests_socket ThinClient::new(r.contact_info.rpc, r.contact_info.tpu, transactions_socket)
.set_read_timeout(Some(Duration::new(1, 0)))
.unwrap();
ThinClient::new(
r.contact_info.rpu,
requests_socket,
r.contact_info.tpu,
transactions_socket,
)
} }

View File

@ -19,11 +19,12 @@ use hash::Hash;
use leader_scheduler::LeaderScheduler; use leader_scheduler::LeaderScheduler;
use ledger::LedgerWindow; use ledger::LedgerWindow;
use log::Level; use log::Level;
use netutil::{bind_in_range, bind_to, multi_bind_in_range}; use netutil::{bind_in_range, bind_to, find_available_port_in_range, multi_bind_in_range};
use packet::{to_blob, Blob, SharedBlob, BLOB_SIZE}; use packet::{to_blob, Blob, SharedBlob, BLOB_SIZE};
use rand::{thread_rng, Rng}; use rand::{thread_rng, Rng};
use rayon::prelude::*; use rayon::prelude::*;
use result::{Error, Result}; use result::{Error, Result};
use rpc::RPC_PORT;
use signature::{Keypair, KeypairUtil}; use signature::{Keypair, KeypairUtil};
use solana_sdk::pubkey::Pubkey; use solana_sdk::pubkey::Pubkey;
use std; use std;
@ -79,12 +80,14 @@ pub struct ContactInfo {
pub ncp: SocketAddr, pub ncp: SocketAddr,
/// address to connect to for replication /// address to connect to for replication
pub tvu: SocketAddr, pub tvu: SocketAddr,
/// address to connect to when this node is leader
pub rpu: SocketAddr,
/// transactions address /// transactions address
pub tpu: SocketAddr, pub tpu: SocketAddr,
/// storage data address /// storage data address
pub storage_addr: SocketAddr, pub storage_addr: SocketAddr,
/// address to which to send JSON-RPC requests
pub rpc: SocketAddr,
/// websocket for JSON-RPC push notifications
pub rpc_pubsub: SocketAddr,
/// if this struture changes update this value as well /// if this struture changes update this value as well
/// Always update `NodeInfo` version too /// Always update `NodeInfo` version too
/// This separate version for addresses allows us to use the `Vote` /// This separate version for addresses allows us to use the `Vote`
@ -115,9 +118,10 @@ impl NodeInfo {
id: Pubkey, id: Pubkey,
ncp: SocketAddr, ncp: SocketAddr,
tvu: SocketAddr, tvu: SocketAddr,
rpu: SocketAddr,
tpu: SocketAddr, tpu: SocketAddr,
storage_addr: SocketAddr, storage_addr: SocketAddr,
rpc: SocketAddr,
rpc_pubsub: SocketAddr,
) -> Self { ) -> Self {
NodeInfo { NodeInfo {
id, id,
@ -125,9 +129,10 @@ impl NodeInfo {
contact_info: ContactInfo { contact_info: ContactInfo {
ncp, ncp,
tvu, tvu,
rpu,
tpu, tpu,
storage_addr, storage_addr,
rpc,
rpc_pubsub,
version: 0, version: 0,
}, },
leader_id: Pubkey::default(), leader_id: Pubkey::default(),
@ -142,6 +147,7 @@ impl NodeInfo {
socketaddr!("127.0.0.1:1236"), socketaddr!("127.0.0.1:1236"),
socketaddr!("127.0.0.1:1237"), socketaddr!("127.0.0.1:1237"),
socketaddr!("127.0.0.1:1238"), socketaddr!("127.0.0.1:1238"),
socketaddr!("127.0.0.1:1239"),
) )
} }
@ -150,14 +156,14 @@ impl NodeInfo {
pub fn new_unspecified() -> Self { pub fn new_unspecified() -> Self {
let addr = socketaddr!(0, 0); let addr = socketaddr!(0, 0);
assert!(addr.ip().is_unspecified()); assert!(addr.ip().is_unspecified());
Self::new(Keypair::new().pubkey(), addr, addr, addr, addr, addr) Self::new(Keypair::new().pubkey(), addr, addr, addr, addr, addr, addr)
} }
#[cfg(test)] #[cfg(test)]
/// NodeInfo with multicast addresses for adversarial testing. /// NodeInfo with multicast addresses for adversarial testing.
pub fn new_multicast() -> Self { pub fn new_multicast() -> Self {
let addr = socketaddr!("224.0.1.255:1000"); let addr = socketaddr!("224.0.1.255:1000");
assert!(addr.ip().is_multicast()); assert!(addr.ip().is_multicast());
Self::new(Keypair::new().pubkey(), addr, addr, addr, addr, addr) Self::new(Keypair::new().pubkey(), addr, addr, addr, addr, addr, addr)
} }
fn next_port(addr: &SocketAddr, nxt: u16) -> SocketAddr { fn next_port(addr: &SocketAddr, nxt: u16) -> SocketAddr {
let mut nxt_addr = *addr; let mut nxt_addr = *addr;
@ -168,14 +174,16 @@ impl NodeInfo {
let transactions_addr = *bind_addr; let transactions_addr = *bind_addr;
let gossip_addr = Self::next_port(&bind_addr, 1); let gossip_addr = Self::next_port(&bind_addr, 1);
let replicate_addr = Self::next_port(&bind_addr, 2); let replicate_addr = Self::next_port(&bind_addr, 2);
let requests_addr = Self::next_port(&bind_addr, 3); let rpc_addr = SocketAddr::new(bind_addr.ip(), RPC_PORT);
let rpc_pubsub_addr = SocketAddr::new(bind_addr.ip(), RPC_PORT + 1);
NodeInfo::new( NodeInfo::new(
pubkey, pubkey,
gossip_addr, gossip_addr,
replicate_addr, replicate_addr,
requests_addr,
transactions_addr, transactions_addr,
"0.0.0.0:0".parse().unwrap(), "0.0.0.0:0".parse().unwrap(),
rpc_addr,
rpc_pubsub_addr,
) )
} }
pub fn new_with_socketaddr(bind_addr: &SocketAddr) -> Self { pub fn new_with_socketaddr(bind_addr: &SocketAddr) -> Self {
@ -185,7 +193,15 @@ impl NodeInfo {
// //
pub fn new_entry_point(gossip_addr: &SocketAddr) -> Self { pub fn new_entry_point(gossip_addr: &SocketAddr) -> Self {
let daddr: SocketAddr = socketaddr!("0.0.0.0:0"); let daddr: SocketAddr = socketaddr!("0.0.0.0:0");
NodeInfo::new(Pubkey::default(), *gossip_addr, daddr, daddr, daddr, daddr) NodeInfo::new(
Pubkey::default(),
*gossip_addr,
daddr,
daddr,
daddr,
daddr,
daddr,
)
} }
} }
@ -281,13 +297,13 @@ impl ClusterInfo {
let nodes: Vec<_> = self let nodes: Vec<_> = self
.table .table
.values() .values()
.filter(|n| Self::is_valid_address(&n.contact_info.rpu)) .filter(|n| Self::is_valid_address(&n.contact_info.rpc))
.cloned() .cloned()
.map(|node| { .map(|node| {
format!( format!(
" ncp: {:20} | {}{}\n \ " ncp: {:20} | {}{}\n \
rpu: {:20} |\n \ tpu: {:20} |\n \
tpu: {:20} |\n", rpc: {:20} |\n",
node.contact_info.ncp.to_string(), node.contact_info.ncp.to_string(),
node.id, node.id,
if node.id == leader_id { if node.id == leader_id {
@ -295,8 +311,8 @@ impl ClusterInfo {
} else { } else {
"" ""
}, },
node.contact_info.rpu.to_string(), node.contact_info.tpu.to_string(),
node.contact_info.tpu.to_string() node.contact_info.rpc.to_string()
) )
}).collect(); }).collect();
@ -323,7 +339,7 @@ impl ClusterInfo {
self.table self.table
.values() .values()
.filter(|x| x.id != me) .filter(|x| x.id != me)
.filter(|x| ClusterInfo::is_valid_address(&x.contact_info.rpu)) .filter(|x| ClusterInfo::is_valid_address(&x.contact_info.rpc))
.cloned() .cloned()
.collect() .collect()
} }
@ -1187,7 +1203,7 @@ impl ClusterInfo {
let pubkey = Keypair::new().pubkey(); let pubkey = Keypair::new().pubkey();
let daddr = socketaddr_any!(); let daddr = socketaddr_any!();
let node = NodeInfo::new(pubkey, daddr, daddr, daddr, daddr, daddr); let node = NodeInfo::new(pubkey, daddr, daddr, daddr, daddr, daddr, daddr);
(node, gossip_socket) (node, gossip_socket)
} }
} }
@ -1195,10 +1211,8 @@ impl ClusterInfo {
#[derive(Debug)] #[derive(Debug)]
pub struct Sockets { pub struct Sockets {
pub gossip: UdpSocket, pub gossip: UdpSocket,
pub requests: UdpSocket,
pub replicate: Vec<UdpSocket>, pub replicate: Vec<UdpSocket>,
pub transaction: Vec<UdpSocket>, pub transaction: Vec<UdpSocket>,
pub respond: UdpSocket,
pub broadcast: UdpSocket, pub broadcast: UdpSocket,
pub repair: UdpSocket, pub repair: UdpSocket,
pub retransmit: UdpSocket, pub retransmit: UdpSocket,
@ -1219,10 +1233,13 @@ impl Node {
let transaction = UdpSocket::bind("127.0.0.1:0").unwrap(); let transaction = UdpSocket::bind("127.0.0.1:0").unwrap();
let gossip = UdpSocket::bind("127.0.0.1:0").unwrap(); let gossip = UdpSocket::bind("127.0.0.1:0").unwrap();
let replicate = UdpSocket::bind("127.0.0.1:0").unwrap(); let replicate = UdpSocket::bind("127.0.0.1:0").unwrap();
let requests = UdpSocket::bind("127.0.0.1:0").unwrap();
let repair = UdpSocket::bind("127.0.0.1:0").unwrap(); let repair = UdpSocket::bind("127.0.0.1:0").unwrap();
let rpc_port = find_available_port_in_range((1024, 65535)).unwrap();
let rpc_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), rpc_port);
let rpc_pubsub_port = find_available_port_in_range((1024, 65535)).unwrap();
let rpc_pubsub_addr =
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), rpc_pubsub_port);
let respond = UdpSocket::bind("0.0.0.0:0").unwrap();
let broadcast = UdpSocket::bind("0.0.0.0:0").unwrap(); let broadcast = UdpSocket::bind("0.0.0.0:0").unwrap();
let retransmit = UdpSocket::bind("0.0.0.0:0").unwrap(); let retransmit = UdpSocket::bind("0.0.0.0:0").unwrap();
let storage = UdpSocket::bind("0.0.0.0:0").unwrap(); let storage = UdpSocket::bind("0.0.0.0:0").unwrap();
@ -1230,18 +1247,17 @@ impl Node {
pubkey, pubkey,
gossip.local_addr().unwrap(), gossip.local_addr().unwrap(),
replicate.local_addr().unwrap(), replicate.local_addr().unwrap(),
requests.local_addr().unwrap(),
transaction.local_addr().unwrap(), transaction.local_addr().unwrap(),
storage.local_addr().unwrap(), storage.local_addr().unwrap(),
rpc_addr,
rpc_pubsub_addr,
); );
Node { Node {
info, info,
sockets: Sockets { sockets: Sockets {
gossip, gossip,
requests,
replicate: vec![replicate], replicate: vec![replicate],
transaction: vec![transaction], transaction: vec![transaction],
respond,
broadcast, broadcast,
repair, repair,
retransmit, retransmit,
@ -1262,8 +1278,6 @@ impl Node {
let (replicate_port, replicate_sockets) = let (replicate_port, replicate_sockets) =
multi_bind_in_range(FULLNODE_PORT_RANGE, 8).expect("tvu multi_bind"); multi_bind_in_range(FULLNODE_PORT_RANGE, 8).expect("tvu multi_bind");
let (requests_port, requests) = bind();
let (transaction_port, transaction_sockets) = let (transaction_port, transaction_sockets) =
multi_bind_in_range(FULLNODE_PORT_RANGE, 32).expect("tpu multi_bind"); multi_bind_in_range(FULLNODE_PORT_RANGE, 32).expect("tpu multi_bind");
@ -1272,18 +1286,14 @@ impl Node {
let (_, retransmit) = bind(); let (_, retransmit) = bind();
let (storage_port, _) = bind(); let (storage_port, _) = bind();
// Responses are sent from the same Udp port as requests are received
// from, in hopes that a NAT sitting in the middle will route the
// response Udp packet correctly back to the requester.
let respond = requests.try_clone().unwrap();
let info = NodeInfo::new( let info = NodeInfo::new(
pubkey, pubkey,
SocketAddr::new(ncp.ip(), gossip_port), SocketAddr::new(ncp.ip(), gossip_port),
SocketAddr::new(ncp.ip(), replicate_port), SocketAddr::new(ncp.ip(), replicate_port),
SocketAddr::new(ncp.ip(), requests_port),
SocketAddr::new(ncp.ip(), transaction_port), SocketAddr::new(ncp.ip(), transaction_port),
SocketAddr::new(ncp.ip(), storage_port), SocketAddr::new(ncp.ip(), storage_port),
SocketAddr::new(ncp.ip(), RPC_PORT),
SocketAddr::new(ncp.ip(), RPC_PORT + 1),
); );
trace!("new NodeInfo: {:?}", info); trace!("new NodeInfo: {:?}", info);
@ -1291,10 +1301,8 @@ impl Node {
info, info,
sockets: Sockets { sockets: Sockets {
gossip, gossip,
requests,
replicate: replicate_sockets, replicate: replicate_sockets,
transaction: transaction_sockets, transaction: transaction_sockets,
respond,
broadcast, broadcast,
repair, repair,
retransmit, retransmit,
@ -1374,8 +1382,9 @@ mod tests {
assert_eq!(d1.id, keypair.pubkey()); assert_eq!(d1.id, keypair.pubkey());
assert_eq!(d1.contact_info.ncp, socketaddr!("127.0.0.1:1235")); assert_eq!(d1.contact_info.ncp, socketaddr!("127.0.0.1:1235"));
assert_eq!(d1.contact_info.tvu, socketaddr!("127.0.0.1:1236")); assert_eq!(d1.contact_info.tvu, socketaddr!("127.0.0.1:1236"));
assert_eq!(d1.contact_info.rpu, socketaddr!("127.0.0.1:1237"));
assert_eq!(d1.contact_info.tpu, socketaddr!("127.0.0.1:1234")); assert_eq!(d1.contact_info.tpu, socketaddr!("127.0.0.1:1234"));
assert_eq!(d1.contact_info.rpc, socketaddr!("127.0.0.1:8899"));
assert_eq!(d1.contact_info.rpc_pubsub, socketaddr!("127.0.0.1:8900"));
} }
#[test] #[test]
fn max_updates() { fn max_updates() {
@ -1480,6 +1489,7 @@ mod tests {
socketaddr!([127, 0, 0, 1], 1236), socketaddr!([127, 0, 0, 1], 1236),
socketaddr!([127, 0, 0, 1], 1237), socketaddr!([127, 0, 0, 1], 1237),
socketaddr!([127, 0, 0, 1], 1238), socketaddr!([127, 0, 0, 1], 1238),
socketaddr!([127, 0, 0, 1], 1239),
); );
cluster_info.insert(&nxt); cluster_info.insert(&nxt);
let rv = cluster_info.window_index_request(0).unwrap(); let rv = cluster_info.window_index_request(0).unwrap();
@ -1494,6 +1504,7 @@ mod tests {
socketaddr!([127, 0, 0, 1], 1236), socketaddr!([127, 0, 0, 1], 1236),
socketaddr!([127, 0, 0, 1], 1237), socketaddr!([127, 0, 0, 1], 1237),
socketaddr!([127, 0, 0, 1], 1238), socketaddr!([127, 0, 0, 1], 1238),
socketaddr!([127, 0, 0, 1], 1239),
); );
cluster_info.insert(&nxt); cluster_info.insert(&nxt);
let mut one = false; let mut one = false;
@ -1520,6 +1531,7 @@ mod tests {
socketaddr!("127.0.0.1:127"), socketaddr!("127.0.0.1:127"),
socketaddr!("127.0.0.1:127"), socketaddr!("127.0.0.1:127"),
socketaddr!("127.0.0.1:127"), socketaddr!("127.0.0.1:127"),
socketaddr!("127.0.0.1:127"),
); );
let mut cluster_info = ClusterInfo::new(me).expect("ClusterInfo::new"); let mut cluster_info = ClusterInfo::new(me).expect("ClusterInfo::new");
@ -1665,6 +1677,7 @@ mod tests {
socketaddr!("127.0.0.1:1236"), socketaddr!("127.0.0.1:1236"),
socketaddr!("127.0.0.1:1237"), socketaddr!("127.0.0.1:1237"),
socketaddr!("127.0.0.1:1238"), socketaddr!("127.0.0.1:1238"),
socketaddr!("127.0.0.1:1239"),
); );
let rv = let rv =
ClusterInfo::run_window_request(&me, &socketaddr_any!(), &window, &mut None, &me, 0); ClusterInfo::run_window_request(&me, &socketaddr_any!(), &window, &mut None, &me, 0);
@ -1843,7 +1856,6 @@ mod tests {
for tx_socket in node.sockets.replicate.iter() { for tx_socket in node.sockets.replicate.iter() {
assert_eq!(tx_socket.local_addr().unwrap().ip(), ip); assert_eq!(tx_socket.local_addr().unwrap().ip(), ip);
} }
assert_eq!(node.sockets.requests.local_addr().unwrap().ip(), ip);
assert!(node.sockets.transaction.len() > 1); assert!(node.sockets.transaction.len() > 1);
for tx_socket in node.sockets.transaction.iter() { for tx_socket in node.sockets.transaction.iter() {
assert_eq!(tx_socket.local_addr().unwrap().ip(), ip); assert_eq!(tx_socket.local_addr().unwrap().ip(), ip);
@ -1858,8 +1870,6 @@ mod tests {
for tx_socket in node.sockets.replicate.iter() { for tx_socket in node.sockets.replicate.iter() {
assert_eq!(tx_socket.local_addr().unwrap().port(), tx_port); assert_eq!(tx_socket.local_addr().unwrap().port(), tx_port);
} }
assert!(node.sockets.requests.local_addr().unwrap().port() >= FULLNODE_PORT_RANGE.0);
assert!(node.sockets.requests.local_addr().unwrap().port() < FULLNODE_PORT_RANGE.1);
let tx_port = node.sockets.transaction[0].local_addr().unwrap().port(); let tx_port = node.sockets.transaction[0].local_addr().unwrap().port();
assert!(tx_port >= FULLNODE_PORT_RANGE.0); assert!(tx_port >= FULLNODE_PORT_RANGE.0);
assert!(tx_port < FULLNODE_PORT_RANGE.1); assert!(tx_port < FULLNODE_PORT_RANGE.1);
@ -1879,7 +1889,6 @@ mod tests {
for tx_socket in node.sockets.replicate.iter() { for tx_socket in node.sockets.replicate.iter() {
assert_eq!(tx_socket.local_addr().unwrap().ip(), ip); assert_eq!(tx_socket.local_addr().unwrap().ip(), ip);
} }
assert_eq!(node.sockets.requests.local_addr().unwrap().ip(), ip);
assert!(node.sockets.transaction.len() > 1); assert!(node.sockets.transaction.len() > 1);
for tx_socket in node.sockets.transaction.iter() { for tx_socket in node.sockets.transaction.iter() {
assert_eq!(tx_socket.local_addr().unwrap().ip(), ip); assert_eq!(tx_socket.local_addr().unwrap().ip(), ip);
@ -1893,8 +1902,6 @@ mod tests {
for tx_socket in node.sockets.replicate.iter() { for tx_socket in node.sockets.replicate.iter() {
assert_eq!(tx_socket.local_addr().unwrap().port(), tx_port); assert_eq!(tx_socket.local_addr().unwrap().port(), tx_port);
} }
assert!(node.sockets.requests.local_addr().unwrap().port() >= FULLNODE_PORT_RANGE.0);
assert!(node.sockets.requests.local_addr().unwrap().port() < FULLNODE_PORT_RANGE.1);
let tx_port = node.sockets.transaction[0].local_addr().unwrap().port(); let tx_port = node.sockets.transaction[0].local_addr().unwrap().port();
assert!(tx_port >= FULLNODE_PORT_RANGE.0); assert!(tx_port >= FULLNODE_PORT_RANGE.0);
assert!(tx_port < FULLNODE_PORT_RANGE.1); assert!(tx_port < FULLNODE_PORT_RANGE.1);

View File

@ -104,15 +104,13 @@ impl Drone {
pub fn send_airdrop(&mut self, req: DroneRequest) -> Result<Signature, io::Error> { pub fn send_airdrop(&mut self, req: DroneRequest) -> Result<Signature, io::Error> {
let request_amount: u64; let request_amount: u64;
let requests_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
let transactions_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let transactions_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
let leader = poll_gossip_for_leader(self.network_addr, Some(10)) let leader = poll_gossip_for_leader(self.network_addr, Some(10))
.map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?; .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?;
let mut client = ThinClient::new( let mut client = ThinClient::new(
leader.contact_info.rpu, leader.contact_info.rpc,
requests_socket,
leader.contact_info.tpu, leader.contact_info.tpu,
transactions_socket, transactions_socket,
); );
@ -340,7 +338,7 @@ mod tests {
None, None,
&ledger_path, &ledger_path,
false, false,
Some(0), None,
); );
let mut addr: SocketAddr = "0.0.0.0:9900".parse().expect("bind to drone socket"); let mut addr: SocketAddr = "0.0.0.0:9900".parse().expect("bind to drone socket");
@ -353,13 +351,11 @@ mod tests {
Some(150_000), Some(150_000),
); );
let requests_socket = UdpSocket::bind("0.0.0.0:0").expect("drone bind to requests socket");
let transactions_socket = let transactions_socket =
UdpSocket::bind("0.0.0.0:0").expect("drone bind to transactions socket"); UdpSocket::bind("0.0.0.0:0").expect("drone bind to transactions socket");
let mut client = ThinClient::new( let mut client = ThinClient::new(
leader_data.contact_info.rpu, leader_data.contact_info.rpc,
requests_socket,
leader_data.contact_info.tpu, leader_data.contact_info.tpu,
transactions_socket, transactions_socket,
); );
@ -385,15 +381,14 @@ mod tests {
None, None,
false, false,
LeaderScheduler::from_bootstrap_leader(leader_data.id), LeaderScheduler::from_bootstrap_leader(leader_data.id),
None,
); );
let requests_socket = UdpSocket::bind("0.0.0.0:0").expect("drone bind to requests socket");
let transactions_socket = let transactions_socket =
UdpSocket::bind("0.0.0.0:0").expect("drone bind to transactions socket"); UdpSocket::bind("0.0.0.0:0").expect("drone bind to transactions socket");
let mut client = ThinClient::new( let mut client = ThinClient::new(
leader_data.contact_info.rpu, leader_data.contact_info.rpc,
requests_socket,
leader_data.contact_info.tpu, leader_data.contact_info.tpu,
transactions_socket, transactions_socket,
); );

View File

@ -7,13 +7,12 @@ use hash::Hash;
use leader_scheduler::LeaderScheduler; use leader_scheduler::LeaderScheduler;
use ledger::read_ledger; use ledger::read_ledger;
use ncp::Ncp; use ncp::Ncp;
use rpc::{JsonRpcService, RPC_PORT}; use rpc::JsonRpcService;
use rpc_pubsub::PubSubService; use rpc_pubsub::PubSubService;
use rpu::Rpu;
use service::Service; use service::Service;
use signature::{Keypair, KeypairUtil}; use signature::{Keypair, KeypairUtil};
use std::net::SocketAddr;
use std::net::UdpSocket; use std::net::UdpSocket;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
use std::thread::Result; use std::thread::Result;
@ -87,7 +86,6 @@ pub struct Fullnode {
keypair: Arc<Keypair>, keypair: Arc<Keypair>,
vote_account_keypair: Arc<Keypair>, vote_account_keypair: Arc<Keypair>,
exit: Arc<AtomicBool>, exit: Arc<AtomicBool>,
rpu: Option<Rpu>,
rpc_service: Option<JsonRpcService>, rpc_service: Option<JsonRpcService>,
rpc_pubsub_service: Option<PubSubService>, rpc_pubsub_service: Option<PubSubService>,
ncp: Ncp, ncp: Ncp,
@ -101,9 +99,8 @@ pub struct Fullnode {
retransmit_socket: UdpSocket, retransmit_socket: UdpSocket,
transaction_sockets: Vec<UdpSocket>, transaction_sockets: Vec<UdpSocket>,
broadcast_socket: UdpSocket, broadcast_socket: UdpSocket,
requests_socket: UdpSocket, rpc_addr: SocketAddr,
respond_socket: UdpSocket, rpc_pubsub_addr: SocketAddr,
rpc_port: Option<u16>,
} }
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] #[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
@ -137,6 +134,7 @@ impl Fullnode {
leader_addr: Option<SocketAddr>, leader_addr: Option<SocketAddr>,
sigverify_disabled: bool, sigverify_disabled: bool,
leader_scheduler: LeaderScheduler, leader_scheduler: LeaderScheduler,
rpc_port: Option<u16>,
) -> Self { ) -> Self {
let leader_scheduler = Arc::new(RwLock::new(leader_scheduler)); let leader_scheduler = Arc::new(RwLock::new(leader_scheduler));
@ -152,9 +150,11 @@ impl Fullnode {
"starting... local gossip address: {} (advertising {})", "starting... local gossip address: {} (advertising {})",
local_gossip_addr, node.info.contact_info.ncp local_gossip_addr, node.info.contact_info.ncp
); );
let mut rpc_addr = node.info.contact_info.rpc;
if let Some(port) = rpc_port {
rpc_addr.set_port(port);
}
let local_requests_addr = node.sockets.requests.local_addr().unwrap();
let requests_addr = node.info.contact_info.rpu;
let leader_info = leader_addr.map(|i| NodeInfo::new_entry_point(&i)); let leader_info = leader_addr.map(|i| NodeInfo::new_entry_point(&i));
let server = Self::new_with_bank( let server = Self::new_with_bank(
keypair, keypair,
@ -166,21 +166,18 @@ impl Fullnode {
leader_info.as_ref(), leader_info.as_ref(),
ledger_path, ledger_path,
sigverify_disabled, sigverify_disabled,
None, rpc_port,
); );
match leader_addr { match leader_addr {
Some(leader_addr) => { Some(leader_addr) => {
info!( info!(
"validator ready... local request address: {} (advertising {}) connected to: {}", "validator ready... rpc address: {}, connected to: {}",
local_requests_addr, requests_addr, leader_addr rpc_addr, leader_addr
); );
} }
None => { None => {
info!( info!("leader ready... rpc address: {}", rpc_addr);
"leader ready... local request address: {} (advertising {})",
local_requests_addr, requests_addr
);
} }
} }
@ -244,27 +241,27 @@ impl Fullnode {
bank: Bank, bank: Bank,
entry_height: u64, entry_height: u64,
last_id: &Hash, last_id: &Hash,
node: Node, mut node: Node,
bootstrap_leader_info_option: Option<&NodeInfo>, bootstrap_leader_info_option: Option<&NodeInfo>,
ledger_path: &str, ledger_path: &str,
sigverify_disabled: bool, sigverify_disabled: bool,
rpc_port: Option<u16>, rpc_port: Option<u16>,
) -> Self { ) -> Self {
let mut rpc_addr = node.info.contact_info.rpc;
let mut rpc_pubsub_addr = node.info.contact_info.rpc_pubsub;
// Use custom RPC port, if provided (`Some(port)`)
// RPC port may be any valid open port on the node
// If rpc_port == `None`, node will listen on the ports set in NodeInfo
if let Some(port) = rpc_port {
rpc_addr.set_port(port);
node.info.contact_info.rpc = rpc_addr;
rpc_pubsub_addr.set_port(port + 1);
node.info.contact_info.rpc_pubsub = rpc_pubsub_addr;
}
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
let bank = Arc::new(bank); let bank = Arc::new(bank);
let rpu = Some(Rpu::new(
&bank,
node.sockets
.requests
.try_clone()
.expect("Failed to clone requests socket"),
node.sockets
.respond
.try_clone()
.expect("Failed to clone respond socket"),
));
let window = new_window(32 * 1024); let window = new_window(32 * 1024);
let shared_window = Arc::new(RwLock::new(window)); let shared_window = Arc::new(RwLock::new(window));
let cluster_info = Arc::new(RwLock::new( let cluster_info = Arc::new(RwLock::new(
@ -272,7 +269,7 @@ impl Fullnode {
)); ));
let (rpc_service, rpc_pubsub_service) = let (rpc_service, rpc_pubsub_service) =
Self::startup_rpc_services(rpc_port, &bank, &cluster_info); Self::startup_rpc_services(rpc_addr, rpc_pubsub_addr, &bank, &cluster_info);
let ncp = Ncp::new( let ncp = Ncp::new(
&cluster_info, &cluster_info,
@ -364,7 +361,6 @@ impl Fullnode {
shared_window, shared_window,
bank, bank,
sigverify_disabled, sigverify_disabled,
rpu,
ncp, ncp,
rpc_service: Some(rpc_service), rpc_service: Some(rpc_service),
rpc_pubsub_service: Some(rpc_pubsub_service), rpc_pubsub_service: Some(rpc_pubsub_service),
@ -376,19 +372,13 @@ impl Fullnode {
retransmit_socket: node.sockets.retransmit, retransmit_socket: node.sockets.retransmit,
transaction_sockets: node.sockets.transaction, transaction_sockets: node.sockets.transaction,
broadcast_socket: node.sockets.broadcast, broadcast_socket: node.sockets.broadcast,
requests_socket: node.sockets.requests, rpc_addr,
respond_socket: node.sockets.respond, rpc_pubsub_addr,
rpc_port,
} }
} }
fn leader_to_validator(&mut self) -> Result<()> { fn leader_to_validator(&mut self) -> Result<()> {
// Close down any services that could have a reference to the bank // Close down any services that could have a reference to the bank
if self.rpu.is_some() {
let old_rpu = self.rpu.take().unwrap();
old_rpu.close()?;
}
if self.rpc_service.is_some() { if self.rpc_service.is_some() {
let old_rpc_service = self.rpc_service.take().unwrap(); let old_rpc_service = self.rpc_service.take().unwrap();
old_rpc_service.close()?; old_rpc_service.close()?;
@ -429,18 +419,12 @@ impl Fullnode {
// Spin up new versions of all the services that relied on the bank, passing in the // Spin up new versions of all the services that relied on the bank, passing in the
// new bank // new bank
self.rpu = Some(Rpu::new( let (rpc_service, rpc_pubsub_service) = Self::startup_rpc_services(
self.rpc_addr,
self.rpc_pubsub_addr,
&new_bank, &new_bank,
self.requests_socket &self.cluster_info,
.try_clone() );
.expect("Failed to clone requests socket"),
self.respond_socket
.try_clone()
.expect("Failed to clone respond socket"),
));
let (rpc_service, rpc_pubsub_service) =
Self::startup_rpc_services(self.rpc_port, &new_bank, &self.cluster_info);
self.rpc_service = Some(rpc_service); self.rpc_service = Some(rpc_service);
self.rpc_pubsub_service = Some(rpc_pubsub_service); self.rpc_pubsub_service = Some(rpc_pubsub_service);
self.bank = new_bank; self.bank = new_bank;
@ -555,9 +539,6 @@ impl Fullnode {
//used for notifying many nodes in parallel to exit //used for notifying many nodes in parallel to exit
pub fn exit(&self) { pub fn exit(&self) {
self.exit.store(true, Ordering::Relaxed); self.exit.store(true, Ordering::Relaxed);
if let Some(ref rpu) = self.rpu {
rpu.exit();
}
if let Some(ref rpc_service) = self.rpc_service { if let Some(ref rpc_service) = self.rpc_service {
rpc_service.exit(); rpc_service.exit();
} }
@ -599,22 +580,11 @@ impl Fullnode {
} }
fn startup_rpc_services( fn startup_rpc_services(
rpc_port: Option<u16>, rpc_addr: SocketAddr,
rpc_pubsub_addr: SocketAddr,
bank: &Arc<Bank>, bank: &Arc<Bank>,
cluster_info: &Arc<RwLock<ClusterInfo>>, cluster_info: &Arc<RwLock<ClusterInfo>>,
) -> (JsonRpcService, PubSubService) { ) -> (JsonRpcService, PubSubService) {
// Use custom RPC port, if provided (`Some(port)`)
// RPC port may be any open port on the node
// If rpc_port == `None`, node will listen on the default RPC_PORT from Rpc module
// If rpc_port == `Some(0)`, node will dynamically choose any open port for both
// Rpc and RpcPubsub serivces. Useful for tests.
let rpc_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::from(0)), rpc_port.unwrap_or(RPC_PORT));
let rpc_pubsub_addr = SocketAddr::new(
IpAddr::V4(Ipv4Addr::from(0)),
rpc_port.map_or(RPC_PORT + 1, |port| if port == 0 { port } else { port + 1 }),
);
// TODO: The RPC service assumes that there is a drone running on the leader // TODO: The RPC service assumes that there is a drone running on the leader
// Drone location/id will need to be handled a different way as soon as leader rotation begins // Drone location/id will need to be handled a different way as soon as leader rotation begins
( (
@ -628,9 +598,6 @@ impl Service for Fullnode {
type JoinReturnType = Option<FullnodeReturnType>; type JoinReturnType = Option<FullnodeReturnType>;
fn join(self) -> Result<Option<FullnodeReturnType>> { fn join(self) -> Result<Option<FullnodeReturnType>> {
if let Some(rpu) = self.rpu {
rpu.join()?;
}
if let Some(rpc_service) = self.rpc_service { if let Some(rpc_service) = self.rpc_service {
rpc_service.join()?; rpc_service.join()?;
} }
@ -702,7 +669,7 @@ mod tests {
Some(&entry), Some(&entry),
&validator_ledger_path, &validator_ledger_path,
false, false,
Some(0), None,
); );
v.close().unwrap(); v.close().unwrap();
remove_dir_all(validator_ledger_path).unwrap(); remove_dir_all(validator_ledger_path).unwrap();
@ -742,7 +709,7 @@ mod tests {
Some(&entry), Some(&entry),
&validator_ledger_path, &validator_ledger_path,
false, false,
Some(0), None,
) )
}).collect(); }).collect();
@ -810,6 +777,7 @@ mod tests {
Some(bootstrap_leader_info.contact_info.ncp), Some(bootstrap_leader_info.contact_info.ncp),
false, false,
LeaderScheduler::new(&leader_scheduler_config), LeaderScheduler::new(&leader_scheduler_config),
None,
); );
// Wait for the leader to transition, ticks should cause the leader to // Wait for the leader to transition, ticks should cause the leader to
@ -900,6 +868,7 @@ mod tests {
Some(bootstrap_leader_info.contact_info.ncp), Some(bootstrap_leader_info.contact_info.ncp),
false, false,
LeaderScheduler::new(&leader_scheduler_config), LeaderScheduler::new(&leader_scheduler_config),
None,
); );
match bootstrap_leader.node_role { match bootstrap_leader.node_role {
@ -918,6 +887,7 @@ mod tests {
Some(bootstrap_leader_info.contact_info.ncp), Some(bootstrap_leader_info.contact_info.ncp),
false, false,
LeaderScheduler::new(&leader_scheduler_config), LeaderScheduler::new(&leader_scheduler_config),
None,
); );
match validator.node_role { match validator.node_role {
@ -997,6 +967,7 @@ mod tests {
Some(leader_ncp), Some(leader_ncp),
false, false,
LeaderScheduler::new(&leader_scheduler_config), LeaderScheduler::new(&leader_scheduler_config),
None,
); );
// Send blobs to the validator from our mock leader // Send blobs to the validator from our mock leader

View File

@ -53,15 +53,11 @@ pub mod poh_service;
pub mod recvmmsg; pub mod recvmmsg;
pub mod replicate_stage; pub mod replicate_stage;
pub mod replicator; pub mod replicator;
pub mod request;
pub mod request_processor;
pub mod request_stage;
pub mod result; pub mod result;
pub mod retransmit_stage; pub mod retransmit_stage;
pub mod rpc; pub mod rpc;
pub mod rpc_pubsub; pub mod rpc_pubsub;
pub mod rpc_request; pub mod rpc_request;
pub mod rpu;
pub mod service; pub mod service;
pub mod signature; pub mod signature;
pub mod sigverify; pub mod sigverify;

View File

@ -429,14 +429,17 @@ pub fn make_consecutive_blobs(
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use hash::Hash;
use packet::{ use packet::{
to_packets, Blob, Meta, Packet, Packets, SharedBlob, SharedPackets, NUM_PACKETS, to_packets, Blob, Meta, Packet, Packets, SharedBlob, SharedPackets, NUM_PACKETS,
PACKET_DATA_SIZE, PACKET_DATA_SIZE,
}; };
use request::Request; use signature::{Keypair, KeypairUtil};
use std::io; use std::io;
use std::io::Write; use std::io::Write;
use std::net::UdpSocket; use std::net::UdpSocket;
use system_transaction::SystemTransaction;
use transaction::Transaction;
#[test] #[test]
pub fn packet_send_recv() { pub fn packet_send_recv() {
@ -460,7 +463,9 @@ mod tests {
#[test] #[test]
fn test_to_packets() { fn test_to_packets() {
let tx = Request::GetTransactionCount; let keypair = Keypair::new();
let hash = Hash::new(&[1; 32]);
let tx = Transaction::system_new(&keypair, keypair.pubkey(), 1, hash);
let rv = to_packets(&vec![tx.clone(); 1]); let rv = to_packets(&vec![tx.clone(); 1]);
assert_eq!(rv.len(), 1); assert_eq!(rv.len(), 1);
assert_eq!(rv[0].read().unwrap().packets.len(), 1); assert_eq!(rv[0].read().unwrap().packets.len(), 1);

View File

@ -214,6 +214,7 @@ mod tests {
None, None,
false, false,
LeaderScheduler::from_bootstrap_leader(leader_info.id), LeaderScheduler::from_bootstrap_leader(leader_info.id),
None,
); );
let mut leader_client = mk_client(&leader_info); let mut leader_client = mk_client(&leader_info);

View File

@ -1,43 +0,0 @@
//! The `request` module defines the messages for the thin client.
use hash::Hash;
use signature::Signature;
use solana_sdk::account::Account;
use solana_sdk::pubkey::Pubkey;
#[cfg_attr(feature = "cargo-clippy", allow(large_enum_variant))]
#[derive(Serialize, Deserialize, Debug, Clone, Copy)]
pub enum Request {
GetAccount { key: Pubkey },
GetLastId,
GetTransactionCount,
GetSignature { signature: Signature },
GetFinality,
}
impl Request {
/// Verify the request is valid.
pub fn verify(&self) -> bool {
true
}
}
#[derive(Serialize, Deserialize, Debug)]
pub enum Response {
Account {
key: Pubkey,
account: Option<Account>,
},
LastId {
id: Hash,
},
TransactionCount {
transaction_count: u64,
},
SignatureStatus {
signature_status: bool,
},
Finality {
time: usize,
},
}

View File

@ -1,66 +0,0 @@
//! The `request_processor` processes thin client Request messages.
use bank::Bank;
use request::{Request, Response};
use std::net::SocketAddr;
use std::sync::Arc;
pub struct RequestProcessor {
bank: Arc<Bank>,
}
impl RequestProcessor {
/// Create a new Tpu that wraps the given Bank.
pub fn new(bank: Arc<Bank>) -> Self {
RequestProcessor { bank }
}
/// Process Request items sent by clients.
fn process_request(
&self,
msg: Request,
rsp_addr: SocketAddr,
) -> Option<(Response, SocketAddr)> {
match msg {
Request::GetAccount { key } => {
let account = self.bank.get_account(&key);
let rsp = (Response::Account { key, account }, rsp_addr);
debug!("Response::Account {:?}", rsp);
Some(rsp)
}
Request::GetLastId => {
let id = self.bank.last_id();
let rsp = (Response::LastId { id }, rsp_addr);
debug!("Response::LastId {:?}", rsp);
Some(rsp)
}
Request::GetTransactionCount => {
let transaction_count = self.bank.transaction_count() as u64;
let rsp = (Response::TransactionCount { transaction_count }, rsp_addr);
debug!("Response::TransactionCount {:?}", rsp);
Some(rsp)
}
Request::GetSignature { signature } => {
let signature_status = self.bank.has_signature(&signature);
let rsp = (Response::SignatureStatus { signature_status }, rsp_addr);
debug!("Response::Signature {:?}", rsp);
Some(rsp)
}
Request::GetFinality => {
let time = self.bank.finality();
let rsp = (Response::Finality { time }, rsp_addr);
debug!("Response::Finality {:?}", rsp);
Some(rsp)
}
}
}
pub fn process_requests(
&self,
reqs: Vec<(Request, SocketAddr)>,
) -> Vec<(Response, SocketAddr)> {
reqs.into_iter()
.filter_map(|(req, rsp_addr)| self.process_request(req, rsp_addr))
.collect()
}
}

View File

@ -1,119 +0,0 @@
//! The `request_stage` processes thin client Request messages.
use bincode::deserialize;
use counter::Counter;
use log::Level;
use packet::{to_blobs, Packets, SharedPackets};
use rayon::prelude::*;
use request::Request;
use request_processor::RequestProcessor;
use result::{Error, Result};
use service::Service;
use std::net::SocketAddr;
use std::sync::atomic::AtomicUsize;
use std::sync::mpsc::{channel, Receiver, RecvTimeoutError};
use std::sync::Arc;
use std::thread::{self, Builder, JoinHandle};
use std::time::Instant;
use streamer::{self, BlobReceiver, BlobSender};
use timing;
pub struct RequestStage {
thread_hdl: JoinHandle<()>,
pub request_processor: Arc<RequestProcessor>,
}
impl RequestStage {
pub fn deserialize_requests(p: &Packets) -> Vec<Option<(Request, SocketAddr)>> {
p.packets
.par_iter()
.map(|x| {
deserialize(&x.data[0..x.meta.size])
.map(|req| (req, x.meta.addr()))
.ok()
}).collect()
}
pub fn process_request_packets(
request_processor: &RequestProcessor,
packet_receiver: &Receiver<SharedPackets>,
blob_sender: &BlobSender,
) -> Result<()> {
let (batch, batch_len, _recv_time) = streamer::recv_batch(packet_receiver)?;
debug!(
"@{:?} request_stage: processing: {}",
timing::timestamp(),
batch_len
);
let mut reqs_len = 0;
let proc_start = Instant::now();
for msgs in batch {
let reqs: Vec<_> = Self::deserialize_requests(&msgs.read().unwrap())
.into_iter()
.filter_map(|x| x)
.collect();
reqs_len += reqs.len();
let rsps = request_processor.process_requests(reqs);
let blobs = to_blobs(rsps)?;
if !blobs.is_empty() {
debug!("process: sending blobs: {}", blobs.len());
//don't wake up the other side if there is nothing
blob_sender.send(blobs)?;
}
}
let total_time_s = timing::duration_as_s(&proc_start.elapsed());
let total_time_ms = timing::duration_as_ms(&proc_start.elapsed());
inc_new_counter_info!("request_stage-time_ms", total_time_ms as usize);
debug!(
"@{:?} done process batches: {} time: {:?}ms reqs: {} reqs/s: {}",
timing::timestamp(),
batch_len,
total_time_ms,
reqs_len,
(reqs_len as f32) / (total_time_s)
);
Ok(())
}
pub fn new(
request_processor: RequestProcessor,
packet_receiver: Receiver<SharedPackets>,
) -> (Self, BlobReceiver) {
let request_processor = Arc::new(request_processor);
let request_processor_ = request_processor.clone();
let (blob_sender, blob_receiver) = channel();
let thread_hdl = Builder::new()
.name("solana-request-stage".to_string())
.spawn(move || loop {
if let Err(e) = Self::process_request_packets(
&request_processor_,
&packet_receiver,
&blob_sender,
) {
match e {
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
_ => error!("{:?}", e),
}
}
}).unwrap();
(
RequestStage {
thread_hdl,
request_processor,
},
blob_receiver,
)
}
}
impl Service for RequestStage {
type JoinReturnType = ();
fn join(self) -> thread::Result<()> {
self.thread_hdl.join()
}
}

View File

@ -615,7 +615,6 @@ mod tests {
let last_id = bank.last_id(); let last_id = bank.last_id();
let tx = Transaction::system_move(&alice.keypair(), bob_pubkey, 20, last_id, 0); let tx = Transaction::system_move(&alice.keypair(), bob_pubkey, 20, last_id, 0);
let serial_tx = serialize(&tx).unwrap(); let serial_tx = serialize(&tx).unwrap();
let rpc_port = 22222; // Needs to be distinct known number to not conflict with other tests
let leader_scheduler = Arc::new(RwLock::new(LeaderScheduler::from_bootstrap_leader( let leader_scheduler = Arc::new(RwLock::new(LeaderScheduler::from_bootstrap_leader(
leader_data.id, leader_data.id,
@ -634,7 +633,7 @@ mod tests {
None, None,
&ledger_path, &ledger_path,
false, false,
Some(rpc_port), None,
); );
sleep(Duration::from_millis(900)); sleep(Duration::from_millis(900));
@ -645,8 +644,7 @@ mod tests {
"method": "sendTransaction", "method": "sendTransaction",
"params": json!(vec![serial_tx]) "params": json!(vec![serial_tx])
}); });
let mut rpc_addr = leader_data.contact_info.ncp; let rpc_addr = leader_data.contact_info.rpc;
rpc_addr.set_port(22222);
let rpc_string = format!("http://{}", rpc_addr.to_string()); let rpc_string = format!("http://{}", rpc_addr.to_string());
let mut response = client let mut response = client
.post(&rpc_string) .post(&rpc_string)

View File

@ -1,88 +0,0 @@
//! The `rpu` module implements the Request Processing Unit, a
//! 3-stage transaction processing pipeline in software. It listens
//! for `Request` messages from clients and replies with `Response`
//! messages.
//!
//! ```text
//! .------.
//! | Bank |
//! `---+--`
//! |
//! .------------------|-------------------.
//! | RPU | |
//! | v |
//! .---------. | .-------. .---------. .---------. | .---------.
//! | Alice |--->| | | | | +---->| Alice |
//! `---------` | | Fetch | | Request | | Respond | | `---------`
//! | | Stage |->| Stage |->| Stage | |
//! .---------. | | | | | | | | .---------.
//! | Bob |--->| | | | | +---->| Bob |
//! `---------` | `-------` `---------` `---------` | `---------`
//! | |
//! | |
//! `--------------------------------------`
//! ```
use bank::Bank;
use request_processor::RequestProcessor;
use request_stage::RequestStage;
use service::Service;
use std::net::UdpSocket;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::channel;
use std::sync::Arc;
use std::thread::{self, JoinHandle};
use streamer;
pub struct Rpu {
request_stage: RequestStage,
thread_hdls: Vec<JoinHandle<()>>,
exit: Arc<AtomicBool>,
}
impl Rpu {
pub fn new(bank: &Arc<Bank>, requests_socket: UdpSocket, respond_socket: UdpSocket) -> Self {
let exit = Arc::new(AtomicBool::new(false));
let (packet_sender, packet_receiver) = channel();
let t_receiver = streamer::receiver(
Arc::new(requests_socket),
exit.clone(),
packet_sender,
"rpu",
);
let request_processor = RequestProcessor::new(bank.clone());
let (request_stage, blob_receiver) = RequestStage::new(request_processor, packet_receiver);
let t_responder = streamer::responder("rpu", Arc::new(respond_socket), blob_receiver);
let thread_hdls = vec![t_receiver, t_responder];
Rpu {
thread_hdls,
request_stage,
exit,
}
}
pub fn exit(&self) {
self.exit.store(true, Ordering::Relaxed);
}
pub fn close(self) -> thread::Result<()> {
self.exit();
self.join()
}
}
impl Service for Rpu {
type JoinReturnType = ();
fn join(self) -> thread::Result<()> {
for thread_hdl in self.thread_hdls {
thread_hdl.join()?;
}
self.request_stage.join()?;
Ok(())
}
}

View File

@ -4,13 +4,15 @@
//! unstable and may change in future releases. //! unstable and may change in future releases.
use bank::Bank; use bank::Bank;
use bincode::{deserialize, serialize}; use bincode::serialize;
use bs58;
use cluster_info::{ClusterInfo, ClusterInfoError, NodeInfo}; use cluster_info::{ClusterInfo, ClusterInfoError, NodeInfo};
use hash::Hash; use hash::Hash;
use log::Level; use log::Level;
use ncp::Ncp; use ncp::Ncp;
use request::{Request, Response};
use result::{Error, Result}; use result::{Error, Result};
use rpc_request::RpcRequest;
use serde_json;
use signature::{Keypair, Signature}; use signature::{Keypair, Signature};
use solana_sdk::account::Account; use solana_sdk::account::Account;
use solana_sdk::pubkey::Pubkey; use solana_sdk::pubkey::Pubkey;
@ -33,8 +35,7 @@ use metrics;
/// An object for querying and sending transactions to the network. /// An object for querying and sending transactions to the network.
pub struct ThinClient { pub struct ThinClient {
requests_addr: SocketAddr, rpc_addr: SocketAddr,
requests_socket: UdpSocket,
transactions_addr: SocketAddr, transactions_addr: SocketAddr,
transactions_socket: UdpSocket, transactions_socket: UdpSocket,
last_id: Option<Hash>, last_id: Option<Hash>,
@ -45,18 +46,15 @@ pub struct ThinClient {
} }
impl ThinClient { impl ThinClient {
/// Create a new ThinClient that will interface with Rpu /// Create a new ThinClient that will interface with the Rpc at `rpc_addr` using TCP
/// over `requests_socket` and `transactions_socket`. To receive responses, the caller must bind `socket` /// and the Tpu at `transactions_addr` over `transactions_socket` using UDP.
/// to a public address before invoking ThinClient methods.
pub fn new( pub fn new(
requests_addr: SocketAddr, rpc_addr: SocketAddr,
requests_socket: UdpSocket,
transactions_addr: SocketAddr, transactions_addr: SocketAddr,
transactions_socket: UdpSocket, transactions_socket: UdpSocket,
) -> Self { ) -> Self {
ThinClient { ThinClient {
requests_addr, rpc_addr,
requests_socket,
transactions_addr, transactions_addr,
transactions_socket, transactions_socket,
last_id: None, last_id: None,
@ -67,58 +65,6 @@ impl ThinClient {
} }
} }
pub fn recv_response(&self) -> io::Result<Response> {
let mut buf = vec![0u8; 1024];
trace!("start recv_from");
match self.requests_socket.recv_from(&mut buf) {
Ok((len, from)) => {
trace!("end recv_from got {} {}", len, from);
deserialize(&buf)
.or_else(|_| Err(io::Error::new(io::ErrorKind::Other, "deserialize")))
}
Err(e) => {
trace!("end recv_from got {:?}", e);
Err(e)
}
}
}
pub fn process_response(&mut self, resp: &Response) {
match *resp {
Response::Account {
key,
account: Some(ref account),
} => {
trace!("Response account {:?} {:?}", key, account);
self.balances.insert(key, account.clone());
}
Response::Account { key, account: None } => {
debug!("Response account {}: None ", key);
self.balances.remove(&key);
}
Response::LastId { id } => {
trace!("Response last_id {:?}", id);
self.last_id = Some(id);
}
Response::TransactionCount { transaction_count } => {
trace!("Response transaction count {:?}", transaction_count);
self.transaction_count = transaction_count;
}
Response::SignatureStatus { signature_status } => {
self.signature_status = signature_status;
if signature_status {
trace!("Response found signature");
} else {
trace!("Response signature not found");
}
}
Response::Finality { time } => {
trace!("Response finality {:?}", time);
self.finality = Some(time);
}
}
}
/// Send a signed Transaction to the server for processing. This method /// Send a signed Transaction to the server for processing. This method
/// does not wait for a response. /// does not wait for a response.
pub fn transfer_signed(&self, tx: &Transaction) -> io::Result<Signature> { pub fn transfer_signed(&self, tx: &Transaction) -> io::Result<Signature> {
@ -195,41 +141,36 @@ impl ThinClient {
} }
pub fn get_account_userdata(&mut self, pubkey: &Pubkey) -> io::Result<Option<Vec<u8>>> { pub fn get_account_userdata(&mut self, pubkey: &Pubkey) -> io::Result<Option<Vec<u8>>> {
let req = Request::GetAccount { key: *pubkey }; let params = json!(format!("{}", pubkey));
let data = serialize(&req).expect("serialize GetAccount in pub fn get_account_userdata"); let rpc_string = format!("http://{}", self.rpc_addr.to_string());
self.requests_socket let resp = RpcRequest::GetAccountInfo.make_rpc_request(&rpc_string, 1, Some(params));
.send_to(&data, &self.requests_addr) if let Ok(account_json) = resp {
.expect("buffer error in pub fn get_account_userdata"); let account: Account =
serde_json::from_value(account_json).expect("deserialize account");
loop { return Ok(Some(account.userdata));
let resp = self.recv_response()?;
trace!("recv_response {:?}", resp);
if let Response::Account { key, account } = resp {
if key == *pubkey {
return Ok(account.map(|account| account.userdata));
}
}
} }
Err(io::Error::new(
io::ErrorKind::Other,
"get_account_userdata failed",
))
} }
/// Request the balance of the user holding `pubkey`. This method blocks /// Request the balance of the user holding `pubkey`. This method blocks
/// until the server sends a response. If the response packet is dropped /// until the server sends a response. If the response packet is dropped
/// by the network, this method will hang indefinitely. /// by the network, this method will hang indefinitely.
pub fn get_balance(&mut self, pubkey: &Pubkey) -> io::Result<u64> { pub fn get_balance(&mut self, pubkey: &Pubkey) -> io::Result<u64> {
trace!("get_balance sending request to {}", self.requests_addr); trace!("get_balance sending request to {}", self.rpc_addr);
let req = Request::GetAccount { key: *pubkey }; let params = json!(format!("{}", pubkey));
let data = serialize(&req).expect("serialize GetAccount in pub fn get_balance"); let rpc_string = format!("http://{}", self.rpc_addr.to_string());
self.requests_socket let resp = RpcRequest::GetAccountInfo.make_rpc_request(&rpc_string, 1, Some(params));
.send_to(&data, &self.requests_addr) if let Ok(account_json) = resp {
.expect("buffer error in pub fn get_balance"); let account: Account =
let mut done = false; serde_json::from_value(account_json).expect("deserialize account");
while !done { trace!("Response account {:?} {:?}", pubkey, account);
let resp = self.recv_response()?; self.balances.insert(*pubkey, account.clone());
trace!("recv_response {:?}", resp); } else {
if let Response::Account { key, .. } = &resp { debug!("Response account {}: None ", pubkey);
done = key == pubkey; self.balances.remove(&pubkey);
}
self.process_response(&resp);
} }
trace!("get_balance {:?}", self.balances.get(pubkey)); trace!("get_balance {:?}", self.balances.get(pubkey));
// TODO: This is a hard coded call to introspect the balance of a budget_dsl contract // TODO: This is a hard coded call to introspect the balance of a budget_dsl contract
@ -243,25 +184,18 @@ impl ThinClient {
/// Request the finality from the leader node /// Request the finality from the leader node
pub fn get_finality(&mut self) -> usize { pub fn get_finality(&mut self) -> usize {
trace!("get_finality"); trace!("get_finality");
let req = Request::GetFinality;
let data = serialize(&req).expect("serialize GetFinality in pub fn get_finality");
let mut done = false; let mut done = false;
let rpc_string = format!("http://{}", self.rpc_addr.to_string());
while !done { while !done {
debug!("get_finality send_to {}", &self.requests_addr); debug!("get_finality send_to {}", &self.rpc_addr);
self.requests_socket let resp = RpcRequest::GetFinality.make_rpc_request(&rpc_string, 1, None);
.send_to(&data, &self.requests_addr)
.expect("buffer error in pub fn get_finality");
match self.recv_response() { if let Ok(value) = resp {
Ok(resp) => { done = true;
if let Response::Finality { .. } = resp { let finality = value.as_u64().unwrap() as usize;
done = true; self.finality = Some(finality);
} } else {
self.process_response(&resp); debug!("thin_client get_finality error: {:?}", resp);
}
Err(e) => {
debug!("thin_client get_finality error: {}", e);
}
} }
} }
self.finality.expect("some finality") self.finality.expect("some finality")
@ -271,21 +205,16 @@ impl ThinClient {
/// this method will try again 5 times. /// this method will try again 5 times.
pub fn transaction_count(&mut self) -> u64 { pub fn transaction_count(&mut self) -> u64 {
debug!("transaction_count"); debug!("transaction_count");
let req = Request::GetTransactionCount;
let data =
serialize(&req).expect("serialize GetTransactionCount in pub fn transaction_count");
let mut tries_left = 5; let mut tries_left = 5;
let rpc_string = format!("http://{}", self.rpc_addr.to_string());
while tries_left > 0 { while tries_left > 0 {
self.requests_socket let resp = RpcRequest::GetTransactionCount.make_rpc_request(&rpc_string, 1, None);
.send_to(&data, &self.requests_addr)
.expect("buffer error in pub fn transaction_count");
if let Ok(resp) = self.recv_response() { if let Ok(value) = resp {
debug!("transaction_count recv_response: {:?}", resp); debug!("transaction_count recv_response: {:?}", value);
if let Response::TransactionCount { .. } = resp { tries_left = 0;
tries_left = 0; let transaction_count = value.as_u64().unwrap();
} self.transaction_count = transaction_count;
self.process_response(&resp);
} else { } else {
tries_left -= 1; tries_left -= 1;
} }
@ -297,25 +226,20 @@ impl ThinClient {
/// until the server sends a response. /// until the server sends a response.
pub fn get_last_id(&mut self) -> Hash { pub fn get_last_id(&mut self) -> Hash {
trace!("get_last_id"); trace!("get_last_id");
let req = Request::GetLastId;
let data = serialize(&req).expect("serialize GetLastId in pub fn get_last_id");
let mut done = false; let mut done = false;
let rpc_string = format!("http://{}", self.rpc_addr.to_string());
while !done { while !done {
debug!("get_last_id send_to {}", &self.requests_addr); debug!("get_last_id send_to {}", &self.rpc_addr);
self.requests_socket let resp = RpcRequest::GetLastId.make_rpc_request(&rpc_string, 1, None);
.send_to(&data, &self.requests_addr)
.expect("buffer error in pub fn get_last_id");
match self.recv_response() { if let Ok(value) = resp {
Ok(resp) => { done = true;
if let Response::LastId { .. } = resp { let last_id_str = value.as_str().unwrap();
done = true; let last_id_vec = bs58::decode(last_id_str).into_vec().unwrap();
} let last_id = Hash::new(&last_id_vec);
self.process_response(&resp); self.last_id = Some(last_id);
} } else {
Err(e) => { debug!("thin_client get_last_id error: {:?}", resp);
debug!("thin_client get_last_id error: {}", e);
}
} }
} }
self.last_id.expect("some last_id") self.last_id.expect("some last_id")
@ -377,22 +301,25 @@ impl ThinClient {
/// until the server sends a response. /// until the server sends a response.
pub fn check_signature(&mut self, signature: &Signature) -> bool { pub fn check_signature(&mut self, signature: &Signature) -> bool {
trace!("check_signature"); trace!("check_signature");
let req = Request::GetSignature { let params = json!(format!("{}", signature));
signature: *signature,
};
let data = serialize(&req).expect("serialize GetSignature in pub fn check_signature");
let now = Instant::now(); let now = Instant::now();
let rpc_string = format!("http://{}", self.rpc_addr.to_string());
let mut done = false; let mut done = false;
while !done { while !done {
self.requests_socket let resp = RpcRequest::ConfirmTransaction.make_rpc_request(
.send_to(&data, &self.requests_addr) &rpc_string,
.expect("buffer error in pub fn get_last_id"); 1,
Some(params.clone()),
);
if let Ok(resp) = self.recv_response() { if let Ok(confirmation) = resp {
if let Response::SignatureStatus { .. } = resp { done = true;
done = true; self.signature_status = confirmation.as_bool().unwrap();
if self.signature_status {
trace!("Response found signature");
} else {
trace!("Response signature not found");
} }
self.process_response(&resp);
} }
} }
metrics::submit( metrics::submit(
@ -500,6 +427,7 @@ pub fn retry_get_balance(
mod tests { mod tests {
use super::*; use super::*;
use bank::Bank; use bank::Bank;
use bincode::deserialize;
use cluster_info::Node; use cluster_info::Node;
use fullnode::Fullnode; use fullnode::Fullnode;
use leader_scheduler::LeaderScheduler; use leader_scheduler::LeaderScheduler;
@ -512,7 +440,6 @@ mod tests {
use vote_program::VoteProgram; use vote_program::VoteProgram;
#[test] #[test]
#[ignore]
fn test_thin_client() { fn test_thin_client() {
logger::setup(); logger::setup();
let leader_keypair = Arc::new(Keypair::new()); let leader_keypair = Arc::new(Keypair::new());
@ -540,19 +467,21 @@ mod tests {
None, None,
&ledger_path, &ledger_path,
false, false,
Some(0), None,
); );
sleep(Duration::from_millis(900)); sleep(Duration::from_millis(900));
let requests_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
let transactions_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let transactions_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
let mut client = ThinClient::new( let mut client = ThinClient::new(
leader_data.contact_info.rpu, leader_data.contact_info.rpc,
requests_socket,
leader_data.contact_info.tpu, leader_data.contact_info.tpu,
transactions_socket, transactions_socket,
); );
let transaction_count = client.transaction_count();
assert_eq!(transaction_count, 0);
let finality = client.get_finality();
assert_eq!(finality, 18446744073709551615);
let last_id = client.get_last_id(); let last_id = client.get_last_id();
let signature = client let signature = client
.transfer(500, &alice.keypair(), bob_pubkey, &last_id) .transfer(500, &alice.keypair(), bob_pubkey, &last_id)
@ -560,6 +489,8 @@ mod tests {
client.poll_for_signature(&signature).unwrap(); client.poll_for_signature(&signature).unwrap();
let balance = client.get_balance(&bob_pubkey); let balance = client.get_balance(&bob_pubkey);
assert_eq!(balance.unwrap(), 500); assert_eq!(balance.unwrap(), 500);
let transaction_count = client.transaction_count();
assert_eq!(transaction_count, 1);
server.close().unwrap(); server.close().unwrap();
remove_dir_all(ledger_path).unwrap(); remove_dir_all(ledger_path).unwrap();
} }
@ -593,19 +524,14 @@ mod tests {
None, None,
&ledger_path, &ledger_path,
false, false,
Some(0), None,
); );
//TODO: remove this sleep, or add a retry so CI is stable //TODO: remove this sleep, or add a retry so CI is stable
sleep(Duration::from_millis(300)); sleep(Duration::from_millis(300));
let requests_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
requests_socket
.set_read_timeout(Some(Duration::new(5, 0)))
.unwrap();
let transactions_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let transactions_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
let mut client = ThinClient::new( let mut client = ThinClient::new(
leader_data.contact_info.rpu, leader_data.contact_info.rpc,
requests_socket,
leader_data.contact_info.tpu, leader_data.contact_info.tpu,
transactions_socket, transactions_socket,
); );
@ -660,18 +586,13 @@ mod tests {
None, None,
&ledger_path, &ledger_path,
false, false,
Some(0), None,
); );
sleep(Duration::from_millis(300)); sleep(Duration::from_millis(300));
let requests_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
requests_socket
.set_read_timeout(Some(Duration::new(5, 0)))
.unwrap();
let transactions_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let transactions_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
let mut client = ThinClient::new( let mut client = ThinClient::new(
leader_data.contact_info.rpu, leader_data.contact_info.rpc,
requests_socket,
leader_data.contact_info.tpu, leader_data.contact_info.tpu,
transactions_socket, transactions_socket,
); );
@ -714,18 +635,13 @@ mod tests {
None, None,
&ledger_path, &ledger_path,
false, false,
Some(0), None,
); );
sleep(Duration::from_millis(300)); sleep(Duration::from_millis(300));
let requests_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
requests_socket
.set_read_timeout(Some(Duration::new(5, 0)))
.unwrap();
let transactions_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let transactions_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
let mut client = ThinClient::new( let mut client = ThinClient::new(
leader_data.contact_info.rpu, leader_data.contact_info.rpc,
requests_socket,
leader_data.contact_info.tpu, leader_data.contact_info.tpu,
transactions_socket, transactions_socket,
); );
@ -788,12 +704,8 @@ mod tests {
// set a bogus address, see that we don't hang // set a bogus address, see that we don't hang
logger::setup(); logger::setup();
let addr = "0.0.0.0:1234".parse().unwrap(); let addr = "0.0.0.0:1234".parse().unwrap();
let requests_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
requests_socket
.set_read_timeout(Some(Duration::from_millis(250)))
.unwrap();
let transactions_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let transactions_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
let mut client = ThinClient::new(addr, requests_socket, addr, transactions_socket); let mut client = ThinClient::new(addr, addr, transactions_socket);
assert_eq!(client.transaction_count(), 0); assert_eq!(client.transaction_count(), 0);
} }
@ -825,18 +737,13 @@ mod tests {
None, None,
&ledger_path, &ledger_path,
false, false,
Some(0), None,
); );
sleep(Duration::from_millis(900)); sleep(Duration::from_millis(900));
let requests_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
requests_socket
.set_read_timeout(Some(Duration::new(5, 0)))
.unwrap();
let transactions_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let transactions_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
let mut client = ThinClient::new( let mut client = ThinClient::new(
leader_data.contact_info.rpu, leader_data.contact_info.rpc,
requests_socket,
leader_data.contact_info.tpu, leader_data.contact_info.tpu,
transactions_socket, transactions_socket,
); );

View File

@ -267,7 +267,7 @@ pub mod tests {
let (s_responder, r_responder) = channel(); let (s_responder, r_responder) = channel();
let t_responder = streamer::responder( let t_responder = streamer::responder(
"test_replicate", "test_replicate",
Arc::new(leader.sockets.requests), Arc::new(leader.sockets.retransmit),
r_responder, r_responder,
); );

View File

@ -1167,8 +1167,6 @@ mod tests {
create_tmp_genesis("wallet_request_airdrop", 10_000_000, leader_data.id, 1000); create_tmp_genesis("wallet_request_airdrop", 10_000_000, leader_data.id, 1000);
let mut bank = Bank::new(&alice); let mut bank = Bank::new(&alice);
let rpc_port = 11111; // Needs to be distinct known number to not conflict with other tests
let leader_scheduler = Arc::new(RwLock::new(LeaderScheduler::from_bootstrap_leader( let leader_scheduler = Arc::new(RwLock::new(LeaderScheduler::from_bootstrap_leader(
leader_data.id, leader_data.id,
))); )));
@ -1186,7 +1184,7 @@ mod tests {
None, None,
&ledger_path, &ledger_path,
false, false,
Some(rpc_port), None,
); );
sleep(Duration::from_millis(900)); sleep(Duration::from_millis(900));
@ -1194,9 +1192,7 @@ mod tests {
run_local_drone(alice.keypair(), leader_data.contact_info.ncp, sender); run_local_drone(alice.keypair(), leader_data.contact_info.ncp, sender);
let drone_addr = receiver.recv().unwrap(); let drone_addr = receiver.recv().unwrap();
let mut addr = leader_data.contact_info.ncp; let rpc_addr = format!("http://{}", leader_data.contact_info.rpc.to_string());
addr.set_port(rpc_port);
let rpc_addr = format!("http://{}", addr.to_string());
let signature = request_airdrop(&drone_addr, &bob_pubkey, 50); let signature = request_airdrop(&drone_addr, &bob_pubkey, 50);
assert!(signature.is_ok()); assert!(signature.is_ok());
@ -1377,7 +1373,6 @@ mod tests {
let mut config_payer = WalletConfig::default(); let mut config_payer = WalletConfig::default();
let mut config_witness = WalletConfig::default(); let mut config_witness = WalletConfig::default();
let rpc_port = 11223; // Needs to be distinct known number to not conflict with other tests
let leader_scheduler = Arc::new(RwLock::new(LeaderScheduler::from_bootstrap_leader( let leader_scheduler = Arc::new(RwLock::new(LeaderScheduler::from_bootstrap_leader(
leader_data.id, leader_data.id,
))); )));
@ -1394,7 +1389,7 @@ mod tests {
None, None,
&ledger_path, &ledger_path,
false, false,
Some(rpc_port), None,
); );
sleep(Duration::from_millis(900)); sleep(Duration::from_millis(900));
@ -1405,8 +1400,7 @@ mod tests {
config_payer.leader = leader_data1; config_payer.leader = leader_data1;
config_witness.leader = leader_data2; config_witness.leader = leader_data2;
let mut rpc_addr = leader_data.contact_info.ncp; let rpc_addr = leader_data.contact_info.rpc;
rpc_addr.set_port(rpc_port);
config_payer.rpc_addr = format!("http://{}", rpc_addr.to_string()); config_payer.rpc_addr = format!("http://{}", rpc_addr.to_string());
config_witness.rpc_addr = config_payer.rpc_addr.clone(); config_witness.rpc_addr = config_payer.rpc_addr.clone();

View File

@ -42,7 +42,6 @@ fn make_spy_node(leader: &NodeInfo) -> (Ncp, Arc<RwLock<ClusterInfo>>, Pubkey) {
let me = spy.info.id.clone(); let me = spy.info.id.clone();
let daddr = "0.0.0.0:0".parse().unwrap(); let daddr = "0.0.0.0:0".parse().unwrap();
spy.info.contact_info.tvu = daddr; spy.info.contact_info.tvu = daddr;
spy.info.contact_info.rpu = spy.sockets.transaction[0].local_addr().unwrap();
let mut spy_cluster_info = ClusterInfo::new(spy.info).expect("ClusterInfo::new"); let mut spy_cluster_info = ClusterInfo::new(spy.info).expect("ClusterInfo::new");
spy_cluster_info.insert(&leader); spy_cluster_info.insert(&leader);
spy_cluster_info.set_leader(leader.id); spy_cluster_info.set_leader(leader.id);
@ -141,6 +140,7 @@ fn test_multi_node_ledger_window() -> result::Result<()> {
None, None,
false, false,
LeaderScheduler::from_bootstrap_leader(leader_pubkey), LeaderScheduler::from_bootstrap_leader(leader_pubkey),
None,
); );
// start up another validator from zero, converge and then check // start up another validator from zero, converge and then check
@ -157,6 +157,7 @@ fn test_multi_node_ledger_window() -> result::Result<()> {
Some(leader_data.contact_info.ncp), Some(leader_data.contact_info.ncp),
false, false,
LeaderScheduler::from_bootstrap_leader(leader_pubkey), LeaderScheduler::from_bootstrap_leader(leader_pubkey),
None,
); );
// Send validator some tokens to vote // Send validator some tokens to vote
@ -233,6 +234,7 @@ fn test_multi_node_validator_catchup_from_zero() -> result::Result<()> {
None, None,
false, false,
LeaderScheduler::from_bootstrap_leader(leader_pubkey), LeaderScheduler::from_bootstrap_leader(leader_pubkey),
None,
); );
let mut nodes = vec![server]; let mut nodes = vec![server];
@ -260,6 +262,7 @@ fn test_multi_node_validator_catchup_from_zero() -> result::Result<()> {
Some(leader_data.contact_info.ncp), Some(leader_data.contact_info.ncp),
false, false,
LeaderScheduler::from_bootstrap_leader(leader_pubkey), LeaderScheduler::from_bootstrap_leader(leader_pubkey),
None,
); );
nodes.push(val); nodes.push(val);
} }
@ -297,6 +300,7 @@ fn test_multi_node_validator_catchup_from_zero() -> result::Result<()> {
Some(leader_data.contact_info.ncp), Some(leader_data.contact_info.ncp),
false, false,
LeaderScheduler::from_bootstrap_leader(leader_pubkey), LeaderScheduler::from_bootstrap_leader(leader_pubkey),
None,
); );
nodes.push(val); nodes.push(val);
//contains the leader and new node //contains the leader and new node
@ -365,6 +369,7 @@ fn test_multi_node_basic() {
None, None,
false, false,
LeaderScheduler::from_bootstrap_leader(leader_pubkey), LeaderScheduler::from_bootstrap_leader(leader_pubkey),
None,
); );
let mut nodes = vec![server]; let mut nodes = vec![server];
@ -389,6 +394,7 @@ fn test_multi_node_basic() {
Some(leader_data.contact_info.ncp), Some(leader_data.contact_info.ncp),
false, false,
LeaderScheduler::from_bootstrap_leader(leader_pubkey), LeaderScheduler::from_bootstrap_leader(leader_pubkey),
None,
); );
nodes.push(val); nodes.push(val);
} }
@ -442,6 +448,7 @@ fn test_boot_validator_from_file() -> result::Result<()> {
None, None,
false, false,
LeaderScheduler::from_bootstrap_leader(leader_pubkey), LeaderScheduler::from_bootstrap_leader(leader_pubkey),
None,
); );
let leader_balance = let leader_balance =
send_tx_and_retry_get_balance(&leader_data, &alice, &bob_pubkey, 500, Some(500)).unwrap(); send_tx_and_retry_get_balance(&leader_data, &alice, &bob_pubkey, 500, Some(500)).unwrap();
@ -463,6 +470,7 @@ fn test_boot_validator_from_file() -> result::Result<()> {
Some(leader_data.contact_info.ncp), Some(leader_data.contact_info.ncp),
false, false,
LeaderScheduler::from_bootstrap_leader(leader_pubkey), LeaderScheduler::from_bootstrap_leader(leader_pubkey),
None,
); );
let mut client = mk_client(&validator_data); let mut client = mk_client(&validator_data);
let getbal = retry_get_balance(&mut client, &bob_pubkey, Some(leader_balance)); let getbal = retry_get_balance(&mut client, &bob_pubkey, Some(leader_balance));
@ -488,6 +496,7 @@ fn create_leader(ledger_path: &str, leader_keypair: Arc<Keypair>) -> (NodeInfo,
None, None,
false, false,
LeaderScheduler::from_bootstrap_leader(leader_data.id), LeaderScheduler::from_bootstrap_leader(leader_data.id),
None,
); );
(leader_data, leader_fullnode) (leader_data, leader_fullnode)
} }
@ -549,6 +558,7 @@ fn test_leader_restart_validator_start_from_old_ledger() -> result::Result<()> {
Some(leader_data.contact_info.ncp), Some(leader_data.contact_info.ncp),
false, false,
LeaderScheduler::from_bootstrap_leader(leader_data.id), LeaderScheduler::from_bootstrap_leader(leader_data.id),
None,
); );
// trigger broadcast, validator should catch up from leader, whose window contains // trigger broadcast, validator should catch up from leader, whose window contains
@ -615,6 +625,7 @@ fn test_multi_node_dynamic_network() {
None, None,
true, true,
LeaderScheduler::from_bootstrap_leader(leader_pubkey), LeaderScheduler::from_bootstrap_leader(leader_pubkey),
None,
); );
info!("{} LEADER", leader_data.id); info!("{} LEADER", leader_data.id);
@ -678,6 +689,7 @@ fn test_multi_node_dynamic_network() {
Some(leader_data.contact_info.ncp), Some(leader_data.contact_info.ncp),
true, true,
LeaderScheduler::from_bootstrap_leader(leader_pubkey), LeaderScheduler::from_bootstrap_leader(leader_pubkey),
None,
); );
(rd, val) (rd, val)
}).unwrap() }).unwrap()
@ -823,6 +835,7 @@ fn test_leader_to_validator_transition() {
Some(leader_info.contact_info.ncp), Some(leader_info.contact_info.ncp),
false, false,
LeaderScheduler::new(&leader_scheduler_config), LeaderScheduler::new(&leader_scheduler_config),
None,
); );
// Make an extra node for our leader to broadcast to, // Make an extra node for our leader to broadcast to,
@ -962,6 +975,7 @@ fn test_leader_validator_basic() {
Some(leader_info.contact_info.ncp), Some(leader_info.contact_info.ncp),
false, false,
LeaderScheduler::new(&leader_scheduler_config), LeaderScheduler::new(&leader_scheduler_config),
None,
); );
// Start the leader fullnode // Start the leader fullnode
@ -973,6 +987,7 @@ fn test_leader_validator_basic() {
Some(leader_info.contact_info.ncp), Some(leader_info.contact_info.ncp),
false, false,
LeaderScheduler::new(&leader_scheduler_config), LeaderScheduler::new(&leader_scheduler_config),
None,
); );
// Wait for convergence // Wait for convergence
@ -1148,6 +1163,7 @@ fn test_dropped_handoff_recovery() {
Some(bootstrap_leader_info.contact_info.ncp), Some(bootstrap_leader_info.contact_info.ncp),
false, false,
LeaderScheduler::new(&leader_scheduler_config), LeaderScheduler::new(&leader_scheduler_config),
None,
); );
let mut nodes = vec![bootstrap_leader]; let mut nodes = vec![bootstrap_leader];
@ -1170,6 +1186,7 @@ fn test_dropped_handoff_recovery() {
Some(bootstrap_leader_info.contact_info.ncp), Some(bootstrap_leader_info.contact_info.ncp),
false, false,
LeaderScheduler::new(&leader_scheduler_config), LeaderScheduler::new(&leader_scheduler_config),
None,
); );
nodes.push(validator); nodes.push(validator);
@ -1195,6 +1212,7 @@ fn test_dropped_handoff_recovery() {
Some(bootstrap_leader_info.contact_info.ncp), Some(bootstrap_leader_info.contact_info.ncp),
false, false,
LeaderScheduler::new(&leader_scheduler_config), LeaderScheduler::new(&leader_scheduler_config),
None,
); );
// Wait for catchup // Wait for catchup
@ -1311,6 +1329,7 @@ fn test_full_leader_validator_network() {
Some(bootstrap_leader_info.contact_info.ncp), Some(bootstrap_leader_info.contact_info.ncp),
false, false,
LeaderScheduler::new(&leader_scheduler_config), LeaderScheduler::new(&leader_scheduler_config),
None,
))); )));
let mut nodes: Vec<Arc<RwLock<Fullnode>>> = vec![bootstrap_leader.clone()]; let mut nodes: Vec<Arc<RwLock<Fullnode>>> = vec![bootstrap_leader.clone()];
@ -1337,6 +1356,7 @@ fn test_full_leader_validator_network() {
Some(bootstrap_leader_info.contact_info.ncp), Some(bootstrap_leader_info.contact_info.ncp),
false, false,
LeaderScheduler::new(&leader_scheduler_config), LeaderScheduler::new(&leader_scheduler_config),
None,
))); )));
nodes.push(validator.clone()); nodes.push(validator.clone());
@ -1451,16 +1471,10 @@ fn test_full_leader_validator_network() {
} }
fn mk_client(leader: &NodeInfo) -> ThinClient { fn mk_client(leader: &NodeInfo) -> ThinClient {
let requests_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
requests_socket
.set_read_timeout(Some(Duration::new(1, 0)))
.unwrap();
let transactions_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let transactions_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
assert!(ClusterInfo::is_valid_address(&leader.contact_info.rpu));
assert!(ClusterInfo::is_valid_address(&leader.contact_info.tpu)); assert!(ClusterInfo::is_valid_address(&leader.contact_info.tpu));
ThinClient::new( ThinClient::new(
leader.contact_info.rpu, leader.contact_info.rpc,
requests_socket,
leader.contact_info.tpu, leader.contact_info.tpu,
transactions_socket, transactions_socket,
) )