* move gossip/NCP off assuming anything about its address
  * use a single socket to send and receive gossip
  * remove --addr/-a from CLIs
  * rearrange networking utility code
  * use Arc<UdpSocket> to share the Sync-safe UdpSocket among threads
  * rename TestNode to Node

TODO:

  * re-enable 127.0.0.1 as a valid address in crdt
  * change repair request/response to a similar, single socket
  * pick cloned sockets or Arc<UdpSocket> for all these (rpu uses tryclone())
  * update contact_info with network truthiness instead of what the node
      says?
This commit is contained in:
Rob Walker 2018-08-28 16:32:40 -07:00
parent cb52a335bd
commit 1af4cee63b
28 changed files with 314 additions and 363 deletions

View File

@ -71,7 +71,7 @@ echo "--- Wallet sanity"
echo "--- Node count"
(
set -x
./multinode-demo/client.sh "$PWD" 3 -c --addr 127.0.0.1
./multinode-demo/client.sh "$PWD" 3 -c
) || flag_error
killBackgroundCommands

View File

@ -47,4 +47,4 @@ fi
# shellcheck disable=SC2086 # $solana_wallet should not be quoted
exec $solana_wallet \
-a 127.0.0.1 -l "$SOLANA_CONFIG_CLIENT_DIR"/leader.json -k "$client_id_path" --timeout 10 "$@"
-l "$SOLANA_CONFIG_CLIENT_DIR"/leader.json -k "$client_id_path" --timeout 10 "$@"

View File

@ -63,7 +63,12 @@ fn main() -> Result<()> {
let pack_recycler = PacketRecycler::default();
let (s_reader, r_reader) = channel();
let t_reader = receiver(read, exit.clone(), pack_recycler.clone(), s_reader);
let t_reader = receiver(
Arc::new(read),
exit.clone(),
pack_recycler.clone(),
s_reader,
);
let t_producer1 = producer(&addr, &pack_recycler, exit.clone());
let t_producer2 = producer(&addr, &pack_recycler, exit.clone());
let t_producer3 = producer(&addr, &pack_recycler, exit.clone());

View File

@ -16,7 +16,6 @@ use solana::fullnode::Config;
use solana::hash::Hash;
use solana::logger;
use solana::metrics;
use solana::nat::get_public_ip_addr;
use solana::ncp::Ncp;
use solana::service::Service;
use solana::signature::{read_keypair, GenKeys, Keypair, KeypairUtil};
@ -440,14 +439,6 @@ fn main() {
.short("c")
.help("exit immediately after converging"),
)
.arg(
Arg::with_name("addr")
.short("a")
.long("addr")
.value_name("IPADDR")
.takes_value(true)
.help("address to advertise to the network"),
)
.arg(
Arg::with_name("sustained")
.long("sustained")
@ -484,18 +475,6 @@ fn main() {
time_sec = s.to_string().parse().expect("integer");
}
let addr = if let Some(s) = matches.value_of("addr") {
s.to_string().parse().unwrap_or_else(|e| {
eprintln!("failed to parse {} as IP address error: {:?}", s, e);
exit(1);
})
} else {
get_public_ip_addr().unwrap_or_else(|e| {
eprintln!("failed to get public IP, try --addr? error: {:?}", e);
exit(1);
})
};
if let Some(s) = matches.value_of("tx_count") {
tx_count = s.to_string().parse().expect("integer");
}
@ -506,7 +485,7 @@ fn main() {
let exit_signal = Arc::new(AtomicBool::new(false));
let mut c_threads = vec![];
let (validators, leader) = converge(&leader, &exit_signal, num_nodes, &mut c_threads, addr);
let (validators, leader) = converge(&leader, &exit_signal, num_nodes, &mut c_threads);
println!(" Node address | Node identifier");
println!("----------------------+------------------");
@ -672,10 +651,9 @@ fn converge(
exit_signal: &Arc<AtomicBool>,
num_nodes: usize,
threads: &mut Vec<JoinHandle<()>>,
addr: IpAddr,
) -> (Vec<NodeInfo>, Option<NodeInfo>) {
//lets spy on the network
let (node, gossip_socket, gossip_send_socket) = Crdt::spy_node(addr);
let (node, gossip_socket) = Crdt::spy_node();
let mut spy_crdt = Crdt::new(node).expect("Crdt::new");
spy_crdt.insert(&leader);
spy_crdt.set_leader(leader.id);
@ -686,7 +664,6 @@ fn converge(
window.clone(),
None,
gossip_socket,
gossip_send_socket,
exit_signal.clone(),
).expect("DataReplicator::new");
let mut v: Vec<NodeInfo> = vec![];

View File

@ -15,14 +15,12 @@ use solana::drone::{Drone, DroneRequest, DRONE_PORT};
use solana::fullnode::Config;
use solana::logger;
use solana::metrics::set_panic_hook;
use solana::nat::get_public_ip_addr;
use solana::signature::read_keypair;
use solana::thin_client::poll_gossip_for_leader;
use std::error;
use std::fs::File;
use std::io;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::process::exit;
use std::sync::{Arc, Mutex};
use std::thread;
use tokio::net::TcpListener;
@ -72,28 +70,8 @@ fn main() -> Result<(), Box<error::Error>> {
.takes_value(true)
.help("Max SECONDS to wait to get necessary gossip from the network"),
)
.arg(
Arg::with_name("addr")
.short("a")
.long("addr")
.value_name("IPADDR")
.takes_value(true)
.help("address to advertise to the network"),
)
.get_matches();
let addr = if let Some(s) = matches.value_of("addr") {
s.to_string().parse().unwrap_or_else(|e| {
eprintln!("failed to parse {} as IP address error: {:?}", s, e);
exit(1);
})
} else {
get_public_ip_addr().unwrap_or_else(|e| {
eprintln!("failed to get public IP, try --addr? error: {:?}", e);
exit(1);
})
};
let leader: NodeInfo;
if let Some(l) = matches.value_of("leader") {
leader = read_leader(l).node_info;
@ -124,9 +102,11 @@ fn main() -> Result<(), Box<error::Error>> {
timeout = None;
}
let leader = poll_gossip_for_leader(leader.contact_info.ncp, timeout, addr)?;
eprintln!("hioisdlflkj");
let drone_addr: SocketAddr = format!("0.0.0.0:{}", DRONE_PORT).parse().unwrap();
let socket = TcpListener::bind(&drone_addr).unwrap();
let leader = poll_gossip_for_leader(leader.contact_info.ncp, timeout)?;
let drone = Arc::new(Mutex::new(Drone::new(
mint_keypair,
@ -144,7 +124,6 @@ fn main() -> Result<(), Box<error::Error>> {
drone1.lock().unwrap().clear_request_count();
});
let socket = TcpListener::bind(&drone_addr).unwrap();
println!("Drone started. Listening on: {}", drone_addr);
let done = socket
.incoming()

View File

@ -5,9 +5,9 @@ extern crate serde_json;
extern crate solana;
use clap::{App, Arg};
use solana::crdt::{get_ip_addr, parse_port_or_addr};
use solana::crdt::GOSSIP_PORT_RANGE;
use solana::fullnode::Config;
use solana::nat::get_public_ip_addr;
use solana::nat::{get_ip_addr, get_public_ip_addr, parse_port_or_addr};
use solana::signature::read_pkcs8;
use std::io;
use std::net::SocketAddr;
@ -48,13 +48,7 @@ fn main() {
.get_matches();
let bind_addr: SocketAddr = {
let mut bind_addr = parse_port_or_addr({
if let Some(b) = matches.value_of("bind") {
Some(b.to_string())
} else {
None
}
});
let mut bind_addr = parse_port_or_addr(matches.value_of("bind"), GOSSIP_PORT_RANGE.0);
if matches.is_present("local") {
let ip = get_ip_addr().unwrap();
bind_addr.set_ip(ip);

View File

@ -7,7 +7,7 @@ extern crate solana;
use clap::{App, Arg};
use solana::client::mk_client;
use solana::crdt::{NodeInfo, TestNode};
use solana::crdt::{Node, NodeInfo};
use solana::drone::DRONE_PORT;
use solana::fullnode::{Config, Fullnode};
use solana::logger;
@ -78,14 +78,14 @@ fn main() -> () {
let port_range = (8100, 10000);
let node = if let Some(_t) = matches.value_of("testnet") {
TestNode::new_with_external_ip(
Node::new_with_external_ip(
leader_pubkey,
repl_data.contact_info.ncp.ip(),
port_range,
0,
)
} else {
TestNode::new_with_external_ip(
Node::new_with_external_ip(
leader_pubkey,
repl_data.contact_info.ncp.ip(),
port_range,

View File

@ -13,7 +13,6 @@ use solana::crdt::NodeInfo;
use solana::drone::DRONE_PORT;
use solana::fullnode::Config;
use solana::logger;
use solana::nat::get_public_ip_addr;
use solana::signature::{read_keypair, Keypair, KeypairUtil, Pubkey, Signature};
use solana::thin_client::{poll_gossip_for_leader, ThinClient};
use solana::wallet::request_airdrop;
@ -21,7 +20,6 @@ use std::error;
use std::fmt;
use std::fs::File;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::process::exit;
use std::thread::sleep;
use std::time::Duration;
@ -94,14 +92,6 @@ fn parse_args() -> Result<WalletConfig, Box<error::Error>> {
.takes_value(true)
.help("/path/to/id.json"),
)
.arg(
Arg::with_name("addr")
.short("a")
.long("addr")
.value_name("IPADDR")
.takes_value(true)
.help("address to advertise to the network"),
)
.arg(
Arg::with_name("timeout")
.long("timeout")
@ -155,18 +145,6 @@ fn parse_args() -> Result<WalletConfig, Box<error::Error>> {
.subcommand(SubCommand::with_name("address").about("Get your public key"))
.get_matches();
let addr = if let Some(s) = matches.value_of("addr") {
s.to_string().parse().unwrap_or_else(|e| {
eprintln!("failed to parse {} as IP address error: {:?}", s, e);
exit(1)
})
} else {
get_public_ip_addr().unwrap_or_else(|e| {
eprintln!("failed to get public IP, try --addr? error: {:?}", e);
exit(1)
})
};
let leader: NodeInfo;
if let Some(l) = matches.value_of("leader") {
leader = read_leader(l)?.node_info;
@ -195,7 +173,7 @@ fn parse_args() -> Result<WalletConfig, Box<error::Error>> {
)))
})?;
let leader = poll_gossip_for_leader(leader.contact_info.ncp, timeout, addr)?;
let leader = poll_gossip_for_leader(leader.contact_info.ncp, timeout)?;
let mut drone_addr = leader.contact_info.tpu;
drone_addr.set_port(DRONE_PORT);

View File

@ -16,31 +16,27 @@ pub struct BlobFetchStage {
impl BlobFetchStage {
pub fn new(
socket: UdpSocket,
socket: Arc<UdpSocket>,
exit: Arc<AtomicBool>,
blob_recycler: &BlobRecycler,
recycler: &BlobRecycler,
) -> (Self, BlobReceiver) {
Self::new_multi_socket(vec![socket], exit, blob_recycler)
Self::new_multi_socket(vec![socket], exit, recycler)
}
pub fn new_multi_socket(
sockets: Vec<UdpSocket>,
sockets: Vec<Arc<UdpSocket>>,
exit: Arc<AtomicBool>,
blob_recycler: &BlobRecycler,
recycler: &BlobRecycler,
) -> (Self, BlobReceiver) {
let (blob_sender, blob_receiver) = channel();
let (sender, receiver) = channel();
let thread_hdls: Vec<_> = sockets
.into_iter()
.map(|socket| {
streamer::blob_receiver(
exit.clone(),
blob_recycler.clone(),
socket,
blob_sender.clone(),
).expect("blob receiver init")
streamer::blob_receiver(socket, exit.clone(), recycler.clone(), sender.clone())
.expect("blob receiver init")
})
.collect();
(BlobFetchStage { exit, thread_hdls }, blob_receiver)
(BlobFetchStage { exit, thread_hdls }, receiver)
}
pub fn close(&self) {

View File

@ -1,11 +1,11 @@
use crdt::NodeInfo;
use nat::udp_random_bind;
use crdt::{NodeInfo, GOSSIP_PORT_RANGE};
use nat::bind_in_range;
use std::time::Duration;
use thin_client::ThinClient;
pub fn mk_client(r: &NodeInfo) -> ThinClient {
let requests_socket = udp_random_bind(8000, 10000, 5).unwrap();
let transactions_socket = udp_random_bind(8000, 10000, 5).unwrap();
let requests_socket = bind_in_range(GOSSIP_PORT_RANGE).unwrap();
let transactions_socket = bind_in_range(GOSSIP_PORT_RANGE).unwrap();
requests_socket
.set_read_timeout(Some(Duration::new(1, 0)))

View File

@ -20,10 +20,9 @@ use counter::Counter;
use hash::Hash;
use ledger::LedgerWindow;
use log::Level;
use nat::udp_random_bind;
use nat::bind_in_range;
use packet::{to_blob, Blob, BlobRecycler, SharedBlob, BLOB_SIZE};
use pnet_datalink as datalink;
use rand::{thread_rng, RngCore};
use rand::{thread_rng, Rng};
use rayon::prelude::*;
use result::{Error, Result};
use signature::{Keypair, KeypairUtil, Pubkey};
@ -41,6 +40,7 @@ use timing::{duration_as_ms, timestamp};
use transaction::Vote;
use window::{SharedWindow, WindowIndex};
pub const GOSSIP_PORT_RANGE: (u16, u16) = (8000, 10_000);
/// milliseconds we sleep for between gossip requests
const GOSSIP_SLEEP_MILLIS: u64 = 100;
const GOSSIP_PURGE_MILLIS: u64 = 15000;
@ -57,47 +57,6 @@ pub enum CrdtError {
BadGossipAddress,
}
pub fn parse_port_or_addr(optstr: Option<String>) -> SocketAddr {
let daddr: SocketAddr = "0.0.0.0:8000".parse().expect("default socket address");
if let Some(addrstr) = optstr {
if let Ok(port) = addrstr.parse() {
let mut addr = daddr;
addr.set_port(port);
addr
} else if let Ok(addr) = addrstr.parse() {
addr
} else {
daddr
}
} else {
daddr
}
}
pub fn get_ip_addr() -> Option<IpAddr> {
for iface in datalink::interfaces() {
for p in iface.ips {
if !p.ip().is_loopback() && !p.ip().is_multicast() {
match p.ip() {
IpAddr::V4(addr) => {
if !addr.is_link_local() {
return Some(p.ip());
}
}
IpAddr::V6(_addr) => {
// Select an ipv6 address if the config is selected
#[cfg(feature = "ipv6")]
{
return Some(p.ip());
}
}
}
}
}
}
None
}
/// Structure to be replicated by the network
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
pub struct ContactInfo {
@ -529,9 +488,10 @@ impl Crdt {
false
} else if !(Self::is_valid_address(v.contact_info.tvu)) {
trace!(
"{:x}:broadcast skip not listening {:x}",
"{:x}:broadcast skip not listening {:x} {}",
me.debug_id(),
v.debug_id()
v.debug_id(),
v.contact_info.tvu,
);
false
} else {
@ -552,6 +512,7 @@ impl Crdt {
/// broadcast messages from the leader to layer 1 nodes
/// # Remarks
/// We need to avoid having obj locked while doing any io, such as the `send_to`
/// TODO: move me out of crdt
pub fn broadcast(
me: &NodeInfo,
broadcast_table: &[NodeInfo],
@ -670,6 +631,7 @@ impl Crdt {
/// retransmit messages from the leader to layer 1 nodes
/// # Remarks
/// We need to avoid having obj locked while doing any io, such as the `send_to`
/// TODO: move me out of Crdt
pub fn retransmit(obj: &Arc<RwLock<Self>>, blob: &SharedBlob, s: &UdpSocket) -> Result<()> {
let (me, table): (NodeInfo, Vec<NodeInfo>) = {
// copy to avoid locking during IO
@ -685,12 +647,17 @@ impl Crdt {
.iter()
.filter(|v| {
if me.id == v.id {
trace!("skip retransmit to self {:?}", v.id);
false
} else if me.leader_id == v.id {
trace!("skip retransmit to leader {:?}", v.id);
false
} else if !(Self::is_valid_address(v.contact_info.tvu)) {
trace!("skip nodes that are not listening {:?}", v.id);
trace!(
"skip nodes that are not listening {:?} {}",
v.id,
v.contact_info.tvu
);
false
} else {
true
@ -702,10 +669,11 @@ impl Crdt {
.par_iter()
.map(|v| {
debug!(
"{:x}: retransmit blob {} to {:x}",
"{:x}: retransmit blob {} to {:x} {}",
me.debug_id(),
rblob.get_index().unwrap(),
v.debug_id(),
v.contact_info.tvu,
);
//TODO profile this, may need multiple sockets for par_iter
assert!(rblob.meta.size <= BLOB_SIZE);
@ -728,10 +696,6 @@ impl Crdt {
self.remote.values().fold(max, |a, b| std::cmp::min(a, *b))
}
fn random() -> u64 {
thread_rng().next_u64()
}
// TODO: fill in with real implmentation once staking is implemented
fn get_stake(_id: Pubkey) -> f64 {
1.0
@ -771,7 +735,7 @@ impl Crdt {
if valid.is_empty() {
Err(CrdtError::NoPeers)?;
}
let n = (Self::random() as usize) % valid.len();
let n = thread_rng().gen::<usize>() % valid.len();
let addr = valid[n].contact_info.ncp;
let req = Protocol::RequestWindowIndex(self.table[&self.me].clone(), ix);
let out = serialize(&req)?;
@ -814,8 +778,9 @@ impl Crdt {
let remote_update_index = *self.remote.get(&v.id).unwrap_or(&0);
let req = Protocol::RequestUpdates(remote_update_index, self.table[&self.me].clone());
trace!(
"created gossip request from {:x} to {:x} {}",
"created gossip request from {:x} {:?} to {:x} {}",
self.debug_id(),
self.table[&self.me].clone(),
v.debug_id(),
v.contact_info.ncp
);
@ -1060,9 +1025,14 @@ impl Crdt {
blob: &Blob,
) -> Option<SharedBlob> {
match deserialize(&blob.data[..blob.meta.size]) {
Ok(request) => {
Crdt::handle_protocol(request, obj, window, ledger_window, blob_recycler)
}
Ok(request) => Crdt::handle_protocol(
blob.meta.addr(),
request,
obj,
window,
ledger_window,
blob_recycler,
),
Err(_) => {
warn!("deserialize crdt packet failed");
None
@ -1071,6 +1041,7 @@ impl Crdt {
}
fn handle_protocol(
from_addr: SocketAddr,
request: Protocol,
obj: &Arc<RwLock<Self>>,
window: &SharedWindow,
@ -1080,10 +1051,14 @@ impl Crdt {
match request {
// TODO sigverify these
Protocol::RequestUpdates(v, from_rd) => {
let addr = from_rd.contact_info.ncp;
trace!("RequestUpdates {} from {}", v, addr);
trace!(
"RequestUpdates {} from {}, professing to be {}",
v,
from_addr,
from_rd.contact_info.ncp
);
let me = obj.read().unwrap();
if addr == me.table[&me.me].contact_info.ncp {
if from_rd.contact_info.ncp == me.table[&me.me].contact_info.ncp {
warn!(
"RequestUpdates ignored, I'm talking to myself: me={:x} remoteme={:x}",
me.debug_id(),
@ -1113,13 +1088,13 @@ impl Crdt {
v
);
None
} else if let Ok(r) = to_blob(rsp, addr, &blob_recycler) {
} else if let Ok(r) = to_blob(rsp, from_addr, &blob_recycler) {
trace!(
"sending updates me {:x} len {} to {:x} {}",
obj.read().unwrap().debug_id(),
len,
from_rd.debug_id(),
addr,
from_addr,
);
Some(r)
} else {
@ -1254,7 +1229,7 @@ impl Crdt {
fn is_valid_ip_internal(addr: IpAddr, cfg_test: bool) -> bool {
!(addr.is_unspecified() || addr.is_multicast() || (addr.is_loopback() && !cfg_test))
}
pub fn is_valid_ip(addr: IpAddr) -> bool {
fn is_valid_ip(addr: IpAddr) -> bool {
Self::is_valid_ip_internal(addr, cfg!(test) || cfg!(feature = "test"))
}
/// port must not be 0
@ -1264,20 +1239,17 @@ impl Crdt {
(addr.port() != 0) && Self::is_valid_ip(addr.ip())
}
pub fn spy_node(addr: IpAddr) -> (NodeInfo, UdpSocket, UdpSocket) {
let gossip_socket = udp_random_bind(8000, 10000, 5).unwrap();
let gossip_send_socket = udp_random_bind(8000, 10000, 5).unwrap();
let gossip_addr = SocketAddr::new(addr, gossip_socket.local_addr().unwrap().port());
pub fn spy_node() -> (NodeInfo, UdpSocket) {
let gossip_socket = bind_in_range(GOSSIP_PORT_RANGE).unwrap();
let pubkey = Keypair::new().pubkey();
let daddr = "0.0.0.0:0".parse().unwrap();
let node = NodeInfo::new(pubkey, gossip_addr, daddr, daddr, daddr, daddr);
(node, gossip_socket, gossip_send_socket)
let node = NodeInfo::new(pubkey, daddr, daddr, daddr, daddr, daddr);
(node, gossip_socket)
}
}
pub struct Sockets {
pub gossip: UdpSocket,
pub gossip_send: UdpSocket,
pub requests: UdpSocket,
pub replicate: UdpSocket,
pub transaction: UdpSocket,
@ -1287,12 +1259,12 @@ pub struct Sockets {
pub retransmit: UdpSocket,
}
pub struct TestNode {
pub struct Node {
pub data: NodeInfo,
pub sockets: Sockets,
}
impl TestNode {
impl Node {
pub fn new_localhost() -> Self {
let pubkey = Keypair::new().pubkey();
Self::new_localhost_with_pubkey(pubkey)
@ -1304,7 +1276,6 @@ impl TestNode {
let requests = UdpSocket::bind("127.0.0.1:0").unwrap();
let repair = UdpSocket::bind("127.0.0.1:0").unwrap();
let gossip_send = UdpSocket::bind("0.0.0.0:0").unwrap();
let respond = UdpSocket::bind("0.0.0.0:0").unwrap();
let broadcast = UdpSocket::bind("0.0.0.0:0").unwrap();
let retransmit = UdpSocket::bind("0.0.0.0:0").unwrap();
@ -1316,11 +1287,10 @@ impl TestNode {
transaction.local_addr().unwrap(),
repair.local_addr().unwrap(),
);
TestNode {
Node {
data,
sockets: Sockets {
gossip,
gossip_send,
requests,
replicate,
transaction,
@ -1331,21 +1301,21 @@ impl TestNode {
},
}
}
pub fn new_with_bind_addr(data: NodeInfo, bind_addr: SocketAddr) -> TestNode {
let mut local_gossip_addr = bind_addr;
local_gossip_addr.set_port(data.contact_info.ncp.port());
pub fn new_with_bind_addr(data: NodeInfo, bind_addr: SocketAddr) -> Node {
let mut gossip_addr = bind_addr;
gossip_addr.set_port(data.contact_info.ncp.port());
let mut local_replicate_addr = bind_addr;
local_replicate_addr.set_port(data.contact_info.tvu.port());
let mut replicate_addr = bind_addr;
replicate_addr.set_port(data.contact_info.tvu.port());
let mut local_requests_addr = bind_addr;
local_requests_addr.set_port(data.contact_info.rpu.port());
let mut requests_addr = bind_addr;
requests_addr.set_port(data.contact_info.rpu.port());
let mut local_transactions_addr = bind_addr;
local_transactions_addr.set_port(data.contact_info.tpu.port());
let mut transactions_addr = bind_addr;
transactions_addr.set_port(data.contact_info.tpu.port());
let mut local_repair_addr = bind_addr;
local_repair_addr.set_port(data.contact_info.tvu_window.port());
let mut repair_addr = bind_addr;
repair_addr.set_port(data.contact_info.tvu_window.port());
fn bind(addr: SocketAddr) -> UdpSocket {
match UdpSocket::bind(addr) {
@ -1356,25 +1326,23 @@ impl TestNode {
}
};
let transaction = bind(local_transactions_addr);
let gossip = bind(local_gossip_addr);
let replicate = bind(local_replicate_addr);
let repair = bind(local_repair_addr);
let requests = bind(local_requests_addr);
let transaction = bind(transactions_addr);
let gossip = bind(gossip_addr);
let replicate = bind(replicate_addr);
let repair = bind(repair_addr);
let requests = bind(requests_addr);
// Responses are sent from the same Udp port as requests are received
// from, in hopes that a NAT sitting in the middle will route the
// response Udp packet correctly back to the requester.
let respond = requests.try_clone().unwrap();
let gossip_send = UdpSocket::bind("0.0.0.0:0").unwrap();
let broadcast = UdpSocket::bind("0.0.0.0:0").unwrap();
let retransmit = UdpSocket::bind("0.0.0.0:0").unwrap();
TestNode {
Node {
data,
sockets: Sockets {
gossip,
gossip_send,
requests,
replicate,
transaction,
@ -1390,9 +1358,9 @@ impl TestNode {
ip: IpAddr,
port_range: (u16, u16),
ncp_port: u16,
) -> TestNode {
) -> Node {
fn bind(port_range: (u16, u16)) -> (u16, UdpSocket) {
match udp_random_bind(port_range.0, port_range.1, 5) {
match bind_in_range(port_range) {
Ok(socket) => (socket.local_addr().unwrap().port(), socket),
Err(err) => {
panic!("Failed to bind to {:?}", err);
@ -1425,7 +1393,6 @@ impl TestNode {
// response Udp packet correctly back to the requester.
let respond = requests.try_clone().unwrap();
let gossip_send = UdpSocket::bind("0.0.0.0:0").unwrap();
let broadcast = UdpSocket::bind("0.0.0.0:0").unwrap();
let retransmit = UdpSocket::bind("0.0.0.0:0").unwrap();
@ -1438,11 +1405,10 @@ impl TestNode {
SocketAddr::new(ip, repair_port),
);
TestNode {
Node {
data: node_info,
sockets: Sockets {
gossip,
gossip_send,
requests,
replicate,
transaction,
@ -1465,8 +1431,8 @@ fn report_time_spent(label: &str, time: &Duration, extra: &str) {
#[cfg(test)]
mod tests {
use crdt::{
parse_port_or_addr, Crdt, CrdtError, NodeInfo, Protocol, TestNode, GOSSIP_PURGE_MILLIS,
GOSSIP_SLEEP_MILLIS, MIN_TABLE_SIZE,
Crdt, CrdtError, Node, NodeInfo, Protocol, GOSSIP_PURGE_MILLIS, GOSSIP_SLEEP_MILLIS,
MIN_TABLE_SIZE,
};
use entry::Entry;
use hash::{hash, Hash};
@ -1485,15 +1451,6 @@ mod tests {
use transaction::Vote;
use window::default_window;
#[test]
fn test_parse_port_or_addr() {
let p1 = parse_port_or_addr(Some("9000".to_string()));
assert_eq!(p1.port(), 9000);
let p2 = parse_port_or_addr(Some("127.0.0.1:7000".to_string()));
assert_eq!(p2.port(), 7000);
let p3 = parse_port_or_addr(None);
assert_eq!(p3.port(), 8000);
}
#[test]
fn test_bad_address() {
let d1 = NodeInfo::new(
@ -2154,13 +2111,38 @@ mod tests {
let obj = Arc::new(RwLock::new(crdt));
let request = Protocol::RequestUpdates(1, node.clone());
assert!(Crdt::handle_protocol(request, &obj, &window, &mut None, &recycler).is_none());
assert!(
Crdt::handle_protocol(
node.contact_info.ncp,
request,
&obj,
&window,
&mut None,
&recycler
).is_none()
);
let request = Protocol::RequestUpdates(1, node_with_same_addr.clone());
assert!(Crdt::handle_protocol(request, &obj, &window, &mut None, &recycler).is_none());
assert!(
Crdt::handle_protocol(
node.contact_info.ncp,
request,
&obj,
&window,
&mut None,
&recycler
).is_none()
);
let request = Protocol::RequestUpdates(1, node_with_diff_addr.clone());
Crdt::handle_protocol(request, &obj, &window, &mut None, &recycler);
Crdt::handle_protocol(
node.contact_info.ncp,
request,
&obj,
&window,
&mut None,
&recycler,
);
let me = obj.write().unwrap();
@ -2206,7 +2188,7 @@ mod tests {
fn new_with_external_ip_test_random() {
let sockaddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 8080);
let node =
TestNode::new_with_external_ip(Keypair::new().pubkey(), sockaddr.ip(), (8100, 8200), 0);
Node::new_with_external_ip(Keypair::new().pubkey(), sockaddr.ip(), (8100, 8200), 0);
assert_eq!(
node.sockets.gossip.local_addr().unwrap().ip(),
@ -2244,12 +2226,8 @@ mod tests {
#[test]
fn new_with_external_ip_test_gossip() {
let sockaddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 8080);
let node = TestNode::new_with_external_ip(
Keypair::new().pubkey(),
sockaddr.ip(),
(8100, 8200),
8050,
);
let node =
Node::new_with_external_ip(Keypair::new().pubkey(), sockaddr.ip(), (8100, 8200), 8050);
assert_eq!(
node.sockets.gossip.local_addr().unwrap().ip(),
sockaddr.ip()

View File

@ -157,11 +157,12 @@ impl Drop for Drone {
#[cfg(test)]
mod tests {
use bank::Bank;
use crdt::{get_ip_addr, TestNode};
use crdt::Node;
use drone::{Drone, DroneRequest, REQUEST_CAP, TIME_SLICE};
use fullnode::Fullnode;
use logger;
use mint::Mint;
use nat::get_ip_addr;
use service::Service;
use signature::{Keypair, KeypairUtil};
use std::fs::remove_dir_all;
@ -275,7 +276,7 @@ mod tests {
logger::setup();
let leader_keypair = Keypair::new();
let leader = TestNode::new_localhost_with_pubkey(leader_keypair.pubkey());
let leader = Node::new_localhost_with_pubkey(leader_keypair.pubkey());
let alice = Mint::new(10_000_000);
let bank = Bank::new(&alice);

View File

@ -16,31 +16,26 @@ pub struct FetchStage {
impl FetchStage {
pub fn new(
socket: UdpSocket,
socket: Arc<UdpSocket>,
exit: Arc<AtomicBool>,
packet_recycler: &PacketRecycler,
recycler: &PacketRecycler,
) -> (Self, PacketReceiver) {
Self::new_multi_socket(vec![socket], exit, packet_recycler)
Self::new_multi_socket(vec![socket], exit, recycler)
}
pub fn new_multi_socket(
sockets: Vec<UdpSocket>,
sockets: Vec<Arc<UdpSocket>>,
exit: Arc<AtomicBool>,
packet_recycler: &PacketRecycler,
recycler: &PacketRecycler,
) -> (Self, PacketReceiver) {
let (packet_sender, packet_receiver) = channel();
let (sender, receiver) = channel();
let thread_hdls: Vec<_> = sockets
.into_iter()
.map(|socket| {
streamer::receiver(
socket,
exit.clone(),
packet_recycler.clone(),
packet_sender.clone(),
)
streamer::receiver(socket, exit.clone(), recycler.clone(), sender.clone())
})
.collect();
(FetchStage { exit, thread_hdls }, packet_receiver)
(FetchStage { exit, thread_hdls }, receiver)
}
pub fn close(&self) {

View File

@ -2,7 +2,7 @@
use bank::Bank;
use broadcast_stage::BroadcastStage;
use crdt::{Crdt, NodeInfo, TestNode};
use crdt::{Crdt, Node, NodeInfo};
use drone::DRONE_PORT;
use entry::Entry;
use ledger::read_ledger;
@ -50,7 +50,7 @@ impl Config {
impl Fullnode {
pub fn new(
node: TestNode,
node: Node,
ledger_path: &str,
keypair: Keypair,
leader_addr: Option<SocketAddr>,
@ -165,7 +165,7 @@ impl Fullnode {
bank: Bank,
entry_height: u64,
ledger_tail: &[Entry],
mut node: TestNode,
mut node: Node,
leader_info: Option<&NodeInfo>,
exit: Arc<AtomicBool>,
ledger_path: Option<&str>,
@ -209,7 +209,6 @@ impl Fullnode {
window.clone(),
ledger_path,
node.sockets.gossip,
node.sockets.gossip_send,
exit.clone(),
).expect("Ncp::new");
thread_hdls.extend(ncp.thread_hdls());
@ -293,7 +292,7 @@ impl Service for Fullnode {
#[cfg(test)]
mod tests {
use bank::Bank;
use crdt::TestNode;
use crdt::Node;
use fullnode::Fullnode;
use mint::Mint;
use service::Service;
@ -304,7 +303,7 @@ mod tests {
#[test]
fn validator_exit() {
let keypair = Keypair::new();
let tn = TestNode::new_localhost_with_pubkey(keypair.pubkey());
let tn = Node::new_localhost_with_pubkey(keypair.pubkey());
let alice = Mint::new(10_000);
let bank = Bank::new(&alice);
let exit = Arc::new(AtomicBool::new(false));
@ -318,7 +317,7 @@ mod tests {
let vals: Vec<Fullnode> = (0..2)
.map(|_| {
let keypair = Keypair::new();
let tn = TestNode::new_localhost_with_pubkey(keypair.pubkey());
let tn = Node::new_localhost_with_pubkey(keypair.pubkey());
let alice = Mint::new(10_000);
let bank = Bank::new(&alice);
let exit = Arc::new(AtomicBool::new(false));

View File

@ -2,10 +2,10 @@
extern crate reqwest;
use std::net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket};
use pnet_datalink as datalink;
use rand::{thread_rng, Rng};
use std::io;
use std::net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket};
/// A data type representing a public Udp socket
pub struct UdpSocketPair {
@ -27,19 +27,79 @@ pub fn get_public_ip_addr() -> Result<IpAddr, String> {
}
}
pub fn udp_random_bind(start: u16, end: u16, tries: u32) -> io::Result<UdpSocket> {
let mut count = 0;
loop {
count += 1;
pub fn parse_port_or_addr(optstr: Option<&str>, default_port: u16) -> SocketAddr {
let daddr: SocketAddr = format!("0.0.0.0:{}", default_port)
.parse()
.expect("default socket address");
if let Some(addrstr) = optstr {
if let Ok(port) = addrstr.parse() {
let mut addr = daddr;
addr.set_port(port);
addr
} else if let Ok(addr) = addrstr.parse() {
addr
} else {
daddr
}
} else {
daddr
}
}
pub fn get_ip_addr() -> Option<IpAddr> {
for iface in datalink::interfaces() {
for p in iface.ips {
if !p.ip().is_loopback() && !p.ip().is_multicast() {
match p.ip() {
IpAddr::V4(addr) => {
if !addr.is_link_local() {
return Some(p.ip());
}
}
IpAddr::V6(_addr) => {
// Select an ipv6 address if the config is selected
#[cfg(feature = "ipv6")]
{
return Some(p.ip());
}
}
}
}
}
}
None
}
pub fn bind_in_range(range: (u16, u16)) -> io::Result<UdpSocket> {
let (start, end) = range;
let mut tries_left = end - start;
loop {
let rand_port = thread_rng().gen_range(start, end);
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), rand_port);
match UdpSocket::bind(addr) {
Result::Ok(val) => break Result::Ok(val),
Result::Err(err) => if err.kind() != io::ErrorKind::AddrInUse || count >= tries {
Result::Err(err) => if err.kind() != io::ErrorKind::AddrInUse || tries_left == 0 {
return Err(err);
},
}
tries_left -= 1;
}
}
#[cfg(test)]
mod tests {
use nat::parse_port_or_addr;
#[test]
fn test_parse_port_or_addr() {
let p1 = parse_port_or_addr(Some("9000"), 1);
assert_eq!(p1.port(), 9000);
let p2 = parse_port_or_addr(Some("127.0.0.1:7000"), 1);
assert_eq!(p2.port(), 7000);
let p2 = parse_port_or_addr(Some("hi there"), 1);
assert_eq!(p2.port(), 1);
let p3 = parse_port_or_addr(None, 1);
assert_eq!(p3.port(), 1);
}
}

View File

@ -22,27 +22,27 @@ impl Ncp {
crdt: &Arc<RwLock<Crdt>>,
window: SharedWindow,
ledger_path: Option<&str>,
gossip_listen_socket: UdpSocket,
gossip_send_socket: UdpSocket,
gossip_socket: UdpSocket,
exit: Arc<AtomicBool>,
) -> Result<Ncp> {
let blob_recycler = BlobRecycler::default();
let (request_sender, request_receiver) = channel();
let gossip_socket = Arc::new(gossip_socket);
trace!(
"Ncp: id: {:?}, listening on: {:?}",
&crdt.read().unwrap().me.as_ref()[..4],
gossip_listen_socket.local_addr().unwrap()
gossip_socket.local_addr().unwrap()
);
let t_receiver = streamer::blob_receiver(
gossip_socket.clone(),
exit.clone(),
blob_recycler.clone(),
gossip_listen_socket,
request_sender,
)?;
let (response_sender, response_receiver) = channel();
let t_responder = streamer::responder(
"ncp",
gossip_send_socket,
gossip_socket,
blob_recycler.clone(),
response_receiver,
);
@ -81,7 +81,7 @@ impl Service for Ncp {
#[cfg(test)]
mod tests {
use crdt::{Crdt, TestNode};
use crdt::{Crdt, Node};
use ncp::Ncp;
use std::sync::atomic::AtomicBool;
use std::sync::{Arc, RwLock};
@ -91,18 +91,11 @@ mod tests {
// test that stage will exit when flag is set
fn test_exit() {
let exit = Arc::new(AtomicBool::new(false));
let tn = TestNode::new_localhost();
let tn = Node::new_localhost();
let crdt = Crdt::new(tn.data.clone()).expect("Crdt::new");
let c = Arc::new(RwLock::new(crdt));
let w = Arc::new(RwLock::new(vec![]));
let d = Ncp::new(
&c,
w,
None,
tn.sockets.gossip,
tn.sockets.gossip_send,
exit.clone(),
).unwrap();
let d = Ncp::new(&c, w, None, tn.sockets.gossip, exit.clone()).unwrap();
d.close().expect("thread join");
}
}

View File

@ -84,7 +84,7 @@ impl ReplicateStage {
let send = UdpSocket::bind("0.0.0.0:0").expect("bind");
let t_responder = responder(
"replicate_stage",
send,
Arc::new(send),
blob_recycler.clone(),
vote_blob_receiver,
);

View File

@ -47,7 +47,7 @@ fn retransmit(
/// * `recycler` - Blob recycler.
/// * `r` - Receive channel for blobs to be retransmitted to all the layer 1 nodes.
fn retransmitter(
sock: UdpSocket,
sock: Arc<UdpSocket>,
crdt: Arc<RwLock<Crdt>>,
recycler: BlobRecycler,
r: BlobReceiver,
@ -81,7 +81,7 @@ impl RetransmitStage {
crdt: &Arc<RwLock<Crdt>>,
window: SharedWindow,
entry_height: u64,
retransmit_socket: UdpSocket,
retransmit_socket: Arc<UdpSocket>,
blob_recycler: &BlobRecycler,
fetch_stage_receiver: BlobReceiver,
) -> (Self, BlobReceiver) {

View File

@ -49,7 +49,7 @@ impl Rpu {
let packet_recycler = PacketRecycler::default();
let (packet_sender, packet_receiver) = channel();
let t_receiver = streamer::receiver(
requests_socket,
Arc::new(requests_socket),
exit,
packet_recycler.clone(),
packet_sender,
@ -64,8 +64,12 @@ impl Rpu {
blob_recycler.clone(),
);
let t_responder =
streamer::responder("rpu", respond_socket, blob_recycler.clone(), blob_receiver);
let t_responder = streamer::responder(
"rpu",
Arc::new(respond_socket),
blob_recycler.clone(),
blob_receiver,
);
let mut thread_hdls = vec![t_receiver, t_responder];
thread_hdls.extend(request_stage.thread_hdls().into_iter());

View File

@ -44,7 +44,7 @@ fn recv_loop(
}
pub fn receiver(
sock: UdpSocket,
sock: Arc<UdpSocket>,
exit: Arc<AtomicBool>,
recycler: PacketRecycler,
packet_sender: PacketSender,
@ -90,7 +90,7 @@ pub fn recv_batch(recvr: &PacketReceiver) -> Result<(Vec<SharedPackets>, usize)>
pub fn responder(
name: &'static str,
sock: UdpSocket,
sock: Arc<UdpSocket>,
recycler: BlobRecycler,
r: BlobReceiver,
) -> JoinHandle<()> {
@ -120,9 +120,9 @@ fn recv_blobs(recycler: &BlobRecycler, sock: &UdpSocket, s: &BlobSender) -> Resu
}
pub fn blob_receiver(
sock: Arc<UdpSocket>,
exit: Arc<AtomicBool>,
recycler: BlobRecycler,
sock: UdpSocket,
s: BlobSender,
) -> Result<JoinHandle<()>> {
//DOCUMENTED SIDE-EFFECT
@ -184,12 +184,17 @@ mod test {
let pack_recycler = PacketRecycler::default();
let resp_recycler = BlobRecycler::default();
let (s_reader, r_reader) = channel();
let t_receiver = receiver(read, exit.clone(), pack_recycler.clone(), s_reader);
let t_receiver = receiver(
Arc::new(read),
exit.clone(),
pack_recycler.clone(),
s_reader,
);
let t_responder = {
let (s_responder, r_responder) = channel();
let t_responder = responder(
"streamer_send_test",
send,
Arc::new(send),
resp_recycler.clone(),
r_responder,
);

View File

@ -13,7 +13,6 @@ use result::{Error, Result};
use signature::{Keypair, Pubkey, Signature};
use std::collections::HashMap;
use std::io;
use std::net::IpAddr;
use std::net::{SocketAddr, UdpSocket};
use std::sync::atomic::AtomicBool;
use std::sync::{Arc, RwLock};
@ -361,23 +360,13 @@ impl Drop for ThinClient {
}
}
pub fn poll_gossip_for_leader(
leader_ncp: SocketAddr,
timeout: Option<u64>,
addr: IpAddr,
) -> Result<NodeInfo> {
pub fn poll_gossip_for_leader(leader_ncp: SocketAddr, timeout: Option<u64>) -> Result<NodeInfo> {
let exit = Arc::new(AtomicBool::new(false));
let (node, gossip_socket, gossip_send_socket) = Crdt::spy_node(addr);
trace!("polling {:?} for leader", leader_ncp);
let (node, gossip_socket) = Crdt::spy_node();
let crdt = Arc::new(RwLock::new(Crdt::new(node).expect("Crdt::new")));
let window = Arc::new(RwLock::new(vec![]));
let ncp = Ncp::new(
&crdt.clone(),
window,
None,
gossip_socket,
gossip_send_socket,
exit.clone(),
).unwrap();
let ncp = Ncp::new(&crdt.clone(), window, None, gossip_socket, exit.clone()).unwrap();
let leader_entry_point = NodeInfo::new_entry_point(leader_ncp);
crdt.write().unwrap().insert(&leader_entry_point);
@ -401,7 +390,7 @@ mod tests {
use super::*;
use bank::Bank;
use budget::Budget;
use crdt::TestNode;
use crdt::Node;
use fullnode::Fullnode;
use ledger::LedgerWriter;
use logger;
@ -430,7 +419,7 @@ mod tests {
fn test_thin_client() {
logger::setup();
let leader_keypair = Keypair::new();
let leader = TestNode::new_localhost_with_pubkey(leader_keypair.pubkey());
let leader = Node::new_localhost_with_pubkey(leader_keypair.pubkey());
let leader_data = leader.data.clone();
let alice = Mint::new(10_000);
@ -479,7 +468,7 @@ mod tests {
fn test_bad_sig() {
logger::setup();
let leader_keypair = Keypair::new();
let leader = TestNode::new_localhost_with_pubkey(leader_keypair.pubkey());
let leader = Node::new_localhost_with_pubkey(leader_keypair.pubkey());
let alice = Mint::new(10_000);
let bank = Bank::new(&alice);
let bob_pubkey = Keypair::new().pubkey();
@ -539,7 +528,7 @@ mod tests {
fn test_client_check_signature() {
logger::setup();
let leader_keypair = Keypair::new();
let leader = TestNode::new_localhost_with_pubkey(leader_keypair.pubkey());
let leader = Node::new_localhost_with_pubkey(leader_keypair.pubkey());
let alice = Mint::new(10_000);
let bank = Bank::new(&alice);
let bob_pubkey = Keypair::new().pubkey();

View File

@ -65,7 +65,7 @@ impl Tpu {
let packet_recycler = PacketRecycler::default();
let (fetch_stage, packet_receiver) =
FetchStage::new(transactions_socket, exit, &packet_recycler);
FetchStage::new(Arc::new(transactions_socket), exit, &packet_recycler);
let (sigverify_stage, verified_receiver) =
SigVerifyStage::new(packet_receiver, sigverify_disabled);

View File

@ -83,7 +83,7 @@ impl Tvu {
) -> Self {
let blob_recycler = BlobRecycler::default();
let (fetch_stage, blob_fetch_receiver) = BlobFetchStage::new_multi_socket(
vec![replicate_socket, repair_socket],
vec![Arc::new(replicate_socket), Arc::new(repair_socket)],
exit.clone(),
&blob_recycler,
);
@ -94,7 +94,7 @@ impl Tvu {
&crdt,
window,
entry_height,
retransmit_socket,
Arc::new(retransmit_socket),
&blob_recycler,
blob_fetch_receiver,
);
@ -143,7 +143,7 @@ impl Service for Tvu {
pub mod tests {
use bank::Bank;
use bincode::serialize;
use crdt::{Crdt, TestNode};
use crdt::{Crdt, Node};
use entry::Entry;
use hash::{hash, Hash};
use logger;
@ -166,12 +166,11 @@ pub mod tests {
fn new_ncp(
crdt: Arc<RwLock<Crdt>>,
listen: UdpSocket,
gossip: UdpSocket,
exit: Arc<AtomicBool>,
) -> Result<(Ncp, SharedWindow)> {
let window = window::default_window();
let send_sock = UdpSocket::bind("0.0.0.0:0").expect("bind 0");
let ncp = Ncp::new(&crdt, window.clone(), None, listen, send_sock, exit)?;
let ncp = Ncp::new(&crdt, window.clone(), None, gossip, exit)?;
Ok((ncp, window))
}
@ -179,10 +178,10 @@ pub mod tests {
#[test]
fn test_replicate() {
logger::setup();
let leader = TestNode::new_localhost();
let leader = Node::new_localhost();
let target1_keypair = Keypair::new();
let target1 = TestNode::new_localhost_with_pubkey(target1_keypair.pubkey());
let target2 = TestNode::new_localhost();
let target1 = Node::new_localhost_with_pubkey(target1_keypair.pubkey());
let target2 = Node::new_localhost();
let exit = Arc::new(AtomicBool::new(false));
//start crdt_leader
@ -207,9 +206,9 @@ pub mod tests {
let resp_recycler = BlobRecycler::default();
let (s_reader, r_reader) = channel();
let t_receiver = streamer::blob_receiver(
Arc::new(target2.sockets.replicate),
exit.clone(),
recv_recycler.clone(),
target2.sockets.replicate,
s_reader,
).unwrap();
@ -217,7 +216,7 @@ pub mod tests {
let (s_responder, r_responder) = channel();
let t_responder = streamer::responder(
"test_replicate",
leader.sockets.requests,
Arc::new(leader.sockets.requests),
resp_recycler.clone(),
r_responder,
);

View File

@ -232,7 +232,7 @@ pub mod tests {
use super::*;
use bank::Bank;
use bincode::deserialize;
use crdt::{Crdt, NodeInfo, TestNode};
use crdt::{Crdt, Node, NodeInfo};
use entry::next_entry;
use hash::{hash, Hash};
use logger;
@ -253,7 +253,7 @@ pub mod tests {
let mint = Mint::new(1234);
let bank = Arc::new(Bank::new(&mint));
let node = TestNode::new_localhost();
let node = Node::new_localhost();
let mut crdt = Crdt::new(node.data.clone()).expect("Crdt::new");
crdt.set_leader(node.data.id);
let blob_recycler = BlobRecycler::default();

View File

@ -93,6 +93,8 @@ fn calculate_highest_lost_blob_index(num_peers: u64, consumed: u64, received: u6
cmp::min(consumed + WINDOW_SIZE - 1, highest_lost)
}
pub const MAX_REPAIR_BACKOFF: usize = 128;
fn repair_backoff(last: &mut u64, times: &mut usize, consumed: u64) -> bool {
//exponential backoff
if *last != consumed {
@ -105,9 +107,9 @@ fn repair_backoff(last: &mut u64, times: &mut usize, consumed: u64) -> bool {
// Experiment with capping repair request duration.
// Once nodes are too far behind they can spend many
// seconds without asking for repair
if *times > 128 {
if *times > MAX_REPAIR_BACKOFF {
// 50% chance that a request will fire between 64 - 128 tries
*times = 64;
*times = MAX_REPAIR_BACKOFF / 2;
}
//if we get lucky, make the request, which should exponentially get less likely
@ -126,6 +128,7 @@ fn repair_window(
) -> Result<()> {
//exponential backoff
if !repair_backoff(last, times, consumed) {
trace!("{:x} !repair_backoff() times = {}", debug_id, times);
return Ok(());
}
@ -691,7 +694,7 @@ pub fn window(
#[cfg(test)]
mod test {
use crdt::{Crdt, TestNode};
use crdt::{Crdt, Node};
use logger;
use packet::{Blob, BlobRecycler, Packet, PacketRecycler, Packets, PACKET_DATA_SIZE};
use std::collections::VecDeque;
@ -737,12 +740,17 @@ mod test {
let pack_recycler = PacketRecycler::default();
let resp_recycler = BlobRecycler::default();
let (s_reader, r_reader) = channel();
let t_receiver = receiver(read, exit.clone(), pack_recycler.clone(), s_reader);
let t_receiver = receiver(
Arc::new(read),
exit.clone(),
pack_recycler.clone(),
s_reader,
);
let t_responder = {
let (s_responder, r_responder) = channel();
let t_responder = responder(
"streamer_send_test",
send,
Arc::new(send),
resp_recycler.clone(),
r_responder,
);
@ -790,7 +798,7 @@ mod test {
#[test]
pub fn window_send_test() {
logger::setup();
let tn = TestNode::new_localhost();
let tn = Node::new_localhost();
let exit = Arc::new(AtomicBool::new(false));
let mut crdt_me = Crdt::new(tn.data.clone()).expect("Crdt::new");
let me_id = crdt_me.my_data().id;
@ -800,9 +808,9 @@ mod test {
let resp_recycler = BlobRecycler::default();
let (s_reader, r_reader) = channel();
let t_receiver = blob_receiver(
Arc::new(tn.sockets.gossip),
exit.clone(),
resp_recycler.clone(),
tn.sockets.gossip,
s_reader,
).unwrap();
let (s_window, r_window) = channel();
@ -821,7 +829,7 @@ mod test {
let (s_responder, r_responder) = channel();
let t_responder = responder(
"window_send_test",
tn.sockets.replicate,
Arc::new(tn.sockets.replicate),
resp_recycler.clone(),
r_responder,
);
@ -860,7 +868,7 @@ mod test {
#[test]
pub fn window_send_no_leader_test() {
logger::setup();
let tn = TestNode::new_localhost();
let tn = Node::new_localhost();
let exit = Arc::new(AtomicBool::new(false));
let crdt_me = Crdt::new(tn.data.clone()).expect("Crdt::new");
let me_id = crdt_me.my_data().id;
@ -869,9 +877,9 @@ mod test {
let resp_recycler = BlobRecycler::default();
let (s_reader, r_reader) = channel();
let t_receiver = blob_receiver(
Arc::new(tn.sockets.gossip),
exit.clone(),
resp_recycler.clone(),
tn.sockets.gossip,
s_reader,
).unwrap();
let (s_window, _r_window) = channel();
@ -890,7 +898,7 @@ mod test {
let (s_responder, r_responder) = channel();
let t_responder = responder(
"window_send_test",
tn.sockets.replicate,
Arc::new(tn.sockets.replicate),
resp_recycler.clone(),
r_responder,
);
@ -922,7 +930,7 @@ mod test {
#[test]
pub fn window_send_late_leader_test() {
logger::setup();
let tn = TestNode::new_localhost();
let tn = Node::new_localhost();
let exit = Arc::new(AtomicBool::new(false));
let crdt_me = Crdt::new(tn.data.clone()).expect("Crdt::new");
let me_id = crdt_me.my_data().id;
@ -931,9 +939,9 @@ mod test {
let resp_recycler = BlobRecycler::default();
let (s_reader, r_reader) = channel();
let t_receiver = blob_receiver(
Arc::new(tn.sockets.gossip),
exit.clone(),
resp_recycler.clone(),
tn.sockets.gossip,
s_reader,
).unwrap();
let (s_window, _r_window) = channel();
@ -952,7 +960,7 @@ mod test {
let (s_responder, r_responder) = channel();
let t_responder = responder(
"window_send_test",
tn.sockets.replicate,
Arc::new(tn.sockets.replicate),
resp_recycler.clone(),
r_responder,
);

View File

@ -81,7 +81,7 @@ impl WriteStage {
let send = UdpSocket::bind("0.0.0.0:0").expect("bind");
let t_responder = responder(
"write_stage_vote_sender",
send,
Arc::new(send),
blob_recycler.clone(),
vote_blob_receiver,
);

View File

@ -4,7 +4,7 @@ extern crate rayon;
extern crate solana;
use rayon::iter::*;
use solana::crdt::{Crdt, TestNode};
use solana::crdt::{Crdt, Node};
use solana::logger;
use solana::ncp::Ncp;
use solana::packet::Blob;
@ -17,18 +17,11 @@ use std::thread::sleep;
use std::time::Duration;
fn test_node(exit: Arc<AtomicBool>) -> (Arc<RwLock<Crdt>>, Ncp, UdpSocket) {
let tn = TestNode::new_localhost();
let tn = Node::new_localhost();
let crdt = Crdt::new(tn.data.clone()).expect("Crdt::new");
let c = Arc::new(RwLock::new(crdt));
let w = Arc::new(RwLock::new(vec![]));
let d = Ncp::new(
&c.clone(),
w,
None,
tn.sockets.gossip,
tn.sockets.gossip_send,
exit,
).unwrap();
let d = Ncp::new(&c.clone(), w, None, tn.sockets.gossip, exit).unwrap();
(c, d, tn.sockets.replicate)
}

View File

@ -5,7 +5,7 @@ extern crate chrono;
extern crate serde_json;
extern crate solana;
use solana::crdt::{Crdt, NodeInfo, TestNode};
use solana::crdt::{Crdt, Node, NodeInfo};
use solana::entry::Entry;
use solana::fullnode::Fullnode;
use solana::hash::Hash;
@ -32,7 +32,7 @@ use std::time::{Duration, Instant};
fn converge(leader: &NodeInfo, num_nodes: usize) -> Vec<NodeInfo> {
//lets spy on the network
let exit = Arc::new(AtomicBool::new(false));
let mut spy = TestNode::new_localhost();
let mut spy = Node::new_localhost();
let daddr = "0.0.0.0:0".parse().unwrap();
let me = spy.data.id.clone();
spy.data.contact_info.tvu = daddr;
@ -42,14 +42,7 @@ fn converge(leader: &NodeInfo, num_nodes: usize) -> Vec<NodeInfo> {
spy_crdt.set_leader(leader.id);
let spy_ref = Arc::new(RwLock::new(spy_crdt));
let spy_window = default_window();
let ncp = Ncp::new(
&spy_ref,
spy_window,
None,
spy.sockets.gossip,
spy.sockets.gossip_send,
exit.clone(),
).unwrap();
let ncp = Ncp::new(&spy_ref, spy_window, None, spy.sockets.gossip, exit.clone()).unwrap();
//wait for the network to converge
let mut converged = false;
let mut rv = vec![];
@ -125,7 +118,7 @@ fn test_multi_node_ledger_window() -> result::Result<()> {
let leader_keypair = Keypair::new();
let leader_pubkey = leader_keypair.pubkey().clone();
let leader = TestNode::new_localhost_with_pubkey(leader_keypair.pubkey());
let leader = Node::new_localhost_with_pubkey(leader_keypair.pubkey());
let leader_data = leader.data.clone();
let bob_pubkey = Keypair::new().pubkey();
let mut ledger_paths = Vec::new();
@ -156,7 +149,7 @@ fn test_multi_node_ledger_window() -> result::Result<()> {
// start up another validator from zero, converge and then check
// balances
let keypair = Keypair::new();
let validator = TestNode::new_localhost_with_pubkey(keypair.pubkey());
let validator = Node::new_localhost_with_pubkey(keypair.pubkey());
let validator_data = validator.data.clone();
let validator = Fullnode::new(
validator,
@ -205,7 +198,7 @@ fn test_multi_node_validator_catchup_from_zero() -> result::Result<()> {
trace!("test_multi_node_validator_catchup_from_zero");
let leader_keypair = Keypair::new();
let leader_pubkey = leader_keypair.pubkey().clone();
let leader = TestNode::new_localhost_with_pubkey(leader_keypair.pubkey());
let leader = Node::new_localhost_with_pubkey(leader_keypair.pubkey());
let leader_data = leader.data.clone();
let bob_pubkey = Keypair::new().pubkey();
let mut ledger_paths = Vec::new();
@ -229,7 +222,7 @@ fn test_multi_node_validator_catchup_from_zero() -> result::Result<()> {
let mut nodes = vec![server];
for _ in 0..N {
let keypair = Keypair::new();
let validator = TestNode::new_localhost_with_pubkey(keypair.pubkey());
let validator = Node::new_localhost_with_pubkey(keypair.pubkey());
let ledger_path = tmp_copy_ledger(
&leader_ledger_path,
"multi_node_validator_catchup_from_zero_validator",
@ -270,7 +263,7 @@ fn test_multi_node_validator_catchup_from_zero() -> result::Result<()> {
// start up another validator from zero, converge and then check everyone's
// balances
let keypair = Keypair::new();
let validator = TestNode::new_localhost_with_pubkey(keypair.pubkey());
let validator = Node::new_localhost_with_pubkey(keypair.pubkey());
let val = Fullnode::new(
validator,
&zero_ledger_path,
@ -329,7 +322,7 @@ fn test_multi_node_basic() {
trace!("test_multi_node_basic");
let leader_keypair = Keypair::new();
let leader_pubkey = leader_keypair.pubkey().clone();
let leader = TestNode::new_localhost_with_pubkey(leader_keypair.pubkey());
let leader = Node::new_localhost_with_pubkey(leader_keypair.pubkey());
let leader_data = leader.data.clone();
let bob_pubkey = Keypair::new().pubkey();
let mut ledger_paths = Vec::new();
@ -346,7 +339,7 @@ fn test_multi_node_basic() {
let mut nodes = vec![server];
for _ in 0..N {
let keypair = Keypair::new();
let validator = TestNode::new_localhost_with_pubkey(keypair.pubkey());
let validator = Node::new_localhost_with_pubkey(keypair.pubkey());
let ledger_path = tmp_copy_ledger(&leader_ledger_path, "multi_node_basic");
ledger_paths.push(ledger_path.clone());
let val = Fullnode::new(
@ -390,7 +383,7 @@ fn test_multi_node_basic() {
fn test_boot_validator_from_file() -> result::Result<()> {
logger::setup();
let leader_keypair = Keypair::new();
let leader = TestNode::new_localhost_with_pubkey(leader_keypair.pubkey());
let leader = Node::new_localhost_with_pubkey(leader_keypair.pubkey());
let bob_pubkey = Keypair::new().pubkey();
let (alice, leader_ledger_path) = genesis("boot_validator_from_file", 100_000);
let mut ledger_paths = Vec::new();
@ -406,7 +399,7 @@ fn test_boot_validator_from_file() -> result::Result<()> {
assert_eq!(leader_balance, 1000);
let keypair = Keypair::new();
let validator = TestNode::new_localhost_with_pubkey(keypair.pubkey());
let validator = Node::new_localhost_with_pubkey(keypair.pubkey());
let validator_data = validator.data.clone();
let ledger_path = tmp_copy_ledger(&leader_ledger_path, "boot_validator_from_file");
ledger_paths.push(ledger_path.clone());
@ -432,7 +425,7 @@ fn test_boot_validator_from_file() -> result::Result<()> {
fn create_leader(ledger_path: &str) -> (NodeInfo, Fullnode) {
let leader_keypair = Keypair::new();
let leader = TestNode::new_localhost_with_pubkey(leader_keypair.pubkey());
let leader = Node::new_localhost_with_pubkey(leader_keypair.pubkey());
let leader_data = leader.data.clone();
let leader_fullnode = Fullnode::new(leader, &ledger_path, leader_keypair, None, false);
(leader_data, leader_fullnode)
@ -445,7 +438,10 @@ fn test_leader_restart_validator_start_from_old_ledger() -> result::Result<()> {
// ledger (currently up to WINDOW_SIZE entries)
logger::setup();
let (alice, ledger_path) = genesis("leader_restart_validator_start_from_old_ledger", 100_000);
let (alice, ledger_path) = genesis(
"leader_restart_validator_start_from_old_ledger",
100_000 + 500 * solana::window::MAX_REPAIR_BACKOFF as i64,
);
let bob_pubkey = Keypair::new().pubkey();
let (leader_data, leader_fullnode) = create_leader(&ledger_path);
@ -476,7 +472,7 @@ fn test_leader_restart_validator_start_from_old_ledger() -> result::Result<()> {
// start validator from old ledger
let keypair = Keypair::new();
let validator = TestNode::new_localhost_with_pubkey(keypair.pubkey());
let validator = Node::new_localhost_with_pubkey(keypair.pubkey());
let validator_data = validator.data.clone();
let val_fullnode = Fullnode::new(
@ -492,15 +488,17 @@ fn test_leader_restart_validator_start_from_old_ledger() -> result::Result<()> {
// send requests so the validator eventually sees a gap and requests a repair
let mut expected = 1500;
let mut client = mk_client(&validator_data);
for _ in 0..10 {
for _ in 0..solana::window::MAX_REPAIR_BACKOFF {
let leader_balance =
send_tx_and_retry_get_balance(&leader_data, &alice, &bob_pubkey, Some(expected))
.unwrap();
assert_eq!(leader_balance, expected);
let getbal = retry_get_balance(&mut client, &bob_pubkey, Some(leader_balance));
if getbal == Some(leader_balance) {
break;
}
expected += 500;
}
let getbal = retry_get_balance(&mut client, &bob_pubkey, Some(expected));
@ -530,7 +528,7 @@ fn test_multi_node_dynamic_network() {
let leader_keypair = Keypair::new();
let leader_pubkey = leader_keypair.pubkey().clone();
let leader = TestNode::new_localhost_with_pubkey(leader_keypair.pubkey());
let leader = Node::new_localhost_with_pubkey(leader_keypair.pubkey());
let bob_pubkey = Keypair::new().pubkey();
let (alice, leader_ledger_path) = genesis("multi_node_dynamic_network", 10_000_000);
@ -605,7 +603,7 @@ fn test_multi_node_dynamic_network() {
Builder::new()
.name("validator-launch-thread".to_string())
.spawn(move || {
let validator = TestNode::new_localhost_with_pubkey(keypair.pubkey());
let validator = Node::new_localhost_with_pubkey(keypair.pubkey());
let rd = validator.data.clone();
info!("starting {} {:x}", keypair.pubkey(), rd.debug_id());
let val = Fullnode::new(