Dynamically bind to available UDP ports in Fullnode (#920)
* Dynamically bind to available UDP ports in Fullnode * Added tests for dynamic port binding - Also removed hard coding of port range from CRDT
This commit is contained in:
parent
c641ba1006
commit
d3fac8a06f
|
@ -72,11 +72,27 @@ fn main() -> () {
|
||||||
}
|
}
|
||||||
|
|
||||||
let leader_pubkey = keypair.pubkey();
|
let leader_pubkey = keypair.pubkey();
|
||||||
let repl_clone = repl_data.clone();
|
|
||||||
|
|
||||||
let ledger_path = matches.value_of("ledger").unwrap();
|
let ledger_path = matches.value_of("ledger").unwrap();
|
||||||
|
|
||||||
let node = TestNode::new_with_bind_addr(repl_data, bind_addr);
|
let port_range = (8100, 10000);
|
||||||
|
let node = if let Some(_t) = matches.value_of("testnet") {
|
||||||
|
TestNode::new_with_external_ip(
|
||||||
|
leader_pubkey,
|
||||||
|
repl_data.contact_info.ncp.ip(),
|
||||||
|
port_range,
|
||||||
|
0,
|
||||||
|
)
|
||||||
|
} else {
|
||||||
|
TestNode::new_with_external_ip(
|
||||||
|
leader_pubkey,
|
||||||
|
repl_data.contact_info.ncp.ip(),
|
||||||
|
port_range,
|
||||||
|
repl_data.contact_info.ncp.port(),
|
||||||
|
)
|
||||||
|
};
|
||||||
|
let repl_clone = node.data.clone();
|
||||||
|
|
||||||
let mut drone_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), DRONE_PORT);
|
let mut drone_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), DRONE_PORT);
|
||||||
let testnet_addr = matches.value_of("testnet").map(|addr_str| {
|
let testnet_addr = matches.value_of("testnet").map(|addr_str| {
|
||||||
let addr: SocketAddr = addr_str.parse().unwrap();
|
let addr: SocketAddr = addr_str.parse().unwrap();
|
||||||
|
|
154
src/crdt.rs
154
src/crdt.rs
|
@ -20,6 +20,7 @@ use counter::Counter;
|
||||||
use hash::Hash;
|
use hash::Hash;
|
||||||
use ledger::LedgerWindow;
|
use ledger::LedgerWindow;
|
||||||
use log::Level;
|
use log::Level;
|
||||||
|
use nat::udp_random_bind;
|
||||||
use packet::{to_blob, Blob, BlobRecycler, SharedBlob, BLOB_SIZE};
|
use packet::{to_blob, Blob, BlobRecycler, SharedBlob, BLOB_SIZE};
|
||||||
use pnet_datalink as datalink;
|
use pnet_datalink as datalink;
|
||||||
use rand::{thread_rng, RngCore};
|
use rand::{thread_rng, RngCore};
|
||||||
|
@ -30,7 +31,7 @@ use std;
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::collections::VecDeque;
|
use std::collections::VecDeque;
|
||||||
use std::io::Cursor;
|
use std::io::Cursor;
|
||||||
use std::net::{IpAddr, SocketAddr, UdpSocket};
|
use std::net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket};
|
||||||
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
|
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
|
||||||
use std::sync::{Arc, RwLock};
|
use std::sync::{Arc, RwLock};
|
||||||
use std::thread::{sleep, Builder, JoinHandle};
|
use std::thread::{sleep, Builder, JoinHandle};
|
||||||
|
@ -1354,6 +1355,74 @@ impl TestNode {
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
pub fn new_with_external_ip(
|
||||||
|
pubkey: Pubkey,
|
||||||
|
ip: IpAddr,
|
||||||
|
port_range: (u16, u16),
|
||||||
|
ncp_port: u16,
|
||||||
|
) -> TestNode {
|
||||||
|
fn bind(port_range: (u16, u16)) -> (u16, UdpSocket) {
|
||||||
|
match udp_random_bind(port_range.0, port_range.1, 5) {
|
||||||
|
Ok(socket) => (socket.local_addr().unwrap().port(), socket),
|
||||||
|
Err(err) => {
|
||||||
|
panic!("Failed to bind to {:?}", err);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
fn bind_to(port: u16) -> UdpSocket {
|
||||||
|
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), port);
|
||||||
|
match UdpSocket::bind(addr) {
|
||||||
|
Ok(socket) => socket,
|
||||||
|
Err(err) => {
|
||||||
|
panic!("Failed to bind to {:?}: {:?}", addr, err);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let (gossip_port, gossip) = if ncp_port != 0 {
|
||||||
|
(ncp_port, bind_to(ncp_port))
|
||||||
|
} else {
|
||||||
|
bind(port_range)
|
||||||
|
};
|
||||||
|
let (replicate_port, replicate) = bind(port_range);
|
||||||
|
let (requests_port, requests) = bind(port_range);
|
||||||
|
let (transaction_port, transaction) = bind(port_range);
|
||||||
|
let (repair_port, repair) = bind(port_range);
|
||||||
|
|
||||||
|
// Responses are sent from the same Udp port as requests are received
|
||||||
|
// from, in hopes that a NAT sitting in the middle will route the
|
||||||
|
// response Udp packet correctly back to the requester.
|
||||||
|
let respond = requests.try_clone().unwrap();
|
||||||
|
|
||||||
|
let gossip_send = UdpSocket::bind("0.0.0.0:0").unwrap();
|
||||||
|
let broadcast = UdpSocket::bind("0.0.0.0:0").unwrap();
|
||||||
|
let retransmit = UdpSocket::bind("0.0.0.0:0").unwrap();
|
||||||
|
|
||||||
|
let node_info = NodeInfo::new(
|
||||||
|
pubkey,
|
||||||
|
SocketAddr::new(ip, gossip_port),
|
||||||
|
SocketAddr::new(ip, replicate_port),
|
||||||
|
SocketAddr::new(ip, requests_port),
|
||||||
|
SocketAddr::new(ip, transaction_port),
|
||||||
|
SocketAddr::new(ip, repair_port),
|
||||||
|
);
|
||||||
|
|
||||||
|
TestNode {
|
||||||
|
data: node_info,
|
||||||
|
sockets: Sockets {
|
||||||
|
gossip,
|
||||||
|
gossip_send,
|
||||||
|
requests,
|
||||||
|
replicate,
|
||||||
|
transaction,
|
||||||
|
respond,
|
||||||
|
broadcast,
|
||||||
|
repair,
|
||||||
|
retransmit,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn report_time_spent(label: &str, time: &Duration, extra: &str) {
|
fn report_time_spent(label: &str, time: &Duration, extra: &str) {
|
||||||
|
@ -1366,7 +1435,7 @@ fn report_time_spent(label: &str, time: &Duration, extra: &str) {
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use crdt::{
|
use crdt::{
|
||||||
parse_port_or_addr, Crdt, CrdtError, NodeInfo, Protocol, GOSSIP_PURGE_MILLIS,
|
parse_port_or_addr, Crdt, CrdtError, NodeInfo, Protocol, TestNode, GOSSIP_PURGE_MILLIS,
|
||||||
GOSSIP_SLEEP_MILLIS, MIN_TABLE_SIZE,
|
GOSSIP_SLEEP_MILLIS, MIN_TABLE_SIZE,
|
||||||
};
|
};
|
||||||
use entry::Entry;
|
use entry::Entry;
|
||||||
|
@ -1377,6 +1446,7 @@ mod tests {
|
||||||
use result::Error;
|
use result::Error;
|
||||||
use signature::{Keypair, KeypairUtil, Pubkey};
|
use signature::{Keypair, KeypairUtil, Pubkey};
|
||||||
use std::fs::remove_dir_all;
|
use std::fs::remove_dir_all;
|
||||||
|
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
|
||||||
use std::sync::atomic::{AtomicBool, Ordering};
|
use std::sync::atomic::{AtomicBool, Ordering};
|
||||||
use std::sync::mpsc::channel;
|
use std::sync::mpsc::channel;
|
||||||
use std::sync::{Arc, RwLock};
|
use std::sync::{Arc, RwLock};
|
||||||
|
@ -2069,4 +2139,84 @@ mod tests {
|
||||||
crdt.insert(&network_entry_point);
|
crdt.insert(&network_entry_point);
|
||||||
assert!(crdt.leader_data().is_none());
|
assert!(crdt.leader_data().is_none());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn new_with_external_ip_test_random() {
|
||||||
|
let sockaddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 8080);
|
||||||
|
let node =
|
||||||
|
TestNode::new_with_external_ip(Keypair::new().pubkey(), sockaddr.ip(), (8100, 8200), 0);
|
||||||
|
|
||||||
|
assert_eq!(
|
||||||
|
node.sockets.gossip.local_addr().unwrap().ip(),
|
||||||
|
sockaddr.ip()
|
||||||
|
);
|
||||||
|
assert_eq!(
|
||||||
|
node.sockets.replicate.local_addr().unwrap().ip(),
|
||||||
|
sockaddr.ip()
|
||||||
|
);
|
||||||
|
assert_eq!(
|
||||||
|
node.sockets.requests.local_addr().unwrap().ip(),
|
||||||
|
sockaddr.ip()
|
||||||
|
);
|
||||||
|
assert_eq!(
|
||||||
|
node.sockets.transaction.local_addr().unwrap().ip(),
|
||||||
|
sockaddr.ip()
|
||||||
|
);
|
||||||
|
assert_eq!(
|
||||||
|
node.sockets.repair.local_addr().unwrap().ip(),
|
||||||
|
sockaddr.ip()
|
||||||
|
);
|
||||||
|
|
||||||
|
assert!(node.sockets.gossip.local_addr().unwrap().port() >= 8100);
|
||||||
|
assert!(node.sockets.gossip.local_addr().unwrap().port() <= 8200);
|
||||||
|
assert!(node.sockets.replicate.local_addr().unwrap().port() >= 8100);
|
||||||
|
assert!(node.sockets.replicate.local_addr().unwrap().port() <= 8200);
|
||||||
|
assert!(node.sockets.requests.local_addr().unwrap().port() >= 8100);
|
||||||
|
assert!(node.sockets.requests.local_addr().unwrap().port() <= 8200);
|
||||||
|
assert!(node.sockets.transaction.local_addr().unwrap().port() >= 8100);
|
||||||
|
assert!(node.sockets.transaction.local_addr().unwrap().port() <= 8200);
|
||||||
|
assert!(node.sockets.repair.local_addr().unwrap().port() >= 8100);
|
||||||
|
assert!(node.sockets.repair.local_addr().unwrap().port() <= 8200);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn new_with_external_ip_test_gossip() {
|
||||||
|
let sockaddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 8080);
|
||||||
|
let node = TestNode::new_with_external_ip(
|
||||||
|
Keypair::new().pubkey(),
|
||||||
|
sockaddr.ip(),
|
||||||
|
(8100, 8200),
|
||||||
|
8050,
|
||||||
|
);
|
||||||
|
assert_eq!(
|
||||||
|
node.sockets.gossip.local_addr().unwrap().ip(),
|
||||||
|
sockaddr.ip()
|
||||||
|
);
|
||||||
|
assert_eq!(
|
||||||
|
node.sockets.replicate.local_addr().unwrap().ip(),
|
||||||
|
sockaddr.ip()
|
||||||
|
);
|
||||||
|
assert_eq!(
|
||||||
|
node.sockets.requests.local_addr().unwrap().ip(),
|
||||||
|
sockaddr.ip()
|
||||||
|
);
|
||||||
|
assert_eq!(
|
||||||
|
node.sockets.transaction.local_addr().unwrap().ip(),
|
||||||
|
sockaddr.ip()
|
||||||
|
);
|
||||||
|
assert_eq!(
|
||||||
|
node.sockets.repair.local_addr().unwrap().ip(),
|
||||||
|
sockaddr.ip()
|
||||||
|
);
|
||||||
|
|
||||||
|
assert_eq!(node.sockets.gossip.local_addr().unwrap().port(), 8050);
|
||||||
|
assert!(node.sockets.replicate.local_addr().unwrap().port() >= 8100);
|
||||||
|
assert!(node.sockets.replicate.local_addr().unwrap().port() <= 8200);
|
||||||
|
assert!(node.sockets.requests.local_addr().unwrap().port() >= 8100);
|
||||||
|
assert!(node.sockets.requests.local_addr().unwrap().port() <= 8200);
|
||||||
|
assert!(node.sockets.transaction.local_addr().unwrap().port() >= 8100);
|
||||||
|
assert!(node.sockets.transaction.local_addr().unwrap().port() <= 8200);
|
||||||
|
assert!(node.sockets.repair.local_addr().unwrap().port() >= 8100);
|
||||||
|
assert!(node.sockets.repair.local_addr().unwrap().port() <= 8200);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue