Client NAT traversal 0.1

UPnP is now used to request a port on the NAT be forwarded to the local machine.
This obviously only works for NATs that support UPnP, and thus is not a panacea
for all NAT-related connectivity issues.

Notable hacks in this patch include a transmit/receive UDP socket pair to work
around current protocol limitations whereby the full node assumes its peer can
receive on the same UDP port it transmitted from.
This commit is contained in:
Michael Vines 2018-06-29 14:12:26 -07:00 committed by Grimes
parent 4ffb5d157a
commit 0b56d603c2
10 changed files with 138 additions and 85 deletions

View File

@ -72,6 +72,9 @@ rand = "0.5.1"
pnet_datalink = "0.21.0"
tokio = "0.1"
tokio-codec = "0.1"
tokio-core = "0.1.17"
tokio-io = "0.1"
itertools = "0.7.8"
bs58 = "0.2.0"
p2p = "0.5.2"
futures = "0.1.21"

View File

@ -20,5 +20,5 @@ rsync -vPz "$rsync_leader_url"/config/mint.json $SOLANA_CONFIG_DIR/
# shellcheck disable=SC2086 # $solana_client_demo should not be quoted
exec $solana_client_demo \
-n "$count" -l $SOLANA_CONFIG_DIR/leader.json -d \
-n "$count" -l $SOLANA_CONFIG_DIR/leader.json \
< $SOLANA_CONFIG_DIR/mint.json

View File

@ -9,6 +9,7 @@ source "$here"/common.sh
SOLANA_CONFIG_DIR=config-client-demo
leader=${1:-${here}/..} # Default to local solana repo
shift
rsync_leader_url=$(rsync_url "$leader")
@ -19,4 +20,4 @@ rsync -vPz "$rsync_leader_url"/config/mint.json $SOLANA_CONFIG_DIR/
# shellcheck disable=SC2086 # $solana_wallet should not be quoted
exec $solana_wallet \
-l $SOLANA_CONFIG_DIR/leader.json -m $SOLANA_CONFIG_DIR/mint.json -d
-l $SOLANA_CONFIG_DIR/leader.json -m $SOLANA_CONFIG_DIR/mint.json "$@"

View File

@ -8,9 +8,10 @@ extern crate solana;
use atty::{is, Stream};
use getopts::Options;
use rayon::prelude::*;
use solana::crdt::{get_ip_addr, Crdt, ReplicatedData};
use solana::crdt::{Crdt, ReplicatedData};
use solana::hash::Hash;
use solana::mint::Mint;
use solana::nat::udp_public_bind;
use solana::ncp::Ncp;
use solana::signature::{GenKeys, KeyPair, KeyPairUtil};
use solana::streamer::default_window;
@ -40,14 +41,13 @@ fn print_usage(program: &str, opts: Options) {
}
fn sample_tx_count(
thread_addr: Arc<RwLock<SocketAddr>>,
exit: Arc<AtomicBool>,
maxes: Arc<RwLock<Vec<(f64, u64)>>>,
first_count: u64,
v: ReplicatedData,
sample_period: u64,
) {
let mut client = mk_client(&thread_addr, &v);
let mut client = mk_client(&v);
let mut now = Instant::now();
let mut initial_tx_count = client.transaction_count();
let mut max_tps = 0.0;
@ -149,9 +149,7 @@ fn main() {
let mut opts = Options::new();
opts.optopt("l", "", "leader", "leader.json");
opts.optopt("c", "", "client port", "port");
opts.optopt("t", "", "number of threads", &format!("{}", threads));
opts.optflag("d", "dyn", "detect network address dynamically");
opts.optopt(
"s",
"",
@ -179,15 +177,6 @@ fn main() {
print_usage(&program, opts);
return;
}
let mut addr: SocketAddr = "0.0.0.0:8100".parse().unwrap();
if matches.opt_present("c") {
let port = matches.opt_str("c").unwrap().parse().unwrap();
addr.set_port(port);
}
if matches.opt_present("d") {
addr.set_ip(get_ip_addr().unwrap());
}
let client_addr: Arc<RwLock<SocketAddr>> = Arc::new(RwLock::new(addr));
if matches.opt_present("t") {
threads = matches.opt_str("t").unwrap().parse().expect("integer");
}
@ -207,13 +196,7 @@ fn main() {
let signal = Arc::new(AtomicBool::new(false));
let mut c_threads = vec![];
let validators = converge(
&client_addr,
&leader,
signal.clone(),
num_nodes,
&mut c_threads,
);
let validators = converge(&leader, signal.clone(), num_nodes, &mut c_threads);
assert_eq!(validators.len(), num_nodes);
if is(Stream::Stdin) {
@ -233,7 +216,7 @@ fn main() {
eprintln!("failed to parse json: {}", e);
exit(1);
});
let mut client = mk_client(&client_addr, &leader);
let mut client = mk_client(&leader);
println!("Get last ID...");
let mut last_id = client.get_last_id();
@ -260,20 +243,17 @@ fn main() {
.into_iter()
.map(|v| {
let exit = signal.clone();
let thread_addr = client_addr.clone();
let maxes = maxes.clone();
Builder::new()
.name("solana-client-sample".to_string())
.spawn(move || {
sample_tx_count(thread_addr, exit, maxes, first_count, v, sample_period);
sample_tx_count(exit, maxes, first_count, v, sample_period);
})
.unwrap()
})
.collect();
let clients = (0..threads)
.map(|_| mk_client(&client_addr, &leader))
.collect();
let clients = (0..threads).map(|_| mk_client(&leader)).collect();
// generate and send transactions for the specified duration
let time = Duration::new(time_sec, 0);
@ -320,45 +300,41 @@ fn main() {
}
}
fn mk_client(locked_addr: &Arc<RwLock<SocketAddr>>, r: &ReplicatedData) -> ThinClient {
let mut addr = locked_addr.write().unwrap();
let port = addr.port();
let transactions_socket = UdpSocket::bind(addr.clone()).unwrap();
addr.set_port(port + 1);
let requests_socket = UdpSocket::bind(addr.clone()).unwrap();
requests_socket
fn mk_client(r: &ReplicatedData) -> ThinClient {
let transactions_socket_pair = udp_public_bind("transactions");
let requests_socket_pair = udp_public_bind("requests");
requests_socket_pair
.receiver
.set_read_timeout(Some(Duration::new(1, 0)))
.unwrap();
addr.set_port(port + 2);
ThinClient::new(
r.requests_addr,
requests_socket,
requests_socket_pair.sender,
requests_socket_pair.receiver,
r.transactions_addr,
transactions_socket,
transactions_socket_pair.sender,
)
}
fn spy_node(client_addr: &Arc<RwLock<SocketAddr>>) -> (ReplicatedData, UdpSocket) {
let mut addr = client_addr.write().unwrap();
let port = addr.port();
let gossip = UdpSocket::bind(addr.clone()).unwrap();
addr.set_port(port + 1);
let daddr = "0.0.0.0:0".parse().unwrap();
fn spy_node() -> (ReplicatedData, UdpSocket) {
let gossip_socket_pair = udp_public_bind("gossip");
let pubkey = KeyPair::new().pubkey();
let daddr = "0.0.0.0:0".parse().unwrap();
let node = ReplicatedData::new(
pubkey,
gossip.local_addr().unwrap(),
//gossip.local_addr().unwrap(),
gossip_socket_pair.addr,
daddr,
daddr,
daddr,
daddr,
);
(node, gossip)
(node, gossip_socket_pair.receiver)
}
fn converge(
client_addr: &Arc<RwLock<SocketAddr>>,
leader: &ReplicatedData,
exit: Arc<AtomicBool>,
num_nodes: usize,
@ -366,7 +342,7 @@ fn converge(
) -> Vec<ReplicatedData> {
//lets spy on the network
let daddr = "0.0.0.0:0".parse().unwrap();
let (spy, spy_gossip) = spy_node(client_addr);
let (spy, spy_gossip) = spy_node();
let mut spy_crdt = Crdt::new(spy);
spy_crdt.insert(&leader);
spy_crdt.set_leader(leader.id);

View File

@ -8,9 +8,10 @@ extern crate solana;
use bincode::serialize;
use getopts::{Matches, Options};
use solana::crdt::{get_ip_addr, ReplicatedData};
use solana::crdt::ReplicatedData;
use solana::drone::DroneRequest;
use solana::mint::Mint;
use solana::nat::udp_public_bind;
use solana::signature::{PublicKey, Signature};
use solana::thin_client::ThinClient;
use std::env;
@ -19,7 +20,7 @@ use std::fmt;
use std::fs::File;
use std::io;
use std::io::prelude::*;
use std::net::{IpAddr, Ipv4Addr, SocketAddr, TcpStream, UdpSocket};
use std::net::{IpAddr, Ipv4Addr, SocketAddr, TcpStream};
use std::process::exit;
use std::thread::sleep;
use std::time::Duration;
@ -57,7 +58,6 @@ impl error::Error for WalletError {
struct WalletConfig {
leader: ReplicatedData,
id: Mint,
client_addr: SocketAddr,
drone_addr: SocketAddr,
command: WalletCommand,
}
@ -68,7 +68,6 @@ impl Default for WalletConfig {
WalletConfig {
leader: ReplicatedData::new_leader(&default_addr.clone()),
id: Mint::new(0),
client_addr: default_addr.clone(),
drone_addr: default_addr.clone(),
command: WalletCommand::Balance,
}
@ -122,8 +121,6 @@ fn parse_args(args: Vec<String>) -> Result<WalletConfig, Box<error::Error>> {
let mut opts = Options::new();
opts.optopt("l", "", "leader", "leader.json");
opts.optopt("m", "", "mint", "mint.json");
opts.optopt("c", "", "client port", "port");
opts.optflag("d", "dyn", "detect network address dynamically");
opts.optflag("h", "help", "print help");
let matches = match opts.parse(&args[1..]) {
@ -139,16 +136,6 @@ fn parse_args(args: Vec<String>) -> Result<WalletConfig, Box<error::Error>> {
return Ok(WalletConfig::default());
}
let mut client_addr: SocketAddr = "0.0.0.0:8100".parse().unwrap();
if matches.opt_present("c") {
let port = matches.opt_str("c").unwrap().parse().unwrap();
client_addr.set_port(port);
}
if matches.opt_present("d") {
client_addr.set_ip(get_ip_addr().unwrap());
}
let leader = if matches.opt_present("l") {
read_leader(matches.opt_str("l").unwrap())
} else {
@ -170,7 +157,6 @@ fn parse_args(args: Vec<String>) -> Result<WalletConfig, Box<error::Error>> {
Ok(WalletConfig {
leader,
id,
client_addr,
drone_addr, // TODO: Add an option for this.
command,
})
@ -252,20 +238,20 @@ fn read_mint(path: String) -> Result<Mint, Box<error::Error>> {
Ok(mint)
}
fn mk_client(client_addr: &SocketAddr, r: &ReplicatedData) -> io::Result<ThinClient> {
let mut addr = client_addr.clone();
let port = addr.port();
let transactions_socket = UdpSocket::bind(addr.clone())?;
addr.set_port(port + 1);
let requests_socket = UdpSocket::bind(addr.clone())?;
requests_socket.set_read_timeout(Some(Duration::new(1, 0)))?;
fn mk_client(r: &ReplicatedData) -> io::Result<ThinClient> {
let transactions_socket_pair = udp_public_bind("transactions");
let requests_socket_pair = udp_public_bind("requests");
requests_socket_pair
.receiver
.set_read_timeout(Some(Duration::new(1, 0)))
.unwrap();
addr.set_port(port + 2);
Ok(ThinClient::new(
r.requests_addr,
requests_socket,
requests_socket_pair.sender,
requests_socket_pair.receiver,
r.transactions_addr,
transactions_socket,
transactions_socket_pair.sender,
))
}
@ -283,6 +269,6 @@ fn request_airdrop(drone_addr: &SocketAddr, id: &Mint) {
fn main() -> Result<(), Box<error::Error>> {
env_logger::init();
let config = parse_args(env::args().collect())?;
let mut client = mk_client(&config.client_addr, &config.leader)?;
let mut client = mk_client(&config.leader)?;
process_command(&config, &mut client)
}

View File

@ -99,6 +99,7 @@ impl Drone {
let mut client = ThinClient::new(
self.requests_addr,
requests_socket.try_clone().unwrap(),
requests_socket,
self.transactions_addr,
transactions_socket,
@ -292,6 +293,7 @@ mod tests {
let mut client = ThinClient::new(
leader.data.requests_addr,
requests_socket.try_clone().unwrap(),
requests_socket,
leader.data.transactions_addr,
transactions_socket,

View File

@ -25,6 +25,7 @@ pub mod hash;
pub mod ledger;
pub mod logger;
pub mod mint;
pub mod nat;
pub mod ncp;
pub mod packet;
pub mod payment_plan;

76
src/nat.rs Normal file
View File

@ -0,0 +1,76 @@
//! The `nat` module assists with NAT traversal
extern crate futures;
extern crate p2p;
extern crate tokio_core;
use std::net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket};
use self::futures::Future;
use self::p2p::UdpSocketExt;
/// A data type representing a public Udp socket
pub struct UdpSocketPair {
pub addr: SocketAddr, // Public address of the socket
pub receiver: UdpSocket, // Locally bound socket that can receive from the public address
pub sender: UdpSocket, // Locally bound socket to send via public address
}
/// Binds a private Udp address to a public address using UPnP if possible
pub fn udp_public_bind(label: &str) -> UdpSocketPair {
let private_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0);
let mut core = tokio_core::reactor::Core::new().unwrap();
let handle = core.handle();
let mc = p2p::P2p::default();
let res = core.run({
tokio_core::net::UdpSocket::bind_public(&private_addr, &handle, &mc)
.map_err(|e| {
info!("Failed to bind public socket for {}: {}", label, e);
})
.and_then(|(socket, public_addr)| Ok((public_addr, socket.local_addr().unwrap())))
});
match res {
Ok((public_addr, local_addr)) => {
info!(
"Using local address {} mapped to UPnP public address {} for {}",
local_addr, public_addr, label
);
// NAT should now be forwarding inbound packets directed at
// |public_addr| to the local |receiver| socket...
let receiver = UdpSocket::bind(local_addr).unwrap();
// ... however for outbound packets, the NAT *will not* rewrite the
// source port from |receiver.local_addr().port()| to |public_addr.port()|.
// This is currently a problem when talking with a fullnode as it
// assumes it can send UDP packets back at the source. This hits the
// NAT as a datagram for |receiver.local_addr().port()| on the NAT's public
// IP, which the NAT promptly discards. As a short term hack, create a
// local UDP socket, |sender|, with the same port as |public_addr.port()|.
//
// TODO: Remove the |sender| socket and deal with the downstream changes to
// the UDP signalling
let mut local_addr_sender = local_addr.clone();
local_addr_sender.set_port(public_addr.port());
let sender = UdpSocket::bind(local_addr_sender).unwrap();
UdpSocketPair {
addr: public_addr,
receiver,
sender,
}
}
Err(_) => {
let sender = UdpSocket::bind(private_addr).unwrap();
let local_addr = sender.local_addr().unwrap();
info!("Using local address {} for {}", local_addr, label);
UdpSocketPair {
addr: private_addr,
receiver: sender.try_clone().unwrap(),
sender,
}
}
}
}

View File

@ -15,7 +15,8 @@ use transaction::Transaction;
/// An object for querying and sending transactions to the network.
pub struct ThinClient {
requests_addr: SocketAddr,
requests_socket: UdpSocket,
requests_sender: UdpSocket,
requests_receiver: UdpSocket,
transactions_addr: SocketAddr,
transactions_socket: UdpSocket,
last_id: Option<Hash>,
@ -30,13 +31,15 @@ impl ThinClient {
/// to a public address before invoking ThinClient methods.
pub fn new(
requests_addr: SocketAddr,
requests_socket: UdpSocket,
requests_sender: UdpSocket,
requests_receiver: UdpSocket,
transactions_addr: SocketAddr,
transactions_socket: UdpSocket,
) -> Self {
let client = ThinClient {
requests_addr,
requests_socket,
requests_sender,
requests_receiver,
transactions_addr,
transactions_socket,
last_id: None,
@ -50,7 +53,7 @@ impl ThinClient {
pub fn recv_response(&self) -> io::Result<Response> {
let mut buf = vec![0u8; 1024];
trace!("start recv_from");
self.requests_socket.recv_from(&mut buf)?;
self.requests_receiver.recv_from(&mut buf)?;
trace!("end recv_from");
let resp = deserialize(&buf).expect("deserialize balance in thin_client");
Ok(resp)
@ -112,7 +115,7 @@ impl ThinClient {
trace!("get_balance");
let req = Request::GetBalance { key: *pubkey };
let data = serialize(&req).expect("serialize GetBalance in pub fn get_balance");
self.requests_socket
self.requests_sender
.send_to(&data, &self.requests_addr)
.expect("buffer error in pub fn get_balance");
let mut done = false;
@ -136,7 +139,7 @@ impl ThinClient {
serialize(&req).expect("serialize GetTransactionCount in pub fn transaction_count");
let mut done = false;
while !done {
self.requests_socket
self.requests_sender
.send_to(&data, &self.requests_addr)
.expect("buffer error in pub fn transaction_count");
@ -159,7 +162,8 @@ impl ThinClient {
let data = serialize(&req).expect("serialize GetLastId in pub fn get_last_id");
let mut done = false;
while !done {
self.requests_socket
eprintln!("get_last_id send_to {}", &self.requests_addr);
self.requests_sender
.send_to(&data, &self.requests_addr)
.expect("buffer error in pub fn get_last_id");
@ -201,7 +205,7 @@ impl ThinClient {
let data = serialize(&req).expect("serialize GetSignature in pub fn check_signature");
let mut done = false;
while !done {
self.requests_socket
self.requests_sender
.send_to(&data, &self.requests_addr)
.expect("buffer error in pub fn get_last_id");
@ -263,6 +267,7 @@ mod tests {
let mut client = ThinClient::new(
leader.data.requests_addr,
requests_socket.try_clone().unwrap(),
requests_socket,
leader.data.transactions_addr,
transactions_socket,
@ -310,6 +315,7 @@ mod tests {
let transactions_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
let mut client = ThinClient::new(
leader.data.requests_addr,
requests_socket.try_clone().unwrap(),
requests_socket,
leader.data.transactions_addr,
transactions_socket,
@ -368,6 +374,7 @@ mod tests {
let transactions_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
let mut client = ThinClient::new(
leader.data.requests_addr,
requests_socket.try_clone().unwrap(),
requests_socket,
leader.data.transactions_addr,
transactions_socket,

View File

@ -246,6 +246,7 @@ fn mk_client(leader: &ReplicatedData) -> ThinClient {
ThinClient::new(
leader.requests_addr,
requests_socket.try_clone().unwrap(),
requests_socket,
leader.transactions_addr,
transactions_socket,