Open multiple sockets for transaction UDP port (#1128)

* Reuse UDP port and open multiple sockets for transaction address

* Fixed failing crdt tests

* Add tests for reusing UDP ports

* Address review comments

* Updated bench-streamer to use multiple receive sockets

* Fix minimum number of recv sockets for bench-streamer

* Address review comments

Fixes #1132

* Moved bind_to function to nat.rs
This commit is contained in:
Pankaj Garg 2018-09-06 14:13:40 -07:00 committed by GitHub
parent 072d0b67e4
commit 05460eec0d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 114 additions and 41 deletions

View File

@ -82,6 +82,7 @@ jsonrpc-macros = { git = "https://github.com/paritytech/jsonrpc", rev = "4b6060b
itertools = "0.7.8" itertools = "0.7.8"
log = "0.4.2" log = "0.4.2"
matches = "0.1.6" matches = "0.1.6"
nix = "0.11.0"
pnet_datalink = "0.21.0" pnet_datalink = "0.21.0"
rand = "0.5.1" rand = "0.5.1"
rayon = "1.0.0" rayon = "1.0.0"
@ -91,6 +92,7 @@ sha2 = "0.7.0"
serde = "1.0.27" serde = "1.0.27"
serde_derive = "1.0.27" serde_derive = "1.0.27"
serde_json = "1.0.10" serde_json = "1.0.10"
socket2 = "0.3.8"
sys-info = "0.5.6" sys-info = "0.5.6"
tokio = "0.1" tokio = "0.1"
tokio-codec = "0.1" tokio-codec = "0.1"

0
src/bank.rs Executable file → Normal file
View File

View File

@ -1,9 +1,13 @@
extern crate clap;
extern crate solana; extern crate solana;
use clap::{App, Arg};
use solana::nat::bind_to;
use solana::packet::{Packet, PacketRecycler, BLOB_SIZE, PACKET_DATA_SIZE}; use solana::packet::{Packet, PacketRecycler, BLOB_SIZE, PACKET_DATA_SIZE};
use solana::result::Result; use solana::result::Result;
use solana::streamer::{receiver, PacketReceiver}; use solana::streamer::{receiver, PacketReceiver};
use std::net::{SocketAddr, UdpSocket}; use std::cmp::max;
use std::net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket};
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::mpsc::channel; use std::sync::mpsc::channel;
use std::sync::Arc; use std::sync::Arc;
@ -55,27 +59,56 @@ fn sink(
} }
fn main() -> Result<()> { fn main() -> Result<()> {
let read = UdpSocket::bind("127.0.0.1:0")?; let mut num_sockets = 1usize;
read.set_read_timeout(Some(Duration::new(1, 0)))?;
let matches = App::new("solana-bench-streamer")
.arg(
Arg::with_name("num-recv-sockets")
.long("num-recv-sockets")
.value_name("NUM")
.takes_value(true)
.help("Use NUM receive sockets"),
)
.get_matches();
if let Some(n) = matches.value_of("num-recv-sockets") {
num_sockets = max(num_sockets, n.to_string().parse().expect("integer"));
}
let mut port = 0;
let mut addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0);
let addr = read.local_addr()?;
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
let pack_recycler = PacketRecycler::default(); let pack_recycler = PacketRecycler::default();
let (s_reader, r_reader) = channel(); let mut read_channels = Vec::new();
let t_reader = receiver( let mut read_threads = Vec::new();
Arc::new(read), for _ in 0..num_sockets {
exit.clone(), let read = bind_to(port);
pack_recycler.clone(), read.set_read_timeout(Some(Duration::new(1, 0))).unwrap();
s_reader,
); addr = read.local_addr().unwrap();
port = addr.port();
let (s_reader, r_reader) = channel();
read_channels.push(r_reader);
read_threads.push(receiver(
Arc::new(read),
exit.clone(),
pack_recycler.clone(),
s_reader,
));
}
let t_producer1 = producer(&addr, &pack_recycler, exit.clone()); let t_producer1 = producer(&addr, &pack_recycler, exit.clone());
let t_producer2 = producer(&addr, &pack_recycler, exit.clone()); let t_producer2 = producer(&addr, &pack_recycler, exit.clone());
let t_producer3 = producer(&addr, &pack_recycler, exit.clone()); let t_producer3 = producer(&addr, &pack_recycler, exit.clone());
let rvs = Arc::new(AtomicUsize::new(0)); let rvs = Arc::new(AtomicUsize::new(0));
let t_sink = sink(pack_recycler.clone(), exit.clone(), rvs.clone(), r_reader); let sink_threads: Vec<_> = read_channels
.into_iter()
.map(|r_reader| sink(pack_recycler.clone(), exit.clone(), rvs.clone(), r_reader))
.collect();
let start = SystemTime::now(); let start = SystemTime::now();
let start_val = rvs.load(Ordering::Relaxed); let start_val = rvs.load(Ordering::Relaxed);
sleep(Duration::new(5, 0)); sleep(Duration::new(5, 0));
@ -86,10 +119,14 @@ fn main() -> Result<()> {
let fcount = (end_val - start_val) as f64; let fcount = (end_val - start_val) as f64;
println!("performance: {:?}", fcount / ftime); println!("performance: {:?}", fcount / ftime);
exit.store(true, Ordering::Relaxed); exit.store(true, Ordering::Relaxed);
t_reader.join()?; for t_reader in read_threads {
t_reader.join()?;
}
t_producer1.join()?; t_producer1.join()?;
t_producer2.join()?; t_producer2.join()?;
t_producer3.join()?; t_producer3.join()?;
t_sink.join()?; for t_sink in sink_threads {
t_sink.join()?;
}
Ok(()) Ok(())
} }

View File

@ -19,7 +19,7 @@ use counter::Counter;
use hash::Hash; use hash::Hash;
use ledger::LedgerWindow; use ledger::LedgerWindow;
use log::Level; use log::Level;
use nat::bind_in_range; use nat::{bind_in_range, bind_to};
use packet::{to_blob, Blob, BlobRecycler, SharedBlob, BLOB_SIZE}; use packet::{to_blob, Blob, BlobRecycler, SharedBlob, BLOB_SIZE};
use rand::{thread_rng, Rng}; use rand::{thread_rng, Rng};
use rayon::prelude::*; use rayon::prelude::*;
@ -1264,7 +1264,7 @@ pub struct Sockets {
pub gossip: UdpSocket, pub gossip: UdpSocket,
pub requests: UdpSocket, pub requests: UdpSocket,
pub replicate: UdpSocket, pub replicate: UdpSocket,
pub transaction: UdpSocket, pub transaction: Vec<UdpSocket>,
pub respond: UdpSocket, pub respond: UdpSocket,
pub broadcast: UdpSocket, pub broadcast: UdpSocket,
pub repair: UdpSocket, pub repair: UdpSocket,
@ -1304,7 +1304,7 @@ impl Node {
gossip, gossip,
requests, requests,
replicate, replicate,
transaction, transaction: vec![transaction],
respond, respond,
broadcast, broadcast,
repair, repair,
@ -1322,16 +1322,6 @@ impl Node {
} }
}; };
fn bind_to(port: u16) -> UdpSocket {
let addr = socketaddr!(0, port);
match UdpSocket::bind(addr) {
Ok(socket) => socket,
Err(err) => {
panic!("Failed to bind to {:?}, err: {}", addr, err);
}
}
};
let (gossip_port, gossip) = if ncp.port() != 0 { let (gossip_port, gossip) = if ncp.port() != 0 {
(ncp.port(), bind_to(ncp.port())) (ncp.port(), bind_to(ncp.port()))
} else { } else {
@ -1342,6 +1332,12 @@ impl Node {
let (requests_port, requests) = bind(); let (requests_port, requests) = bind();
let (transaction_port, transaction) = bind(); let (transaction_port, transaction) = bind();
let mut transaction_sockets = vec![transaction];
for _ in 0..4 {
transaction_sockets.push(bind_to(transaction_port));
}
let (_, repair) = bind(); let (_, repair) = bind();
let (_, broadcast) = bind(); let (_, broadcast) = bind();
let (_, retransmit) = bind(); let (_, retransmit) = bind();
@ -1365,7 +1361,7 @@ impl Node {
gossip, gossip,
requests, requests,
replicate, replicate,
transaction, transaction: transaction_sockets,
respond, respond,
broadcast, broadcast,
repair, repair,
@ -2059,7 +2055,10 @@ mod tests {
assert_eq!(node.sockets.gossip.local_addr().unwrap().ip(), ip); assert_eq!(node.sockets.gossip.local_addr().unwrap().ip(), ip);
assert_eq!(node.sockets.replicate.local_addr().unwrap().ip(), ip); assert_eq!(node.sockets.replicate.local_addr().unwrap().ip(), ip);
assert_eq!(node.sockets.requests.local_addr().unwrap().ip(), ip); assert_eq!(node.sockets.requests.local_addr().unwrap().ip(), ip);
assert_eq!(node.sockets.transaction.local_addr().unwrap().ip(), ip); assert!(node.sockets.transaction.len() > 1);
for tx_socket in node.sockets.transaction.iter() {
assert_eq!(tx_socket.local_addr().unwrap().ip(), ip);
}
assert_eq!(node.sockets.repair.local_addr().unwrap().ip(), ip); assert_eq!(node.sockets.repair.local_addr().unwrap().ip(), ip);
assert!(node.sockets.gossip.local_addr().unwrap().port() >= FULLNODE_PORT_RANGE.0); assert!(node.sockets.gossip.local_addr().unwrap().port() >= FULLNODE_PORT_RANGE.0);
@ -2068,8 +2067,12 @@ mod tests {
assert!(node.sockets.replicate.local_addr().unwrap().port() < FULLNODE_PORT_RANGE.1); assert!(node.sockets.replicate.local_addr().unwrap().port() < FULLNODE_PORT_RANGE.1);
assert!(node.sockets.requests.local_addr().unwrap().port() >= FULLNODE_PORT_RANGE.0); assert!(node.sockets.requests.local_addr().unwrap().port() >= FULLNODE_PORT_RANGE.0);
assert!(node.sockets.requests.local_addr().unwrap().port() < FULLNODE_PORT_RANGE.1); assert!(node.sockets.requests.local_addr().unwrap().port() < FULLNODE_PORT_RANGE.1);
assert!(node.sockets.transaction.local_addr().unwrap().port() >= FULLNODE_PORT_RANGE.0); let tx_port = node.sockets.transaction[0].local_addr().unwrap().port();
assert!(node.sockets.transaction.local_addr().unwrap().port() < FULLNODE_PORT_RANGE.1); assert!(tx_port >= FULLNODE_PORT_RANGE.0);
assert!(tx_port < FULLNODE_PORT_RANGE.1);
for tx_socket in node.sockets.transaction.iter() {
assert_eq!(tx_socket.local_addr().unwrap().port(), tx_port);
}
assert!(node.sockets.repair.local_addr().unwrap().port() >= FULLNODE_PORT_RANGE.0); assert!(node.sockets.repair.local_addr().unwrap().port() >= FULLNODE_PORT_RANGE.0);
assert!(node.sockets.repair.local_addr().unwrap().port() < FULLNODE_PORT_RANGE.1); assert!(node.sockets.repair.local_addr().unwrap().port() < FULLNODE_PORT_RANGE.1);
} }
@ -2081,7 +2084,10 @@ mod tests {
assert_eq!(node.sockets.gossip.local_addr().unwrap().ip(), ip); assert_eq!(node.sockets.gossip.local_addr().unwrap().ip(), ip);
assert_eq!(node.sockets.replicate.local_addr().unwrap().ip(), ip); assert_eq!(node.sockets.replicate.local_addr().unwrap().ip(), ip);
assert_eq!(node.sockets.requests.local_addr().unwrap().ip(), ip); assert_eq!(node.sockets.requests.local_addr().unwrap().ip(), ip);
assert_eq!(node.sockets.transaction.local_addr().unwrap().ip(), ip); assert!(node.sockets.transaction.len() > 1);
for tx_socket in node.sockets.transaction.iter() {
assert_eq!(tx_socket.local_addr().unwrap().ip(), ip);
}
assert_eq!(node.sockets.repair.local_addr().unwrap().ip(), ip); assert_eq!(node.sockets.repair.local_addr().unwrap().ip(), ip);
assert_eq!(node.sockets.gossip.local_addr().unwrap().port(), 8050); assert_eq!(node.sockets.gossip.local_addr().unwrap().port(), 8050);
@ -2089,8 +2095,12 @@ mod tests {
assert!(node.sockets.replicate.local_addr().unwrap().port() < FULLNODE_PORT_RANGE.1); assert!(node.sockets.replicate.local_addr().unwrap().port() < FULLNODE_PORT_RANGE.1);
assert!(node.sockets.requests.local_addr().unwrap().port() >= FULLNODE_PORT_RANGE.0); assert!(node.sockets.requests.local_addr().unwrap().port() >= FULLNODE_PORT_RANGE.0);
assert!(node.sockets.requests.local_addr().unwrap().port() < FULLNODE_PORT_RANGE.1); assert!(node.sockets.requests.local_addr().unwrap().port() < FULLNODE_PORT_RANGE.1);
assert!(node.sockets.transaction.local_addr().unwrap().port() >= FULLNODE_PORT_RANGE.0); let tx_port = node.sockets.transaction[0].local_addr().unwrap().port();
assert!(node.sockets.transaction.local_addr().unwrap().port() < FULLNODE_PORT_RANGE.1); assert!(tx_port >= FULLNODE_PORT_RANGE.0);
assert!(tx_port < FULLNODE_PORT_RANGE.1);
for tx_socket in node.sockets.transaction.iter() {
assert_eq!(tx_socket.local_addr().unwrap().port(), tx_port);
}
assert!(node.sockets.repair.local_addr().unwrap().port() >= FULLNODE_PORT_RANGE.0); assert!(node.sockets.repair.local_addr().unwrap().port() >= FULLNODE_PORT_RANGE.0);
assert!(node.sockets.repair.local_addr().unwrap().port() < FULLNODE_PORT_RANGE.1); assert!(node.sockets.repair.local_addr().unwrap().port() < FULLNODE_PORT_RANGE.1);
} }

View File

@ -16,11 +16,12 @@ pub struct FetchStage {
impl FetchStage { impl FetchStage {
pub fn new( pub fn new(
socket: Arc<UdpSocket>, sockets: Vec<UdpSocket>,
exit: Arc<AtomicBool>, exit: Arc<AtomicBool>,
recycler: &PacketRecycler, recycler: &PacketRecycler,
) -> (Self, PacketReceiver) { ) -> (Self, PacketReceiver) {
Self::new_multi_socket(vec![socket], exit, recycler) let tx_sockets = sockets.into_iter().map(Arc::new).collect();
Self::new_multi_socket(tx_sockets, exit, recycler)
} }
pub fn new_multi_socket( pub fn new_multi_socket(
sockets: Vec<Arc<UdpSocket>>, sockets: Vec<Arc<UdpSocket>>,

View File

@ -16,6 +16,7 @@ pub mod broadcast_stage;
pub mod budget; pub mod budget;
pub mod choose_gossip_peer_strategy; pub mod choose_gossip_peer_strategy;
pub mod client; pub mod client;
#[macro_use]
pub mod crdt; pub mod crdt;
pub mod drone; pub mod drone;
pub mod entry; pub mod entry;
@ -69,6 +70,7 @@ extern crate jsonrpc_macros;
extern crate jsonrpc_http_server; extern crate jsonrpc_http_server;
#[macro_use] #[macro_use]
extern crate log; extern crate log;
extern crate nix;
extern crate rayon; extern crate rayon;
extern crate ring; extern crate ring;
extern crate serde; extern crate serde;
@ -77,6 +79,7 @@ extern crate serde_derive;
extern crate pnet_datalink; extern crate pnet_datalink;
extern crate serde_json; extern crate serde_json;
extern crate sha2; extern crate sha2;
extern crate socket2;
extern crate sys_info; extern crate sys_info;
extern crate untrusted; extern crate untrusted;

View File

@ -2,10 +2,14 @@
extern crate reqwest; extern crate reqwest;
use nix::sys::socket::setsockopt;
use nix::sys::socket::sockopt::ReusePort;
use pnet_datalink as datalink; use pnet_datalink as datalink;
use rand::{thread_rng, Rng}; use rand::{thread_rng, Rng};
use socket2::{Domain, SockAddr, Socket, Type};
use std::io; use std::io;
use std::net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket}; use std::net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket};
use std::os::unix::io::AsRawFd;
/// A data type representing a public Udp socket /// A data type representing a public Udp socket
pub struct UdpSocketPair { pub struct UdpSocketPair {
@ -72,12 +76,15 @@ pub fn get_ip_addr() -> Option<IpAddr> {
pub fn bind_in_range(range: (u16, u16)) -> io::Result<UdpSocket> { pub fn bind_in_range(range: (u16, u16)) -> io::Result<UdpSocket> {
let (start, end) = range; let (start, end) = range;
let mut tries_left = end - start; let mut tries_left = end - start;
let sock = Socket::new(Domain::ipv4(), Type::dgram(), None).unwrap();
let sock_fd = sock.as_raw_fd();
setsockopt(sock_fd, ReusePort, &true).unwrap();
loop { loop {
let rand_port = thread_rng().gen_range(start, end); let rand_port = thread_rng().gen_range(start, end);
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), rand_port); let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), rand_port);
match UdpSocket::bind(addr) { match sock.bind(&SockAddr::from(addr)) {
Result::Ok(val) => break Result::Ok(val), Result::Ok(_) => break Result::Ok(sock.into_udp_socket()),
Result::Err(err) => if err.kind() != io::ErrorKind::AddrInUse || tries_left == 0 { Result::Err(err) => if err.kind() != io::ErrorKind::AddrInUse || tries_left == 0 {
return Err(err); return Err(err);
}, },
@ -86,6 +93,19 @@ pub fn bind_in_range(range: (u16, u16)) -> io::Result<UdpSocket> {
} }
} }
pub fn bind_to(port: u16) -> UdpSocket {
let sock = Socket::new(Domain::ipv4(), Type::dgram(), None).unwrap();
let sock_fd = sock.as_raw_fd();
setsockopt(sock_fd, ReusePort, &true).unwrap();
let addr = socketaddr!(0, port);
match sock.bind(&SockAddr::from(addr)) {
Ok(_) => sock.into_udp_socket(),
Err(err) => {
panic!("Failed to bind to {:?}, err: {}", addr, err);
}
}
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use nat::parse_port_or_addr; use nat::parse_port_or_addr;

0
src/request.rs Executable file → Normal file
View File

0
src/request_processor.rs Executable file → Normal file
View File

View File

@ -56,7 +56,7 @@ impl Tpu {
bank: &Arc<Bank>, bank: &Arc<Bank>,
crdt: &Arc<RwLock<Crdt>>, crdt: &Arc<RwLock<Crdt>>,
tick_duration: Option<Duration>, tick_duration: Option<Duration>,
transactions_socket: UdpSocket, transactions_sockets: Vec<UdpSocket>,
blob_recycler: &BlobRecycler, blob_recycler: &BlobRecycler,
exit: Arc<AtomicBool>, exit: Arc<AtomicBool>,
ledger_path: &str, ledger_path: &str,
@ -65,7 +65,7 @@ impl Tpu {
let packet_recycler = PacketRecycler::default(); let packet_recycler = PacketRecycler::default();
let (fetch_stage, packet_receiver) = let (fetch_stage, packet_receiver) =
FetchStage::new(Arc::new(transactions_socket), exit, &packet_recycler); FetchStage::new(transactions_sockets, exit, &packet_recycler);
let (sigverify_stage, verified_receiver) = let (sigverify_stage, verified_receiver) =
SigVerifyStage::new(packet_receiver, sigverify_disabled); SigVerifyStage::new(packet_receiver, sigverify_disabled);

0
src/vote_stage.rs Executable file → Normal file
View File