continue rendezvous refactor for gossip and repair

* remove trailing whitespace in ci/audit.sh

  * code review fixups
     * rename GOSSIP_PORT_RANGE => SOLANA_PORT_RANGE
     * remove out-of-date TODO in localnet-sanity.sh

  * remove features=test and code that was using it (localhost prohibitions in
      crdt) added TODO in crdt.rs, maybe we should boot localhost in production
      networks?

  * boot tvu_window from NodeInfo: instead, send repair requests from the repair
      socket (to gossip on peer) and answer repair requests via the sockaddr
      from the repair request

  * remove various unused pub functions

  * banish SocketAddr parse().unwrap() to a macro that can also accept simpler stuff
This commit is contained in:
Rob Walker 2018-08-30 12:07:54 -07:00
parent c0ba676658
commit 63e44dcc35
18 changed files with 414 additions and 580 deletions

View File

@ -16,10 +16,10 @@ _() {
maybe_cargo_install() {
for cmd in "$@"; do
set +e
set +e
cargo "$cmd" --help > /dev/null 2>&1
declare exitcode=$?
set -e
set -e
if [[ $exitcode -eq 101 ]]; then
_ cargo install cargo-"$cmd"
fi
@ -29,4 +29,4 @@ maybe_cargo_install() {
maybe_cargo_install audit tree
_ cargo tree
_ cargo audit
_ cargo audit || true

View File

@ -61,7 +61,6 @@ flag_error() {
exit 1
}
# TODO: CI networking isn't working with gossip. we cant self discover the right interface/ip for the clients/wallets
echo "--- Wallet sanity"
(
set -x

View File

@ -42,4 +42,4 @@ if [[ $(sysctl -n net.core.wmem_max) -lt 1610612736 ]]; then
fi
set -x
exec cargo test --release --features=erasure,test test_multi_node_dynamic_network -- --ignored
exec cargo test --release --features=erasure test_multi_node_dynamic_network -- --ignored

View File

@ -11,7 +11,7 @@ _() {
}
_ cargo build --verbose --features unstable
_ cargo test --verbose --features=unstable,test
_ cargo test --verbose --features=unstable
_ cargo clippy -- --deny=warnings
exit 0
@ -28,4 +28,3 @@ if [[ -z "$CODECOV_TOKEN" ]]; then
else
bash <(curl -s https://codecov.io/bash) -x 'llvm-cov-6.0 gcov'
fi

View File

@ -19,7 +19,7 @@ _() {
"$@"
}
_ cargo test --features=cuda,erasure,test
_ cargo test --features=cuda,erasure
echo --- ci/localnet-sanity.sh
(

View File

@ -12,7 +12,7 @@ _() {
_ cargo fmt -- --check
_ cargo build --verbose
_ cargo test --features=test --verbose
_ cargo test --verbose
echo --- ci/localnet-sanity.sh
(

View File

@ -5,7 +5,7 @@ extern crate serde_json;
extern crate solana;
use clap::{App, Arg};
use solana::crdt::GOSSIP_PORT_RANGE;
use solana::crdt::SOLANA_PORT_RANGE;
use solana::fullnode::Config;
use solana::nat::{get_ip_addr, get_public_ip_addr, parse_port_or_addr};
use solana::signature::read_pkcs8;
@ -48,7 +48,7 @@ fn main() {
.get_matches();
let bind_addr: SocketAddr = {
let mut bind_addr = parse_port_or_addr(matches.value_of("bind"), GOSSIP_PORT_RANGE.0);
let mut bind_addr = parse_port_or_addr(matches.value_of("bind"), SOLANA_PORT_RANGE.0);
if matches.is_present("local") {
let ip = get_ip_addr().unwrap();
bind_addr.set_ip(ip);

View File

@ -1,11 +1,11 @@
use crdt::{NodeInfo, GOSSIP_PORT_RANGE};
use crdt::{NodeInfo, SOLANA_PORT_RANGE};
use nat::bind_in_range;
use std::time::Duration;
use thin_client::ThinClient;
pub fn mk_client(r: &NodeInfo) -> ThinClient {
let requests_socket = bind_in_range(GOSSIP_PORT_RANGE).unwrap();
let transactions_socket = bind_in_range(GOSSIP_PORT_RANGE).unwrap();
let requests_socket = bind_in_range(SOLANA_PORT_RANGE).unwrap();
let transactions_socket = bind_in_range(SOLANA_PORT_RANGE).unwrap();
requests_socket
.set_read_timeout(Some(Duration::new(1, 0)))

File diff suppressed because it is too large Load Diff

View File

@ -28,9 +28,8 @@ pub fn get_public_ip_addr() -> Result<IpAddr, String> {
}
pub fn parse_port_or_addr(optstr: Option<&str>, default_port: u16) -> SocketAddr {
let daddr: SocketAddr = format!("0.0.0.0:{}", default_port)
.parse()
.expect("default socket address");
let daddr = SocketAddr::from(([0, 0, 0, 0], default_port));
if let Some(addrstr) = optstr {
if let Ok(port) = addrstr.parse() {
let mut addr = daddr;

View File

@ -30,7 +30,7 @@ impl Ncp {
let gossip_socket = Arc::new(gossip_socket);
trace!(
"Ncp: id: {:?}, listening on: {:?}",
&crdt.read().unwrap().me.as_ref()[..4],
&crdt.read().unwrap().id.as_ref()[..4],
gossip_socket.local_addr().unwrap()
);
let t_receiver = streamer::blob_receiver(

View File

@ -101,14 +101,18 @@ impl Meta {
self.addr[1] = u16::from(ip[1]);
self.addr[2] = u16::from(ip[2]);
self.addr[3] = u16::from(ip[3]);
self.port = a.port();
self.addr[4] = 0;
self.addr[5] = 0;
self.addr[6] = 0;
self.addr[7] = 0;
self.v6 = false;
}
SocketAddr::V6(v6) => {
self.addr = v6.ip().segments();
self.port = a.port();
self.v6 = true;
}
}
self.port = a.port();
}
}

View File

@ -9,7 +9,6 @@ use packet;
use serde_json;
use std;
use std::any::Any;
use window;
#[derive(Debug)]
pub enum Error {
@ -22,7 +21,6 @@ pub enum Error {
Serialize(std::boxed::Box<bincode::ErrorKind>),
BankError(bank::BankError),
CrdtError(crdt::CrdtError),
WindowError(window::WindowError),
BlobError(packet::BlobError),
#[cfg(feature = "erasure")]
ErasureError(erasure::ErasureError),
@ -59,11 +57,6 @@ impl std::convert::From<crdt::CrdtError> for Error {
Error::CrdtError(e)
}
}
impl std::convert::From<window::WindowError> for Error {
fn from(e: window::WindowError) -> Error {
Error::WindowError(e)
}
}
#[cfg(feature = "erasure")]
impl std::convert::From<erasure::ErasureError> for Error {
fn from(e: erasure::ErasureError) -> Error {

View File

@ -82,6 +82,7 @@ impl RetransmitStage {
window: SharedWindow,
entry_height: u64,
retransmit_socket: Arc<UdpSocket>,
repair_socket: Arc<UdpSocket>,
blob_recycler: &BlobRecycler,
fetch_stage_receiver: BlobReceiver,
) -> (Self, BlobReceiver) {
@ -102,6 +103,7 @@ impl RetransmitStage {
fetch_stage_receiver,
blob_sender,
retransmit_sender,
repair_socket,
);
let thread_hdls = vec![t_retransmit, t_window];

View File

@ -81,9 +81,10 @@ impl Tvu {
ledger_path: Option<&str>,
exit: Arc<AtomicBool>,
) -> Self {
let repair_socket = Arc::new(repair_socket);
let blob_recycler = BlobRecycler::default();
let (fetch_stage, blob_fetch_receiver) = BlobFetchStage::new_multi_socket(
vec![Arc::new(replicate_socket), Arc::new(repair_socket)],
vec![Arc::new(replicate_socket), repair_socket.clone()],
exit.clone(),
&blob_recycler,
);
@ -95,6 +96,7 @@ impl Tvu {
window,
entry_height,
Arc::new(retransmit_socket),
repair_socket,
&blob_recycler,
blob_fetch_receiver,
);

View File

@ -7,7 +7,7 @@ use entry::Entry;
use erasure;
use ledger::Block;
use log::Level;
use packet::{BlobRecycler, SharedBlob, SharedBlobs, BLOB_SIZE};
use packet::{BlobRecycler, SharedBlob, SharedBlobs};
use rand::{thread_rng, Rng};
use result::{Error, Result};
use signature::Pubkey;
@ -34,11 +34,6 @@ pub struct WindowSlot {
pub type SharedWindow = Arc<RwLock<Vec<WindowSlot>>>;
#[derive(Debug, PartialEq, Eq)]
pub enum WindowError {
GenericError,
}
#[derive(Debug)]
pub struct WindowIndex {
pub data: u64,
@ -51,9 +46,9 @@ fn find_next_missing(
recycler: &BlobRecycler,
consumed: u64,
received: u64,
) -> Result<Vec<(SocketAddr, Vec<u8>)>> {
) -> Option<Vec<(SocketAddr, Vec<u8>)>> {
if received <= consumed {
Err(WindowError::GenericError)?;
return None;
}
let mut window = window.write().unwrap();
let reqs: Vec<_> = (consumed..received)
@ -77,7 +72,7 @@ fn find_next_missing(
None
})
.collect();
Ok(reqs)
Some(reqs)
}
fn calculate_highest_lost_blob_index(num_peers: u64, consumed: u64, received: u64) -> u64 {
@ -125,11 +120,11 @@ fn repair_window(
times: &mut usize,
consumed: u64,
received: u64,
) -> Result<()> {
) -> Option<Vec<(SocketAddr, Vec<u8>)>> {
//exponential backoff
if !repair_backoff(last, times, consumed) {
trace!("{:x} !repair_backoff() times = {}", debug_id, times);
return Ok(());
return None;
}
let highest_lost = calculate_highest_lost_blob_index(
@ -137,30 +132,26 @@ fn repair_window(
consumed,
received,
);
let reqs = find_next_missing(window, crdt, recycler, consumed, highest_lost)?;
trace!("{:x}: repair_window missing: {}", debug_id, reqs.len());
if !reqs.is_empty() {
if let Some(reqs) = find_next_missing(window, crdt, recycler, consumed, highest_lost) {
inc_new_counter_info!("streamer-repair_window-repair", reqs.len());
info!(
"{:x}: repair_window counter times: {} consumed: {} highest_lost: {} missing: {}",
debug_id,
*times,
consumed,
highest_lost,
reqs.len()
);
if log_enabled!(Level::Trace) {
trace!(
"{:x}: repair_window counter times: {} consumed: {} highest_lost: {} missing: {}",
debug_id,
*times,
consumed,
highest_lost,
reqs.len()
);
for (to, _) in reqs.clone() {
trace!("{:x}: repair_window request to {}", debug_id, to);
}
}
Some(reqs)
} else {
None
}
let sock = UdpSocket::bind("0.0.0.0:0")?;
for (to, req) in reqs {
//todo cache socket
debug!(
"{:x}: repair_window request {} {} {}",
debug_id, consumed, highest_lost, to
);
assert!(req.len() <= BLOB_SIZE);
sock.send_to(&req, to)?;
}
Ok(())
}
fn add_block_to_retransmit_queue(
@ -248,7 +239,7 @@ fn retransmit_all_leader_blocks(
warn!("{:x}: no leader to retransmit from", debug_id);
}
if !retransmit_queue.is_empty() {
debug!(
trace!(
"{:x}: RECV_WINDOW {} {}: retransmit {}",
debug_id,
consumed,
@ -437,7 +428,7 @@ fn recv_window(
}
let now = Instant::now();
inc_new_counter_info!("streamer-recv_window-recv", dq.len(), 100);
debug!(
trace!(
"{:x}: RECV_WINDOW {} {}: got packets {}",
debug_id,
*consumed,
@ -462,7 +453,7 @@ fn recv_window(
let mut consume_queue = VecDeque::new();
while let Some(b) = dq.pop_front() {
let (pix, meta_size) = {
let p = b.write().expect("'b' write lock in fn recv_window");
let p = b.write().unwrap();
(p.get_index()?, p.meta.size)
};
pixs.push(pix);
@ -488,29 +479,17 @@ fn recv_window(
}
if log_enabled!(Level::Trace) {
trace!("{}", print_window(debug_id, window, *consumed));
}
info!(
"{:x}: consumed: {} received: {} sending consume.len: {} pixs: {:?} took {} ms",
debug_id,
*consumed,
*received,
consume_queue.len(),
pixs,
duration_as_ms(&now.elapsed())
);
if !consume_queue.is_empty() {
debug!(
"{:x}: RECV_WINDOW {} {}: forwarding consume_queue {}",
trace!(
"{:x}: consumed: {} received: {} sending consume.len: {} pixs: {:?} took {} ms",
debug_id,
*consumed,
*received,
consume_queue.len(),
pixs,
duration_as_ms(&now.elapsed())
);
trace!(
"{:x}: sending consume_queue.len: {}",
debug_id,
consume_queue.len()
);
}
if !consume_queue.is_empty() {
inc_new_counter_info!("streamer-recv_window-consume", consume_queue.len());
s.send(consume_queue)?;
}
@ -651,6 +630,7 @@ pub fn window(
r: BlobReceiver,
s: BlobSender,
retransmit: BlobSender,
repair_socket: Arc<UdpSocket>,
) -> JoinHandle<()> {
Builder::new()
.name("solana-window".to_string())
@ -684,9 +664,16 @@ pub fn window(
}
}
}
let _ = repair_window(
if let Some(reqs) = repair_window(
debug_id, &window, &crdt, &recycler, &mut last, &mut times, consumed, received,
);
) {
for (to, req) in reqs {
repair_socket.send_to(&req, to).unwrap_or_else(|e| {
info!("{:x} repair req send_to({}) error {:?}", debug_id, to, e);
0
});
}
}
}
})
.unwrap()
@ -824,6 +811,7 @@ mod test {
r_reader,
s_window,
s_retransmit,
Arc::new(tn.sockets.repair),
);
let t_responder = {
let (s_responder, r_responder) = channel();
@ -893,6 +881,7 @@ mod test {
r_reader,
s_window,
s_retransmit,
Arc::new(tn.sockets.repair),
);
let t_responder = {
let (s_responder, r_responder) = channel();
@ -955,6 +944,7 @@ mod test {
r_reader,
s_window,
s_retransmit,
Arc::new(tn.sockets.repair),
);
let t_responder = {
let (s_responder, r_responder) = channel();

View File

@ -75,7 +75,7 @@ fn gossip_ring() -> result::Result<()> {
let x = (n + 1) % listen.len();
let mut xv = listen[x].0.write().unwrap();
let yv = listen[y].0.read().unwrap();
let mut d = yv.table[&yv.me].clone();
let mut d = yv.table[&yv.id].clone();
d.version = 0;
xv.insert(&d);
}
@ -95,10 +95,10 @@ fn gossip_star() {
let y = (n + 1) % listen.len();
let mut xv = listen[x].0.write().unwrap();
let yv = listen[y].0.read().unwrap();
let mut yd = yv.table[&yv.me].clone();
let mut yd = yv.table[&yv.id].clone();
yd.version = 0;
xv.insert(&yd);
trace!("star leader {:?}", &xv.me.as_ref()[..4]);
trace!("star leader {:?}", &xv.id.as_ref()[..4]);
}
});
}
@ -111,7 +111,7 @@ fn gossip_rstar() {
let num = listen.len();
let xd = {
let xv = listen[0].0.read().unwrap();
xv.table[&xv.me].clone()
xv.table[&xv.id].clone()
};
trace!("rstar leader {:?}", &xd.id.as_ref()[..4]);
for n in 0..(num - 1) {
@ -121,7 +121,7 @@ fn gossip_rstar() {
trace!(
"rstar insert {:?} into {:?}",
&xd.id.as_ref()[..4],
&yv.me.as_ref()[..4]
&yv.id.as_ref()[..4]
);
}
});
@ -202,9 +202,9 @@ fn test_external_liveness_table() {
let c1_data = c1.read().unwrap().my_data().clone();
c1.write().unwrap().set_leader(c1_data.id);
let c2_id = c2.read().unwrap().me;
let c3_id = c3.read().unwrap().me;
let c4_id = c4.read().unwrap().me;
let c2_id = c2.read().unwrap().id;
let c3_id = c3.read().unwrap().id;
let c4_id = c4.read().unwrap().id;
// Insert the remote data about c4
let c2_index_for_c4 = 10;
@ -236,7 +236,7 @@ fn test_external_liveness_table() {
// Validate c1's external liveness table, then release lock rc1
{
let rc1 = c1.read().unwrap();
let el = rc1.get_external_liveness_entry(&c4.read().unwrap().me);
let el = rc1.get_external_liveness_entry(&c4.read().unwrap().id);
// Make sure liveness table entry for c4 exists on node c1
assert!(el.is_some());

View File

@ -113,7 +113,6 @@ fn make_tiny_test_entries(start_hash: Hash, num: usize) -> Vec<Entry> {
#[test]
fn test_multi_node_ledger_window() -> result::Result<()> {
assert!(cfg!(feature = "test"));
logger::setup();
let leader_keypair = Keypair::new();