Use multiple sockets for receiving blobs on validators (#1228)

* Use multiple sockets for receiving blobs on validators

- The blobs that are broadcasted by leader or retransmitted by peer
  validators are received on replicate_port
- Using reuse_addr/reuse_port, multiple sockets can be opened for
  the same port
- This allows the kernel to queue data to user space app on multiple
  socket queues, preventing over-running one queue
- This helps with reducing packets dropped due to queue over-runs

Fixes #1224

* Fixed failing tests
This commit is contained in:
Pankaj Garg 2018-09-14 16:56:06 -07:00 committed by GitHub
parent 4196cf43e8
commit e142aafca9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 53 additions and 22 deletions

View File

@ -1210,7 +1210,7 @@ impl Crdt {
pub struct Sockets { pub struct Sockets {
pub gossip: UdpSocket, pub gossip: UdpSocket,
pub requests: UdpSocket, pub requests: UdpSocket,
pub replicate: UdpSocket, pub replicate: Vec<UdpSocket>,
pub transaction: Vec<UdpSocket>, pub transaction: Vec<UdpSocket>,
pub respond: UdpSocket, pub respond: UdpSocket,
pub broadcast: UdpSocket, pub broadcast: UdpSocket,
@ -1250,7 +1250,7 @@ impl Node {
sockets: Sockets { sockets: Sockets {
gossip, gossip,
requests, requests,
replicate, replicate: vec![replicate],
transaction: vec![transaction], transaction: vec![transaction],
respond, respond,
broadcast, broadcast,
@ -1270,7 +1270,9 @@ impl Node {
bind() bind()
}; };
let (replicate_port, replicate) = bind(); let (replicate_port, replicate_sockets) =
multi_bind_in_range(FULLNODE_PORT_RANGE, 8).expect("tvu multi_bind");
let (requests_port, requests) = bind(); let (requests_port, requests) = bind();
let (transaction_port, transaction_sockets) = let (transaction_port, transaction_sockets) =
@ -1299,7 +1301,7 @@ impl Node {
sockets: Sockets { sockets: Sockets {
gossip, gossip,
requests, requests,
replicate, replicate: replicate_sockets,
transaction: transaction_sockets, transaction: transaction_sockets,
respond, respond,
broadcast, broadcast,
@ -1992,7 +1994,10 @@ mod tests {
let ip = Ipv4Addr::from(0); let ip = Ipv4Addr::from(0);
let node = Node::new_with_external_ip(Keypair::new().pubkey(), &socketaddr!(ip, 0)); let node = Node::new_with_external_ip(Keypair::new().pubkey(), &socketaddr!(ip, 0));
assert_eq!(node.sockets.gossip.local_addr().unwrap().ip(), ip); assert_eq!(node.sockets.gossip.local_addr().unwrap().ip(), ip);
assert_eq!(node.sockets.replicate.local_addr().unwrap().ip(), ip); assert!(node.sockets.replicate.len() > 1);
for tx_socket in node.sockets.replicate.iter() {
assert_eq!(tx_socket.local_addr().unwrap().ip(), ip);
}
assert_eq!(node.sockets.requests.local_addr().unwrap().ip(), ip); assert_eq!(node.sockets.requests.local_addr().unwrap().ip(), ip);
assert!(node.sockets.transaction.len() > 1); assert!(node.sockets.transaction.len() > 1);
for tx_socket in node.sockets.transaction.iter() { for tx_socket in node.sockets.transaction.iter() {
@ -2002,8 +2007,12 @@ mod tests {
assert!(node.sockets.gossip.local_addr().unwrap().port() >= FULLNODE_PORT_RANGE.0); assert!(node.sockets.gossip.local_addr().unwrap().port() >= FULLNODE_PORT_RANGE.0);
assert!(node.sockets.gossip.local_addr().unwrap().port() < FULLNODE_PORT_RANGE.1); assert!(node.sockets.gossip.local_addr().unwrap().port() < FULLNODE_PORT_RANGE.1);
assert!(node.sockets.replicate.local_addr().unwrap().port() >= FULLNODE_PORT_RANGE.0); let tx_port = node.sockets.replicate[0].local_addr().unwrap().port();
assert!(node.sockets.replicate.local_addr().unwrap().port() < FULLNODE_PORT_RANGE.1); assert!(tx_port >= FULLNODE_PORT_RANGE.0);
assert!(tx_port < FULLNODE_PORT_RANGE.1);
for tx_socket in node.sockets.replicate.iter() {
assert_eq!(tx_socket.local_addr().unwrap().port(), tx_port);
}
assert!(node.sockets.requests.local_addr().unwrap().port() >= FULLNODE_PORT_RANGE.0); assert!(node.sockets.requests.local_addr().unwrap().port() >= FULLNODE_PORT_RANGE.0);
assert!(node.sockets.requests.local_addr().unwrap().port() < FULLNODE_PORT_RANGE.1); assert!(node.sockets.requests.local_addr().unwrap().port() < FULLNODE_PORT_RANGE.1);
let tx_port = node.sockets.transaction[0].local_addr().unwrap().port(); let tx_port = node.sockets.transaction[0].local_addr().unwrap().port();
@ -2021,7 +2030,10 @@ mod tests {
let ip = IpAddr::V4(Ipv4Addr::from(0)); let ip = IpAddr::V4(Ipv4Addr::from(0));
let node = Node::new_with_external_ip(Keypair::new().pubkey(), &socketaddr!(0, 8050)); let node = Node::new_with_external_ip(Keypair::new().pubkey(), &socketaddr!(0, 8050));
assert_eq!(node.sockets.gossip.local_addr().unwrap().ip(), ip); assert_eq!(node.sockets.gossip.local_addr().unwrap().ip(), ip);
assert_eq!(node.sockets.replicate.local_addr().unwrap().ip(), ip); assert!(node.sockets.replicate.len() > 1);
for tx_socket in node.sockets.replicate.iter() {
assert_eq!(tx_socket.local_addr().unwrap().ip(), ip);
}
assert_eq!(node.sockets.requests.local_addr().unwrap().ip(), ip); assert_eq!(node.sockets.requests.local_addr().unwrap().ip(), ip);
assert!(node.sockets.transaction.len() > 1); assert!(node.sockets.transaction.len() > 1);
for tx_socket in node.sockets.transaction.iter() { for tx_socket in node.sockets.transaction.iter() {
@ -2030,8 +2042,12 @@ mod tests {
assert_eq!(node.sockets.repair.local_addr().unwrap().ip(), ip); assert_eq!(node.sockets.repair.local_addr().unwrap().ip(), ip);
assert_eq!(node.sockets.gossip.local_addr().unwrap().port(), 8050); assert_eq!(node.sockets.gossip.local_addr().unwrap().port(), 8050);
assert!(node.sockets.replicate.local_addr().unwrap().port() >= FULLNODE_PORT_RANGE.0); let tx_port = node.sockets.replicate[0].local_addr().unwrap().port();
assert!(node.sockets.replicate.local_addr().unwrap().port() < FULLNODE_PORT_RANGE.1); assert!(tx_port >= FULLNODE_PORT_RANGE.0);
assert!(tx_port < FULLNODE_PORT_RANGE.1);
for tx_socket in node.sockets.replicate.iter() {
assert_eq!(tx_socket.local_addr().unwrap().port(), tx_port);
}
assert!(node.sockets.requests.local_addr().unwrap().port() >= FULLNODE_PORT_RANGE.0); assert!(node.sockets.requests.local_addr().unwrap().port() >= FULLNODE_PORT_RANGE.0);
assert!(node.sockets.requests.local_addr().unwrap().port() < FULLNODE_PORT_RANGE.1); assert!(node.sockets.requests.local_addr().unwrap().port() < FULLNODE_PORT_RANGE.1);
let tx_port = node.sockets.transaction[0].local_addr().unwrap().port(); let tx_port = node.sockets.transaction[0].local_addr().unwrap().port();

View File

@ -76,18 +76,18 @@ impl Tvu {
crdt: Arc<RwLock<Crdt>>, crdt: Arc<RwLock<Crdt>>,
window: SharedWindow, window: SharedWindow,
blob_recycler: BlobRecycler, blob_recycler: BlobRecycler,
replicate_socket: UdpSocket, replicate_sockets: Vec<UdpSocket>,
repair_socket: UdpSocket, repair_socket: UdpSocket,
retransmit_socket: UdpSocket, retransmit_socket: UdpSocket,
ledger_path: Option<&str>, ledger_path: Option<&str>,
exit: Arc<AtomicBool>, exit: Arc<AtomicBool>,
) -> Self { ) -> Self {
let repair_socket = Arc::new(repair_socket); let repair_socket = Arc::new(repair_socket);
let (fetch_stage, blob_fetch_receiver) = BlobFetchStage::new_multi_socket( let mut blob_sockets: Vec<Arc<UdpSocket>> =
vec![Arc::new(replicate_socket), repair_socket.clone()], replicate_sockets.into_iter().map(Arc::new).collect();
exit.clone(), blob_sockets.push(repair_socket.clone());
&blob_recycler, let (fetch_stage, blob_fetch_receiver) =
); BlobFetchStage::new_multi_socket(blob_sockets, exit.clone(), &blob_recycler);
//TODO //TODO
//the packets coming out of blob_receiver need to be sent to the GPU and verified //the packets coming out of blob_receiver need to be sent to the GPU and verified
//then sent to the window, which does the erasure coding reconstruction //then sent to the window, which does the erasure coding reconstruction
@ -200,8 +200,15 @@ pub mod tests {
// simulate target peer // simulate target peer
let recycler = BlobRecycler::default(); let recycler = BlobRecycler::default();
let (s_reader, r_reader) = channel(); let (s_reader, r_reader) = channel();
let blob_sockets: Vec<Arc<UdpSocket>> = target2
.sockets
.replicate
.into_iter()
.map(Arc::new)
.collect();
let t_receiver = streamer::blob_receiver( let t_receiver = streamer::blob_receiver(
Arc::new(target2.sockets.replicate), blob_sockets[0].clone(),
exit.clone(), exit.clone(),
recycler.clone(), recycler.clone(),
s_reader, s_reader,

View File

@ -302,6 +302,7 @@ mod test {
use crdt::{Crdt, Node}; use crdt::{Crdt, Node};
use logger; use logger;
use packet::{BlobRecycler, PACKET_DATA_SIZE}; use packet::{BlobRecycler, PACKET_DATA_SIZE};
use std::net::UdpSocket;
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::channel; use std::sync::mpsc::channel;
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
@ -361,9 +362,12 @@ mod test {
); );
let t_responder = { let t_responder = {
let (s_responder, r_responder) = channel(); let (s_responder, r_responder) = channel();
let blob_sockets: Vec<Arc<UdpSocket>> =
tn.sockets.replicate.into_iter().map(Arc::new).collect();
let t_responder = responder( let t_responder = responder(
"window_send_test", "window_send_test",
Arc::new(tn.sockets.replicate), blob_sockets[0].clone(),
resp_recycler.clone(), resp_recycler.clone(),
r_responder, r_responder,
); );
@ -431,9 +435,11 @@ mod test {
); );
let t_responder = { let t_responder = {
let (s_responder, r_responder) = channel(); let (s_responder, r_responder) = channel();
let blob_sockets: Vec<Arc<UdpSocket>> =
tn.sockets.replicate.into_iter().map(Arc::new).collect();
let t_responder = responder( let t_responder = responder(
"window_send_test", "window_send_test",
Arc::new(tn.sockets.replicate), blob_sockets[0].clone(),
resp_recycler.clone(), resp_recycler.clone(),
r_responder, r_responder,
); );
@ -494,9 +500,11 @@ mod test {
); );
let t_responder = { let t_responder = {
let (s_responder, r_responder) = channel(); let (s_responder, r_responder) = channel();
let blob_sockets: Vec<Arc<UdpSocket>> =
tn.sockets.replicate.into_iter().map(Arc::new).collect();
let t_responder = responder( let t_responder = responder(
"window_send_test", "window_send_test",
Arc::new(tn.sockets.replicate), blob_sockets[0].clone(),
resp_recycler.clone(), resp_recycler.clone(),
r_responder, r_responder,
); );

View File

@ -17,7 +17,7 @@ use std::thread::sleep;
use std::time::Duration; use std::time::Duration;
fn test_node(exit: Arc<AtomicBool>) -> (Arc<RwLock<Crdt>>, Ncp, UdpSocket) { fn test_node(exit: Arc<AtomicBool>) -> (Arc<RwLock<Crdt>>, Ncp, UdpSocket) {
let tn = Node::new_localhost(); let mut tn = Node::new_localhost();
let crdt = Crdt::new(tn.info.clone()).expect("Crdt::new"); let crdt = Crdt::new(tn.info.clone()).expect("Crdt::new");
let c = Arc::new(RwLock::new(crdt)); let c = Arc::new(RwLock::new(crdt));
let w = Arc::new(RwLock::new(vec![])); let w = Arc::new(RwLock::new(vec![]));
@ -29,7 +29,7 @@ fn test_node(exit: Arc<AtomicBool>) -> (Arc<RwLock<Crdt>>, Ncp, UdpSocket) {
tn.sockets.gossip, tn.sockets.gossip,
exit, exit,
); );
(c, d, tn.sockets.replicate) (c, d, tn.sockets.replicate.pop().unwrap())
} }
/// Test that the network converges. /// Test that the network converges.