From d3fac8a06f96cea1c49f1922c20bf8567a2161ed Mon Sep 17 00:00:00 2001 From: Pankaj Garg Date: Sat, 25 Aug 2018 10:24:16 -0700 Subject: [PATCH] Dynamically bind to available UDP ports in Fullnode (#920) * Dynamically bind to available UDP ports in Fullnode * Added tests for dynamic port binding - Also removed hard coding of port range from CRDT --- src/bin/fullnode.rs | 20 +++++- src/crdt.rs | 154 +++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 170 insertions(+), 4 deletions(-) diff --git a/src/bin/fullnode.rs b/src/bin/fullnode.rs index 72ac52cca..ab0e4c918 100644 --- a/src/bin/fullnode.rs +++ b/src/bin/fullnode.rs @@ -72,11 +72,27 @@ fn main() -> () { } let leader_pubkey = keypair.pubkey(); - let repl_clone = repl_data.clone(); let ledger_path = matches.value_of("ledger").unwrap(); - let node = TestNode::new_with_bind_addr(repl_data, bind_addr); + let port_range = (8100, 10000); + let node = if let Some(_t) = matches.value_of("testnet") { + TestNode::new_with_external_ip( + leader_pubkey, + repl_data.contact_info.ncp.ip(), + port_range, + 0, + ) + } else { + TestNode::new_with_external_ip( + leader_pubkey, + repl_data.contact_info.ncp.ip(), + port_range, + repl_data.contact_info.ncp.port(), + ) + }; + let repl_clone = node.data.clone(); + let mut drone_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), DRONE_PORT); let testnet_addr = matches.value_of("testnet").map(|addr_str| { let addr: SocketAddr = addr_str.parse().unwrap(); diff --git a/src/crdt.rs b/src/crdt.rs index ad0e7a374..85b498b05 100644 --- a/src/crdt.rs +++ b/src/crdt.rs @@ -20,6 +20,7 @@ use counter::Counter; use hash::Hash; use ledger::LedgerWindow; use log::Level; +use nat::udp_random_bind; use packet::{to_blob, Blob, BlobRecycler, SharedBlob, BLOB_SIZE}; use pnet_datalink as datalink; use rand::{thread_rng, RngCore}; @@ -30,7 +31,7 @@ use std; use std::collections::HashMap; use std::collections::VecDeque; use std::io::Cursor; -use std::net::{IpAddr, SocketAddr, UdpSocket}; +use std::net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket}; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::{Arc, RwLock}; use std::thread::{sleep, Builder, JoinHandle}; @@ -1354,6 +1355,74 @@ impl TestNode { }, } } + pub fn new_with_external_ip( + pubkey: Pubkey, + ip: IpAddr, + port_range: (u16, u16), + ncp_port: u16, + ) -> TestNode { + fn bind(port_range: (u16, u16)) -> (u16, UdpSocket) { + match udp_random_bind(port_range.0, port_range.1, 5) { + Ok(socket) => (socket.local_addr().unwrap().port(), socket), + Err(err) => { + panic!("Failed to bind to {:?}", err); + } + } + }; + + fn bind_to(port: u16) -> UdpSocket { + let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), port); + match UdpSocket::bind(addr) { + Ok(socket) => socket, + Err(err) => { + panic!("Failed to bind to {:?}: {:?}", addr, err); + } + } + }; + + let (gossip_port, gossip) = if ncp_port != 0 { + (ncp_port, bind_to(ncp_port)) + } else { + bind(port_range) + }; + let (replicate_port, replicate) = bind(port_range); + let (requests_port, requests) = bind(port_range); + let (transaction_port, transaction) = bind(port_range); + let (repair_port, repair) = bind(port_range); + + // Responses are sent from the same Udp port as requests are received + // from, in hopes that a NAT sitting in the middle will route the + // response Udp packet correctly back to the requester. + let respond = requests.try_clone().unwrap(); + + let gossip_send = UdpSocket::bind("0.0.0.0:0").unwrap(); + let broadcast = UdpSocket::bind("0.0.0.0:0").unwrap(); + let retransmit = UdpSocket::bind("0.0.0.0:0").unwrap(); + + let node_info = NodeInfo::new( + pubkey, + SocketAddr::new(ip, gossip_port), + SocketAddr::new(ip, replicate_port), + SocketAddr::new(ip, requests_port), + SocketAddr::new(ip, transaction_port), + SocketAddr::new(ip, repair_port), + ); + + TestNode { + data: node_info, + sockets: Sockets { + gossip, + gossip_send, + requests, + replicate, + transaction, + respond, + broadcast, + repair, + retransmit, + }, + } + } } fn report_time_spent(label: &str, time: &Duration, extra: &str) { @@ -1366,7 +1435,7 @@ fn report_time_spent(label: &str, time: &Duration, extra: &str) { #[cfg(test)] mod tests { use crdt::{ - parse_port_or_addr, Crdt, CrdtError, NodeInfo, Protocol, GOSSIP_PURGE_MILLIS, + parse_port_or_addr, Crdt, CrdtError, NodeInfo, Protocol, TestNode, GOSSIP_PURGE_MILLIS, GOSSIP_SLEEP_MILLIS, MIN_TABLE_SIZE, }; use entry::Entry; @@ -1377,6 +1446,7 @@ mod tests { use result::Error; use signature::{Keypair, KeypairUtil, Pubkey}; use std::fs::remove_dir_all; + use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::channel; use std::sync::{Arc, RwLock}; @@ -2069,4 +2139,84 @@ mod tests { crdt.insert(&network_entry_point); assert!(crdt.leader_data().is_none()); } + + #[test] + fn new_with_external_ip_test_random() { + let sockaddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 8080); + let node = + TestNode::new_with_external_ip(Keypair::new().pubkey(), sockaddr.ip(), (8100, 8200), 0); + + assert_eq!( + node.sockets.gossip.local_addr().unwrap().ip(), + sockaddr.ip() + ); + assert_eq!( + node.sockets.replicate.local_addr().unwrap().ip(), + sockaddr.ip() + ); + assert_eq!( + node.sockets.requests.local_addr().unwrap().ip(), + sockaddr.ip() + ); + assert_eq!( + node.sockets.transaction.local_addr().unwrap().ip(), + sockaddr.ip() + ); + assert_eq!( + node.sockets.repair.local_addr().unwrap().ip(), + sockaddr.ip() + ); + + assert!(node.sockets.gossip.local_addr().unwrap().port() >= 8100); + assert!(node.sockets.gossip.local_addr().unwrap().port() <= 8200); + assert!(node.sockets.replicate.local_addr().unwrap().port() >= 8100); + assert!(node.sockets.replicate.local_addr().unwrap().port() <= 8200); + assert!(node.sockets.requests.local_addr().unwrap().port() >= 8100); + assert!(node.sockets.requests.local_addr().unwrap().port() <= 8200); + assert!(node.sockets.transaction.local_addr().unwrap().port() >= 8100); + assert!(node.sockets.transaction.local_addr().unwrap().port() <= 8200); + assert!(node.sockets.repair.local_addr().unwrap().port() >= 8100); + assert!(node.sockets.repair.local_addr().unwrap().port() <= 8200); + } + + #[test] + fn new_with_external_ip_test_gossip() { + let sockaddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 8080); + let node = TestNode::new_with_external_ip( + Keypair::new().pubkey(), + sockaddr.ip(), + (8100, 8200), + 8050, + ); + assert_eq!( + node.sockets.gossip.local_addr().unwrap().ip(), + sockaddr.ip() + ); + assert_eq!( + node.sockets.replicate.local_addr().unwrap().ip(), + sockaddr.ip() + ); + assert_eq!( + node.sockets.requests.local_addr().unwrap().ip(), + sockaddr.ip() + ); + assert_eq!( + node.sockets.transaction.local_addr().unwrap().ip(), + sockaddr.ip() + ); + assert_eq!( + node.sockets.repair.local_addr().unwrap().ip(), + sockaddr.ip() + ); + + assert_eq!(node.sockets.gossip.local_addr().unwrap().port(), 8050); + assert!(node.sockets.replicate.local_addr().unwrap().port() >= 8100); + assert!(node.sockets.replicate.local_addr().unwrap().port() <= 8200); + assert!(node.sockets.requests.local_addr().unwrap().port() >= 8100); + assert!(node.sockets.requests.local_addr().unwrap().port() <= 8200); + assert!(node.sockets.transaction.local_addr().unwrap().port() >= 8100); + assert!(node.sockets.transaction.local_addr().unwrap().port() <= 8200); + assert!(node.sockets.repair.local_addr().unwrap().port() >= 8100); + assert!(node.sockets.repair.local_addr().unwrap().port() <= 8200); + } }