From 0b56d603c238ca4dc79a7712a6cace92cc68db70 Mon Sep 17 00:00:00 2001 From: Michael Vines Date: Fri, 29 Jun 2018 14:12:26 -0700 Subject: [PATCH] Client NAT traversal 0.1 UPnP is now used to request a port on the NAT be forwarded to the local machine. This obviously only works for NATs that support UPnP, and thus is not a panacea for all NAT-related connectivity issues. Notable hacks in this patch include a transmit/receive UDP socket pair to work around current protocol limitations whereby the full node assumes its peer can receive on the same UDP port it transmitted from. --- Cargo.toml | 3 ++ multinode-demo/client.sh | 2 +- multinode-demo/wallet.sh | 3 +- src/bin/client-demo.rs | 70 ++++++++++++------------------------ src/bin/wallet.rs | 42 ++++++++-------------- src/drone.rs | 2 ++ src/lib.rs | 1 + src/nat.rs | 76 ++++++++++++++++++++++++++++++++++++++++ src/thin_client.rs | 23 +++++++----- tests/multinode.rs | 1 + 10 files changed, 138 insertions(+), 85 deletions(-) create mode 100644 src/nat.rs diff --git a/Cargo.toml b/Cargo.toml index d0bd0461e..a35ffa2ac 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -72,6 +72,9 @@ rand = "0.5.1" pnet_datalink = "0.21.0" tokio = "0.1" tokio-codec = "0.1" +tokio-core = "0.1.17" tokio-io = "0.1" itertools = "0.7.8" bs58 = "0.2.0" +p2p = "0.5.2" +futures = "0.1.21" diff --git a/multinode-demo/client.sh b/multinode-demo/client.sh index 32e1313bb..79493abdc 100755 --- a/multinode-demo/client.sh +++ b/multinode-demo/client.sh @@ -20,5 +20,5 @@ rsync -vPz "$rsync_leader_url"/config/mint.json $SOLANA_CONFIG_DIR/ # shellcheck disable=SC2086 # $solana_client_demo should not be quoted exec $solana_client_demo \ - -n "$count" -l $SOLANA_CONFIG_DIR/leader.json -d \ + -n "$count" -l $SOLANA_CONFIG_DIR/leader.json \ < $SOLANA_CONFIG_DIR/mint.json diff --git a/multinode-demo/wallet.sh b/multinode-demo/wallet.sh index fc34fbb03..fcba90e14 100755 --- a/multinode-demo/wallet.sh +++ b/multinode-demo/wallet.sh @@ -9,6 +9,7 @@ source "$here"/common.sh SOLANA_CONFIG_DIR=config-client-demo leader=${1:-${here}/..} # Default to local solana repo +shift rsync_leader_url=$(rsync_url "$leader") @@ -19,4 +20,4 @@ rsync -vPz "$rsync_leader_url"/config/mint.json $SOLANA_CONFIG_DIR/ # shellcheck disable=SC2086 # $solana_wallet should not be quoted exec $solana_wallet \ - -l $SOLANA_CONFIG_DIR/leader.json -m $SOLANA_CONFIG_DIR/mint.json -d + -l $SOLANA_CONFIG_DIR/leader.json -m $SOLANA_CONFIG_DIR/mint.json "$@" diff --git a/src/bin/client-demo.rs b/src/bin/client-demo.rs index b819a17de..e1a0b192a 100644 --- a/src/bin/client-demo.rs +++ b/src/bin/client-demo.rs @@ -8,9 +8,10 @@ extern crate solana; use atty::{is, Stream}; use getopts::Options; use rayon::prelude::*; -use solana::crdt::{get_ip_addr, Crdt, ReplicatedData}; +use solana::crdt::{Crdt, ReplicatedData}; use solana::hash::Hash; use solana::mint::Mint; +use solana::nat::udp_public_bind; use solana::ncp::Ncp; use solana::signature::{GenKeys, KeyPair, KeyPairUtil}; use solana::streamer::default_window; @@ -40,14 +41,13 @@ fn print_usage(program: &str, opts: Options) { } fn sample_tx_count( - thread_addr: Arc>, exit: Arc, maxes: Arc>>, first_count: u64, v: ReplicatedData, sample_period: u64, ) { - let mut client = mk_client(&thread_addr, &v); + let mut client = mk_client(&v); let mut now = Instant::now(); let mut initial_tx_count = client.transaction_count(); let mut max_tps = 0.0; @@ -149,9 +149,7 @@ fn main() { let mut opts = Options::new(); opts.optopt("l", "", "leader", "leader.json"); - opts.optopt("c", "", "client port", "port"); opts.optopt("t", "", "number of threads", &format!("{}", threads)); - opts.optflag("d", "dyn", "detect network address dynamically"); opts.optopt( "s", "", @@ -179,15 +177,6 @@ fn main() { print_usage(&program, opts); return; } - let mut addr: SocketAddr = "0.0.0.0:8100".parse().unwrap(); - if matches.opt_present("c") { - let port = matches.opt_str("c").unwrap().parse().unwrap(); - addr.set_port(port); - } - if matches.opt_present("d") { - addr.set_ip(get_ip_addr().unwrap()); - } - let client_addr: Arc> = Arc::new(RwLock::new(addr)); if matches.opt_present("t") { threads = matches.opt_str("t").unwrap().parse().expect("integer"); } @@ -207,13 +196,7 @@ fn main() { let signal = Arc::new(AtomicBool::new(false)); let mut c_threads = vec![]; - let validators = converge( - &client_addr, - &leader, - signal.clone(), - num_nodes, - &mut c_threads, - ); + let validators = converge(&leader, signal.clone(), num_nodes, &mut c_threads); assert_eq!(validators.len(), num_nodes); if is(Stream::Stdin) { @@ -233,7 +216,7 @@ fn main() { eprintln!("failed to parse json: {}", e); exit(1); }); - let mut client = mk_client(&client_addr, &leader); + let mut client = mk_client(&leader); println!("Get last ID..."); let mut last_id = client.get_last_id(); @@ -260,20 +243,17 @@ fn main() { .into_iter() .map(|v| { let exit = signal.clone(); - let thread_addr = client_addr.clone(); let maxes = maxes.clone(); Builder::new() .name("solana-client-sample".to_string()) .spawn(move || { - sample_tx_count(thread_addr, exit, maxes, first_count, v, sample_period); + sample_tx_count(exit, maxes, first_count, v, sample_period); }) .unwrap() }) .collect(); - let clients = (0..threads) - .map(|_| mk_client(&client_addr, &leader)) - .collect(); + let clients = (0..threads).map(|_| mk_client(&leader)).collect(); // generate and send transactions for the specified duration let time = Duration::new(time_sec, 0); @@ -320,45 +300,41 @@ fn main() { } } -fn mk_client(locked_addr: &Arc>, r: &ReplicatedData) -> ThinClient { - let mut addr = locked_addr.write().unwrap(); - let port = addr.port(); - let transactions_socket = UdpSocket::bind(addr.clone()).unwrap(); - addr.set_port(port + 1); - let requests_socket = UdpSocket::bind(addr.clone()).unwrap(); - requests_socket +fn mk_client(r: &ReplicatedData) -> ThinClient { + let transactions_socket_pair = udp_public_bind("transactions"); + let requests_socket_pair = udp_public_bind("requests"); + + requests_socket_pair + .receiver .set_read_timeout(Some(Duration::new(1, 0))) .unwrap(); - addr.set_port(port + 2); ThinClient::new( r.requests_addr, - requests_socket, + requests_socket_pair.sender, + requests_socket_pair.receiver, r.transactions_addr, - transactions_socket, + transactions_socket_pair.sender, ) } -fn spy_node(client_addr: &Arc>) -> (ReplicatedData, UdpSocket) { - let mut addr = client_addr.write().unwrap(); - let port = addr.port(); - let gossip = UdpSocket::bind(addr.clone()).unwrap(); - addr.set_port(port + 1); - let daddr = "0.0.0.0:0".parse().unwrap(); +fn spy_node() -> (ReplicatedData, UdpSocket) { + let gossip_socket_pair = udp_public_bind("gossip"); let pubkey = KeyPair::new().pubkey(); + let daddr = "0.0.0.0:0".parse().unwrap(); let node = ReplicatedData::new( pubkey, - gossip.local_addr().unwrap(), + //gossip.local_addr().unwrap(), + gossip_socket_pair.addr, daddr, daddr, daddr, daddr, ); - (node, gossip) + (node, gossip_socket_pair.receiver) } fn converge( - client_addr: &Arc>, leader: &ReplicatedData, exit: Arc, num_nodes: usize, @@ -366,7 +342,7 @@ fn converge( ) -> Vec { //lets spy on the network let daddr = "0.0.0.0:0".parse().unwrap(); - let (spy, spy_gossip) = spy_node(client_addr); + let (spy, spy_gossip) = spy_node(); let mut spy_crdt = Crdt::new(spy); spy_crdt.insert(&leader); spy_crdt.set_leader(leader.id); diff --git a/src/bin/wallet.rs b/src/bin/wallet.rs index 19ce3493a..cdaf43a7f 100644 --- a/src/bin/wallet.rs +++ b/src/bin/wallet.rs @@ -8,9 +8,10 @@ extern crate solana; use bincode::serialize; use getopts::{Matches, Options}; -use solana::crdt::{get_ip_addr, ReplicatedData}; +use solana::crdt::ReplicatedData; use solana::drone::DroneRequest; use solana::mint::Mint; +use solana::nat::udp_public_bind; use solana::signature::{PublicKey, Signature}; use solana::thin_client::ThinClient; use std::env; @@ -19,7 +20,7 @@ use std::fmt; use std::fs::File; use std::io; use std::io::prelude::*; -use std::net::{IpAddr, Ipv4Addr, SocketAddr, TcpStream, UdpSocket}; +use std::net::{IpAddr, Ipv4Addr, SocketAddr, TcpStream}; use std::process::exit; use std::thread::sleep; use std::time::Duration; @@ -57,7 +58,6 @@ impl error::Error for WalletError { struct WalletConfig { leader: ReplicatedData, id: Mint, - client_addr: SocketAddr, drone_addr: SocketAddr, command: WalletCommand, } @@ -68,7 +68,6 @@ impl Default for WalletConfig { WalletConfig { leader: ReplicatedData::new_leader(&default_addr.clone()), id: Mint::new(0), - client_addr: default_addr.clone(), drone_addr: default_addr.clone(), command: WalletCommand::Balance, } @@ -122,8 +121,6 @@ fn parse_args(args: Vec) -> Result> { let mut opts = Options::new(); opts.optopt("l", "", "leader", "leader.json"); opts.optopt("m", "", "mint", "mint.json"); - opts.optopt("c", "", "client port", "port"); - opts.optflag("d", "dyn", "detect network address dynamically"); opts.optflag("h", "help", "print help"); let matches = match opts.parse(&args[1..]) { @@ -139,16 +136,6 @@ fn parse_args(args: Vec) -> Result> { return Ok(WalletConfig::default()); } - let mut client_addr: SocketAddr = "0.0.0.0:8100".parse().unwrap(); - if matches.opt_present("c") { - let port = matches.opt_str("c").unwrap().parse().unwrap(); - client_addr.set_port(port); - } - - if matches.opt_present("d") { - client_addr.set_ip(get_ip_addr().unwrap()); - } - let leader = if matches.opt_present("l") { read_leader(matches.opt_str("l").unwrap()) } else { @@ -170,7 +157,6 @@ fn parse_args(args: Vec) -> Result> { Ok(WalletConfig { leader, id, - client_addr, drone_addr, // TODO: Add an option for this. command, }) @@ -252,20 +238,20 @@ fn read_mint(path: String) -> Result> { Ok(mint) } -fn mk_client(client_addr: &SocketAddr, r: &ReplicatedData) -> io::Result { - let mut addr = client_addr.clone(); - let port = addr.port(); - let transactions_socket = UdpSocket::bind(addr.clone())?; - addr.set_port(port + 1); - let requests_socket = UdpSocket::bind(addr.clone())?; - requests_socket.set_read_timeout(Some(Duration::new(1, 0)))?; +fn mk_client(r: &ReplicatedData) -> io::Result { + let transactions_socket_pair = udp_public_bind("transactions"); + let requests_socket_pair = udp_public_bind("requests"); + requests_socket_pair + .receiver + .set_read_timeout(Some(Duration::new(1, 0))) + .unwrap(); - addr.set_port(port + 2); Ok(ThinClient::new( r.requests_addr, - requests_socket, + requests_socket_pair.sender, + requests_socket_pair.receiver, r.transactions_addr, - transactions_socket, + transactions_socket_pair.sender, )) } @@ -283,6 +269,6 @@ fn request_airdrop(drone_addr: &SocketAddr, id: &Mint) { fn main() -> Result<(), Box> { env_logger::init(); let config = parse_args(env::args().collect())?; - let mut client = mk_client(&config.client_addr, &config.leader)?; + let mut client = mk_client(&config.leader)?; process_command(&config, &mut client) } diff --git a/src/drone.rs b/src/drone.rs index edf10392c..53c673c02 100644 --- a/src/drone.rs +++ b/src/drone.rs @@ -99,6 +99,7 @@ impl Drone { let mut client = ThinClient::new( self.requests_addr, + requests_socket.try_clone().unwrap(), requests_socket, self.transactions_addr, transactions_socket, @@ -292,6 +293,7 @@ mod tests { let mut client = ThinClient::new( leader.data.requests_addr, + requests_socket.try_clone().unwrap(), requests_socket, leader.data.transactions_addr, transactions_socket, diff --git a/src/lib.rs b/src/lib.rs index 0bde6e98b..956f056af 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -25,6 +25,7 @@ pub mod hash; pub mod ledger; pub mod logger; pub mod mint; +pub mod nat; pub mod ncp; pub mod packet; pub mod payment_plan; diff --git a/src/nat.rs b/src/nat.rs new file mode 100644 index 000000000..0de46d115 --- /dev/null +++ b/src/nat.rs @@ -0,0 +1,76 @@ +//! The `nat` module assists with NAT traversal + +extern crate futures; +extern crate p2p; +extern crate tokio_core; + +use std::net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket}; + +use self::futures::Future; +use self::p2p::UdpSocketExt; + +/// A data type representing a public Udp socket +pub struct UdpSocketPair { + pub addr: SocketAddr, // Public address of the socket + pub receiver: UdpSocket, // Locally bound socket that can receive from the public address + pub sender: UdpSocket, // Locally bound socket to send via public address +} + +/// Binds a private Udp address to a public address using UPnP if possible +pub fn udp_public_bind(label: &str) -> UdpSocketPair { + let private_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0); + + let mut core = tokio_core::reactor::Core::new().unwrap(); + let handle = core.handle(); + let mc = p2p::P2p::default(); + let res = core.run({ + tokio_core::net::UdpSocket::bind_public(&private_addr, &handle, &mc) + .map_err(|e| { + info!("Failed to bind public socket for {}: {}", label, e); + }) + .and_then(|(socket, public_addr)| Ok((public_addr, socket.local_addr().unwrap()))) + }); + + match res { + Ok((public_addr, local_addr)) => { + info!( + "Using local address {} mapped to UPnP public address {} for {}", + local_addr, public_addr, label + ); + + // NAT should now be forwarding inbound packets directed at + // |public_addr| to the local |receiver| socket... + let receiver = UdpSocket::bind(local_addr).unwrap(); + + // ... however for outbound packets, the NAT *will not* rewrite the + // source port from |receiver.local_addr().port()| to |public_addr.port()|. + // This is currently a problem when talking with a fullnode as it + // assumes it can send UDP packets back at the source. This hits the + // NAT as a datagram for |receiver.local_addr().port()| on the NAT's public + // IP, which the NAT promptly discards. As a short term hack, create a + // local UDP socket, |sender|, with the same port as |public_addr.port()|. + // + // TODO: Remove the |sender| socket and deal with the downstream changes to + // the UDP signalling + let mut local_addr_sender = local_addr.clone(); + local_addr_sender.set_port(public_addr.port()); + let sender = UdpSocket::bind(local_addr_sender).unwrap(); + + UdpSocketPair { + addr: public_addr, + receiver, + sender, + } + } + Err(_) => { + let sender = UdpSocket::bind(private_addr).unwrap(); + let local_addr = sender.local_addr().unwrap(); + info!("Using local address {} for {}", local_addr, label); + UdpSocketPair { + addr: private_addr, + receiver: sender.try_clone().unwrap(), + sender, + } + } + } +} diff --git a/src/thin_client.rs b/src/thin_client.rs index a2920c999..b25db7721 100644 --- a/src/thin_client.rs +++ b/src/thin_client.rs @@ -15,7 +15,8 @@ use transaction::Transaction; /// An object for querying and sending transactions to the network. pub struct ThinClient { requests_addr: SocketAddr, - requests_socket: UdpSocket, + requests_sender: UdpSocket, + requests_receiver: UdpSocket, transactions_addr: SocketAddr, transactions_socket: UdpSocket, last_id: Option, @@ -30,13 +31,15 @@ impl ThinClient { /// to a public address before invoking ThinClient methods. pub fn new( requests_addr: SocketAddr, - requests_socket: UdpSocket, + requests_sender: UdpSocket, + requests_receiver: UdpSocket, transactions_addr: SocketAddr, transactions_socket: UdpSocket, ) -> Self { let client = ThinClient { requests_addr, - requests_socket, + requests_sender, + requests_receiver, transactions_addr, transactions_socket, last_id: None, @@ -50,7 +53,7 @@ impl ThinClient { pub fn recv_response(&self) -> io::Result { let mut buf = vec![0u8; 1024]; trace!("start recv_from"); - self.requests_socket.recv_from(&mut buf)?; + self.requests_receiver.recv_from(&mut buf)?; trace!("end recv_from"); let resp = deserialize(&buf).expect("deserialize balance in thin_client"); Ok(resp) @@ -112,7 +115,7 @@ impl ThinClient { trace!("get_balance"); let req = Request::GetBalance { key: *pubkey }; let data = serialize(&req).expect("serialize GetBalance in pub fn get_balance"); - self.requests_socket + self.requests_sender .send_to(&data, &self.requests_addr) .expect("buffer error in pub fn get_balance"); let mut done = false; @@ -136,7 +139,7 @@ impl ThinClient { serialize(&req).expect("serialize GetTransactionCount in pub fn transaction_count"); let mut done = false; while !done { - self.requests_socket + self.requests_sender .send_to(&data, &self.requests_addr) .expect("buffer error in pub fn transaction_count"); @@ -159,7 +162,8 @@ impl ThinClient { let data = serialize(&req).expect("serialize GetLastId in pub fn get_last_id"); let mut done = false; while !done { - self.requests_socket + eprintln!("get_last_id send_to {}", &self.requests_addr); + self.requests_sender .send_to(&data, &self.requests_addr) .expect("buffer error in pub fn get_last_id"); @@ -201,7 +205,7 @@ impl ThinClient { let data = serialize(&req).expect("serialize GetSignature in pub fn check_signature"); let mut done = false; while !done { - self.requests_socket + self.requests_sender .send_to(&data, &self.requests_addr) .expect("buffer error in pub fn get_last_id"); @@ -263,6 +267,7 @@ mod tests { let mut client = ThinClient::new( leader.data.requests_addr, + requests_socket.try_clone().unwrap(), requests_socket, leader.data.transactions_addr, transactions_socket, @@ -310,6 +315,7 @@ mod tests { let transactions_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let mut client = ThinClient::new( leader.data.requests_addr, + requests_socket.try_clone().unwrap(), requests_socket, leader.data.transactions_addr, transactions_socket, @@ -368,6 +374,7 @@ mod tests { let transactions_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let mut client = ThinClient::new( leader.data.requests_addr, + requests_socket.try_clone().unwrap(), requests_socket, leader.data.transactions_addr, transactions_socket, diff --git a/tests/multinode.rs b/tests/multinode.rs index 9ef891631..bd2acc3a2 100644 --- a/tests/multinode.rs +++ b/tests/multinode.rs @@ -246,6 +246,7 @@ fn mk_client(leader: &ReplicatedData) -> ThinClient { ThinClient::new( leader.requests_addr, + requests_socket.try_clone().unwrap(), requests_socket, leader.transactions_addr, transactions_socket,