diff --git a/src/bin/client-demo.rs b/src/bin/client-demo.rs old mode 100755 new mode 100644 index d9627031f..2eeb58aeb --- a/src/bin/client-demo.rs +++ b/src/bin/client-demo.rs @@ -51,17 +51,20 @@ fn sample_tx_count( now = Instant::now(); let sample = tx_count - initial_tx_count; initial_tx_count = tx_count; - println!("{}: Transactions processed {}", v.transactions_addr, sample); + println!( + "{}: Transactions processed {}", + v.addrs.transactions, sample + ); let ns = duration.as_secs() * 1_000_000_000 + u64::from(duration.subsec_nanos()); let tps = (sample * 1_000_000_000) as f64 / ns as f64; if tps > max_tps { max_tps = tps; } - println!("{}: {:.2} tps", v.transactions_addr, tps); + println!("{}: {:.2} tps", v.addrs.transactions, tps); total = tx_count - first_count; println!( "{}: Total Transactions processed {}", - v.transactions_addr, total + v.addrs.transactions, total ); sleep(Duration::new(sample_period, 0)); @@ -113,7 +116,7 @@ fn generate_and_send_txs( println!( "Transferring 1 unit {} times... to {:?}", txs.len(), - leader.transactions_addr + leader.addrs.transactions ); for tx in txs { client.transfer_signed(tx.clone()).unwrap(); @@ -212,7 +215,7 @@ fn main() { time_sec = s.to_string().parse().expect("integer"); } - let mut drone_addr = leader.transactions_addr.clone(); + let mut drone_addr = leader.addrs.transactions.clone(); drone_addr.set_port(9900); let signal = Arc::new(AtomicBool::new(false)); @@ -327,9 +330,9 @@ fn mk_client(r: &ReplicatedData) -> ThinClient { .unwrap(); ThinClient::new( - r.requests_addr, + r.addrs.requests, requests_socket, - r.transactions_addr, + r.addrs.transactions, transactions_socket, ) } @@ -381,7 +384,7 @@ fn converge( .table .values() .into_iter() - .filter(|x| x.requests_addr != daddr) + .filter(|x| x.addrs.requests != daddr) .cloned() .collect(); if v.len() >= num_nodes { diff --git a/src/bin/drone.rs b/src/bin/drone.rs index 2e00375fd..7a299ed38 100644 --- a/src/bin/drone.rs +++ b/src/bin/drone.rs @@ -94,8 +94,8 @@ fn main() { let drone = Arc::new(Mutex::new(Drone::new( mint_keypair, drone_addr, - leader.transactions_addr, - leader.requests_addr, + leader.addrs.transactions, + leader.addrs.requests, time_slice, request_cap, ))); diff --git a/src/bin/wallet.rs b/src/bin/wallet.rs index 1f93eba55..6431682c9 100644 --- a/src/bin/wallet.rs +++ b/src/bin/wallet.rs @@ -156,7 +156,7 @@ fn parse_args() -> Result> { exit(1); }; - let mut drone_addr = leader.transactions_addr.clone(); + let mut drone_addr = leader.addrs.transactions.clone(); drone_addr.set_port(9900); let command = match matches.subcommand() { @@ -305,9 +305,9 @@ fn mk_client(r: &ReplicatedData) -> io::Result { .unwrap(); Ok(ThinClient::new( - r.requests_addr, + r.addrs.requests, requests_socket, - r.transactions_addr, + r.addrs.transactions, transactions_socket, )) } diff --git a/src/crdt.rs b/src/crdt.rs index e7338c427..492af809f 100644 --- a/src/crdt.rs +++ b/src/crdt.rs @@ -19,10 +19,11 @@ use choose_gossip_peer_strategy::{ChooseGossipPeerStrategy, ChooseWeightedPeerSt use hash::Hash; use packet::{to_blob, Blob, BlobRecycler, SharedBlob, BLOB_SIZE}; use pnet_datalink as datalink; +use rand::{thread_rng, Rng}; use rayon::prelude::*; use result::{Error, Result}; use ring::rand::{SecureRandom, SystemRandom}; -use signature::{KeyPair, KeyPairUtil, PublicKey, Signature}; +use signature::{KeyPair, KeyPairUtil, PublicKey}; use std; use std::collections::HashMap; use std::collections::VecDeque; @@ -36,8 +37,8 @@ use streamer::{BlobReceiver, BlobSender, Window}; use timing::timestamp; /// milliseconds we sleep for between gossip requests -const GOSSIP_SLEEP_MILLIS: u64 = 100; -//const GOSSIP_MIN_PURGE_MILLIS: u64 = 15000; +const GOSSIP_SLEEP_MILLIS: u64 = 200; +const GOSSIP_PURGE_MILLIS: u64 = 15000; /// minimum membership table size before we start purging dead nodes const MIN_TABLE_SIZE: usize = 2; @@ -45,6 +46,7 @@ const MIN_TABLE_SIZE: usize = 2; #[derive(Debug, PartialEq, Eq)] pub enum CrdtError { TooSmall, + NoLeader, } pub fn parse_port_or_addr(optstr: Option) -> SocketAddr { @@ -90,28 +92,39 @@ pub fn get_ip_addr() -> Option { /// Structure to be replicated by the network #[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] -pub struct ReplicatedData { - pub id: PublicKey, - sig: Signature, - /// should always be increasing - pub version: u64, - /// address to connect to for gossip - pub gossip_addr: SocketAddr, +pub struct Addrs { + /// gossip address + pub gossip: SocketAddr, /// address to connect to for replication - pub replicate_addr: SocketAddr, + pub replicate: SocketAddr, /// address to connect to when this node is leader - pub requests_addr: SocketAddr, + pub requests: SocketAddr, /// transactions address - pub transactions_addr: SocketAddr, + pub transactions: SocketAddr, /// repair address, we use this to jump ahead of the packets /// destined to the replciate_addr - pub repair_addr: SocketAddr, + pub repair: SocketAddr, + /// if this struture changes update this value as well + /// Always update `ReplicatedData` version too + /// This separate version for addresses allows us to use the `Vote` + /// as means of updating the `ReplicatedData` table without touching the + /// addresses if they haven't changed. + pub version: u64, +} + +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] +pub struct ReplicatedData { + pub id: PublicKey, + /// If any of the bits change, update increment this value + pub version: u64, + /// network addresses + pub addrs: Addrs, /// current leader identity pub current_leader_id: PublicKey, /// last verified hash that was submitted to the leader - last_verified_hash: Hash, + last_verified_id: Hash, /// last verified count, always increasing - last_verified_count: u64, + last_verified_height: u64, } fn make_debug_id(buf: &[u8]) -> u64 { @@ -123,24 +136,26 @@ fn make_debug_id(buf: &[u8]) -> u64 { impl ReplicatedData { pub fn new( id: PublicKey, - gossip_addr: SocketAddr, - replicate_addr: SocketAddr, - requests_addr: SocketAddr, - transactions_addr: SocketAddr, - repair_addr: SocketAddr, + gossip: SocketAddr, + replicate: SocketAddr, + requests: SocketAddr, + transactions: SocketAddr, + repair: SocketAddr, ) -> ReplicatedData { ReplicatedData { id, - sig: Signature::default(), version: 0, - gossip_addr, - replicate_addr, - requests_addr, - transactions_addr, - repair_addr, + addrs: Addrs { + gossip, + replicate, + requests, + transactions, + repair, + version: 0, + }, current_leader_id: PublicKey::default(), - last_verified_hash: Hash::default(), - last_verified_count: 0, + last_verified_id: Hash::default(), + last_verified_height: 0, } } pub fn debug_id(&self) -> u64 { @@ -151,14 +166,12 @@ impl ReplicatedData { nxt_addr.set_port(addr.port() + nxt); nxt_addr } - - pub fn new_leader(bind_addr: &SocketAddr) -> Self { + pub fn new_leader_with_pubkey(pubkey: PublicKey, bind_addr: &SocketAddr) -> Self { let transactions_addr = bind_addr.clone(); let gossip_addr = Self::next_port(&bind_addr, 1); let replicate_addr = Self::next_port(&bind_addr, 2); let requests_addr = Self::next_port(&bind_addr, 3); let repair_addr = Self::next_port(&bind_addr, 4); - let pubkey = KeyPair::new().pubkey(); ReplicatedData::new( pubkey, gossip_addr, @@ -168,6 +181,10 @@ impl ReplicatedData { repair_addr, ) } + pub fn new_leader(bind_addr: &SocketAddr) -> Self { + let keypair = KeyPair::new(); + Self::new_leader_with_pubkey(keypair.pubkey(), bind_addr) + } pub fn new_entry_point(gossip_addr: SocketAddr) -> Self { let daddr: SocketAddr = "0.0.0.0:0".parse().unwrap(); ReplicatedData::new( @@ -193,6 +210,7 @@ impl ReplicatedData { /// * `listen` - listen for requests and responses /// No attempt to keep track of timeouts or dropped requests is made, or should be. pub struct Crdt { + /// table of everyone in the network pub table: HashMap, /// Value of my update index when entry in table was updated. /// Nodes will ask for updates since `update_index`, and this node @@ -202,9 +220,12 @@ pub struct Crdt { /// The value of the remote update index that I have last seen /// This Node will ask external nodes for updates since the value in this list pub remote: HashMap, + /// last time the public key had sent us a message pub alive: HashMap, pub update_index: u64, pub me: PublicKey, + /// last time we heard from anyone getting a message fro this public key + /// these are rumers and shouldn't be trusted directly external_liveness: HashMap>, } // TODO These messages should be signed, and go through the gpu pipeline for spam filtering @@ -213,11 +234,13 @@ enum Protocol { /// forward your own latest data structure when requesting an update /// this doesn't update the `remote` update index, but it allows the /// recepient of this request to add knowledge of this node to the network + /// (last update index i saw from you, my replicated data) RequestUpdates(u64, ReplicatedData), //TODO might need a since? /// from id, form's last update index, ReplicatedData ReceiveUpdates(PublicKey, u64, Vec, Vec<(PublicKey, u64)>), /// ask for a missing index + /// (my replicated data to keep alive, missing window index) RequestWindowIndex(ReplicatedData, u64), } @@ -249,8 +272,8 @@ impl Crdt { pub fn set_leader(&mut self, key: PublicKey) -> () { let mut me = self.my_data().clone(); - info!( - "{:x}: setting leader to {:x} from {:x}", + warn!( + "{:x}: LEADER_UPDATE TO {:x} from {:x}", me.debug_id(), make_debug_id(&key), make_debug_id(&me.current_leader_id), @@ -266,32 +289,43 @@ impl Crdt { pub fn insert(&mut self, v: &ReplicatedData) { // TODO check that last_verified types are always increasing + //update the peer table if self.table.get(&v.id).is_none() || (v.version > self.table[&v.id].version) { //somehow we signed a message for our own identity with a higher version that // we have stored ourselves trace!( - "me: {:x} v.id: {:x} version: {}", + "{:x}: insert v.id: {:x} version: {}", self.debug_id(), v.debug_id(), v.version ); + self.update_index += 1; let _ = self.table.insert(v.id.clone(), v.clone()); let _ = self.local.insert(v.id, self.update_index); } else { trace!( - "INSERT FAILED me: {:x} data: {:?} new.version: {} me.version: {}", + "{:x}: INSERT FAILED data: {:?} new.version: {} me.version: {}", self.debug_id(), v.debug_id(), v.version, self.table[&v.id].version ); } - //update the liveness table - let now = timestamp(); - *self.alive.entry(v.id).or_insert(now) = now; + self.update_liveness(v.id); } + fn update_liveness(&mut self, id: PublicKey) { + //update the liveness table + let now = timestamp(); + trace!( + "{:x} updating liveness {:x} to {}", + self.debug_id(), + make_debug_id(&id), + now + ); + *self.alive.entry(id).or_insert(now) = now; + } /// purge old validators /// TODO: we need a robust membership protocol /// http://asc.di.fct.unl.pt/~jleitao/pdf/dsn07-leitao.pdf @@ -300,18 +334,33 @@ impl Crdt { if self.table.len() <= MIN_TABLE_SIZE { return; } + if self.leader_data().is_none() { + return; + } + let leader_id = self.leader_data().unwrap().id; - //wait for 4x as long as it would randomly take to reach our node - //assuming everyone is waiting the same amount of time as this node - let limit = self.table.len() as u64 * GOSSIP_SLEEP_MILLIS * 4; - //let limit = std::cmp::max(limit, GOSSIP_MIN_PURGE_MILLIS); - + let limit = GOSSIP_PURGE_MILLIS; let dead_ids: Vec = self.alive .iter() .filter_map(|(&k, v)| { if k != self.me && (now - v) > limit { - info!("purge {:x} {}", make_debug_id(&k), now - v); - Some(k) + if leader_id == k { + info!( + "{:x}: PURGE LEADER {:x} {}", + self.debug_id(), + make_debug_id(&k), + now - v + ); + Some(k) + } else { + info!( + "{:x}: purge {:x} {}", + self.debug_id(), + make_debug_id(&k), + now - v + ); + Some(k) + } } else { trace!( "purge skipped {:x} {} {}", @@ -337,20 +386,10 @@ impl Crdt { } pub fn index_blobs( - obj: &Arc>, + me: &ReplicatedData, blobs: &Vec, receive_index: &mut u64, ) -> Result<()> { - let me: ReplicatedData = { - let robj = obj.read().expect("'obj' read lock in crdt::index_blobs"); - debug!( - "{:x}: broadcast table {}", - robj.debug_id(), - robj.table.len() - ); - robj.table[&robj.me].clone() - }; - // enumerate all the blobs, those are the indices for (i, b) in blobs.iter().enumerate() { // only leader should be broadcasting @@ -362,45 +401,48 @@ impl Crdt { Ok(()) } + /// compute broadcast table + /// # Remarks + pub fn compute_broadcast_table(&self) -> Vec { + let mut live: Vec<_> = self.alive.iter().collect(); + thread_rng().shuffle(&mut live); + let daddr = "0.0.0.0:0".parse().unwrap(); + let me = &self.table[&self.me]; + let cloned_table: Vec = live.iter() + .map(|x| &self.table[x.0]) + .filter(|v| { + if me.id == v.id { + //filter myself + false + } else if v.addrs.replicate == daddr { + trace!("broadcast skip not listening {:x}", v.debug_id()); + false + } else { + trace!("broadcast node {}", v.addrs.replicate); + true + } + }) + .cloned() + .collect(); + cloned_table + } /// broadcast messages from the leader to layer 1 nodes /// # Remarks /// We need to avoid having obj locked while doing any io, such as the `send_to` pub fn broadcast( - obj: &Arc>, + me: &ReplicatedData, + broadcast_table: &Vec, window: &Window, s: &UdpSocket, transmit_index: &mut u64, received_index: u64, ) -> Result<()> { - let (me, table): (ReplicatedData, Vec) = { - // copy to avoid locking during IO - let robj = obj.read().expect("'obj' read lock in pub fn broadcast"); - trace!("broadcast table {}", robj.table.len()); - let cloned_table: Vec = robj.table.values().cloned().collect(); - (robj.table[&robj.me].clone(), cloned_table) - }; - let daddr = "0.0.0.0:0".parse().unwrap(); - let nodes: Vec<&ReplicatedData> = table - .iter() - .filter(|v| { - if me.id == v.id { - //filter myself - false - } else if v.replicate_addr == daddr { - trace!("broadcast skip not listening {:x}", v.debug_id()); - false - } else { - trace!("broadcast node {}", v.replicate_addr); - true - } - }) - .collect(); - if nodes.len() < 1 { - warn!("crdt too small"); + if broadcast_table.len() < 1 { + warn!("not enough peers in crdt table"); Err(CrdtError::TooSmall)?; } - trace!("broadcast nodes {}", nodes.len()); + trace!("broadcast nodes {}", broadcast_table.len()); // enumerate all the blobs in the window, those are the indices // transmit them to nodes, starting from a different node @@ -411,7 +453,8 @@ impl Crdt { let k = is % window_l.len(); assert!(window_l[k].is_some()); - orders.push((window_l[k].clone(), nodes[is % nodes.len()])); + let pos = is % broadcast_table.len(); + orders.push((window_l[k].clone(), &broadcast_table[pos])); } trace!("broadcast orders table {}", orders.len()); @@ -427,12 +470,12 @@ impl Crdt { "broadcast idx: {} sz: {} to {} coding: {}", blob.get_index().unwrap(), blob.meta.size, - v.replicate_addr, + v.addrs.replicate, blob.is_coding() ); assert!(blob.meta.size < BLOB_SIZE); - let e = s.send_to(&blob.data[..blob.meta.size], &v.replicate_addr); - trace!("done broadcast {} to {}", blob.meta.size, v.replicate_addr); + let e = s.send_to(&blob.data[..blob.meta.size], &v.addrs.replicate); + trace!("done broadcast {} to {}", blob.meta.size, v.addrs.replicate); e }) .collect(); @@ -470,8 +513,8 @@ impl Crdt { } else if me.current_leader_id == v.id { trace!("skip retransmit to leader {:?}", v.id); false - } else if v.replicate_addr == daddr { - trace!("retransmit skip not listening {:x}", v.debug_id()); + } else if v.addrs.replicate == daddr { + trace!("skip nodes that are not listening {:?}", v.id); false } else { true @@ -482,7 +525,7 @@ impl Crdt { let errs: Vec<_> = orders .par_iter() .map(|v| { - info!( + debug!( "{:x}: retransmit blob {} to {:x}", me.debug_id(), rblob.get_index().unwrap(), @@ -490,7 +533,7 @@ impl Crdt { ); //TODO profile this, may need multiple sockets for par_iter assert!(rblob.meta.size < BLOB_SIZE); - s.send_to(&rblob.data[..rblob.meta.size], &v.replicate_addr) + s.send_to(&rblob.data[..rblob.meta.size], &v.addrs.replicate) }) .collect(); for e in errs { @@ -538,13 +581,13 @@ impl Crdt { let daddr = "0.0.0.0:0".parse().unwrap(); let valid: Vec<_> = self.table .values() - .filter(|r| r.id != self.me && r.repair_addr != daddr) + .filter(|r| r.id != self.me && r.addrs.repair != daddr) .collect(); if valid.is_empty() { Err(CrdtError::TooSmall)?; } let n = (Self::random() as usize) % valid.len(); - let addr = valid[n].gossip_addr.clone(); + let addr = valid[n].addrs.gossip.clone(); let req = Protocol::RequestWindowIndex(self.table[&self.me].clone(), ix); let out = serialize(&req)?; Ok((addr, out)) @@ -581,10 +624,10 @@ impl Crdt { "created gossip request from {:x} to {:x} {}", self.debug_id(), v.debug_id(), - v.gossip_addr + v.addrs.gossip ); - Ok((v.gossip_addr, req)) + Ok((v.addrs.gossip, req)) } /// At random pick a node and try to get updated changes from them @@ -702,10 +745,10 @@ impl Crdt { .spawn(move || loop { let start = timestamp(); let _ = Self::run_gossip(&obj, &blob_sender, &blob_recycler); - obj.write().unwrap().purge(timestamp()); if exit.load(Ordering::Relaxed) { return; } + obj.write().unwrap().purge(timestamp()); //TODO: possibly tune this parameter //we saw a deadlock passing an obj.read().unwrap().timeout into sleep let _ = obj.write().unwrap().update_leader(); @@ -753,7 +796,7 @@ impl Crdt { let sz = wblob.meta.size; outblob.meta.size = sz; outblob.data[..sz].copy_from_slice(&wblob.data[..sz]); - outblob.meta.set_addr(&from.repair_addr); + outblob.meta.set_addr(&from.addrs.repair); outblob.set_id(sender_id).expect("blob set_id"); } @@ -789,7 +832,7 @@ impl Crdt { // TODO sigverify these Ok(Protocol::RequestUpdates(v, from_rd)) => { trace!("RequestUpdates {}", v); - let addr = from_rd.gossip_addr; + let addr = from_rd.addrs.gossip; let me = obj.read().unwrap(); // only lock for these two calls, dont lock during IO `sock.send_to` or `sock.recv_from` let (from, ups, data) = me.get_updates_since(v); @@ -849,7 +892,7 @@ impl Crdt { from.debug_id(), ix, ); - assert_ne!(from.repair_addr, me.repair_addr); + assert_ne!(from.addrs.repair, me.addrs.repair); Self::run_window_request(&window, &me, &from, ix, blob_recycler) } Err(_) => { @@ -890,6 +933,7 @@ impl Crdt { response_sender: BlobSender, exit: Arc, ) -> JoinHandle<()> { + let debug_id = obj.read().unwrap().debug_id(); Builder::new() .name("solana-listen".to_string()) .spawn(move || loop { @@ -900,15 +944,16 @@ impl Crdt { &requests_receiver, &response_sender, ); - if e.is_err() { - info!( - "run_listen timeout, table size: {}", - obj.read().unwrap().table.len() - ); - } if exit.load(Ordering::Relaxed) { return; } + if e.is_err() { + info!( + "{:x}: run_listen timeout, table size: {}", + debug_id, + obj.read().unwrap().table.len() + ); + } }) .unwrap() } @@ -932,7 +977,11 @@ pub struct TestNode { } impl TestNode { - pub fn new() -> TestNode { + pub fn new() -> Self { + let pubkey = KeyPair::new().pubkey(); + Self::new_with_pubkey(pubkey) + } + pub fn new_with_pubkey(pubkey: PublicKey) -> Self { let transaction = UdpSocket::bind("0.0.0.0:0").unwrap(); let gossip = UdpSocket::bind("0.0.0.0:0").unwrap(); let replicate = UdpSocket::bind("0.0.0.0:0").unwrap(); @@ -942,7 +991,6 @@ impl TestNode { let respond = UdpSocket::bind("0.0.0.0:0").unwrap(); let broadcast = UdpSocket::bind("0.0.0.0:0").unwrap(); let retransmit = UdpSocket::bind("0.0.0.0:0").unwrap(); - let pubkey = KeyPair::new().pubkey(); let data = ReplicatedData::new( pubkey, gossip.local_addr().unwrap(), @@ -968,19 +1016,19 @@ impl TestNode { } pub fn new_with_bind_addr(data: ReplicatedData, bind_addr: SocketAddr) -> TestNode { let mut local_gossip_addr = bind_addr.clone(); - local_gossip_addr.set_port(data.gossip_addr.port()); + local_gossip_addr.set_port(data.addrs.gossip.port()); let mut local_replicate_addr = bind_addr.clone(); - local_replicate_addr.set_port(data.replicate_addr.port()); + local_replicate_addr.set_port(data.addrs.replicate.port()); let mut local_requests_addr = bind_addr.clone(); - local_requests_addr.set_port(data.requests_addr.port()); + local_requests_addr.set_port(data.addrs.requests.port()); let mut local_transactions_addr = bind_addr.clone(); - local_transactions_addr.set_port(data.transactions_addr.port()); + local_transactions_addr.set_port(data.addrs.transactions.port()); let mut local_repair_addr = bind_addr.clone(); - local_repair_addr.set_port(data.repair_addr.port()); + local_repair_addr.set_port(data.addrs.repair.port()); let transaction = UdpSocket::bind(local_transactions_addr).unwrap(); let gossip = UdpSocket::bind(local_gossip_addr).unwrap(); @@ -1016,7 +1064,8 @@ impl TestNode { #[cfg(test)] mod tests { use crdt::{ - parse_port_or_addr, Crdt, CrdtError, ReplicatedData, GOSSIP_SLEEP_MILLIS, MIN_TABLE_SIZE, + parse_port_or_addr, Crdt, CrdtError, ReplicatedData, GOSSIP_PURGE_MILLIS, + GOSSIP_SLEEP_MILLIS, MIN_TABLE_SIZE, }; use logger; use packet::BlobRecycler; @@ -1064,13 +1113,18 @@ mod tests { copy } #[test] - fn replicated_data_new_leader() { - let d1 = ReplicatedData::new_leader(&"127.0.0.1:1234".parse().unwrap()); - assert_eq!(d1.gossip_addr, "127.0.0.1:1235".parse().unwrap()); - assert_eq!(d1.replicate_addr, "127.0.0.1:1236".parse().unwrap()); - assert_eq!(d1.requests_addr, "127.0.0.1:1237".parse().unwrap()); - assert_eq!(d1.transactions_addr, "127.0.0.1:1234".parse().unwrap()); - assert_eq!(d1.repair_addr, "127.0.0.1:1238".parse().unwrap()); + fn replicated_data_new_leader_with_pubkey() { + let kp = KeyPair::new(); + let d1 = ReplicatedData::new_leader_with_pubkey( + kp.pubkey().clone(), + &"127.0.0.1:1234".parse().unwrap(), + ); + assert_eq!(d1.id, kp.pubkey()); + assert_eq!(d1.addrs.gossip, "127.0.0.1:1235".parse().unwrap()); + assert_eq!(d1.addrs.replicate, "127.0.0.1:1236".parse().unwrap()); + assert_eq!(d1.addrs.requests, "127.0.0.1:1237".parse().unwrap()); + assert_eq!(d1.addrs.transactions, "127.0.0.1:1234".parse().unwrap()); + assert_eq!(d1.addrs.repair, "127.0.0.1:1238".parse().unwrap()); } #[test] fn update_test() { @@ -1165,7 +1219,7 @@ mod tests { ); crdt.insert(&nxt); let rv = crdt.window_index_request(0).unwrap(); - assert_eq!(nxt.gossip_addr, "127.0.0.2:1234".parse().unwrap()); + assert_eq!(nxt.addrs.gossip, "127.0.0.2:1234".parse().unwrap()); assert_eq!(rv.0, "127.0.0.2:1234".parse().unwrap()); let nxt = ReplicatedData::new( @@ -1218,7 +1272,7 @@ mod tests { crdt.insert(&nxt1); let rv = crdt.gossip_request().unwrap(); - assert_eq!(rv.0, nxt1.gossip_addr); + assert_eq!(rv.0, nxt1.addrs.gossip); let nxt2 = ReplicatedData::new_entry_point("127.0.0.3:1234".parse().unwrap()); crdt.insert(&nxt2); @@ -1239,9 +1293,9 @@ mod tests { } assert!(rv.len() > 0); for i in rv.iter() { - if i.read().unwrap().meta.addr() == nxt1.gossip_addr { + if i.read().unwrap().meta.addr() == nxt1.addrs.gossip { one = true; - } else if i.read().unwrap().meta.addr() == nxt2.gossip_addr { + } else if i.read().unwrap().meta.addr() == nxt2.addrs.gossip { two = true; } else { //unexpected request @@ -1260,26 +1314,27 @@ mod tests { #[test] fn purge_test() { + logger::setup(); let me = ReplicatedData::new_leader(&"127.0.0.1:1234".parse().unwrap()); let mut crdt = Crdt::new(me.clone()); let nxt = ReplicatedData::new_leader(&"127.0.0.2:1234".parse().unwrap()); assert_ne!(me.id, nxt.id); + crdt.set_leader(me.id); crdt.insert(&nxt); let rv = crdt.gossip_request().unwrap(); - assert_eq!(rv.0, nxt.gossip_addr); + assert_eq!(rv.0, nxt.addrs.gossip); let now = crdt.alive[&nxt.id]; - let len = crdt.table.len() as u64; crdt.purge(now); let rv = crdt.gossip_request().unwrap(); - assert_eq!(rv.0, nxt.gossip_addr); + assert_eq!(rv.0, nxt.addrs.gossip); - crdt.purge(now + len * GOSSIP_SLEEP_MILLIS * 4); + crdt.purge(now + GOSSIP_PURGE_MILLIS); let rv = crdt.gossip_request().unwrap(); - assert_eq!(rv.0, nxt.gossip_addr); + assert_eq!(rv.0, nxt.addrs.gossip); - crdt.purge(now + len * GOSSIP_SLEEP_MILLIS * 4 + 1); + crdt.purge(now + GOSSIP_PURGE_MILLIS + 1); let rv = crdt.gossip_request().unwrap(); - assert_eq!(rv.0, nxt.gossip_addr); + assert_eq!(rv.0, nxt.addrs.gossip); let nxt2 = ReplicatedData::new_leader(&"127.0.0.2:1234".parse().unwrap()); assert_ne!(me.id, nxt2.id); @@ -1291,12 +1346,13 @@ mod tests { } let len = crdt.table.len() as u64; assert!((MIN_TABLE_SIZE as u64) < len); - crdt.purge(now + len * GOSSIP_SLEEP_MILLIS * 4); + crdt.purge(now + GOSSIP_PURGE_MILLIS); assert_eq!(len as usize, crdt.table.len()); - crdt.purge(now + len * GOSSIP_SLEEP_MILLIS * 4 + 1); + trace!("purging"); + crdt.purge(now + GOSSIP_PURGE_MILLIS + 1); + assert_eq!(len as usize - 1, crdt.table.len()); let rv = crdt.gossip_request().unwrap(); - assert_eq!(rv.0, nxt.gossip_addr); - assert_eq!(2, crdt.table.len()); + assert_eq!(rv.0, nxt.addrs.gossip); } /// test window requests respond with the right blob, and do not overrun diff --git a/src/drone.rs b/src/drone.rs index 6c67587f9..70de99f0f 100644 --- a/src/drone.rs +++ b/src/drone.rs @@ -287,8 +287,8 @@ mod tests { let mut drone = Drone::new( alice.keypair(), addr, - leader_data.transactions_addr, - leader_data.requests_addr, + leader_data.addrs.transactions, + leader_data.addrs.requests, None, Some(150_000), ); @@ -312,9 +312,9 @@ mod tests { UdpSocket::bind("0.0.0.0:0").expect("drone bind to transactions socket"); let mut client = ThinClient::new( - leader_data.requests_addr, + leader_data.addrs.requests, requests_socket, - leader_data.transactions_addr, + leader_data.addrs.transactions, transactions_socket, ); diff --git a/src/fullnode.rs b/src/fullnode.rs index 26ba6eae5..040f889f9 100644 --- a/src/fullnode.rs +++ b/src/fullnode.rs @@ -67,9 +67,9 @@ impl FullNode { let local_requests_addr = node.sockets.requests.local_addr().unwrap(); info!( "starting... local gossip address: {} (advertising {})", - local_gossip_addr, node.data.gossip_addr + local_gossip_addr, node.data.addrs.gossip ); - let requests_addr = node.data.requests_addr.clone(); + let requests_addr = node.data.addrs.requests.clone(); if !leader { let testnet_addr = network_entry_for_validator.expect("validator requires entry"); diff --git a/src/streamer.rs b/src/streamer.rs index e45ba0861..f3f7d0532 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -227,7 +227,7 @@ fn repair_window( let sock = UdpSocket::bind("0.0.0.0:0")?; for (to, req) in reqs { //todo cache socket - info!( + debug!( "{:x} repair_window request {} {} {}", debug_id, *consumed, *received, to ); @@ -257,7 +257,7 @@ fn recv_window( while let Ok(mut nq) = r.try_recv() { dq.append(&mut nq) } - info!( + debug!( "{:x}: RECV_WINDOW {} {}: got packets {}", debug_id, *consumed, @@ -302,7 +302,7 @@ fn recv_window( warn!("{:x}: no leader to retransmit from", debug_id); } if !retransmitq.is_empty() { - info!( + debug!( "{:x}: RECV_WINDOW {} {}: retransmit {}", debug_id, *consumed, @@ -416,7 +416,7 @@ fn recv_window( print_window(locked_window, *consumed); trace!("sending contq.len: {}", contq.len()); if !contq.is_empty() { - info!( + debug!( "{:x}: RECV_WINDOW {} {}: forwarding contq {}", debug_id, *consumed, @@ -475,6 +475,7 @@ pub fn initialized_window( { let mut win = window.write().unwrap(); + let me = crdt.read().unwrap().my_data().clone(); assert!(blobs.len() <= win.len()); debug!( @@ -485,7 +486,7 @@ pub fn initialized_window( // Index the blobs let mut received = entry_height - blobs.len() as u64; - Crdt::index_blobs(crdt, &blobs, &mut received).expect("index blobs for initial window"); + Crdt::index_blobs(&me, &blobs, &mut received).expect("index blobs for initial window"); // populate the window, offset by implied index for b in blobs { @@ -552,8 +553,8 @@ pub fn window( } fn broadcast( - debug_id: u64, - crdt: &Arc>, + me: &ReplicatedData, + broadcast_table: &Vec, window: &Window, recycler: &BlobRecycler, r: &BlobReceiver, @@ -561,6 +562,7 @@ fn broadcast( transmit_index: &mut u64, receive_index: &mut u64, ) -> Result<()> { + let debug_id = me.debug_id(); let timer = Duration::new(1, 0); let mut dq = r.recv_timeout(timer)?; while let Ok(mut nq) = r.try_recv() { @@ -585,7 +587,7 @@ fn broadcast( debug!("{:x} broadcast blobs.len: {}", debug_id, blobs_len); // Index the blobs - Crdt::index_blobs(crdt, &blobs, receive_index)?; + Crdt::index_blobs(&me, &blobs, receive_index)?; // keep the cache of blobs that are broadcast { let mut win = window.write().unwrap(); @@ -625,7 +627,14 @@ fn broadcast( *receive_index += blobs_len as u64; // Send blobs out from the window - Crdt::broadcast(crdt, &window, &sock, transmit_index, *receive_index)?; + Crdt::broadcast( + &me, + &broadcast_table, + &window, + &sock, + transmit_index, + *receive_index, + )?; } Ok(()) } @@ -652,11 +661,12 @@ pub fn broadcaster( .spawn(move || { let mut transmit_index = entry_height; let mut receive_index = entry_height; - let debug_id = crdt.read().unwrap().debug_id(); + let me = crdt.read().unwrap().my_data().clone(); loop { + let broadcast_table = crdt.read().unwrap().compute_broadcast_table(); if let Err(e) = broadcast( - debug_id, - &crdt, + &me, + &broadcast_table, &window, &recycler, &r, @@ -955,7 +965,6 @@ mod test { s_window, s_retransmit, ); - let t_responder = { let (s_responder, r_responder) = channel(); let t_responder = responder(tn.sockets.replicate, resp_recycler.clone(), r_responder); @@ -969,7 +978,7 @@ mod test { w.set_id(me_id).unwrap(); assert_eq!(i, w.get_index().unwrap()); w.meta.size = PACKET_DATA_SIZE; - w.meta.set_addr(&tn.data.gossip_addr); + w.meta.set_addr(&tn.data.addrs.gossip); } msgs.push_back(b); } diff --git a/src/thin_client.rs b/src/thin_client.rs index 3782ca025..e48d2be71 100644 --- a/src/thin_client.rs +++ b/src/thin_client.rs @@ -302,9 +302,9 @@ mod tests { let transactions_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let mut client = ThinClient::new( - leader_data.requests_addr, + leader_data.addrs.requests, requests_socket, - leader_data.transactions_addr, + leader_data.addrs.transactions, transactions_socket, ); let last_id = client.get_last_id(); @@ -344,9 +344,9 @@ mod tests { .unwrap(); let transactions_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let mut client = ThinClient::new( - leader_data.requests_addr, + leader_data.addrs.requests, requests_socket, - leader_data.transactions_addr, + leader_data.addrs.transactions, transactions_socket, ); let last_id = client.get_last_id(); @@ -396,9 +396,9 @@ mod tests { .unwrap(); let transactions_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let mut client = ThinClient::new( - leader_data.requests_addr, + leader_data.addrs.requests, requests_socket, - leader_data.transactions_addr, + leader_data.addrs.transactions, transactions_socket, ); let last_id = client.get_last_id(); diff --git a/src/tvu.rs b/src/tvu.rs index 17612fb0e..c23649615 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -198,7 +198,7 @@ pub mod tests { let starting_balance = 10_000; let mint = Mint::new(starting_balance); - let replicate_addr = target1.data.replicate_addr; + let replicate_addr = target1.data.addrs.replicate; let bank = Arc::new(Bank::new(&mint)); //start crdt1 diff --git a/tests/multinode.rs b/tests/multinode.rs index 46de1166b..e4eab68fe 100644 --- a/tests/multinode.rs +++ b/tests/multinode.rs @@ -29,8 +29,8 @@ fn converge(leader: &ReplicatedData, num_nodes: usize) -> Vec { let mut spy = TestNode::new(); let daddr = "0.0.0.0:0".parse().unwrap(); let me = spy.data.id.clone(); - spy.data.replicate_addr = daddr; - spy.data.requests_addr = daddr; + spy.data.addrs.replicate = daddr; + spy.data.addrs.requests = daddr; let mut spy_crdt = Crdt::new(spy.data); spy_crdt.insert(&leader); spy_crdt.set_leader(leader.id); @@ -55,7 +55,7 @@ fn converge(leader: &ReplicatedData, num_nodes: usize) -> Vec { .values() .into_iter() .filter(|x| x.id != me) - .filter(|x| x.requests_addr != daddr) + .filter(|x| x.addrs.requests != daddr) .cloned() .collect(); if num >= num_nodes as u64 && v.len() >= num_nodes { @@ -110,7 +110,7 @@ fn test_multi_node_validator_catchup_from_zero() { validator, false, InFile::Path(ledger_path.clone()), - Some(leader_data.gossip_addr), + Some(leader_data.addrs.gossip), None, exit.clone(), ); @@ -143,7 +143,7 @@ fn test_multi_node_validator_catchup_from_zero() { TestNode::new(), false, InFile::Path(ledger_path.clone()), - Some(leader_data.gossip_addr), + Some(leader_data.addrs.gossip), None, exit.clone(), ); @@ -211,7 +211,7 @@ fn test_multi_node_basic() { validator, false, InFile::Path(ledger_path.clone()), - Some(leader_data.gossip_addr), + Some(leader_data.addrs.gossip), None, exit.clone(), ); @@ -272,7 +272,7 @@ fn test_boot_validator_from_file() { validator, false, InFile::Path(ledger_path.clone()), - Some(leader_data.gossip_addr), + Some(leader_data.addrs.gossip), None, exit.clone(), ); @@ -356,7 +356,7 @@ fn test_leader_restart_validator_start_from_old_ledger() { validator, false, InFile::Path(stale_ledger_path.clone()), - Some(leader_data.gossip_addr), + Some(leader_data.addrs.gossip), None, exit.clone(), ); @@ -416,7 +416,7 @@ fn test_multi_node_dynamic_network() { validator, false, InFile::Path(ledger_path.clone()), - Some(leader_data.gossip_addr), + Some(leader_data.addrs.gossip), Some(OutFile::Path(ledger_path.clone())), exit.clone(), ); @@ -473,7 +473,7 @@ fn test_multi_node_dynamic_network() { validator, false, InFile::Path(ledger_path.clone()), - Some(leader_data.gossip_addr), + Some(leader_data.addrs.gossip), Some(OutFile::Path(ledger_path.clone())), exit.clone(), ); @@ -509,12 +509,12 @@ fn mk_client(leader: &ReplicatedData) -> ThinClient { .unwrap(); let transactions_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let daddr = "0.0.0.0:0".parse().unwrap(); - assert!(leader.requests_addr != daddr); - assert!(leader.transactions_addr != daddr); + assert!(leader.addrs.requests != daddr); + assert!(leader.addrs.transactions != daddr); ThinClient::new( - leader.requests_addr, + leader.addrs.requests, requests_socket, - leader.transactions_addr, + leader.addrs.transactions, transactions_socket, ) }