diff --git a/Cargo.toml b/Cargo.toml index f0c26403e..695086375 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -82,6 +82,7 @@ jsonrpc-macros = { git = "https://github.com/paritytech/jsonrpc", rev = "4b6060b itertools = "0.7.8" log = "0.4.2" matches = "0.1.6" +nix = "0.11.0" pnet_datalink = "0.21.0" rand = "0.5.1" rayon = "1.0.0" @@ -91,6 +92,7 @@ sha2 = "0.7.0" serde = "1.0.27" serde_derive = "1.0.27" serde_json = "1.0.10" +socket2 = "0.3.8" sys-info = "0.5.6" tokio = "0.1" tokio-codec = "0.1" diff --git a/src/bank.rs b/src/bank.rs old mode 100755 new mode 100644 diff --git a/src/bin/bench-streamer.rs b/src/bin/bench-streamer.rs index d5d8c9b5f..54d43a402 100644 --- a/src/bin/bench-streamer.rs +++ b/src/bin/bench-streamer.rs @@ -1,9 +1,13 @@ +extern crate clap; extern crate solana; +use clap::{App, Arg}; +use solana::nat::bind_to; use solana::packet::{Packet, PacketRecycler, BLOB_SIZE, PACKET_DATA_SIZE}; use solana::result::Result; use solana::streamer::{receiver, PacketReceiver}; -use std::net::{SocketAddr, UdpSocket}; +use std::cmp::max; +use std::net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket}; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::mpsc::channel; use std::sync::Arc; @@ -55,27 +59,56 @@ fn sink( } fn main() -> Result<()> { - let read = UdpSocket::bind("127.0.0.1:0")?; - read.set_read_timeout(Some(Duration::new(1, 0)))?; + let mut num_sockets = 1usize; + + let matches = App::new("solana-bench-streamer") + .arg( + Arg::with_name("num-recv-sockets") + .long("num-recv-sockets") + .value_name("NUM") + .takes_value(true) + .help("Use NUM receive sockets"), + ) + .get_matches(); + + if let Some(n) = matches.value_of("num-recv-sockets") { + num_sockets = max(num_sockets, n.to_string().parse().expect("integer")); + } + + let mut port = 0; + let mut addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0); - let addr = read.local_addr()?; let exit = Arc::new(AtomicBool::new(false)); let pack_recycler = PacketRecycler::default(); - let (s_reader, r_reader) = channel(); - let t_reader = receiver( - Arc::new(read), - exit.clone(), - pack_recycler.clone(), - s_reader, - ); + let mut read_channels = Vec::new(); + let mut read_threads = Vec::new(); + for _ in 0..num_sockets { + let read = bind_to(port); + read.set_read_timeout(Some(Duration::new(1, 0))).unwrap(); + + addr = read.local_addr().unwrap(); + port = addr.port(); + + let (s_reader, r_reader) = channel(); + read_channels.push(r_reader); + read_threads.push(receiver( + Arc::new(read), + exit.clone(), + pack_recycler.clone(), + s_reader, + )); + } + let t_producer1 = producer(&addr, &pack_recycler, exit.clone()); let t_producer2 = producer(&addr, &pack_recycler, exit.clone()); let t_producer3 = producer(&addr, &pack_recycler, exit.clone()); let rvs = Arc::new(AtomicUsize::new(0)); - let t_sink = sink(pack_recycler.clone(), exit.clone(), rvs.clone(), r_reader); - + let sink_threads: Vec<_> = read_channels + .into_iter() + .map(|r_reader| sink(pack_recycler.clone(), exit.clone(), rvs.clone(), r_reader)) + .collect(); let start = SystemTime::now(); let start_val = rvs.load(Ordering::Relaxed); sleep(Duration::new(5, 0)); @@ -86,10 +119,14 @@ fn main() -> Result<()> { let fcount = (end_val - start_val) as f64; println!("performance: {:?}", fcount / ftime); exit.store(true, Ordering::Relaxed); - t_reader.join()?; + for t_reader in read_threads { + t_reader.join()?; + } t_producer1.join()?; t_producer2.join()?; t_producer3.join()?; - t_sink.join()?; + for t_sink in sink_threads { + t_sink.join()?; + } Ok(()) } diff --git a/src/crdt.rs b/src/crdt.rs index d4ad10ec1..789efb07b 100644 --- a/src/crdt.rs +++ b/src/crdt.rs @@ -19,7 +19,7 @@ use counter::Counter; use hash::Hash; use ledger::LedgerWindow; use log::Level; -use nat::bind_in_range; +use nat::{bind_in_range, bind_to}; use packet::{to_blob, Blob, BlobRecycler, SharedBlob, BLOB_SIZE}; use rand::{thread_rng, Rng}; use rayon::prelude::*; @@ -1264,7 +1264,7 @@ pub struct Sockets { pub gossip: UdpSocket, pub requests: UdpSocket, pub replicate: UdpSocket, - pub transaction: UdpSocket, + pub transaction: Vec, pub respond: UdpSocket, pub broadcast: UdpSocket, pub repair: UdpSocket, @@ -1304,7 +1304,7 @@ impl Node { gossip, requests, replicate, - transaction, + transaction: vec![transaction], respond, broadcast, repair, @@ -1322,16 +1322,6 @@ impl Node { } }; - fn bind_to(port: u16) -> UdpSocket { - let addr = socketaddr!(0, port); - match UdpSocket::bind(addr) { - Ok(socket) => socket, - Err(err) => { - panic!("Failed to bind to {:?}, err: {}", addr, err); - } - } - }; - let (gossip_port, gossip) = if ncp.port() != 0 { (ncp.port(), bind_to(ncp.port())) } else { @@ -1342,6 +1332,12 @@ impl Node { let (requests_port, requests) = bind(); let (transaction_port, transaction) = bind(); + let mut transaction_sockets = vec![transaction]; + + for _ in 0..4 { + transaction_sockets.push(bind_to(transaction_port)); + } + let (_, repair) = bind(); let (_, broadcast) = bind(); let (_, retransmit) = bind(); @@ -1365,7 +1361,7 @@ impl Node { gossip, requests, replicate, - transaction, + transaction: transaction_sockets, respond, broadcast, repair, @@ -2059,7 +2055,10 @@ mod tests { assert_eq!(node.sockets.gossip.local_addr().unwrap().ip(), ip); assert_eq!(node.sockets.replicate.local_addr().unwrap().ip(), ip); assert_eq!(node.sockets.requests.local_addr().unwrap().ip(), ip); - assert_eq!(node.sockets.transaction.local_addr().unwrap().ip(), ip); + assert!(node.sockets.transaction.len() > 1); + for tx_socket in node.sockets.transaction.iter() { + assert_eq!(tx_socket.local_addr().unwrap().ip(), ip); + } assert_eq!(node.sockets.repair.local_addr().unwrap().ip(), ip); assert!(node.sockets.gossip.local_addr().unwrap().port() >= FULLNODE_PORT_RANGE.0); @@ -2068,8 +2067,12 @@ mod tests { assert!(node.sockets.replicate.local_addr().unwrap().port() < FULLNODE_PORT_RANGE.1); assert!(node.sockets.requests.local_addr().unwrap().port() >= FULLNODE_PORT_RANGE.0); assert!(node.sockets.requests.local_addr().unwrap().port() < FULLNODE_PORT_RANGE.1); - assert!(node.sockets.transaction.local_addr().unwrap().port() >= FULLNODE_PORT_RANGE.0); - assert!(node.sockets.transaction.local_addr().unwrap().port() < FULLNODE_PORT_RANGE.1); + let tx_port = node.sockets.transaction[0].local_addr().unwrap().port(); + assert!(tx_port >= FULLNODE_PORT_RANGE.0); + assert!(tx_port < FULLNODE_PORT_RANGE.1); + for tx_socket in node.sockets.transaction.iter() { + assert_eq!(tx_socket.local_addr().unwrap().port(), tx_port); + } assert!(node.sockets.repair.local_addr().unwrap().port() >= FULLNODE_PORT_RANGE.0); assert!(node.sockets.repair.local_addr().unwrap().port() < FULLNODE_PORT_RANGE.1); } @@ -2081,7 +2084,10 @@ mod tests { assert_eq!(node.sockets.gossip.local_addr().unwrap().ip(), ip); assert_eq!(node.sockets.replicate.local_addr().unwrap().ip(), ip); assert_eq!(node.sockets.requests.local_addr().unwrap().ip(), ip); - assert_eq!(node.sockets.transaction.local_addr().unwrap().ip(), ip); + assert!(node.sockets.transaction.len() > 1); + for tx_socket in node.sockets.transaction.iter() { + assert_eq!(tx_socket.local_addr().unwrap().ip(), ip); + } assert_eq!(node.sockets.repair.local_addr().unwrap().ip(), ip); assert_eq!(node.sockets.gossip.local_addr().unwrap().port(), 8050); @@ -2089,8 +2095,12 @@ mod tests { assert!(node.sockets.replicate.local_addr().unwrap().port() < FULLNODE_PORT_RANGE.1); assert!(node.sockets.requests.local_addr().unwrap().port() >= FULLNODE_PORT_RANGE.0); assert!(node.sockets.requests.local_addr().unwrap().port() < FULLNODE_PORT_RANGE.1); - assert!(node.sockets.transaction.local_addr().unwrap().port() >= FULLNODE_PORT_RANGE.0); - assert!(node.sockets.transaction.local_addr().unwrap().port() < FULLNODE_PORT_RANGE.1); + let tx_port = node.sockets.transaction[0].local_addr().unwrap().port(); + assert!(tx_port >= FULLNODE_PORT_RANGE.0); + assert!(tx_port < FULLNODE_PORT_RANGE.1); + for tx_socket in node.sockets.transaction.iter() { + assert_eq!(tx_socket.local_addr().unwrap().port(), tx_port); + } assert!(node.sockets.repair.local_addr().unwrap().port() >= FULLNODE_PORT_RANGE.0); assert!(node.sockets.repair.local_addr().unwrap().port() < FULLNODE_PORT_RANGE.1); } diff --git a/src/fetch_stage.rs b/src/fetch_stage.rs index 5d0f62a63..d445101bb 100644 --- a/src/fetch_stage.rs +++ b/src/fetch_stage.rs @@ -16,11 +16,12 @@ pub struct FetchStage { impl FetchStage { pub fn new( - socket: Arc, + sockets: Vec, exit: Arc, recycler: &PacketRecycler, ) -> (Self, PacketReceiver) { - Self::new_multi_socket(vec![socket], exit, recycler) + let tx_sockets = sockets.into_iter().map(Arc::new).collect(); + Self::new_multi_socket(tx_sockets, exit, recycler) } pub fn new_multi_socket( sockets: Vec>, diff --git a/src/lib.rs b/src/lib.rs index bd8b941b7..e4e042820 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -16,6 +16,7 @@ pub mod broadcast_stage; pub mod budget; pub mod choose_gossip_peer_strategy; pub mod client; +#[macro_use] pub mod crdt; pub mod drone; pub mod entry; @@ -69,6 +70,7 @@ extern crate jsonrpc_macros; extern crate jsonrpc_http_server; #[macro_use] extern crate log; +extern crate nix; extern crate rayon; extern crate ring; extern crate serde; @@ -77,6 +79,7 @@ extern crate serde_derive; extern crate pnet_datalink; extern crate serde_json; extern crate sha2; +extern crate socket2; extern crate sys_info; extern crate untrusted; diff --git a/src/nat.rs b/src/nat.rs index abd606130..fd2ac6ff7 100644 --- a/src/nat.rs +++ b/src/nat.rs @@ -2,10 +2,14 @@ extern crate reqwest; +use nix::sys::socket::setsockopt; +use nix::sys::socket::sockopt::ReusePort; use pnet_datalink as datalink; use rand::{thread_rng, Rng}; +use socket2::{Domain, SockAddr, Socket, Type}; use std::io; use std::net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket}; +use std::os::unix::io::AsRawFd; /// A data type representing a public Udp socket pub struct UdpSocketPair { @@ -72,12 +76,15 @@ pub fn get_ip_addr() -> Option { pub fn bind_in_range(range: (u16, u16)) -> io::Result { let (start, end) = range; let mut tries_left = end - start; + let sock = Socket::new(Domain::ipv4(), Type::dgram(), None).unwrap(); + let sock_fd = sock.as_raw_fd(); + setsockopt(sock_fd, ReusePort, &true).unwrap(); loop { let rand_port = thread_rng().gen_range(start, end); let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), rand_port); - match UdpSocket::bind(addr) { - Result::Ok(val) => break Result::Ok(val), + match sock.bind(&SockAddr::from(addr)) { + Result::Ok(_) => break Result::Ok(sock.into_udp_socket()), Result::Err(err) => if err.kind() != io::ErrorKind::AddrInUse || tries_left == 0 { return Err(err); }, @@ -86,6 +93,19 @@ pub fn bind_in_range(range: (u16, u16)) -> io::Result { } } +pub fn bind_to(port: u16) -> UdpSocket { + let sock = Socket::new(Domain::ipv4(), Type::dgram(), None).unwrap(); + let sock_fd = sock.as_raw_fd(); + setsockopt(sock_fd, ReusePort, &true).unwrap(); + let addr = socketaddr!(0, port); + match sock.bind(&SockAddr::from(addr)) { + Ok(_) => sock.into_udp_socket(), + Err(err) => { + panic!("Failed to bind to {:?}, err: {}", addr, err); + } + } +} + #[cfg(test)] mod tests { use nat::parse_port_or_addr; diff --git a/src/request.rs b/src/request.rs old mode 100755 new mode 100644 diff --git a/src/request_processor.rs b/src/request_processor.rs old mode 100755 new mode 100644 diff --git a/src/tpu.rs b/src/tpu.rs index 9011e56f8..d3404ee75 100644 --- a/src/tpu.rs +++ b/src/tpu.rs @@ -56,7 +56,7 @@ impl Tpu { bank: &Arc, crdt: &Arc>, tick_duration: Option, - transactions_socket: UdpSocket, + transactions_sockets: Vec, blob_recycler: &BlobRecycler, exit: Arc, ledger_path: &str, @@ -65,7 +65,7 @@ impl Tpu { let packet_recycler = PacketRecycler::default(); let (fetch_stage, packet_receiver) = - FetchStage::new(Arc::new(transactions_socket), exit, &packet_recycler); + FetchStage::new(transactions_sockets, exit, &packet_recycler); let (sigverify_stage, verified_receiver) = SigVerifyStage::new(packet_receiver, sigverify_disabled); diff --git a/src/vote_stage.rs b/src/vote_stage.rs old mode 100755 new mode 100644