rework of netwrk rendezvous

* rename NodeInfo field of Node from "data" to "info"
      (touches a lot of files)

  * update client to use gossip to find leader, a la drone

  * rework multinode scripts
      * move more stuff into rust
      * added usage to all
      * no more rsync unless you're a validator (TODO: whack that, too)
  * fullnode doesn't bail if drone isn't up yet, just keeps trying
  * drone doesn't bail if network isn't up yet, just keeps trying
This commit is contained in:
Rob Walker 2018-08-31 00:10:39 -07:00
parent eb4e5a7bd0
commit 176e806d94
19 changed files with 385 additions and 400 deletions

View File

@ -70,7 +70,7 @@ echo "--- Wallet sanity"
echo "--- Node count"
(
set -x
./multinode-demo/client.sh "$PWD" 3 -c
./multinode-demo/client.sh localhost --num-nodes 3 --converge-only
) || flag_error
killBackgroundCommands

View File

@ -1,68 +1,37 @@
#!/bin/bash -e
#
USAGE=" usage: $0 [leader_url] [num_nodes] [--loop] [extra args]
Run bench-tps against the specified network
leader_url URL to the leader (defaults to ..)
num_nodes Minimum number of nodes to look for while converging
--loop Add this flag to cause the program to loop infinitely
\"extra args\" Any additional arguments are pass along to solana-bench-tps
"
here=$(dirname "$0")
# shellcheck source=multinode-demo/common.sh
source "$here"/common.sh
leader=$1
if [[ -n $leader ]]; then
if [[ $leader == "-h" || $leader == "--help" ]]; then
echo "$USAGE"
exit 0
usage() {
if [[ -n $1 ]]; then
echo "$*"
echo
fi
shift
echo "usage: $0 [network entry point] [extra args]"
echo
echo " Run bench-tps against the specified network"
echo
echo " extra args: additional arguments are pass along to solana-bench-tps"
echo
exit 1
}
# this is a little hacky
if [[ ${1:0:2} != "--" ]]; then
read -r _ leader_address shift < <(find_leader "${@:1:1}")
else
if [[ -d "$SNAP" ]]; then
leader=testnet.solana.com # Default to testnet when running as a Snap
else
leader=$here/.. # Default to local solana repo
fi
read -r _ leader_address shift < <(find_leader)
fi
shift "$shift"
count=$1
if [[ -n $count ]]; then
shift
else
count=1
fi
loop=
if [[ $1 = --loop ]]; then
loop=1
shift
fi
client_json="$SOLANA_CONFIG_CLIENT_DIR"/client.json
[[ -r $client_json ]] || $solana_keygen -o "$client_json"
rsync_leader_url=$(rsync_url "$leader")
(
set -x
mkdir -p "$SOLANA_CONFIG_CLIENT_DIR"
$rsync -vPz "$rsync_leader_url"/config/leader.json "$SOLANA_CONFIG_CLIENT_DIR"/
client_json="$SOLANA_CONFIG_CLIENT_DIR"/client.json
[[ -r $client_json ]] || $solana_keygen -o "$client_json"
)
iteration=0
set -x
while true; do
$solana_bench_tps \
-n "$count" \
-l "$SOLANA_CONFIG_CLIENT_DIR"/leader.json \
-k "$SOLANA_CONFIG_CLIENT_DIR"/client.json \
"$@"
[[ -n $loop ]] || exit 0
iteration=$((iteration + 1))
echo ------------------------------------------------------------------------
echo "Iteration: $iteration"
echo ------------------------------------------------------------------------
done
$solana_bench_tps \
--network "$leader_address" \
--keypair "$SOLANA_CONFIG_CLIENT_DIR"/client.json \
"$@"

View File

@ -159,3 +159,46 @@ rsync_url() { # adds the 'rsync://` prefix to URLs that need it
# Default to rsync:// URL
echo "rsync://$url"
}
# called from drone, validator, client
find_leader() {
declare leader leader_address
declare shift=0
if [[ -d $SNAP ]]; then
# Exit if mode is not yet configured
# (typically the case after the Snap is first installed)
[[ -n $(snapctl get mode) ]] || exit 0
# Select leader from the Snap configuration
leader_address=$(snapctl get leader-address)
if [[ -z $leader_address ]]; then
# Assume public testnet by default
leader_address=35.227.93.37:8001 # testnet.solana.com
fi
leader=$leader_address
else
if [[ -z $1 ]]; then
leader=${here}/.. # Default to local tree for rsync
leader_address=127.0.0.1:8001 # Default to local leader
elif [[ -z $2 ]]; then
leader=$1
declare leader_ip
leader_ip=$(dig +short "${leader%:*}" | head -n1)
if [[ -z $leader_ip ]]; then
usage "Error: unable to resolve IP address for $leader"
fi
leader_address=${leader_ip}:8001
shift=1
else
leader=$1
leader_address=$2
shift=2
fi
fi
echo "$leader" "$leader_address" "$shift"
}

View File

@ -2,32 +2,25 @@
#
# Starts an instance of solana-drone
#
# usage: $0 <rsync network path to solana repo on leader machine>
#
here=$(dirname "$0")
# shellcheck source=multinode-demo/common.sh
source "$here"/common.sh
SOLANA_CONFIG_DIR="$SOLANA_CONFIG_DIR"-drone
# shellcheck source=scripts/oom-score-adj.sh
source "$here"/../scripts/oom-score-adj.sh
if [[ -d "$SNAP" ]]; then
# Exit if mode is not yet configured
# (typically the case after the Snap is first installed)
[[ -n "$(snapctl get mode)" ]] || exit 0
# Select leader from the Snap configuration
leader_address="$(snapctl get leader-address)"
if [[ -z "$leader_address" ]]; then
# Assume drone is running on the same node as the leader by default
leader_address="localhost"
usage() {
if [[ -n $1 ]]; then
echo "$*"
echo
fi
leader="$leader_address"
else
leader=${1:-${here}/..} # Default to local tree for data
fi
echo "usage: $0 [network entry point]"
echo
echo " Run an airdrop drone for the specified network"
echo
exit 1
}
read -r _ leader_address shift < <(find_leader "${@:1:1}")
shift "$shift"
[[ -f "$SOLANA_CONFIG_PRIVATE_DIR"/mint.json ]] || {
echo "$SOLANA_CONFIG_PRIVATE_DIR/mint.json not found, create it by running:"
@ -36,17 +29,12 @@ fi
exit 1
}
rsync_leader_url=$(rsync_url "$leader")
set -ex
mkdir -p "$SOLANA_CONFIG_DIR"
$rsync -vPz "$rsync_leader_url"/config/leader.json "$SOLANA_CONFIG_DIR"/
trap 'kill "$pid" && wait "$pid"' INT TERM
$solana_drone \
-l "$SOLANA_CONFIG_DIR"/leader.json -k "$SOLANA_CONFIG_PRIVATE_DIR"/mint.json \
--timeout 120 \
--keypair "$SOLANA_CONFIG_PRIVATE_DIR"/mint.json \
--network "$leader_address" \
> >($drone_logger) 2>&1 &
pid=$!
oom_score_adj "$pid" 1000
wait "$pid"

View File

@ -14,9 +14,12 @@ usage() {
echo "$*"
echo
fi
echo "usage: $0 [-x] [rsync network path to solana repo on leader machine] [network ip address of leader]"
echo ""
echo " -x: runs a new, dynamically-configured validator"
echo "usage: $0 [-x] [rsync network path to leader] [network entry point]"
echo
echo " Start a validator on the specified network"
echo
echo " -x: runs a new, dynamically-configured validator"
echo
exit 1
}
@ -35,34 +38,8 @@ if [[ -n $3 ]]; then
usage
fi
if [[ -d $SNAP ]]; then
# Exit if mode is not yet configured
# (typically the case after the Snap is first installed)
[[ -n $(snapctl get mode) ]] || exit 0
# Select leader from the Snap configuration
leader_address=$(snapctl get leader-address)
if [[ -z $leader_address ]]; then
# Assume public testnet by default
leader_address=35.227.93.37 # testnet.solana.com
fi
leader=$leader_address
else
if [[ -z $1 ]]; then
leader=${1:-${here}/..} # Default to local tree for data
leader_address=${2:-127.0.0.1} # Default to local leader
elif [[ -z $2 ]]; then
leader=$1
leader_address=$(dig +short "${leader%:*}" | head -n1)
if [[ -z $leader_address ]]; then
usage "Error: unable to resolve IP address for $leader"
fi
else
leader=$1
leader_address=$2
fi
fi
leader_port=8001
read -r leader leader_address shift < <(find_leader "${@:1:2}")
shift "$shift"
if [[ -n $SOLANA_CUDA ]]; then
program=$solana_fullnode_cuda
@ -109,7 +86,7 @@ $rsync -vPr "$rsync_leader_url"/config/ "$SOLANA_LEADER_CONFIG_DIR"
trap 'kill "$pid" && wait "$pid"' INT TERM
$program \
--identity "$validator_json_path" \
--testnet "$leader_address:$leader_port" \
--network "$leader_address" \
--ledger "$SOLANA_LEADER_CONFIG_DIR"/ledger \
> >($validator_logger) 2>&1 &
pid=$!

View File

@ -12,21 +12,19 @@ use rayon::prelude::*;
use solana::client::mk_client;
use solana::crdt::{Crdt, NodeInfo};
use solana::drone::DRONE_PORT;
use solana::fullnode::Config;
use solana::hash::Hash;
use solana::logger;
use solana::metrics;
use solana::ncp::Ncp;
use solana::service::Service;
use solana::signature::{read_keypair, GenKeys, Keypair, KeypairUtil};
use solana::thin_client::ThinClient;
use solana::thin_client::{poll_gossip_for_leader, ThinClient};
use solana::timing::{duration_as_ms, duration_as_s};
use solana::transaction::Transaction;
use solana::wallet::request_airdrop;
use solana::window::default_window;
use std::collections::VecDeque;
use std::fs::File;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::net::SocketAddr;
use std::process::exit;
use std::sync::atomic::{AtomicBool, AtomicIsize, Ordering};
use std::sync::{Arc, RwLock};
@ -282,6 +280,7 @@ fn airdrop_tokens(client: &mut ThinClient, leader: &NodeInfo, id: &Keypair, tx_c
let starting_balance = client.poll_get_balance(&id.pubkey()).unwrap_or(0);
metrics_submit_token_balance(starting_balance);
println!("starting balance {}", starting_balance);
if starting_balance < tx_count {
let airdrop_amount = tx_count - starting_balance;
@ -299,13 +298,14 @@ fn airdrop_tokens(client: &mut ThinClient, leader: &NodeInfo, id: &Keypair, tx_c
let mut current_balance = starting_balance;
for _ in 0..20 {
sleep(Duration::from_millis(500));
current_balance = client
.poll_get_balance(&id.pubkey())
.unwrap_or(starting_balance);
current_balance = client.poll_get_balance(&id.pubkey()).unwrap_or_else(|e| {
println!("airdrop error {}", e);
starting_balance
});
if starting_balance != current_balance {
break;
}
println!(".");
println!("current balance {}...", current_balance);
}
metrics_submit_token_balance(current_balance);
if current_balance - starting_balance != airdrop_amount {
@ -394,12 +394,13 @@ fn main() {
let matches = App::new("solana-bench-tps")
.version(crate_version!())
.arg(
Arg::with_name("leader")
.short("l")
.long("leader")
.value_name("PATH")
Arg::with_name("network")
.short("n")
.long("network")
.value_name("HOST:PORT")
.takes_value(true)
.help("/path/to/leader.json"),
.required(true)
.help("rendezvous with the network at this gossip entry point"),
)
.arg(
Arg::with_name("keypair")
@ -411,32 +412,33 @@ fn main() {
.help("/path/to/id.json"),
)
.arg(
Arg::with_name("num_nodes")
.short("n")
.long("nodes")
.value_name("NUMBER")
Arg::with_name("num-nodes")
.short("N")
.long("num-nodes")
.value_name("NUM")
.takes_value(true)
.help("number of nodes to converge to"),
.help("wait for NUM nodes to converge"),
)
.arg(
Arg::with_name("threads")
.short("t")
.long("threads")
.value_name("NUMBER")
.value_name("NUM")
.takes_value(true)
.help("number of threads"),
)
.arg(
Arg::with_name("seconds")
.short("s")
.long("sec")
.value_name("NUMBER")
.long("seconds")
.value_name("NUM")
.takes_value(true)
.help("send transactions for this many seconds"),
)
.arg(
Arg::with_name("converge_only")
Arg::with_name("converge-only")
.short("c")
.long("converge-only")
.help("exit immediately after converging"),
)
.arg(
@ -453,13 +455,14 @@ fn main() {
)
.get_matches();
let leader: NodeInfo;
if let Some(l) = matches.value_of("leader") {
leader = read_leader(l).node_info;
} else {
let server_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 8000);
leader = NodeInfo::new_leader(&server_addr);
};
let network = matches
.value_of("network")
.unwrap()
.parse()
.unwrap_or_else(|e| {
eprintln!("failed to parse network: {}", e);
exit(1)
});
let id = read_keypair(matches.value_of("keypair").unwrap()).expect("client keypair");
@ -483,6 +486,8 @@ fn main() {
sustained = true;
}
let leader = poll_gossip_for_leader(network, None).expect("unable to find leader on network");
let exit_signal = Arc::new(AtomicBool::new(false));
let mut c_threads = vec![];
let (validators, leader) = converge(&leader, &exit_signal, num_nodes, &mut c_threads);
@ -510,9 +515,10 @@ fn main() {
exit(1);
}
if matches.is_present("converge_only") {
if matches.is_present("converge-only") {
return;
}
let leader = leader.unwrap();
println!("leader is at {} {}", leader.contact_info.rpu, leader.id);
@ -678,7 +684,7 @@ fn converge(
.unwrap()
.table
.values()
.filter(|x| Crdt::is_valid_address(x.contact_info.rpu))
.filter(|x| Crdt::is_valid_address(&x.contact_info.rpu))
.cloned()
.collect();
@ -698,8 +704,3 @@ fn converge(
let leader = spy_ref.read().unwrap().leader_data().cloned();
(v, leader)
}
fn read_leader(path: &str) -> Config {
let file = File::open(path).unwrap_or_else(|_| panic!("file not found: {}", path));
serde_json::from_reader(file).unwrap_or_else(|_| panic!("failed to parse {}", path))
}

View File

@ -10,35 +10,43 @@ extern crate tokio_codec;
use bincode::{deserialize, serialize};
use bytes::Bytes;
use clap::{App, Arg};
use solana::crdt::NodeInfo;
use solana::drone::{Drone, DroneRequest, DRONE_PORT};
use solana::fullnode::Config;
use solana::logger;
use solana::metrics::set_panic_hook;
use solana::signature::read_keypair;
use solana::thin_client::poll_gossip_for_leader;
use std::error;
use std::fs::File;
use std::io;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::net::{Ipv4Addr, SocketAddr};
use std::process::exit;
use std::sync::{Arc, Mutex};
use std::thread;
use tokio::net::TcpListener;
use tokio::prelude::*;
use tokio_codec::{BytesCodec, Decoder};
macro_rules! socketaddr {
($ip:expr, $port:expr) => {
SocketAddr::from((Ipv4Addr::from($ip), $port))
};
($str:expr) => {{
let a: SocketAddr = $str.parse().unwrap();
a
}};
}
fn main() -> Result<(), Box<error::Error>> {
logger::setup();
set_panic_hook("drone");
let matches = App::new("drone")
.version(crate_version!())
.arg(
Arg::with_name("leader")
.short("l")
.long("leader")
.value_name("PATH")
Arg::with_name("network")
.short("n")
.long("network")
.value_name("HOST:PORT")
.takes_value(true)
.help("/path/to/leader.json"),
.required(true)
.help("rendezvous with the network at this gossip entry point"),
)
.arg(
Arg::with_name("keypair")
@ -47,7 +55,7 @@ fn main() -> Result<(), Box<error::Error>> {
.value_name("PATH")
.takes_value(true)
.required(true)
.help("/path/to/mint.json"),
.help("File to read the client's keypair from"),
)
.arg(
Arg::with_name("slice")
@ -72,39 +80,40 @@ fn main() -> Result<(), Box<error::Error>> {
)
.get_matches();
let leader: NodeInfo;
if let Some(l) = matches.value_of("leader") {
leader = read_leader(l).node_info;
} else {
let server_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 8000);
leader = NodeInfo::new_leader(&server_addr);
};
let network = matches
.value_of("network")
.unwrap()
.parse()
.unwrap_or_else(|e| {
eprintln!("failed to parse network: {}", e);
exit(1)
});
let mint_keypair =
read_keypair(matches.value_of("keypair").expect("keypair")).expect("client keypair");
read_keypair(matches.value_of("keypair").unwrap()).expect("failed to read client keypair");
let time_slice: Option<u64>;
if let Some(secs) = matches.value_of("slice") {
time_slice = Some(secs.to_string().parse().expect("integer"));
time_slice = Some(secs.to_string().parse().expect("failed to parse slice"));
} else {
time_slice = None;
}
let request_cap: Option<u64>;
if let Some(c) = matches.value_of("cap") {
request_cap = Some(c.to_string().parse().expect("integer"));
request_cap = Some(c.to_string().parse().expect("failed to parse cap"));
} else {
request_cap = None;
}
let timeout: Option<u64>;
if let Some(secs) = matches.value_of("timeout") {
timeout = Some(secs.to_string().parse().expect("integer"));
timeout = Some(secs.to_string().parse().expect("failed to parse timeout"));
} else {
timeout = None;
}
let leader = poll_gossip_for_leader(leader.contact_info.ncp, timeout)?;
let leader = poll_gossip_for_leader(network, timeout)?;
let drone_addr: SocketAddr = format!("0.0.0.0:{}", DRONE_PORT).parse().unwrap();
let drone_addr = socketaddr!(0, DRONE_PORT);
let drone = Arc::new(Mutex::new(Drone::new(
mint_keypair,
@ -172,8 +181,3 @@ fn main() -> Result<(), Box<error::Error>> {
tokio::run(done);
Ok(())
}
fn read_leader(path: &str) -> Config {
let file = File::open(path).unwrap_or_else(|_| panic!("file not found: {}", path));
serde_json::from_reader(file).unwrap_or_else(|_| panic!("failed to parse {}", path))
}

View File

@ -1,23 +1,27 @@
#[macro_use]
extern crate clap;
extern crate getopts;
#[macro_use]
extern crate log;
extern crate serde_json;
#[macro_use]
extern crate solana;
use clap::{App, Arg};
use solana::client::mk_client;
use solana::crdt::{Node, NodeInfo};
use solana::crdt::Node;
use solana::drone::DRONE_PORT;
use solana::fullnode::{Config, Fullnode};
use solana::logger;
use solana::metrics::set_panic_hook;
use solana::service::Service;
use solana::signature::{Keypair, KeypairUtil};
use solana::thin_client::poll_gossip_for_leader;
use solana::wallet::request_airdrop;
use std::fs::File;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::net::{Ipv4Addr, SocketAddr};
use std::process::exit;
use std::thread::sleep;
use std::time::Duration;
fn main() -> () {
@ -34,12 +38,12 @@ fn main() -> () {
.help("run with the identity found in FILE"),
)
.arg(
Arg::with_name("testnet")
.short("t")
.long("testnet")
Arg::with_name("network")
.short("n")
.long("network")
.value_name("HOST:PORT")
.takes_value(true)
.help("connect to the network at this gossip entry point"),
.help("connect/rendezvous with the network at this gossip entry point"),
)
.arg(
Arg::with_name("ledger")
@ -52,16 +56,12 @@ fn main() -> () {
)
.get_matches();
let bind_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 8000);
let mut keypair = Keypair::new();
let mut repl_data = NodeInfo::new_leader_with_pubkey(keypair.pubkey(), &bind_addr);
if let Some(i) = matches.value_of("identity") {
let (keypair, ncp) = if let Some(i) = matches.value_of("identity") {
let path = i.to_string();
if let Ok(file) = File::open(path.clone()) {
let parse: serde_json::Result<Config> = serde_json::from_reader(file);
if let Ok(data) = parse {
keypair = data.keypair();
repl_data = data.node_info;
(data.keypair(), data.node_info.contact_info.ncp)
} else {
eprintln!("failed to parse {}", path);
exit(1);
@ -70,60 +70,62 @@ fn main() -> () {
eprintln!("failed to read {}", path);
exit(1);
}
}
let leader_pubkey = keypair.pubkey();
} else {
(Keypair::new(), socketaddr!(0, 8000))
};
let ledger_path = matches.value_of("ledger").unwrap();
let port_range = (8100, 10000);
let node = if let Some(_t) = matches.value_of("testnet") {
Node::new_with_external_ip(
leader_pubkey,
repl_data.contact_info.ncp.ip(),
port_range,
0,
)
} else {
Node::new_with_external_ip(
leader_pubkey,
repl_data.contact_info.ncp.ip(),
port_range,
repl_data.contact_info.ncp.port(),
)
// socketaddr that is initial pointer into the network's gossip (ncp)
let network = matches
.value_of("network")
.map(|network| network.parse().expect("failed to parse network address"));
let node = Node::new_with_external_ip(keypair.pubkey(), &ncp);
// save off some stuff for airdrop
let node_info = node.info.clone();
let pubkey = keypair.pubkey();
let fullnode = Fullnode::new(node, ledger_path, keypair, network, false);
// airdrop stuff, probably goes away at some point
let leader = match network {
Some(network) => {
poll_gossip_for_leader(network, None).expect("can't find leader on network")
}
None => node_info,
};
let repl_clone = node.data.clone();
let mut drone_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), DRONE_PORT);
let testnet_addr = matches.value_of("testnet").map(|addr_str| {
let addr: SocketAddr = addr_str.parse().unwrap();
drone_addr.set_ip(addr.ip());
addr
});
let fullnode = Fullnode::new(node, ledger_path, keypair, testnet_addr, false);
let mut client = mk_client(&leader);
let mut client = mk_client(&repl_clone);
let previous_balance = client.poll_get_balance(&leader_pubkey).unwrap_or(0);
eprintln!("balance is {}", previous_balance);
// TODO: maybe have the drone put itself in gossip somewhere instead of hardcoding?
let drone_addr = match network {
Some(network) => SocketAddr::new(network.ip(), DRONE_PORT),
None => SocketAddr::new(ncp.ip(), DRONE_PORT),
};
if previous_balance == 0 {
eprintln!("requesting airdrop from {}", drone_addr);
request_airdrop(&drone_addr, &leader_pubkey, 50).unwrap_or_else(|_| {
panic!(
"Airdrop failed, is the drone address correct {:?} drone running?",
loop {
let balance = client.poll_get_balance(&pubkey).unwrap_or(0);
info!("balance is {}", balance);
if balance >= 50 {
info!("good to go!");
break;
}
info!("requesting airdrop from {}", drone_addr);
loop {
if request_airdrop(&drone_addr, &pubkey, 50).is_ok() {
break;
}
info!(
"airdrop request, is the drone address correct {:?}, drone running?",
drone_addr
)
});
let balance_ok = client
.poll_balance_with_timeout(
&leader_pubkey,
&Duration::from_millis(100),
&Duration::from_secs(30),
)
.unwrap() > 0;
assert!(balance_ok, "0 balance, airdrop failed?");
);
sleep(Duration::from_secs(2));
}
}
fullnode.join().expect("join");
fullnode.join().expect("to never happen");
}

View File

@ -5,6 +5,7 @@ extern crate bs58;
extern crate clap;
extern crate dirs;
extern crate serde_json;
#[macro_use]
extern crate solana;
use clap::{App, Arg, SubCommand};
@ -63,9 +64,9 @@ struct WalletConfig {
impl Default for WalletConfig {
fn default() -> WalletConfig {
let default_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 8000);
let default_addr = socketaddr!(0, 8000);
WalletConfig {
leader: NodeInfo::new_leader(&default_addr),
leader: NodeInfo::new_with_socketaddr(&default_addr),
id: Keypair::new(),
drone_addr: default_addr,
command: WalletCommand::Balance,
@ -150,7 +151,7 @@ fn parse_args() -> Result<WalletConfig, Box<error::Error>> {
leader = read_leader(l)?.node_info;
} else {
let server_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 8000);
leader = NodeInfo::new_leader(&server_addr);
leader = NodeInfo::new_with_socketaddr(&server_addr);
};
let timeout: Option<u64>;
if let Some(secs) = matches.value_of("timeout") {

View File

@ -12,7 +12,6 @@
//! * layer 2 - Everyone else, if layer 1 is `2^10`, layer 2 should be able to fit `2^20` number of nodes.
//!
//! Bank needs to provide an interface for us to query the stake weight
use bincode::{deserialize, serialize};
use byteorder::{LittleEndian, ReadBytesExt};
use choose_gossip_peer_strategy::{ChooseGossipPeerStrategy, ChooseWeightedPeerStrategy};
@ -48,6 +47,7 @@ const GOSSIP_PURGE_MILLIS: u64 = 15000;
/// minimum membership table size before we start purging dead nodes
const MIN_TABLE_SIZE: usize = 2;
#[macro_export]
macro_rules! socketaddr {
($ip:expr, $port:expr) => {
SocketAddr::from((Ipv4Addr::from($ip), $port))
@ -57,6 +57,7 @@ macro_rules! socketaddr {
a
}};
}
#[macro_export]
macro_rules! socketaddr_any {
() => {
socketaddr!(0, 0)
@ -164,7 +165,7 @@ impl NodeInfo {
nxt_addr.set_port(addr.port() + nxt);
nxt_addr
}
pub fn new_leader_with_pubkey(pubkey: Pubkey, bind_addr: &SocketAddr) -> Self {
pub fn new_with_pubkey_socketaddr(pubkey: Pubkey, bind_addr: &SocketAddr) -> Self {
let transactions_addr = *bind_addr;
let gossip_addr = Self::next_port(&bind_addr, 1);
let replicate_addr = Self::next_port(&bind_addr, 2);
@ -177,13 +178,14 @@ impl NodeInfo {
transactions_addr,
)
}
pub fn new_leader(bind_addr: &SocketAddr) -> Self {
pub fn new_with_socketaddr(bind_addr: &SocketAddr) -> Self {
let keypair = Keypair::new();
Self::new_leader_with_pubkey(keypair.pubkey(), bind_addr)
Self::new_with_pubkey_socketaddr(keypair.pubkey(), bind_addr)
}
pub fn new_entry_point(gossip_addr: SocketAddr) -> Self {
//
pub fn new_entry_point(gossip_addr: &SocketAddr) -> Self {
let daddr: SocketAddr = socketaddr!("0.0.0.0:0");
NodeInfo::new(Pubkey::default(), gossip_addr, daddr, daddr, daddr)
NodeInfo::new(Pubkey::default(), *gossip_addr, daddr, daddr, daddr)
}
}
@ -460,7 +462,7 @@ impl Crdt {
if me.id == v.id {
//filter myself
false
} else if !(Self::is_valid_address(v.contact_info.tvu)) {
} else if !(Self::is_valid_address(&v.contact_info.tvu)) {
trace!(
"{:x}:broadcast skip not listening {:x} {}",
me.debug_id(),
@ -624,7 +626,7 @@ impl Crdt {
} else if me.leader_id == v.id {
trace!("skip retransmit to leader {:?}", v.id);
false
} else if !(Self::is_valid_address(v.contact_info.tvu)) {
} else if !(Self::is_valid_address(&v.contact_info.tvu)) {
trace!(
"skip nodes that are not listening {:?} {}",
v.id,
@ -691,8 +693,8 @@ impl Crdt {
.values()
.filter(|r| {
r.id != Pubkey::default()
&& (Self::is_valid_address(r.contact_info.tpu)
|| Self::is_valid_address(r.contact_info.tvu))
&& (Self::is_valid_address(&r.contact_info.tpu)
|| Self::is_valid_address(&r.contact_info.tvu))
})
.map(|x| x.ledger_state.last_id)
.collect()
@ -704,7 +706,7 @@ impl Crdt {
let valid: Vec<_> = self
.table
.values()
.filter(|r| r.id != self.id && Self::is_valid_address(r.contact_info.tvu))
.filter(|r| r.id != self.id && Self::is_valid_address(&r.contact_info.tvu))
.collect();
if valid.is_empty() {
Err(CrdtError::NoPeers)?;
@ -1002,7 +1004,7 @@ impl Crdt {
match deserialize(&blob.data[..blob.meta.size]) {
Ok(request) => Crdt::handle_protocol(
obj,
blob.meta.addr(),
&blob.meta.addr(),
request,
window,
ledger_window,
@ -1017,7 +1019,7 @@ impl Crdt {
fn handle_protocol(
me: &Arc<RwLock<Self>>,
from_addr: SocketAddr,
from_addr: &SocketAddr,
request: Protocol,
window: &SharedWindow,
ledger_window: &mut Option<&mut LedgerWindow>,
@ -1051,7 +1053,7 @@ impl Crdt {
// an unspecified address in our table
if from.contact_info.ncp.ip().is_unspecified() {
inc_new_counter_info!("crdt-window-request-updates-unspec-ncp", 1);
from.contact_info.ncp = from_addr;
from.contact_info.ncp = *from_addr;
}
let (from_id, ups, data, liveness) = {
@ -1247,7 +1249,7 @@ impl Crdt {
/// port must not be 0
/// ip must be specified and not mulitcast
/// loopback ip is only allowed in tests
pub fn is_valid_address(addr: SocketAddr) -> bool {
pub fn is_valid_address(addr: &SocketAddr) -> bool {
(addr.port() != 0) && Self::is_valid_ip(addr.ip())
}
@ -1273,7 +1275,7 @@ pub struct Sockets {
}
pub struct Node {
pub data: NodeInfo,
pub info: NodeInfo,
pub sockets: Sockets,
}
@ -1292,7 +1294,7 @@ impl Node {
let respond = UdpSocket::bind("0.0.0.0:0").unwrap();
let broadcast = UdpSocket::bind("0.0.0.0:0").unwrap();
let retransmit = UdpSocket::bind("0.0.0.0:0").unwrap();
let data = NodeInfo::new(
let info = NodeInfo::new(
pubkey,
gossip.local_addr().unwrap(),
replicate.local_addr().unwrap(),
@ -1300,7 +1302,7 @@ impl Node {
transaction.local_addr().unwrap(),
);
Node {
data,
info,
sockets: Sockets {
gossip,
requests,
@ -1313,17 +1315,12 @@ impl Node {
},
}
}
pub fn new_with_external_ip(
pubkey: Pubkey,
ip: IpAddr,
port_range: (u16, u16),
ncp_port: u16,
) -> Node {
fn bind(port_range: (u16, u16)) -> (u16, UdpSocket) {
match bind_in_range(port_range) {
pub fn new_with_external_ip(pubkey: Pubkey, ncp: &SocketAddr) -> Node {
fn bind() -> (u16, UdpSocket) {
match bind_in_range(SOLANA_PORT_RANGE) {
Ok(socket) => (socket.local_addr().unwrap().port(), socket),
Err(err) => {
panic!("Failed to bind to {:?}", err);
panic!("Failed to bind err: {}", err);
}
}
};
@ -1333,39 +1330,40 @@ impl Node {
match UdpSocket::bind(addr) {
Ok(socket) => socket,
Err(err) => {
panic!("Failed to bind to {:?}: {:?}", addr, err);
panic!("Failed to bind to {:?}, err: {}", addr, err);
}
}
};
let (gossip_port, gossip) = if ncp_port != 0 {
(ncp_port, bind_to(ncp_port))
let (gossip_port, gossip) = if ncp.port() != 0 {
(ncp.port(), bind_to(ncp.port()))
} else {
bind(port_range)
bind()
};
let (replicate_port, replicate) = bind(port_range);
let (requests_port, requests) = bind(port_range);
let (transaction_port, transaction) = bind(port_range);
let (_, repair) = bind(port_range);
let (_, broadcast) = bind(port_range);
let (_, retransmit) = bind(port_range);
let (replicate_port, replicate) = bind();
let (requests_port, requests) = bind();
let (transaction_port, transaction) = bind();
let (_, repair) = bind();
let (_, broadcast) = bind();
let (_, retransmit) = bind();
// Responses are sent from the same Udp port as requests are received
// from, in hopes that a NAT sitting in the middle will route the
// response Udp packet correctly back to the requester.
let respond = requests.try_clone().unwrap();
let node_info = NodeInfo::new(
let info = NodeInfo::new(
pubkey,
SocketAddr::new(ip, gossip_port),
SocketAddr::new(ip, replicate_port),
SocketAddr::new(ip, requests_port),
SocketAddr::new(ip, transaction_port),
SocketAddr::new(ncp.ip(), gossip_port),
SocketAddr::new(ncp.ip(), replicate_port),
SocketAddr::new(ncp.ip(), requests_port),
SocketAddr::new(ncp.ip(), transaction_port),
);
Node {
data: node_info,
info,
sockets: Sockets {
gossip,
requests,
@ -1391,7 +1389,7 @@ fn report_time_spent(label: &str, time: &Duration, extra: &str) {
mod tests {
use crdt::{
Crdt, CrdtError, Node, NodeInfo, Protocol, GOSSIP_PURGE_MILLIS, GOSSIP_SLEEP_MILLIS,
MIN_TABLE_SIZE,
MIN_TABLE_SIZE, SOLANA_PORT_RANGE,
};
use entry::Entry;
use hash::{hash, Hash};
@ -1444,11 +1442,11 @@ mod tests {
}
#[test]
fn test_new_vote() {
let d = NodeInfo::new_leader(&socketaddr!("127.0.0.1:1234"));
let d = NodeInfo::new_with_socketaddr(&socketaddr!("127.0.0.1:1234"));
assert_eq!(d.version, 0);
let mut crdt = Crdt::new(d.clone()).unwrap();
assert_eq!(crdt.table[&d.id].version, 0);
let leader = NodeInfo::new_leader(&socketaddr!("127.0.0.2:1235"));
let leader = NodeInfo::new_with_socketaddr(&socketaddr!("127.0.0.2:1235"));
assert_ne!(d.id, leader.id);
assert_matches!(
crdt.new_vote(Hash::default()).err(),
@ -1471,7 +1469,7 @@ mod tests {
#[test]
fn test_insert_vote() {
let d = NodeInfo::new_leader(&socketaddr!("127.0.0.1:1234"));
let d = NodeInfo::new_with_socketaddr(&socketaddr!("127.0.0.1:1234"));
assert_eq!(d.version, 0);
let mut crdt = Crdt::new(d.clone()).unwrap();
assert_eq!(crdt.table[&d.id].version, 0);
@ -1504,9 +1502,9 @@ mod tests {
copy
}
#[test]
fn replicated_data_new_leader_with_pubkey() {
fn replicated_data_new_with_socketaddr_with_pubkey() {
let keypair = Keypair::new();
let d1 = NodeInfo::new_leader_with_pubkey(
let d1 = NodeInfo::new_with_pubkey_socketaddr(
keypair.pubkey().clone(),
&socketaddr!("127.0.0.1:1234"),
);
@ -1567,7 +1565,7 @@ mod tests {
sorted(&crdt2.table.values().map(|x| x.clone()).collect()),
sorted(&crdt.table.values().map(|x| x.clone()).collect())
);
let d4 = NodeInfo::new_entry_point(socketaddr!("127.0.0.4:1234"));
let d4 = NodeInfo::new_entry_point(&socketaddr!("127.0.0.4:1234"));
crdt.insert(&d4);
let (_key, _ix, ups) = crdt.get_updates_since(0);
assert_eq!(sorted(&ups), sorted(&vec![d2.clone(), d1, d3]));
@ -1671,7 +1669,7 @@ mod tests {
let rv = crdt.gossip_request().unwrap();
assert_eq!(rv.0, nxt1.contact_info.ncp);
let nxt2 = NodeInfo::new_entry_point(socketaddr!("127.0.0.3:1234"));
let nxt2 = NodeInfo::new_entry_point(&socketaddr!("127.0.0.3:1234"));
crdt.insert(&nxt2);
// check that the service works
// and that it eventually produces a request for both nodes
@ -1712,9 +1710,9 @@ mod tests {
#[test]
fn purge_test() {
logger::setup();
let me = NodeInfo::new_leader(&socketaddr!("127.0.0.1:1234"));
let me = NodeInfo::new_with_socketaddr(&socketaddr!("127.0.0.1:1234"));
let mut crdt = Crdt::new(me.clone()).expect("Crdt::new");
let nxt = NodeInfo::new_leader(&socketaddr!("127.0.0.2:1234"));
let nxt = NodeInfo::new_with_socketaddr(&socketaddr!("127.0.0.2:1234"));
assert_ne!(me.id, nxt.id);
crdt.set_leader(me.id);
crdt.insert(&nxt);
@ -1733,7 +1731,7 @@ mod tests {
let rv = crdt.gossip_request().unwrap();
assert_eq!(rv.0, nxt.contact_info.ncp);
let mut nxt2 = NodeInfo::new_leader(&socketaddr!("127.0.0.2:1234"));
let mut nxt2 = NodeInfo::new_with_socketaddr(&socketaddr!("127.0.0.2:1234"));
assert_ne!(me.id, nxt2.id);
assert_ne!(nxt.id, nxt2.id);
crdt.insert(&nxt2);
@ -1755,14 +1753,14 @@ mod tests {
#[test]
fn purge_leader_test() {
logger::setup();
let me = NodeInfo::new_leader(&socketaddr!("127.0.0.1:1234"));
let me = NodeInfo::new_with_socketaddr(&socketaddr!("127.0.0.1:1234"));
let mut crdt = Crdt::new(me.clone()).expect("Crdt::new");
let nxt = NodeInfo::new_leader(&socketaddr!("127.0.0.2:1234"));
let nxt = NodeInfo::new_with_socketaddr(&socketaddr!("127.0.0.2:1234"));
assert_ne!(me.id, nxt.id);
crdt.insert(&nxt);
crdt.set_leader(nxt.id);
let now = crdt.alive[&nxt.id];
let mut nxt2 = NodeInfo::new_leader(&socketaddr!("127.0.0.2:1234"));
let mut nxt2 = NodeInfo::new_with_socketaddr(&socketaddr!("127.0.0.2:1234"));
crdt.insert(&nxt2);
while now == crdt.alive[&nxt2.id] {
sleep(Duration::from_millis(GOSSIP_SLEEP_MILLIS));
@ -1865,10 +1863,10 @@ mod tests {
fn run_window_request_with_backoff() {
let window = default_window();
let mut me = NodeInfo::new_leader(&socketaddr!("127.0.0.1:1234"));
let mut me = NodeInfo::new_with_socketaddr(&socketaddr!("127.0.0.1:1234"));
me.leader_id = me.id;
let mock_peer = NodeInfo::new_leader(&socketaddr!("127.0.0.1:1234"));
let mock_peer = NodeInfo::new_with_socketaddr(&socketaddr!("127.0.0.1:1234"));
let recycler = BlobRecycler::default();
@ -1916,16 +1914,16 @@ mod tests {
#[test]
fn test_update_leader() {
logger::setup();
let me = NodeInfo::new_leader(&socketaddr!("127.0.0.1:1234"));
let leader0 = NodeInfo::new_leader(&socketaddr!("127.0.0.1:1234"));
let leader1 = NodeInfo::new_leader(&socketaddr!("127.0.0.1:1234"));
let me = NodeInfo::new_with_socketaddr(&socketaddr!("127.0.0.1:1234"));
let leader0 = NodeInfo::new_with_socketaddr(&socketaddr!("127.0.0.1:1234"));
let leader1 = NodeInfo::new_with_socketaddr(&socketaddr!("127.0.0.1:1234"));
let mut crdt = Crdt::new(me.clone()).expect("Crdt::new");
assert_eq!(crdt.top_leader(), None);
crdt.set_leader(leader0.id);
assert_eq!(crdt.top_leader().unwrap(), leader0.id);
//add a bunch of nodes with a new leader
for _ in 0..10 {
let mut dum = NodeInfo::new_entry_point(socketaddr!("127.0.0.1:1234"));
let mut dum = NodeInfo::new_entry_point(&socketaddr!("127.0.0.1:1234"));
dum.id = Keypair::new().pubkey();
dum.leader_id = leader1.id;
crdt.insert(&dum);
@ -1941,12 +1939,12 @@ mod tests {
#[test]
fn test_valid_last_ids() {
logger::setup();
let mut leader0 = NodeInfo::new_leader(&socketaddr!("127.0.0.2:1234"));
let mut leader0 = NodeInfo::new_with_socketaddr(&socketaddr!("127.0.0.2:1234"));
leader0.ledger_state.last_id = hash(b"0");
let mut leader1 = NodeInfo::new_multicast();
leader1.ledger_state.last_id = hash(b"1");
let mut leader2 =
NodeInfo::new_leader_with_pubkey(Pubkey::default(), &socketaddr!("127.0.0.2:1234"));
NodeInfo::new_with_pubkey_socketaddr(Pubkey::default(), &socketaddr!("127.0.0.2:1234"));
leader2.ledger_state.last_id = hash(b"2");
// test that only valid tvu or tpu are retured as nodes
let mut leader3 = NodeInfo::new(
@ -1973,10 +1971,10 @@ mod tests {
let window = default_window();
let recycler = BlobRecycler::default();
let node = NodeInfo::new_leader(&socketaddr!("127.0.0.1:1234"));
let node_with_same_addr = NodeInfo::new_leader(&socketaddr!("127.0.0.1:1234"));
let node = NodeInfo::new_with_socketaddr(&socketaddr!("127.0.0.1:1234"));
let node_with_same_addr = NodeInfo::new_with_socketaddr(&socketaddr!("127.0.0.1:1234"));
assert_ne!(node.id, node_with_same_addr.id);
let node_with_diff_addr = NodeInfo::new_leader(&socketaddr!("127.0.0.1:4321"));
let node_with_diff_addr = NodeInfo::new_with_socketaddr(&socketaddr!("127.0.0.1:4321"));
let crdt = Crdt::new(node.clone()).expect("Crdt::new");
assert_eq!(crdt.alive.len(), 0);
@ -1987,7 +1985,7 @@ mod tests {
assert!(
Crdt::handle_protocol(
&obj,
node.contact_info.ncp,
&node.contact_info.ncp,
request,
&window,
&mut None,
@ -1999,7 +1997,7 @@ mod tests {
assert!(
Crdt::handle_protocol(
&obj,
node.contact_info.ncp,
&node.contact_info.ncp,
request,
&window,
&mut None,
@ -2010,7 +2008,7 @@ mod tests {
let request = Protocol::RequestUpdates(1, node_with_diff_addr.clone());
Crdt::handle_protocol(
&obj,
node.contact_info.ncp,
&node.contact_info.ncp,
request,
&window,
&mut None,
@ -2031,13 +2029,13 @@ mod tests {
fn test_is_valid_address() {
assert!(cfg!(test));
let bad_address_port = socketaddr!("127.0.0.1:0");
assert!(!Crdt::is_valid_address(bad_address_port));
assert!(!Crdt::is_valid_address(&bad_address_port));
let bad_address_unspecified = socketaddr!(0, 1234);
assert!(!Crdt::is_valid_address(bad_address_unspecified));
assert!(!Crdt::is_valid_address(&bad_address_unspecified));
let bad_address_multicast = socketaddr!([224, 254, 0, 0], 1234);
assert!(!Crdt::is_valid_address(bad_address_multicast));
assert!(!Crdt::is_valid_address(&bad_address_multicast));
let loopback = socketaddr!("127.0.0.1:1234");
assert!(Crdt::is_valid_address(loopback));
assert!(Crdt::is_valid_address(&loopback));
// assert!(!Crdt::is_valid_ip_internal(loopback.ip(), false));
}
@ -2052,37 +2050,37 @@ mod tests {
socketaddr!("127.0.0.1:1237"),
);
let mut crdt = Crdt::new(node_info).unwrap();
let network_entry_point = NodeInfo::new_entry_point(socketaddr!("127.0.0.1:1239"));
let network_entry_point = NodeInfo::new_entry_point(&socketaddr!("127.0.0.1:1239"));
crdt.insert(&network_entry_point);
assert!(crdt.leader_data().is_none());
}
#[test]
fn new_with_external_ip_test_random() {
let ip = IpAddr::V4(Ipv4Addr::from(0));
let node = Node::new_with_external_ip(Keypair::new().pubkey(), ip, (8100, 8200), 0);
let ip = Ipv4Addr::from(0);
let node = Node::new_with_external_ip(Keypair::new().pubkey(), &socketaddr!(ip, 0));
assert_eq!(node.sockets.gossip.local_addr().unwrap().ip(), ip);
assert_eq!(node.sockets.replicate.local_addr().unwrap().ip(), ip);
assert_eq!(node.sockets.requests.local_addr().unwrap().ip(), ip);
assert_eq!(node.sockets.transaction.local_addr().unwrap().ip(), ip);
assert_eq!(node.sockets.repair.local_addr().unwrap().ip(), ip);
assert!(node.sockets.gossip.local_addr().unwrap().port() >= 8100);
assert!(node.sockets.gossip.local_addr().unwrap().port() < 8200);
assert!(node.sockets.replicate.local_addr().unwrap().port() >= 8100);
assert!(node.sockets.replicate.local_addr().unwrap().port() < 8200);
assert!(node.sockets.requests.local_addr().unwrap().port() >= 8100);
assert!(node.sockets.requests.local_addr().unwrap().port() < 8200);
assert!(node.sockets.transaction.local_addr().unwrap().port() >= 8100);
assert!(node.sockets.transaction.local_addr().unwrap().port() < 8200);
assert!(node.sockets.repair.local_addr().unwrap().port() >= 8100);
assert!(node.sockets.repair.local_addr().unwrap().port() < 8200);
assert!(node.sockets.gossip.local_addr().unwrap().port() >= SOLANA_PORT_RANGE.0);
assert!(node.sockets.gossip.local_addr().unwrap().port() < SOLANA_PORT_RANGE.1);
assert!(node.sockets.replicate.local_addr().unwrap().port() >= SOLANA_PORT_RANGE.0);
assert!(node.sockets.replicate.local_addr().unwrap().port() < SOLANA_PORT_RANGE.1);
assert!(node.sockets.requests.local_addr().unwrap().port() >= SOLANA_PORT_RANGE.0);
assert!(node.sockets.requests.local_addr().unwrap().port() < SOLANA_PORT_RANGE.1);
assert!(node.sockets.transaction.local_addr().unwrap().port() >= SOLANA_PORT_RANGE.0);
assert!(node.sockets.transaction.local_addr().unwrap().port() < SOLANA_PORT_RANGE.1);
assert!(node.sockets.repair.local_addr().unwrap().port() >= SOLANA_PORT_RANGE.0);
assert!(node.sockets.repair.local_addr().unwrap().port() < SOLANA_PORT_RANGE.1);
}
#[test]
fn new_with_external_ip_test_gossip() {
let ip = IpAddr::V4(Ipv4Addr::from(0));
let node = Node::new_with_external_ip(Keypair::new().pubkey(), ip, (8100, 8200), 8050);
let node = Node::new_with_external_ip(Keypair::new().pubkey(), &socketaddr!(0, 8050));
assert_eq!(node.sockets.gossip.local_addr().unwrap().ip(), ip);
assert_eq!(node.sockets.replicate.local_addr().unwrap().ip(), ip);
assert_eq!(node.sockets.requests.local_addr().unwrap().ip(), ip);
@ -2090,13 +2088,13 @@ mod tests {
assert_eq!(node.sockets.repair.local_addr().unwrap().ip(), ip);
assert_eq!(node.sockets.gossip.local_addr().unwrap().port(), 8050);
assert!(node.sockets.replicate.local_addr().unwrap().port() >= 8100);
assert!(node.sockets.replicate.local_addr().unwrap().port() < 8200);
assert!(node.sockets.requests.local_addr().unwrap().port() >= 8100);
assert!(node.sockets.requests.local_addr().unwrap().port() < 8200);
assert!(node.sockets.transaction.local_addr().unwrap().port() >= 8100);
assert!(node.sockets.transaction.local_addr().unwrap().port() < 8200);
assert!(node.sockets.repair.local_addr().unwrap().port() >= 8100);
assert!(node.sockets.repair.local_addr().unwrap().port() < 8200);
assert!(node.sockets.replicate.local_addr().unwrap().port() >= SOLANA_PORT_RANGE.0);
assert!(node.sockets.replicate.local_addr().unwrap().port() < SOLANA_PORT_RANGE.1);
assert!(node.sockets.requests.local_addr().unwrap().port() >= SOLANA_PORT_RANGE.0);
assert!(node.sockets.requests.local_addr().unwrap().port() < SOLANA_PORT_RANGE.1);
assert!(node.sockets.transaction.local_addr().unwrap().port() >= SOLANA_PORT_RANGE.0);
assert!(node.sockets.transaction.local_addr().unwrap().port() < SOLANA_PORT_RANGE.1);
assert!(node.sockets.repair.local_addr().unwrap().port() >= SOLANA_PORT_RANGE.0);
assert!(node.sockets.repair.local_addr().unwrap().port() < SOLANA_PORT_RANGE.1);
}
}

View File

@ -283,7 +283,7 @@ mod tests {
let bob_pubkey = Keypair::new().pubkey();
let carlos_pubkey = Keypair::new().pubkey();
let exit = Arc::new(AtomicBool::new(false));
let leader_data = leader.data.clone();
let leader_data = leader.info.clone();
let ledger_path = tmp_ledger_path("send_airdrop");
let server = Fullnode::new_with_bank(

View File

@ -39,7 +39,7 @@ impl Config {
let keypair =
Keypair::from_pkcs8(Input::from(&pkcs8)).expect("from_pkcs8 in fullnode::Config new");
let pubkey = keypair.pubkey();
let node_info = NodeInfo::new_leader_with_pubkey(pubkey, bind_addr);
let node_info = NodeInfo::new_with_pubkey_socketaddr(pubkey, bind_addr);
Config { node_info, pkcs8 }
}
pub fn keypair(&self) -> Keypair {
@ -74,12 +74,12 @@ impl Fullnode {
let local_gossip_addr = node.sockets.gossip.local_addr().unwrap();
info!(
"starting... local gossip address: {} (advertising {})",
local_gossip_addr, node.data.contact_info.ncp
local_gossip_addr, node.info.contact_info.ncp
);
let exit = Arc::new(AtomicBool::new(false));
let local_requests_addr = node.sockets.requests.local_addr().unwrap();
let requests_addr = node.data.contact_info.rpu;
let leader_info = leader_addr.map(NodeInfo::new_entry_point);
let requests_addr = node.info.contact_info.rpu;
let leader_info = leader_addr.map(|i| NodeInfo::new_entry_point(&i));
let server = Self::new_with_bank(
keypair,
bank,
@ -172,7 +172,7 @@ impl Fullnode {
sigverify_disabled: bool,
) -> Self {
if leader_info.is_none() {
node.data.leader_id = node.data.id;
node.info.leader_id = node.info.id;
}
let bank = Arc::new(bank);
@ -186,12 +186,13 @@ impl Fullnode {
);
thread_hdls.extend(rpu.thread_hdls());
let mut drone_addr = node.data.contact_info.tpu;
// TODO: this code assumes this node is the leader
let mut drone_addr = node.info.contact_info.tpu;
drone_addr.set_port(DRONE_PORT);
let rpc_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), RPC_PORT);
let rpc_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::from(0)), RPC_PORT);
let rpc_service = JsonRpcService::new(
&bank,
node.data.contact_info.tpu,
node.info.contact_info.tpu,
drone_addr,
rpc_addr,
exit.clone(),
@ -200,9 +201,9 @@ impl Fullnode {
let blob_recycler = BlobRecycler::default();
let window =
window::new_window_from_entries(ledger_tail, entry_height, &node.data, &blob_recycler);
window::new_window_from_entries(ledger_tail, entry_height, &node.info, &blob_recycler);
let crdt = Arc::new(RwLock::new(Crdt::new(node.data).expect("Crdt::new")));
let crdt = Arc::new(RwLock::new(Crdt::new(node.info).expect("Crdt::new")));
let ncp = Ncp::new(
&crdt,
@ -216,6 +217,7 @@ impl Fullnode {
match leader_info {
Some(leader_info) => {
// Start in validator mode.
// TODO: let Crdt get that data from the network?
crdt.write().unwrap().insert(leader_info);
let tvu = Tvu::new(
keypair,
@ -307,7 +309,7 @@ mod tests {
let alice = Mint::new(10_000);
let bank = Bank::new(&alice);
let exit = Arc::new(AtomicBool::new(false));
let entry = tn.data.clone();
let entry = tn.info.clone();
let v = Fullnode::new_with_bank(keypair, bank, 0, &[], tn, Some(&entry), exit, None, false);
v.exit();
v.join().unwrap();
@ -321,7 +323,7 @@ mod tests {
let alice = Mint::new(10_000);
let bank = Bank::new(&alice);
let exit = Arc::new(AtomicBool::new(false));
let entry = tn.data.clone();
let entry = tn.info.clone();
Fullnode::new_with_bank(keypair, bank, 0, &[], tn, Some(&entry), exit, None, false)
})
.collect();

View File

@ -92,7 +92,7 @@ mod tests {
fn test_exit() {
let exit = Arc::new(AtomicBool::new(false));
let tn = Node::new_localhost();
let crdt = Crdt::new(tn.data.clone()).expect("Crdt::new");
let crdt = Crdt::new(tn.info.clone()).expect("Crdt::new");
let c = Arc::new(RwLock::new(crdt));
let w = Arc::new(RwLock::new(vec![]));
let d = Ncp::new(&c, w, None, tn.sockets.gossip, exit.clone()).unwrap();

View File

@ -64,8 +64,8 @@ impl ThinClient {
pub fn recv_response(&self) -> io::Result<Response> {
let mut buf = vec![0u8; 1024];
trace!("start recv_from");
self.requests_socket.recv_from(&mut buf)?;
trace!("end recv_from");
let (len, from) = self.requests_socket.recv_from(&mut buf)?;
trace!("end recv_from got {} {}", len, from);
deserialize(&buf).or_else(|_| Err(io::Error::new(io::ErrorKind::Other, "deserialize")))
}
@ -161,7 +161,7 @@ impl ThinClient {
/// until the server sends a response. If the response packet is dropped
/// by the network, this method will hang indefinitely.
pub fn get_balance(&mut self, pubkey: &Pubkey) -> io::Result<i64> {
trace!("get_balance");
trace!("get_balance sending request to {}", self.requests_addr);
let req = Request::GetAccount { key: *pubkey };
let data = serialize(&req).expect("serialize GetAccount in pub fn get_balance");
self.requests_socket
@ -367,7 +367,7 @@ pub fn poll_gossip_for_leader(leader_ncp: SocketAddr, timeout: Option<u64>) -> R
let crdt = Arc::new(RwLock::new(Crdt::new(node).expect("Crdt::new")));
let window = Arc::new(RwLock::new(vec![]));
let ncp = Ncp::new(&crdt.clone(), window, None, gossip_socket, exit.clone()).unwrap();
let leader_entry_point = NodeInfo::new_entry_point(leader_ncp);
let leader_entry_point = NodeInfo::new_entry_point(&leader_ncp);
crdt.write().unwrap().insert(&leader_entry_point);
sleep(Duration::from_millis(100));
@ -420,7 +420,7 @@ mod tests {
logger::setup();
let leader_keypair = Keypair::new();
let leader = Node::new_localhost_with_pubkey(leader_keypair.pubkey());
let leader_data = leader.data.clone();
let leader_data = leader.info.clone();
let alice = Mint::new(10_000);
let bank = Bank::new(&alice);
@ -473,7 +473,7 @@ mod tests {
let bank = Bank::new(&alice);
let bob_pubkey = Keypair::new().pubkey();
let exit = Arc::new(AtomicBool::new(false));
let leader_data = leader.data.clone();
let leader_data = leader.info.clone();
let ledger_path = tmp_ledger("bad_sig", &alice);
let server = Fullnode::new_with_bank(
@ -533,7 +533,7 @@ mod tests {
let bank = Bank::new(&alice);
let bob_pubkey = Keypair::new().pubkey();
let exit = Arc::new(AtomicBool::new(false));
let leader_data = leader.data.clone();
let leader_data = leader.info.clone();
let ledger_path = tmp_ledger("client_check_signature", &alice);
let server = Fullnode::new_with_bank(

View File

@ -187,17 +187,17 @@ pub mod tests {
let exit = Arc::new(AtomicBool::new(false));
//start crdt_leader
let mut crdt_l = Crdt::new(leader.data.clone()).expect("Crdt::new");
crdt_l.set_leader(leader.data.id);
let mut crdt_l = Crdt::new(leader.info.clone()).expect("Crdt::new");
crdt_l.set_leader(leader.info.id);
let cref_l = Arc::new(RwLock::new(crdt_l));
let dr_l = new_ncp(cref_l, leader.sockets.gossip, exit.clone()).unwrap();
//start crdt2
let mut crdt2 = Crdt::new(target2.data.clone()).expect("Crdt::new");
crdt2.insert(&leader.data);
crdt2.set_leader(leader.data.id);
let leader_id = leader.data.id;
let mut crdt2 = Crdt::new(target2.info.clone()).expect("Crdt::new");
crdt2.insert(&leader.info);
crdt2.set_leader(leader.info.id);
let leader_id = leader.info.id;
let cref2 = Arc::new(RwLock::new(crdt2));
let dr_2 = new_ncp(cref2, target2.sockets.gossip, exit.clone()).unwrap();
@ -225,13 +225,13 @@ pub mod tests {
let starting_balance = 10_000;
let mint = Mint::new(starting_balance);
let replicate_addr = target1.data.contact_info.tvu;
let replicate_addr = target1.info.contact_info.tvu;
let bank = Arc::new(Bank::new(&mint));
//start crdt1
let mut crdt1 = Crdt::new(target1.data.clone()).expect("Crdt::new");
crdt1.insert(&leader.data);
crdt1.set_leader(leader.data.id);
let mut crdt1 = Crdt::new(target1.info.clone()).expect("Crdt::new");
crdt1.insert(&leader.info);
crdt1.set_leader(leader.info.id);
let cref1 = Arc::new(RwLock::new(crdt1));
let dr_1 = new_ncp(cref1.clone(), target1.sockets.gossip, exit.clone()).unwrap();

View File

@ -254,8 +254,8 @@ pub mod tests {
let bank = Arc::new(Bank::new(&mint));
let node = Node::new_localhost();
let mut crdt = Crdt::new(node.data.clone()).expect("Crdt::new");
crdt.set_leader(node.data.id);
let mut crdt = Crdt::new(node.info.clone()).expect("Crdt::new");
crdt.set_leader(node.info.id);
let blob_recycler = BlobRecycler::default();
let (sender, receiver) = channel();
let exit = Arc::new(AtomicBool::new(false));
@ -293,7 +293,7 @@ pub mod tests {
bank.register_entry_id(&entry.id);
// Create a leader
let leader_data = NodeInfo::new_leader(&"127.0.0.1:1234".parse().unwrap());
let leader_data = NodeInfo::new_with_socketaddr(&"127.0.0.1:1234".parse().unwrap());
let leader_pubkey = leader_data.id.clone();
let mut leader_crdt = Crdt::new(leader_data).unwrap();
@ -308,7 +308,7 @@ pub mod tests {
// and votes for new last_id
for i in 0..10 {
let mut validator =
NodeInfo::new_leader(&format!("127.0.0.1:234{}", i).parse().unwrap());
NodeInfo::new_with_socketaddr(&format!("127.0.0.1:234{}", i).parse().unwrap());
let vote = Vote {
version: validator.version + 1,
@ -350,7 +350,7 @@ pub mod tests {
// add two more nodes and see that it succeeds
for i in 0..2 {
let mut validator =
NodeInfo::new_leader(&format!("127.0.0.1:234{}", i).parse().unwrap());
NodeInfo::new_with_socketaddr(&format!("127.0.0.1:234{}", i).parse().unwrap());
let vote = Vote {
version: validator.version + 1,

View File

@ -787,7 +787,7 @@ mod test {
logger::setup();
let tn = Node::new_localhost();
let exit = Arc::new(AtomicBool::new(false));
let mut crdt_me = Crdt::new(tn.data.clone()).expect("Crdt::new");
let mut crdt_me = Crdt::new(tn.info.clone()).expect("Crdt::new");
let me_id = crdt_me.my_data().id;
crdt_me.set_leader(me_id);
let subs = Arc::new(RwLock::new(crdt_me));
@ -831,7 +831,7 @@ mod test {
w.set_id(me_id).unwrap();
assert_eq!(i, w.get_index().unwrap());
w.meta.size = PACKET_DATA_SIZE;
w.meta.set_addr(&tn.data.contact_info.ncp);
w.meta.set_addr(&tn.info.contact_info.ncp);
}
msgs.push_back(b);
}
@ -858,7 +858,7 @@ mod test {
logger::setup();
let tn = Node::new_localhost();
let exit = Arc::new(AtomicBool::new(false));
let crdt_me = Crdt::new(tn.data.clone()).expect("Crdt::new");
let crdt_me = Crdt::new(tn.info.clone()).expect("Crdt::new");
let me_id = crdt_me.my_data().id;
let subs = Arc::new(RwLock::new(crdt_me));
@ -901,7 +901,7 @@ mod test {
w.set_id(me_id).unwrap();
assert_eq!(i, w.get_index().unwrap());
w.meta.size = PACKET_DATA_SIZE;
w.meta.set_addr(&tn.data.contact_info.ncp);
w.meta.set_addr(&tn.info.contact_info.ncp);
}
msgs.push_back(b);
}
@ -921,7 +921,7 @@ mod test {
logger::setup();
let tn = Node::new_localhost();
let exit = Arc::new(AtomicBool::new(false));
let crdt_me = Crdt::new(tn.data.clone()).expect("Crdt::new");
let crdt_me = Crdt::new(tn.info.clone()).expect("Crdt::new");
let me_id = crdt_me.my_data().id;
let subs = Arc::new(RwLock::new(crdt_me));
@ -964,7 +964,7 @@ mod test {
w.set_id(me_id).unwrap();
assert_eq!(i, w.get_index().unwrap());
w.meta.size = PACKET_DATA_SIZE;
w.meta.set_addr(&tn.data.contact_info.ncp);
w.meta.set_addr(&tn.info.contact_info.ncp);
}
msgs.push_back(b);
}
@ -984,7 +984,7 @@ mod test {
w.set_id(me_id).unwrap();
assert_eq!(i, w.get_index().unwrap());
w.meta.size = PACKET_DATA_SIZE;
w.meta.set_addr(&tn.data.contact_info.ncp);
w.meta.set_addr(&tn.info.contact_info.ncp);
}
msgs1.push_back(b);
}

View File

@ -18,7 +18,7 @@ use std::time::Duration;
fn test_node(exit: Arc<AtomicBool>) -> (Arc<RwLock<Crdt>>, Ncp, UdpSocket) {
let tn = Node::new_localhost();
let crdt = Crdt::new(tn.data.clone()).expect("Crdt::new");
let crdt = Crdt::new(tn.info.clone()).expect("Crdt::new");
let c = Arc::new(RwLock::new(crdt));
let w = Arc::new(RwLock::new(vec![]));
let d = Ncp::new(&c.clone(), w, None, tn.sockets.gossip, exit).unwrap();

View File

@ -34,10 +34,10 @@ fn converge(leader: &NodeInfo, num_nodes: usize) -> Vec<NodeInfo> {
let exit = Arc::new(AtomicBool::new(false));
let mut spy = Node::new_localhost();
let daddr = "0.0.0.0:0".parse().unwrap();
let me = spy.data.id.clone();
spy.data.contact_info.tvu = daddr;
spy.data.contact_info.rpu = daddr;
let mut spy_crdt = Crdt::new(spy.data).expect("Crdt::new");
let me = spy.info.id.clone();
spy.info.contact_info.tvu = daddr;
spy.info.contact_info.rpu = daddr;
let mut spy_crdt = Crdt::new(spy.info).expect("Crdt::new");
spy_crdt.insert(&leader);
spy_crdt.set_leader(leader.id);
let spy_ref = Arc::new(RwLock::new(spy_crdt));
@ -55,7 +55,7 @@ fn converge(leader: &NodeInfo, num_nodes: usize) -> Vec<NodeInfo> {
.values()
.into_iter()
.filter(|x| x.id != me)
.filter(|x| Crdt::is_valid_address(x.contact_info.rpu))
.filter(|x| Crdt::is_valid_address(&x.contact_info.rpu))
.cloned()
.collect();
if num >= num_nodes as u64 && v.len() >= num_nodes {
@ -118,7 +118,7 @@ fn test_multi_node_ledger_window() -> result::Result<()> {
let leader_keypair = Keypair::new();
let leader_pubkey = leader_keypair.pubkey().clone();
let leader = Node::new_localhost_with_pubkey(leader_keypair.pubkey());
let leader_data = leader.data.clone();
let leader_data = leader.info.clone();
let bob_pubkey = Keypair::new().pubkey();
let mut ledger_paths = Vec::new();
@ -149,7 +149,7 @@ fn test_multi_node_ledger_window() -> result::Result<()> {
// balances
let keypair = Keypair::new();
let validator = Node::new_localhost_with_pubkey(keypair.pubkey());
let validator_data = validator.data.clone();
let validator_data = validator.info.clone();
let validator = Fullnode::new(
validator,
&zero_ledger_path,
@ -198,7 +198,7 @@ fn test_multi_node_validator_catchup_from_zero() -> result::Result<()> {
let leader_keypair = Keypair::new();
let leader_pubkey = leader_keypair.pubkey().clone();
let leader = Node::new_localhost_with_pubkey(leader_keypair.pubkey());
let leader_data = leader.data.clone();
let leader_data = leader.info.clone();
let bob_pubkey = Keypair::new().pubkey();
let mut ledger_paths = Vec::new();
@ -322,7 +322,7 @@ fn test_multi_node_basic() {
let leader_keypair = Keypair::new();
let leader_pubkey = leader_keypair.pubkey().clone();
let leader = Node::new_localhost_with_pubkey(leader_keypair.pubkey());
let leader_data = leader.data.clone();
let leader_data = leader.info.clone();
let bob_pubkey = Keypair::new().pubkey();
let mut ledger_paths = Vec::new();
@ -388,7 +388,7 @@ fn test_boot_validator_from_file() -> result::Result<()> {
let mut ledger_paths = Vec::new();
ledger_paths.push(leader_ledger_path.clone());
let leader_data = leader.data.clone();
let leader_data = leader.info.clone();
let leader_fullnode = Fullnode::new(leader, &leader_ledger_path, leader_keypair, None, false);
let leader_balance =
send_tx_and_retry_get_balance(&leader_data, &alice, &bob_pubkey, Some(500)).unwrap();
@ -399,7 +399,7 @@ fn test_boot_validator_from_file() -> result::Result<()> {
let keypair = Keypair::new();
let validator = Node::new_localhost_with_pubkey(keypair.pubkey());
let validator_data = validator.data.clone();
let validator_data = validator.info.clone();
let ledger_path = tmp_copy_ledger(&leader_ledger_path, "boot_validator_from_file");
ledger_paths.push(ledger_path.clone());
let val_fullnode = Fullnode::new(
@ -425,7 +425,7 @@ fn test_boot_validator_from_file() -> result::Result<()> {
fn create_leader(ledger_path: &str) -> (NodeInfo, Fullnode) {
let leader_keypair = Keypair::new();
let leader = Node::new_localhost_with_pubkey(leader_keypair.pubkey());
let leader_data = leader.data.clone();
let leader_data = leader.info.clone();
let leader_fullnode = Fullnode::new(leader, &ledger_path, leader_keypair, None, false);
(leader_data, leader_fullnode)
}
@ -472,7 +472,7 @@ fn test_leader_restart_validator_start_from_old_ledger() -> result::Result<()> {
// start validator from old ledger
let keypair = Keypair::new();
let validator = Node::new_localhost_with_pubkey(keypair.pubkey());
let validator_data = validator.data.clone();
let validator_data = validator.info.clone();
let val_fullnode = Fullnode::new(
validator,
@ -535,7 +535,7 @@ fn test_multi_node_dynamic_network() {
ledger_paths.push(leader_ledger_path.clone());
let alice_arc = Arc::new(RwLock::new(alice));
let leader_data = leader.data.clone();
let leader_data = leader.info.clone();
let server = Fullnode::new(leader, &leader_ledger_path, leader_keypair, None, true);
@ -603,7 +603,7 @@ fn test_multi_node_dynamic_network() {
.name("validator-launch-thread".to_string())
.spawn(move || {
let validator = Node::new_localhost_with_pubkey(keypair.pubkey());
let rd = validator.data.clone();
let rd = validator.info.clone();
info!("starting {} {:x}", keypair.pubkey(), rd.debug_id());
let val = Fullnode::new(
validator,
@ -712,8 +712,8 @@ fn mk_client(leader: &NodeInfo) -> ThinClient {
.set_read_timeout(Some(Duration::new(1, 0)))
.unwrap();
let transactions_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
assert!(Crdt::is_valid_address(leader.contact_info.rpu));
assert!(Crdt::is_valid_address(leader.contact_info.tpu));
assert!(Crdt::is_valid_address(&leader.contact_info.rpu));
assert!(Crdt::is_valid_address(&leader.contact_info.tpu));
ThinClient::new(
leader.contact_info.rpu,
requests_socket,