From 95701114e3338a988c4d8c735615230bb00a67a7 Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Mon, 8 Oct 2018 20:55:54 -0600 Subject: [PATCH] Crdt -> ClusterInfo --- src/bin/bench-tps.rs | 16 +- src/bin/fullnode-config.rs | 2 +- src/bin/fullnode.rs | 2 +- src/bin/replicator.rs | 2 +- src/broadcast_stage.rs | 65 ++--- src/budget_instruction.rs | 4 +- src/choose_gossip_peer_strategy.rs | 10 +- src/client.rs | 2 +- src/{crdt.rs => cluster_info.rs} | 372 +++++++++++++++-------------- src/drone.rs | 2 +- src/erasure.rs | 4 +- src/fullnode.rs | 45 ++-- src/lib.rs | 2 +- src/ncp.rs | 18 +- src/replicate_stage.rs | 14 +- src/replicator.rs | 14 +- src/result.rs | 10 +- src/retransmit_stage.rs | 29 ++- src/thin_client.rs | 28 ++- src/tpu.rs | 6 +- src/tvu.rs | 44 ++-- src/vote_stage.rs | 30 +-- src/wallet.rs | 4 +- src/window.rs | 30 +-- src/window_service.rs | 63 ++--- src/write_stage.rs | 95 ++++---- tests/data_replicator.rs | 14 +- tests/multinode.rs | 24 +- 28 files changed, 499 insertions(+), 452 deletions(-) rename src/{crdt.rs => cluster_info.rs} (85%) diff --git a/src/bin/bench-tps.rs b/src/bin/bench-tps.rs index e10bd19bd..5a65e559e 100644 --- a/src/bin/bench-tps.rs +++ b/src/bin/bench-tps.rs @@ -11,7 +11,7 @@ use clap::{App, Arg}; use influx_db_client as influxdb; use rayon::prelude::*; use solana::client::mk_client; -use solana::crdt::{Crdt, NodeInfo}; +use solana::cluster_info::{ClusterInfo, NodeInfo}; use solana::drone::DRONE_PORT; use solana::hash::Hash; use solana::logger; @@ -734,7 +734,7 @@ fn main() { total_tx_sent_count.load(Ordering::Relaxed), ); - // join the crdt client threads + // join the cluster_info client threads ncp.join().unwrap(); } @@ -744,11 +744,11 @@ fn converge( num_nodes: usize, ) -> (Vec, Option, Ncp) { //lets spy on the network - let (node, gossip_socket) = Crdt::spy_node(); - let mut spy_crdt = Crdt::new(node).expect("Crdt::new"); - spy_crdt.insert(&leader); - spy_crdt.set_leader(leader.id); - let spy_ref = Arc::new(RwLock::new(spy_crdt)); + let (node, gossip_socket) = ClusterInfo::spy_node(); + let mut spy_cluster_info = ClusterInfo::new(node).expect("ClusterInfo::new"); + spy_cluster_info.insert(&leader); + spy_cluster_info.set_leader(leader.id); + let spy_ref = Arc::new(RwLock::new(spy_cluster_info)); let window = Arc::new(RwLock::new(default_window())); let ncp = Ncp::new(&spy_ref, window, None, gossip_socket, exit_signal.clone()); let mut v: Vec = vec![]; @@ -763,7 +763,7 @@ fn converge( v = spy_ref .table .values() - .filter(|x| Crdt::is_valid_address(&x.contact_info.rpu)) + .filter(|x| ClusterInfo::is_valid_address(&x.contact_info.rpu)) .cloned() .collect(); diff --git a/src/bin/fullnode-config.rs b/src/bin/fullnode-config.rs index 343d1d5a5..9bd696191 100644 --- a/src/bin/fullnode-config.rs +++ b/src/bin/fullnode-config.rs @@ -5,7 +5,7 @@ extern crate serde_json; extern crate solana; use clap::{App, Arg}; -use solana::crdt::FULLNODE_PORT_RANGE; +use solana::cluster_info::FULLNODE_PORT_RANGE; use solana::fullnode::Config; use solana::logger; use solana::netutil::{get_ip_addr, get_public_ip_addr, parse_port_or_addr}; diff --git a/src/bin/fullnode.rs b/src/bin/fullnode.rs index 49a3dc774..9703e1e98 100644 --- a/src/bin/fullnode.rs +++ b/src/bin/fullnode.rs @@ -9,7 +9,7 @@ extern crate solana; use clap::{App, Arg}; use solana::client::mk_client; -use solana::crdt::Node; +use solana::cluster_info::Node; use solana::drone::DRONE_PORT; use solana::fullnode::{Config, Fullnode, FullnodeReturnType}; use solana::logger; diff --git a/src/bin/replicator.rs b/src/bin/replicator.rs index 5c9c32739..f78653cd7 100644 --- a/src/bin/replicator.rs +++ b/src/bin/replicator.rs @@ -7,7 +7,7 @@ extern crate solana; use clap::{App, Arg}; use solana::chacha::chacha_cbc_encrypt_files; -use solana::crdt::Node; +use solana::cluster_info::Node; use solana::fullnode::Config; use solana::ledger::LEDGER_DATA_FILE; use solana::logger; diff --git a/src/broadcast_stage.rs b/src/broadcast_stage.rs index c638f2729..5bc9500d3 100644 --- a/src/broadcast_stage.rs +++ b/src/broadcast_stage.rs @@ -1,7 +1,7 @@ //! The `broadcast_stage` broadcasts data from a leader node to validators //! +use cluster_info::{ClusterInfo, ClusterInfoError, NodeInfo}; use counter::Counter; -use crdt::{Crdt, CrdtError, NodeInfo}; use entry::Entry; #[cfg(feature = "erasure")] use erasure; @@ -27,7 +27,7 @@ pub enum BroadcastStageReturnType { } fn broadcast( - crdt: &Arc>, + cluster_info: &Arc>, leader_rotation_interval: u64, node_info: &NodeInfo, broadcast_table: &[NodeInfo], @@ -129,8 +129,8 @@ fn broadcast( *receive_index += blobs_len as u64; // Send blobs out from the window - Crdt::broadcast( - crdt, + ClusterInfo::broadcast( + cluster_info, leader_rotation_interval, &node_info, &broadcast_table, @@ -179,7 +179,7 @@ pub struct BroadcastStage { impl BroadcastStage { fn run( sock: &UdpSocket, - crdt: &Arc>, + cluster_info: &Arc>, window: &SharedWindow, entry_height: u64, receiver: &Receiver>, @@ -192,16 +192,16 @@ impl BroadcastStage { let me; let leader_rotation_interval; { - let rcrdt = crdt.read().unwrap(); - me = rcrdt.my_data().clone(); - leader_rotation_interval = rcrdt.get_leader_rotation_interval(); + let rcluster_info = cluster_info.read().unwrap(); + me = rcluster_info.my_data().clone(); + leader_rotation_interval = rcluster_info.get_leader_rotation_interval(); } loop { if transmit_index.data % (leader_rotation_interval as u64) == 0 { - let rcrdt = crdt.read().unwrap(); - let my_id = rcrdt.my_data().id; - match rcrdt.get_scheduled_leader(transmit_index.data) { + let rcluster_info = cluster_info.read().unwrap(); + let my_id = rcluster_info.my_data().id; + match rcluster_info.get_scheduled_leader(transmit_index.data) { Some(id) if id == my_id => (), // If the leader stays in power for the next // round as well, then we don't exit. Otherwise, exit. @@ -211,9 +211,9 @@ impl BroadcastStage { } } - let broadcast_table = crdt.read().unwrap().compute_broadcast_table(); + let broadcast_table = cluster_info.read().unwrap().compute_broadcast_table(); if let Err(e) = broadcast( - crdt, + cluster_info, leader_rotation_interval, &me, &broadcast_table, @@ -228,7 +228,7 @@ impl BroadcastStage { return BroadcastStageReturnType::ChannelDisconnected } Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (), - Error::CrdtError(CrdtError::NoPeers) => (), // TODO: Why are the unit-tests throwing hundreds of these? + Error::ClusterInfoError(ClusterInfoError::NoPeers) => (), // TODO: Why are the unit-tests throwing hundreds of these? _ => { inc_new_counter_info!("streamer-broadcaster-error", 1, 1); error!("broadcaster error: {:?}", e); @@ -239,11 +239,11 @@ impl BroadcastStage { } /// Service to broadcast messages from the leader to layer 1 nodes. - /// See `crdt` for network layer definitions. + /// See `cluster_info` for network layer definitions. /// # Arguments /// * `sock` - Socket to send from. /// * `exit` - Boolean to signal system exit. - /// * `crdt` - CRDT structure + /// * `cluster_info` - ClusterInfo structure /// * `window` - Cache of blobs that we have broadcast /// * `receiver` - Receive channel for blobs to be retransmitted to all the layer 1 nodes. /// * `exit_sender` - Set to true when this stage exits, allows rest of Tpu to exit cleanly. Otherwise, @@ -255,7 +255,7 @@ impl BroadcastStage { /// completing the cycle. pub fn new( sock: UdpSocket, - crdt: Arc>, + cluster_info: Arc>, window: SharedWindow, entry_height: u64, receiver: Receiver>, @@ -265,7 +265,7 @@ impl BroadcastStage { .name("solana-broadcaster".to_string()) .spawn(move || { let _exit = Finalizer::new(exit_sender); - Self::run(&sock, &crdt, &window, entry_height, &receiver) + Self::run(&sock, &cluster_info, &window, entry_height, &receiver) }).unwrap(); BroadcastStage { thread_hdl } @@ -283,7 +283,7 @@ impl Service for BroadcastStage { #[cfg(test)] mod tests { use broadcast_stage::{BroadcastStage, BroadcastStageReturnType}; - use crdt::{Crdt, Node}; + use cluster_info::{ClusterInfo, Node}; use entry::Entry; use ledger::next_entries_mut; use mint::Mint; @@ -302,7 +302,7 @@ mod tests { broadcast_stage: BroadcastStage, shared_window: SharedWindow, entry_sender: Sender>, - crdt: Arc>, + cluster_info: Arc>, entries: Vec, } @@ -317,11 +317,12 @@ mod tests { let buddy_id = buddy_keypair.pubkey(); let broadcast_buddy = Node::new_localhost_with_pubkey(buddy_keypair.pubkey()); - // Fill the crdt with the buddy's info - let mut crdt = Crdt::new(leader_info.info.clone()).expect("Crdt::new"); - crdt.insert(&broadcast_buddy.info); - crdt.set_leader_rotation_interval(leader_rotation_interval); - let crdt = Arc::new(RwLock::new(crdt)); + // Fill the cluster_info with the buddy's info + let mut cluster_info = + ClusterInfo::new(leader_info.info.clone()).expect("ClusterInfo::new"); + cluster_info.insert(&broadcast_buddy.info); + cluster_info.set_leader_rotation_interval(leader_rotation_interval); + let cluster_info = Arc::new(RwLock::new(cluster_info)); // Make dummy initial entries let mint = Mint::new(10000); @@ -338,7 +339,7 @@ mod tests { // Start up the broadcast stage let broadcast_stage = BroadcastStage::new( leader_info.sockets.broadcast, - crdt.clone(), + cluster_info.clone(), shared_window.clone(), entry_height, entry_receiver, @@ -351,7 +352,7 @@ mod tests { broadcast_stage, shared_window, entry_sender, - crdt, + cluster_info, entries, } } @@ -372,9 +373,9 @@ mod tests { let leader_rotation_interval = 10; let broadcast_info = setup_dummy_broadcast_stage(leader_rotation_interval); { - let mut wcrdt = broadcast_info.crdt.write().unwrap(); + let mut wcluster_info = broadcast_info.cluster_info.write().unwrap(); // Set the leader for the next rotation to be myself - wcrdt.set_scheduled_leader(leader_rotation_interval, broadcast_info.my_id); + wcluster_info.set_scheduled_leader(leader_rotation_interval, broadcast_info.my_id); } let genesis_len = broadcast_info.entries.len() as u64; @@ -394,16 +395,16 @@ mod tests { broadcast_info.entry_sender.send(new_entry).unwrap(); } - // Set the scheduled next leader in the crdt to the other buddy on the network + // Set the scheduled next leader in the cluster_info to the other buddy on the network broadcast_info - .crdt + .cluster_info .write() .unwrap() .set_scheduled_leader(2 * leader_rotation_interval, broadcast_info.buddy_id); // Input another leader_rotation_interval dummy entries, which will take us // past the point of the leader rotation. The write_stage will see that - // it's no longer the leader after checking the crdt, and exit + // it's no longer the leader after checking the cluster_info, and exit for _ in 0..leader_rotation_interval { let new_entry = next_entries_mut(&mut last_id, &mut num_hashes, vec![]); diff --git a/src/budget_instruction.rs b/src/budget_instruction.rs index d441c7208..0a5f6884f 100644 --- a/src/budget_instruction.rs +++ b/src/budget_instruction.rs @@ -12,9 +12,9 @@ pub struct Contract { pub struct Vote { /// We send some gossip specific membership information through the vote to shortcut /// liveness voting - /// The version of the CRDT struct that the last_id of this network voted with + /// The version of the ClusterInfo struct that the last_id of this network voted with pub version: u64, - /// The version of the CRDT struct that has the same network configuration as this one + /// The version of the ClusterInfo struct that has the same network configuration as this one pub contact_info_version: u64, // TODO: add signature of the state here as well } diff --git a/src/choose_gossip_peer_strategy.rs b/src/choose_gossip_peer_strategy.rs index 16c4226e4..7a79ab444 100644 --- a/src/choose_gossip_peer_strategy.rs +++ b/src/choose_gossip_peer_strategy.rs @@ -1,4 +1,4 @@ -use crdt::{CrdtError, NodeInfo}; +use cluster_info::{ClusterInfoError, NodeInfo}; use rand::distributions::{Distribution, Weighted, WeightedChoice}; use rand::thread_rng; use result::Result; @@ -29,7 +29,7 @@ impl<'a, 'b> ChooseRandomPeerStrategy<'a> { impl<'a> ChooseGossipPeerStrategy for ChooseRandomPeerStrategy<'a> { fn choose_peer<'b>(&self, options: Vec<&'b NodeInfo>) -> Result<&'b NodeInfo> { if options.is_empty() { - Err(CrdtError::NoPeers)?; + Err(ClusterInfoError::NoPeers)?; } let n = ((self.random)() as usize) % options.len(); @@ -87,8 +87,8 @@ impl<'a> ChooseWeightedPeerStrategy<'a> { fn calculate_weighted_remote_index(&self, peer_id: Pubkey) -> u32 { let mut last_seen_index = 0; // If the peer is not in our remote table, then we leave last_seen_index as zero. - // Only happens when a peer appears in our crdt.table but not in our crdt.remote, - // which means a validator was directly injected into our crdt.table + // Only happens when a peer appears in our cluster_info.table but not in our cluster_info.remote, + // which means a validator was directly injected into our cluster_info.table if let Some(index) = self.remote.get(&peer_id) { last_seen_index = *index; } @@ -174,7 +174,7 @@ impl<'a> ChooseWeightedPeerStrategy<'a> { impl<'a> ChooseGossipPeerStrategy for ChooseWeightedPeerStrategy<'a> { fn choose_peer<'b>(&self, options: Vec<&'b NodeInfo>) -> Result<&'b NodeInfo> { if options.is_empty() { - Err(CrdtError::NoPeers)?; + Err(ClusterInfoError::NoPeers)?; } let mut weighted_peers = vec![]; diff --git a/src/client.rs b/src/client.rs index ab7780033..d4df59d80 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,4 +1,4 @@ -use crdt::{NodeInfo, FULLNODE_PORT_RANGE}; +use cluster_info::{NodeInfo, FULLNODE_PORT_RANGE}; use netutil::bind_in_range; use std::time::Duration; use thin_client::ThinClient; diff --git a/src/crdt.rs b/src/cluster_info.rs similarity index 85% rename from src/crdt.rs rename to src/cluster_info.rs index 1f49e0de3..79ccc5275 100644 --- a/src/crdt.rs +++ b/src/cluster_info.rs @@ -1,4 +1,4 @@ -//! The `crdt` module defines a data structure that is shared by all the nodes in the network over +//! The `cluster_info` module defines a data structure that is shared by all the nodes in the network over //! a gossip control plane. The goal is to share small bits of off-chain information and detect and //! repair partitions. //! @@ -64,7 +64,7 @@ macro_rules! socketaddr_any { } #[derive(Debug, PartialEq, Eq)] -pub enum CrdtError { +pub enum ClusterInfoError { NoPeers, NoLeader, BadContactInfo, @@ -194,7 +194,7 @@ impl NodeInfo { } } -/// `Crdt` structure keeps a table of `NodeInfo` structs +/// `ClusterInfo` structure keeps a table of `NodeInfo` structs /// # Properties /// * `table` - map of public id's to versioned and signed NodeInfo structs /// * `local` - map of public id's to what `self.update_index` `self.table` was updated @@ -205,7 +205,7 @@ impl NodeInfo { /// * `gossip` - asynchronously ask nodes to send updates /// * `listen` - listen for requests and responses /// No attempt to keep track of timeouts or dropped requests is made, or should be. -pub struct Crdt { +pub struct ClusterInfo { /// table of everyone in the network pub table: HashMap, /// Value of my update index when entry in table was updated. @@ -247,12 +247,12 @@ enum Protocol { RequestWindowIndex(NodeInfo, u64), } -impl Crdt { - pub fn new(node_info: NodeInfo) -> Result { +impl ClusterInfo { + pub fn new(node_info: NodeInfo) -> Result { if node_info.version != 0 { - return Err(Error::CrdtError(CrdtError::BadNodeInfo)); + return Err(Error::ClusterInfoError(ClusterInfoError::BadNodeInfo)); } - let mut me = Crdt { + let mut me = ClusterInfo { table: HashMap::new(), local: HashMap::new(), remote: HashMap::new(), @@ -351,7 +351,7 @@ impl Crdt { .values() .into_iter() .filter(|x| x.id != me) - .filter(|x| Crdt::is_valid_address(&x.contact_info.rpu)) + .filter(|x| ClusterInfo::is_valid_address(&x.contact_info.rpu)) .cloned() .collect() } @@ -374,7 +374,7 @@ impl Crdt { } if *pubkey == self.my_data().leader_id { info!("{}: LEADER_VOTED! {}", self.id, pubkey); - inc_new_counter_info!("crdt-insert_vote-leader_voted", 1); + inc_new_counter_info!("cluster_info-insert_vote-leader_voted", 1); } if v.version <= self.table[pubkey].version { @@ -392,7 +392,7 @@ impl Crdt { } } pub fn insert_votes(&mut self, votes: &[(Pubkey, Vote, Hash)]) { - inc_new_counter_info!("crdt-vote-count", votes.len()); + inc_new_counter_info!("cluster_info-vote-count", votes.len()); if !votes.is_empty() { info!("{}: INSERTING VOTES {}", self.id, votes.len()); } @@ -409,7 +409,7 @@ impl Crdt { // we have stored ourselves trace!("{}: insert v.id: {} version: {}", self.id, v.id, v.version); if self.table.get(&v.id).is_none() { - inc_new_counter_info!("crdt-insert-new_entry", 1, 1); + inc_new_counter_info!("cluster_info-insert-new_entry", 1, 1); } self.update_index += 1; @@ -462,7 +462,7 @@ impl Crdt { } }).collect(); - inc_new_counter_info!("crdt-purge-count", dead_ids.len()); + inc_new_counter_info!("cluster_info-purge-count", dead_ids.len()); for id in &dead_ids { self.alive.remove(id); @@ -476,7 +476,7 @@ impl Crdt { } if *id == leader_id { info!("{}: PURGE LEADER {}", self.id, id,); - inc_new_counter_info!("crdt-purge-purged_leader", 1, 1); + inc_new_counter_info!("cluster_info-purge-purged_leader", 1, 1); self.set_leader(Pubkey::default()); } } @@ -516,7 +516,7 @@ impl Crdt { /// # Remarks /// We need to avoid having obj locked while doing any io, such as the `send_to` pub fn broadcast( - crdt: &Arc>, + cluster_info: &Arc>, leader_rotation_interval: u64, me: &NodeInfo, broadcast_table: &[NodeInfo], @@ -526,9 +526,9 @@ impl Crdt { received_index: u64, ) -> Result<()> { if broadcast_table.is_empty() { - debug!("{}:not enough peers in crdt table", me.id); - inc_new_counter_info!("crdt-broadcast-not_enough_peers_error", 1); - Err(CrdtError::NoPeers)?; + debug!("{}:not enough peers in cluster_info table", me.id); + inc_new_counter_info!("cluster_info-broadcast-not_enough_peers_error", 1); + Err(ClusterInfoError::NoPeers)?; } trace!( "{} transmit_index: {:?} received_index: {} broadcast_len: {}", @@ -563,7 +563,10 @@ impl Crdt { // so he can initiate repairs if necessary let entry_height = idx + 1; if entry_height % leader_rotation_interval == 0 { - let next_leader_id = crdt.read().unwrap().get_scheduled_leader(entry_height); + let next_leader_id = cluster_info + .read() + .unwrap() + .get_scheduled_leader(entry_height); if next_leader_id.is_some() && next_leader_id != Some(me.id) { let info_result = broadcast_table .iter() @@ -640,7 +643,7 @@ impl Crdt { } } inc_new_counter_info!( - "crdt-broadcast-max_idx", + "cluster_info-broadcast-max_idx", (transmit_index.data - old_transmit_index) as usize ); transmit_index.coding = transmit_index.data; @@ -699,7 +702,7 @@ impl Crdt { }).collect(); for e in errs { if let Err(e) = &e { - inc_new_counter_info!("crdt-retransmit-send_to_error", 1, 1); + inc_new_counter_info!("cluster_info-retransmit-send_to_error", 1, 1); error!("retransmit result {:?}", e); } e?; @@ -751,7 +754,7 @@ impl Crdt { .filter(|r| r.id != self.id && Self::is_valid_address(&r.contact_info.tvu)) .collect(); if valid.is_empty() { - Err(CrdtError::NoPeers)?; + Err(ClusterInfoError::NoPeers)?; } let n = thread_rng().gen::() % valid.len(); let addr = valid[n].contact_info.ncp; // send the request to the peer's gossip port @@ -783,8 +786,12 @@ impl Crdt { let choose_peer_result = choose_peer_strategy.choose_peer(options); - if let Err(Error::CrdtError(CrdtError::NoPeers)) = &choose_peer_result { - trace!("crdt too small for gossip {} {}", self.id, self.table.len()); + if let Err(Error::ClusterInfoError(ClusterInfoError::NoPeers)) = &choose_peer_result { + trace!( + "cluster_info too small for gossip {} {}", + self.id, + self.table.len() + ); }; let v = choose_peer_result?; @@ -803,7 +810,10 @@ impl Crdt { pub fn new_vote(&mut self, last_id: Hash) -> Result<(Vote, SocketAddr)> { let mut me = self.my_data().clone(); - let leader = self.leader_data().ok_or(CrdtError::NoLeader)?.clone(); + let leader = self + .leader_data() + .ok_or(ClusterInfoError::NoLeader)? + .clone(); me.version += 1; me.ledger_state.last_id = last_id; let vote = Vote { @@ -879,7 +889,7 @@ impl Crdt { for v in data { insert_total += self.insert(&v); } - inc_new_counter_info!("crdt-update-count", insert_total); + inc_new_counter_info!("cluster_info-update-count", insert_total); for (pubkey, external_remote_index) in external_liveness { let remote_entry = if let Some(v) = self.remote.get(pubkey) { @@ -974,11 +984,11 @@ impl Crdt { outblob.meta.set_addr(from_addr); outblob.set_id(sender_id).expect("blob set_id"); } - inc_new_counter_info!("crdt-window-request-pass", 1); + inc_new_counter_info!("cluster_info-window-request-pass", 1); return Some(out); } else { - inc_new_counter_info!("crdt-window-request-outside", 1); + inc_new_counter_info!("cluster_info-window-request-outside", 1); trace!( "requested ix {} != blob_ix {}, outside window!", ix, @@ -990,7 +1000,7 @@ impl Crdt { if let Some(ledger_window) = ledger_window { if let Ok(entry) = ledger_window.get_entry(ix) { - inc_new_counter_info!("crdt-window-request-ledger", 1); + inc_new_counter_info!("cluster_info-window-request-ledger", 1); let out = entry.to_blob( Some(ix), @@ -1002,7 +1012,7 @@ impl Crdt { } } - inc_new_counter_info!("crdt-window-request-fail", 1); + inc_new_counter_info!("cluster_info-window-request-fail", 1); trace!( "{}: failed RequestWindowIndex {} {} {}", me.id, @@ -1023,10 +1033,10 @@ impl Crdt { ) -> Option { match deserialize(&blob.data[..blob.meta.size]) { Ok(request) => { - Crdt::handle_protocol(obj, &blob.meta.addr(), request, window, ledger_window) + ClusterInfo::handle_protocol(obj, &blob.meta.addr(), request, window, ledger_window) } Err(_) => { - warn!("deserialize crdt packet failed"); + warn!("deserialize cluster_info packet failed"); None } } @@ -1058,7 +1068,7 @@ impl Crdt { me.read().unwrap().id, from.id ); - inc_new_counter_info!("crdt-window-request-loopback", 1); + inc_new_counter_info!("cluster_info-window-request-loopback", 1); return None; } @@ -1066,7 +1076,7 @@ impl Crdt { // this may or may not be correct for everybody but it's better than leaving him with // an unspecified address in our table if from.contact_info.ncp.ip().is_unspecified() { - inc_new_counter_info!("crdt-window-request-updates-unspec-ncp", 1); + inc_new_counter_info!("cluster_info-window-request-updates-unspec-ncp", 1); from.contact_info.ncp = *from_addr; } @@ -1144,7 +1154,7 @@ impl Crdt { Protocol::RequestWindowIndex(from, ix) => { let now = Instant::now(); - //TODO this doesn't depend on CRDT module, could be moved + //TODO this doesn't depend on cluster_info module, could be moved //but we are using the listen thread to service these request //TODO verify from is signed @@ -1155,13 +1165,13 @@ impl Crdt { from.id, ix, ); - inc_new_counter_info!("crdt-window-request-address-eq", 1); + inc_new_counter_info!("cluster_info-window-request-address-eq", 1); return None; } me.write().unwrap().insert(&from); let me = me.read().unwrap().my_data().clone(); - inc_new_counter_info!("crdt-window-request-recv", 1); + inc_new_counter_info!("cluster_info-window-request-recv", 1); trace!("{}: received RequestWindowIndex {} {} ", me.id, from.id, ix,); let res = Self::run_window_request(&from, &from_addr, &window, ledger_window, &me, ix); @@ -1376,9 +1386,9 @@ fn report_time_spent(label: &str, time: &Duration, extra: &str) { #[cfg(test)] mod tests { use budget_instruction::Vote; - use crdt::{ - Crdt, CrdtError, Node, NodeInfo, Protocol, FULLNODE_PORT_RANGE, GOSSIP_PURGE_MILLIS, - GOSSIP_SLEEP_MILLIS, MIN_TABLE_SIZE, + use cluster_info::{ + ClusterInfo, ClusterInfoError, Node, NodeInfo, Protocol, FULLNODE_PORT_RANGE, + GOSSIP_PURGE_MILLIS, GOSSIP_SLEEP_MILLIS, MIN_TABLE_SIZE, }; use entry::Entry; use hash::{hash, Hash}; @@ -1401,83 +1411,83 @@ mod tests { fn insert_test() { let mut d = NodeInfo::new_localhost(Keypair::new().pubkey()); assert_eq!(d.version, 0); - let mut crdt = Crdt::new(d.clone()).unwrap(); - assert_eq!(crdt.table[&d.id].version, 0); - assert!(!crdt.alive.contains_key(&d.id)); + let mut cluster_info = ClusterInfo::new(d.clone()).unwrap(); + assert_eq!(cluster_info.table[&d.id].version, 0); + assert!(!cluster_info.alive.contains_key(&d.id)); d.version = 2; - crdt.insert(&d); - let liveness = crdt.alive[&d.id]; - assert_eq!(crdt.table[&d.id].version, 2); + cluster_info.insert(&d); + let liveness = cluster_info.alive[&d.id]; + assert_eq!(cluster_info.table[&d.id].version, 2); d.version = 1; - crdt.insert(&d); - assert_eq!(crdt.table[&d.id].version, 2); - assert_eq!(liveness, crdt.alive[&d.id]); + cluster_info.insert(&d); + assert_eq!(cluster_info.table[&d.id].version, 2); + assert_eq!(liveness, cluster_info.alive[&d.id]); // Ensure liveness will be updated for version 3 sleep(Duration::from_millis(1)); d.version = 3; - crdt.insert(&d); - assert_eq!(crdt.table[&d.id].version, 3); - assert!(liveness < crdt.alive[&d.id]); + cluster_info.insert(&d); + assert_eq!(cluster_info.table[&d.id].version, 3); + assert!(liveness < cluster_info.alive[&d.id]); } #[test] fn test_new_vote() { let d = NodeInfo::new_with_socketaddr(&socketaddr!("127.0.0.1:1234")); assert_eq!(d.version, 0); - let mut crdt = Crdt::new(d.clone()).unwrap(); - assert_eq!(crdt.table[&d.id].version, 0); + let mut cluster_info = ClusterInfo::new(d.clone()).unwrap(); + assert_eq!(cluster_info.table[&d.id].version, 0); let leader = NodeInfo::new_with_socketaddr(&socketaddr!("127.0.0.2:1235")); assert_ne!(d.id, leader.id); assert_matches!( - crdt.new_vote(Hash::default()).err(), - Some(Error::CrdtError(CrdtError::NoLeader)) + cluster_info.new_vote(Hash::default()).err(), + Some(Error::ClusterInfoError(ClusterInfoError::NoLeader)) ); - crdt.insert(&leader); + cluster_info.insert(&leader); assert_matches!( - crdt.new_vote(Hash::default()).err(), - Some(Error::CrdtError(CrdtError::NoLeader)) + cluster_info.new_vote(Hash::default()).err(), + Some(Error::ClusterInfoError(ClusterInfoError::NoLeader)) ); - crdt.set_leader(leader.id); - assert_eq!(crdt.table[&d.id].version, 1); + cluster_info.set_leader(leader.id); + assert_eq!(cluster_info.table[&d.id].version, 1); let v = Vote { version: 2, //version should increase when we vote contact_info_version: 0, }; - let expected = (v, crdt.table[&leader.id].contact_info.tpu); - assert_eq!(crdt.new_vote(Hash::default()).unwrap(), expected); + let expected = (v, cluster_info.table[&leader.id].contact_info.tpu); + assert_eq!(cluster_info.new_vote(Hash::default()).unwrap(), expected); } #[test] fn test_insert_vote() { let d = NodeInfo::new_with_socketaddr(&socketaddr!("127.0.0.1:1234")); assert_eq!(d.version, 0); - let mut crdt = Crdt::new(d.clone()).unwrap(); - assert_eq!(crdt.table[&d.id].version, 0); + let mut cluster_info = ClusterInfo::new(d.clone()).unwrap(); + assert_eq!(cluster_info.table[&d.id].version, 0); let vote_same_version = Vote { version: d.version, contact_info_version: 0, }; - crdt.insert_vote(&d.id, &vote_same_version, Hash::default()); - assert_eq!(crdt.table[&d.id].version, 0); + cluster_info.insert_vote(&d.id, &vote_same_version, Hash::default()); + assert_eq!(cluster_info.table[&d.id].version, 0); let vote_new_version_new_addrs = Vote { version: d.version + 1, contact_info_version: 1, }; - crdt.insert_vote(&d.id, &vote_new_version_new_addrs, Hash::default()); + cluster_info.insert_vote(&d.id, &vote_new_version_new_addrs, Hash::default()); //should be dropped since the address is newer then we know - assert_eq!(crdt.table[&d.id].version, 0); + assert_eq!(cluster_info.table[&d.id].version, 0); let vote_new_version_old_addrs = Vote { version: d.version + 1, contact_info_version: 0, }; - crdt.insert_vote(&d.id, &vote_new_version_old_addrs, Hash::default()); + cluster_info.insert_vote(&d.id, &vote_new_version_old_addrs, Hash::default()); //should be accepted, since the update is for the same address field as the one we know - assert_eq!(crdt.table[&d.id].version, 1); + assert_eq!(cluster_info.table[&d.id].version, 1); } fn sorted(ls: &Vec) -> Vec { let mut copy: Vec<_> = ls.iter().cloned().collect(); @@ -1502,20 +1512,20 @@ mod tests { let d1 = NodeInfo::new_localhost(Keypair::new().pubkey()); let d2 = NodeInfo::new_localhost(Keypair::new().pubkey()); let d3 = NodeInfo::new_localhost(Keypair::new().pubkey()); - let mut crdt = Crdt::new(d1.clone()).expect("Crdt::new"); - let (key, ix, ups) = crdt.get_updates_since(0); + let mut cluster_info = ClusterInfo::new(d1.clone()).expect("ClusterInfo::new"); + let (key, ix, ups) = cluster_info.get_updates_since(0); assert_eq!(key, d1.id); assert_eq!(ix, 1); assert_eq!(ups.len(), 1); assert_eq!(sorted(&ups), sorted(&vec![d1.clone()])); - crdt.insert(&d2); - let (key, ix, ups) = crdt.get_updates_since(0); + cluster_info.insert(&d2); + let (key, ix, ups) = cluster_info.get_updates_since(0); assert_eq!(key, d1.id); assert_eq!(ix, 2); assert_eq!(ups.len(), 2); assert_eq!(sorted(&ups), sorted(&vec![d1.clone(), d2.clone()])); - crdt.insert(&d3); - let (key, ix, ups) = crdt.get_updates_since(0); + cluster_info.insert(&d3); + let (key, ix, ups) = cluster_info.get_updates_since(0); assert_eq!(key, d1.id); assert_eq!(ix, 3); assert_eq!(ups.len(), 3); @@ -1523,24 +1533,24 @@ mod tests { sorted(&ups), sorted(&vec![d1.clone(), d2.clone(), d3.clone()]) ); - let mut crdt2 = Crdt::new(d2.clone()).expect("Crdt::new"); - crdt2.apply_updates(key, ix, &ups, &vec![]); - assert_eq!(crdt2.table.values().len(), 3); + let mut cluster_info2 = ClusterInfo::new(d2.clone()).expect("ClusterInfo::new"); + cluster_info2.apply_updates(key, ix, &ups, &vec![]); + assert_eq!(cluster_info2.table.values().len(), 3); assert_eq!( - sorted(&crdt2.table.values().map(|x| x.clone()).collect()), - sorted(&crdt.table.values().map(|x| x.clone()).collect()) + sorted(&cluster_info2.table.values().map(|x| x.clone()).collect()), + sorted(&cluster_info.table.values().map(|x| x.clone()).collect()) ); let d4 = NodeInfo::new_entry_point(&socketaddr!("127.0.0.4:1234")); - crdt.insert(&d4); - let (_key, _ix, ups) = crdt.get_updates_since(0); + cluster_info.insert(&d4); + let (_key, _ix, ups) = cluster_info.get_updates_since(0); assert_eq!(sorted(&ups), sorted(&vec![d2.clone(), d1, d3])); } #[test] fn window_index_request() { let me = NodeInfo::new_localhost(Keypair::new().pubkey()); - let mut crdt = Crdt::new(me).expect("Crdt::new"); - let rv = crdt.window_index_request(0); - assert_matches!(rv, Err(Error::CrdtError(CrdtError::NoPeers))); + let mut cluster_info = ClusterInfo::new(me).expect("ClusterInfo::new"); + let rv = cluster_info.window_index_request(0); + assert_matches!(rv, Err(Error::ClusterInfoError(ClusterInfoError::NoPeers))); let ncp = socketaddr!([127, 0, 0, 1], 1234); let nxt = NodeInfo::new( @@ -1551,8 +1561,8 @@ mod tests { socketaddr!([127, 0, 0, 1], 1237), socketaddr!([127, 0, 0, 1], 1238), ); - crdt.insert(&nxt); - let rv = crdt.window_index_request(0).unwrap(); + cluster_info.insert(&nxt); + let rv = cluster_info.window_index_request(0).unwrap(); assert_eq!(nxt.contact_info.ncp, ncp); assert_eq!(rv.0, nxt.contact_info.ncp); @@ -1565,12 +1575,12 @@ mod tests { socketaddr!([127, 0, 0, 1], 1237), socketaddr!([127, 0, 0, 1], 1238), ); - crdt.insert(&nxt); + cluster_info.insert(&nxt); let mut one = false; let mut two = false; while !one || !two { //this randomly picks an option, so eventually it should pick both - let rv = crdt.window_index_request(0).unwrap(); + let rv = cluster_info.window_index_request(0).unwrap(); if rv.0 == ncp { one = true; } @@ -1592,41 +1602,41 @@ mod tests { socketaddr!("127.0.0.1:127"), ); - let mut crdt = Crdt::new(me).expect("Crdt::new"); + let mut cluster_info = ClusterInfo::new(me).expect("ClusterInfo::new"); let nxt1 = NodeInfo::new_unspecified(); // Filter out unspecified addresses - crdt.insert(&nxt1); //<--- attack! - let rv = crdt.gossip_request(); - assert_matches!(rv, Err(Error::CrdtError(CrdtError::NoPeers))); + cluster_info.insert(&nxt1); //<--- attack! + let rv = cluster_info.gossip_request(); + assert_matches!(rv, Err(Error::ClusterInfoError(ClusterInfoError::NoPeers))); let nxt2 = NodeInfo::new_multicast(); // Filter out multicast addresses - crdt.insert(&nxt2); //<--- attack! - let rv = crdt.gossip_request(); - assert_matches!(rv, Err(Error::CrdtError(CrdtError::NoPeers))); + cluster_info.insert(&nxt2); //<--- attack! + let rv = cluster_info.gossip_request(); + assert_matches!(rv, Err(Error::ClusterInfoError(ClusterInfoError::NoPeers))); } /// test that gossip requests are eventually generated for all nodes #[test] fn gossip_request() { let me = NodeInfo::new_localhost(Keypair::new().pubkey()); - let mut crdt = Crdt::new(me.clone()).expect("Crdt::new"); - let rv = crdt.gossip_request(); - assert_matches!(rv, Err(Error::CrdtError(CrdtError::NoPeers))); + let mut cluster_info = ClusterInfo::new(me.clone()).expect("ClusterInfo::new"); + let rv = cluster_info.gossip_request(); + assert_matches!(rv, Err(Error::ClusterInfoError(ClusterInfoError::NoPeers))); let nxt1 = NodeInfo::new_localhost(Keypair::new().pubkey()); - crdt.insert(&nxt1); + cluster_info.insert(&nxt1); - let rv = crdt.gossip_request().unwrap(); + let rv = cluster_info.gossip_request().unwrap(); assert_eq!(rv.0, nxt1.contact_info.ncp); let nxt2 = NodeInfo::new_entry_point(&socketaddr!("127.0.0.3:1234")); - crdt.insert(&nxt2); + cluster_info.insert(&nxt2); // check that the service works // and that it eventually produces a request for both nodes let (sender, reader) = channel(); let exit = Arc::new(AtomicBool::new(false)); - let obj = Arc::new(RwLock::new(crdt)); - let thread = Crdt::gossip(obj, sender, exit.clone()); + let obj = Arc::new(RwLock::new(cluster_info)); + let thread = ClusterInfo::gossip(obj, sender, exit.clone()); let mut one = false; let mut two = false; for _ in 0..30 { @@ -1660,67 +1670,67 @@ mod tests { fn purge_test() { logger::setup(); let me = NodeInfo::new_with_socketaddr(&socketaddr!("127.0.0.1:1234")); - let mut crdt = Crdt::new(me.clone()).expect("Crdt::new"); + let mut cluster_info = ClusterInfo::new(me.clone()).expect("ClusterInfo::new"); let nxt = NodeInfo::new_with_socketaddr(&socketaddr!("127.0.0.2:1234")); assert_ne!(me.id, nxt.id); - crdt.set_leader(me.id); - crdt.insert(&nxt); - let rv = crdt.gossip_request().unwrap(); + cluster_info.set_leader(me.id); + cluster_info.insert(&nxt); + let rv = cluster_info.gossip_request().unwrap(); assert_eq!(rv.0, nxt.contact_info.ncp); - let now = crdt.alive[&nxt.id]; - crdt.purge(now); - let rv = crdt.gossip_request().unwrap(); + let now = cluster_info.alive[&nxt.id]; + cluster_info.purge(now); + let rv = cluster_info.gossip_request().unwrap(); assert_eq!(rv.0, nxt.contact_info.ncp); - crdt.purge(now + GOSSIP_PURGE_MILLIS); - let rv = crdt.gossip_request().unwrap(); + cluster_info.purge(now + GOSSIP_PURGE_MILLIS); + let rv = cluster_info.gossip_request().unwrap(); assert_eq!(rv.0, nxt.contact_info.ncp); - crdt.purge(now + GOSSIP_PURGE_MILLIS + 1); - let rv = crdt.gossip_request().unwrap(); + cluster_info.purge(now + GOSSIP_PURGE_MILLIS + 1); + let rv = cluster_info.gossip_request().unwrap(); assert_eq!(rv.0, nxt.contact_info.ncp); let mut nxt2 = NodeInfo::new_with_socketaddr(&socketaddr!("127.0.0.2:1234")); assert_ne!(me.id, nxt2.id); assert_ne!(nxt.id, nxt2.id); - crdt.insert(&nxt2); - while now == crdt.alive[&nxt2.id] { + cluster_info.insert(&nxt2); + while now == cluster_info.alive[&nxt2.id] { sleep(Duration::from_millis(GOSSIP_SLEEP_MILLIS)); nxt2.version += 1; - crdt.insert(&nxt2); + cluster_info.insert(&nxt2); } - let len = crdt.table.len() as u64; + let len = cluster_info.table.len() as u64; assert!((MIN_TABLE_SIZE as u64) < len); - crdt.purge(now + GOSSIP_PURGE_MILLIS); - assert_eq!(len as usize, crdt.table.len()); + cluster_info.purge(now + GOSSIP_PURGE_MILLIS); + assert_eq!(len as usize, cluster_info.table.len()); trace!("purging"); - crdt.purge(now + GOSSIP_PURGE_MILLIS + 1); - assert_eq!(len as usize - 1, crdt.table.len()); - let rv = crdt.gossip_request().unwrap(); + cluster_info.purge(now + GOSSIP_PURGE_MILLIS + 1); + assert_eq!(len as usize - 1, cluster_info.table.len()); + let rv = cluster_info.gossip_request().unwrap(); assert_eq!(rv.0, nxt.contact_info.ncp); } #[test] fn purge_leader_test() { logger::setup(); let me = NodeInfo::new_with_socketaddr(&socketaddr!("127.0.0.1:1234")); - let mut crdt = Crdt::new(me.clone()).expect("Crdt::new"); + let mut cluster_info = ClusterInfo::new(me.clone()).expect("ClusterInfo::new"); let nxt = NodeInfo::new_with_socketaddr(&socketaddr!("127.0.0.2:1234")); assert_ne!(me.id, nxt.id); - crdt.insert(&nxt); - crdt.set_leader(nxt.id); - let now = crdt.alive[&nxt.id]; + cluster_info.insert(&nxt); + cluster_info.set_leader(nxt.id); + let now = cluster_info.alive[&nxt.id]; let mut nxt2 = NodeInfo::new_with_socketaddr(&socketaddr!("127.0.0.2:1234")); - crdt.insert(&nxt2); - while now == crdt.alive[&nxt2.id] { + cluster_info.insert(&nxt2); + while now == cluster_info.alive[&nxt2.id] { sleep(Duration::from_millis(GOSSIP_SLEEP_MILLIS)); nxt2.version = nxt2.version + 1; - crdt.insert(&nxt2); + cluster_info.insert(&nxt2); } - let len = crdt.table.len() as u64; - crdt.purge(now + GOSSIP_PURGE_MILLIS + 1); - assert_eq!(len as usize - 1, crdt.table.len()); - assert_eq!(crdt.my_data().leader_id, Pubkey::default()); - assert!(crdt.leader_data().is_none()); + let len = cluster_info.table.len() as u64; + cluster_info.purge(now + GOSSIP_PURGE_MILLIS + 1); + assert_eq!(len as usize - 1, cluster_info.table.len()); + assert_eq!(cluster_info.my_data().leader_id, Pubkey::default()); + assert!(cluster_info.leader_data().is_none()); } /// test window requests respond with the right blob, and do not overrun @@ -1736,18 +1746,21 @@ mod tests { socketaddr!("127.0.0.1:1237"), socketaddr!("127.0.0.1:1238"), ); - let rv = Crdt::run_window_request(&me, &socketaddr_any!(), &window, &mut None, &me, 0); + let rv = + ClusterInfo::run_window_request(&me, &socketaddr_any!(), &window, &mut None, &me, 0); assert!(rv.is_none()); let out = SharedBlob::default(); out.write().unwrap().meta.size = 200; window.write().unwrap()[0].data = Some(out); - let rv = Crdt::run_window_request(&me, &socketaddr_any!(), &window, &mut None, &me, 0); + let rv = + ClusterInfo::run_window_request(&me, &socketaddr_any!(), &window, &mut None, &me, 0); assert!(rv.is_some()); let v = rv.unwrap(); //test we copied the blob assert_eq!(v.read().unwrap().meta.size, 200); let len = window.read().unwrap().len() as u64; - let rv = Crdt::run_window_request(&me, &socketaddr_any!(), &window, &mut None, &me, len); + let rv = + ClusterInfo::run_window_request(&me, &socketaddr_any!(), &window, &mut None, &me, len); assert!(rv.is_none()); fn tmp_ledger(name: &str) -> String { @@ -1769,7 +1782,7 @@ mod tests { let ledger_path = tmp_ledger("run_window_request"); let mut ledger_window = LedgerWindow::open(&ledger_path).unwrap(); - let rv = Crdt::run_window_request( + let rv = ClusterInfo::run_window_request( &me, &socketaddr_any!(), &window, @@ -1793,8 +1806,14 @@ mod tests { let mock_peer = NodeInfo::new_with_socketaddr(&socketaddr!("127.0.0.1:1234")); // Simulate handling a repair request from mock_peer - let rv = - Crdt::run_window_request(&mock_peer, &socketaddr_any!(), &window, &mut None, &me, 0); + let rv = ClusterInfo::run_window_request( + &mock_peer, + &socketaddr_any!(), + &window, + &mut None, + &me, + 0, + ); assert!(rv.is_none()); let blob = SharedBlob::default(); let blob_size = 200; @@ -1803,7 +1822,7 @@ mod tests { let num_requests: u32 = 64; for i in 0..num_requests { - let shared_blob = Crdt::run_window_request( + let shared_blob = ClusterInfo::run_window_request( &mock_peer, &socketaddr_any!(), &window, @@ -1831,23 +1850,23 @@ mod tests { let me = NodeInfo::new_with_socketaddr(&socketaddr!("127.0.0.1:1234")); let leader0 = NodeInfo::new_with_socketaddr(&socketaddr!("127.0.0.1:1234")); let leader1 = NodeInfo::new_with_socketaddr(&socketaddr!("127.0.0.1:1234")); - let mut crdt = Crdt::new(me.clone()).expect("Crdt::new"); - assert_eq!(crdt.top_leader(), None); - crdt.set_leader(leader0.id); - assert_eq!(crdt.top_leader().unwrap(), leader0.id); + let mut cluster_info = ClusterInfo::new(me.clone()).expect("ClusterInfo::new"); + assert_eq!(cluster_info.top_leader(), None); + cluster_info.set_leader(leader0.id); + assert_eq!(cluster_info.top_leader().unwrap(), leader0.id); //add a bunch of nodes with a new leader for _ in 0..10 { let mut dum = NodeInfo::new_entry_point(&socketaddr!("127.0.0.1:1234")); dum.id = Keypair::new().pubkey(); dum.leader_id = leader1.id; - crdt.insert(&dum); + cluster_info.insert(&dum); } - assert_eq!(crdt.top_leader().unwrap(), leader1.id); - crdt.update_leader(); - assert_eq!(crdt.my_data().leader_id, leader0.id); - crdt.insert(&leader1); - crdt.update_leader(); - assert_eq!(crdt.my_data().leader_id, leader1.id); + assert_eq!(cluster_info.top_leader().unwrap(), leader1.id); + cluster_info.update_leader(); + assert_eq!(cluster_info.my_data().leader_id, leader0.id); + cluster_info.insert(&leader1); + cluster_info.update_leader(); + assert_eq!(cluster_info.my_data().leader_id, leader1.id); } #[test] @@ -1870,11 +1889,14 @@ mod tests { socketaddr_any!(), ); leader3.ledger_state.last_id = hash(b"3"); - let mut crdt = Crdt::new(leader0.clone()).expect("Crdt::new"); - crdt.insert(&leader1); - crdt.insert(&leader2); - crdt.insert(&leader3); - assert_eq!(crdt.valid_last_ids(), vec![leader0.ledger_state.last_id]); + let mut cluster_info = ClusterInfo::new(leader0.clone()).expect("ClusterInfo::new"); + cluster_info.insert(&leader1); + cluster_info.insert(&leader2); + cluster_info.insert(&leader3); + assert_eq!( + cluster_info.valid_last_ids(), + vec![leader0.ledger_state.last_id] + ); } /// Validates the node that sent Protocol::ReceiveUpdates gets its @@ -1890,25 +1912,25 @@ mod tests { assert_ne!(node.id, node_with_same_addr.id); let node_with_diff_addr = NodeInfo::new_with_socketaddr(&socketaddr!("127.0.0.1:4321")); - let crdt = Crdt::new(node.clone()).expect("Crdt::new"); - assert_eq!(crdt.alive.len(), 0); + let cluster_info = ClusterInfo::new(node.clone()).expect("ClusterInfo::new"); + assert_eq!(cluster_info.alive.len(), 0); - let obj = Arc::new(RwLock::new(crdt)); + let obj = Arc::new(RwLock::new(cluster_info)); let request = Protocol::RequestUpdates(1, node.clone()); assert!( - Crdt::handle_protocol(&obj, &node.contact_info.ncp, request, &window, &mut None,) + ClusterInfo::handle_protocol(&obj, &node.contact_info.ncp, request, &window, &mut None,) .is_none() ); let request = Protocol::RequestUpdates(1, node_with_same_addr.clone()); assert!( - Crdt::handle_protocol(&obj, &node.contact_info.ncp, request, &window, &mut None,) + ClusterInfo::handle_protocol(&obj, &node.contact_info.ncp, request, &window, &mut None,) .is_none() ); let request = Protocol::RequestUpdates(1, node_with_diff_addr.clone()); - Crdt::handle_protocol(&obj, &node.contact_info.ncp, request, &window, &mut None); + ClusterInfo::handle_protocol(&obj, &node.contact_info.ncp, request, &window, &mut None); let me = obj.write().unwrap(); @@ -1924,24 +1946,24 @@ mod tests { fn test_is_valid_address() { assert!(cfg!(test)); let bad_address_port = socketaddr!("127.0.0.1:0"); - assert!(!Crdt::is_valid_address(&bad_address_port)); + assert!(!ClusterInfo::is_valid_address(&bad_address_port)); let bad_address_unspecified = socketaddr!(0, 1234); - assert!(!Crdt::is_valid_address(&bad_address_unspecified)); + assert!(!ClusterInfo::is_valid_address(&bad_address_unspecified)); let bad_address_multicast = socketaddr!([224, 254, 0, 0], 1234); - assert!(!Crdt::is_valid_address(&bad_address_multicast)); + assert!(!ClusterInfo::is_valid_address(&bad_address_multicast)); let loopback = socketaddr!("127.0.0.1:1234"); - assert!(Crdt::is_valid_address(&loopback)); - // assert!(!Crdt::is_valid_ip_internal(loopback.ip(), false)); + assert!(ClusterInfo::is_valid_address(&loopback)); + // assert!(!ClusterInfo::is_valid_ip_internal(loopback.ip(), false)); } #[test] fn test_default_leader() { logger::setup(); let node_info = NodeInfo::new_localhost(Keypair::new().pubkey()); - let mut crdt = Crdt::new(node_info).unwrap(); + let mut cluster_info = ClusterInfo::new(node_info).unwrap(); let network_entry_point = NodeInfo::new_entry_point(&socketaddr!("127.0.0.1:1239")); - crdt.insert(&network_entry_point); - assert!(crdt.leader_data().is_none()); + cluster_info.insert(&network_entry_point); + assert!(cluster_info.leader_data().is_none()); } #[test] diff --git a/src/drone.rs b/src/drone.rs index 233070796..c57a761e4 100644 --- a/src/drone.rs +++ b/src/drone.rs @@ -224,7 +224,7 @@ pub fn run_local_drone(mint_keypair: Keypair, network: SocketAddr, sender: Sende #[cfg(test)] mod tests { use bank::Bank; - use crdt::Node; + use cluster_info::Node; use drone::{Drone, DroneRequest, REQUEST_CAP, TIME_SLICE}; use fullnode::Fullnode; use logger; diff --git a/src/erasure.rs b/src/erasure.rs index 0206b6299..37bb1fe2d 100644 --- a/src/erasure.rs +++ b/src/erasure.rs @@ -591,7 +591,7 @@ pub fn recover(id: &Pubkey, window: &mut [WindowSlot], start_idx: u64, start: us #[cfg(test)] mod test { - use crdt; + use cluster_info; use erasure; use logger; use packet::{SharedBlob, BLOB_DATA_SIZE, BLOB_HEADER_SIZE, BLOB_SIZE}; @@ -726,7 +726,7 @@ mod test { blobs.push(b_); } - let d = crdt::NodeInfo::new_localhost(Keypair::new().pubkey()); + let d = cluster_info::NodeInfo::new_localhost(Keypair::new().pubkey()); assert!(index_blobs(&d, &blobs, &mut (offset as u64)).is_ok()); for b in blobs { let idx = b.read().unwrap().get_index().unwrap() as usize % WINDOW_SIZE; diff --git a/src/fullnode.rs b/src/fullnode.rs index 9951989af..66eecf450 100644 --- a/src/fullnode.rs +++ b/src/fullnode.rs @@ -2,7 +2,7 @@ use bank::Bank; use broadcast_stage::BroadcastStage; -use crdt::{Crdt, Node, NodeInfo}; +use cluster_info::{ClusterInfo, Node, NodeInfo}; use drone::DRONE_PORT; use entry::Entry; use ledger::read_ledger; @@ -80,7 +80,7 @@ pub struct Fullnode { rpc_service: JsonRpcService, ncp: Ncp, bank: Arc, - crdt: Arc>, + cluster_info: Arc>, ledger_path: String, sigverify_disabled: bool, shared_window: window::SharedWindow, @@ -270,14 +270,14 @@ impl Fullnode { let window = window::new_window_from_entries(ledger_tail, entry_height, &node.info); let shared_window = Arc::new(RwLock::new(window)); - let mut crdt = Crdt::new(node.info).expect("Crdt::new"); + let mut cluster_info = ClusterInfo::new(node.info).expect("ClusterInfo::new"); if let Some(interval) = leader_rotation_interval { - crdt.set_leader_rotation_interval(interval); + cluster_info.set_leader_rotation_interval(interval); } - let crdt = Arc::new(RwLock::new(crdt)); + let cluster_info = Arc::new(RwLock::new(cluster_info)); let ncp = Ncp::new( - &crdt, + &cluster_info, shared_window.clone(), Some(ledger_path), node.sockets.gossip, @@ -289,13 +289,13 @@ impl Fullnode { match leader_info { Some(leader_info) => { // Start in validator mode. - // TODO: let Crdt get that data from the network? - crdt.write().unwrap().insert(leader_info); + // TODO: let ClusterInfo get that data from the network? + cluster_info.write().unwrap().insert(leader_info); let tvu = Tvu::new( keypair.clone(), &bank, entry_height, - crdt.clone(), + cluster_info.clone(), shared_window.clone(), node.sockets .replicate @@ -320,7 +320,7 @@ impl Fullnode { let (tpu, entry_receiver, tpu_exit) = Tpu::new( keypair.clone(), &bank, - &crdt, + &cluster_info, Default::default(), node.sockets .transaction @@ -337,7 +337,7 @@ impl Fullnode { .broadcast .try_clone() .expect("Failed to clone broadcast socket"), - crdt.clone(), + cluster_info.clone(), shared_window.clone(), entry_height, entry_receiver, @@ -350,7 +350,7 @@ impl Fullnode { Fullnode { keypair, - crdt, + cluster_info, shared_window, bank, sigverify_disabled, @@ -377,13 +377,13 @@ impl Fullnode { self.bank = Arc::new(bank); { - let mut wcrdt = self.crdt.write().unwrap(); - let scheduled_leader = wcrdt.get_scheduled_leader(entry_height); + let mut wcluster_info = self.cluster_info.write().unwrap(); + let scheduled_leader = wcluster_info.get_scheduled_leader(entry_height); match scheduled_leader { //TODO: Handle the case where we don't know who the next //scheduled leader is None => (), - Some(leader_id) => wcrdt.set_leader(leader_id), + Some(leader_id) => wcluster_info.set_leader(leader_id), } } @@ -407,7 +407,7 @@ impl Fullnode { self.keypair.clone(), &self.bank, entry_height, - self.crdt.clone(), + self.cluster_info.clone(), self.shared_window.clone(), self.replicate_socket .iter() @@ -427,11 +427,14 @@ impl Fullnode { } fn validator_to_leader(&mut self, entry_height: u64) { - self.crdt.write().unwrap().set_leader(self.keypair.pubkey()); + self.cluster_info + .write() + .unwrap() + .set_leader(self.keypair.pubkey()); let (tpu, blob_receiver, tpu_exit) = Tpu::new( self.keypair.clone(), &self.bank, - &self.crdt, + &self.cluster_info, Default::default(), self.transaction_sockets .iter() @@ -446,7 +449,7 @@ impl Fullnode { self.broadcast_socket .try_clone() .expect("Failed to clone broadcast socket"), - self.crdt.clone(), + self.cluster_info.clone(), self.shared_window.clone(), entry_height, blob_receiver, @@ -498,7 +501,7 @@ impl Fullnode { // TODO: only used for testing, get rid of this once we have actual // leader scheduling pub fn set_scheduled_leader(&self, leader_id: Pubkey, entry_height: u64) { - self.crdt + self.cluster_info .write() .unwrap() .set_scheduled_leader(entry_height, leader_id); @@ -549,7 +552,7 @@ impl Service for Fullnode { #[cfg(test)] mod tests { use bank::Bank; - use crdt::Node; + use cluster_info::Node; use fullnode::{Fullnode, NodeRole, TvuReturnType}; use ledger::genesis; use packet::make_consecutive_blobs; diff --git a/src/lib.rs b/src/lib.rs index 3fe00be7a..7cc4086a2 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -21,7 +21,7 @@ pub mod chacha; pub mod choose_gossip_peer_strategy; pub mod client; #[macro_use] -pub mod crdt; +pub mod cluster_info; pub mod bpf_verifier; pub mod budget_program; pub mod drone; diff --git a/src/ncp.rs b/src/ncp.rs index 5ef2a9ecb..4546454a5 100644 --- a/src/ncp.rs +++ b/src/ncp.rs @@ -1,6 +1,6 @@ //! The `ncp` module implements the network control plane. -use crdt::Crdt; +use cluster_info::ClusterInfo; use service::Service; use std::net::UdpSocket; use std::sync::atomic::{AtomicBool, Ordering}; @@ -17,7 +17,7 @@ pub struct Ncp { impl Ncp { pub fn new( - crdt: &Arc>, + cluster_info: &Arc>, window: SharedWindow, ledger_path: Option<&str>, gossip_socket: UdpSocket, @@ -27,22 +27,22 @@ impl Ncp { let gossip_socket = Arc::new(gossip_socket); trace!( "Ncp: id: {:?}, listening on: {:?}", - &crdt.read().unwrap().id.as_ref()[..4], + &cluster_info.read().unwrap().id.as_ref()[..4], gossip_socket.local_addr().unwrap() ); let t_receiver = streamer::blob_receiver(gossip_socket.clone(), exit.clone(), request_sender); let (response_sender, response_receiver) = channel(); let t_responder = streamer::responder("ncp", gossip_socket, response_receiver); - let t_listen = Crdt::listen( - crdt.clone(), + let t_listen = ClusterInfo::listen( + cluster_info.clone(), window, ledger_path, request_receiver, response_sender.clone(), exit.clone(), ); - let t_gossip = Crdt::gossip(crdt.clone(), response_sender, exit.clone()); + let t_gossip = ClusterInfo::gossip(cluster_info.clone(), response_sender, exit.clone()); let thread_hdls = vec![t_receiver, t_responder, t_listen, t_gossip]; Ncp { exit, thread_hdls } } @@ -66,7 +66,7 @@ impl Service for Ncp { #[cfg(test)] mod tests { - use crdt::{Crdt, Node}; + use cluster_info::{ClusterInfo, Node}; use ncp::Ncp; use std::sync::atomic::AtomicBool; use std::sync::{Arc, RwLock}; @@ -77,8 +77,8 @@ mod tests { fn test_exit() { let exit = Arc::new(AtomicBool::new(false)); let tn = Node::new_localhost(); - let crdt = Crdt::new(tn.info.clone()).expect("Crdt::new"); - let c = Arc::new(RwLock::new(crdt)); + let cluster_info = ClusterInfo::new(tn.info.clone()).expect("ClusterInfo::new"); + let c = Arc::new(RwLock::new(cluster_info)); let w = Arc::new(RwLock::new(vec![])); let d = Ncp::new(&c, w, None, tn.sockets.gossip, exit.clone()); d.close().expect("thread join"); diff --git a/src/replicate_stage.rs b/src/replicate_stage.rs index 10d4bf84f..0b0dc7efe 100644 --- a/src/replicate_stage.rs +++ b/src/replicate_stage.rs @@ -1,8 +1,8 @@ //! The `replicate_stage` replicates transactions broadcast by the leader. use bank::Bank; +use cluster_info::ClusterInfo; use counter::Counter; -use crdt::Crdt; use entry::EntryReceiver; use ledger::{Block, LedgerWriter}; use log::Level; @@ -46,7 +46,7 @@ impl ReplicateStage { /// Process entry blobs, already in order fn replicate_requests( bank: &Arc, - crdt: &Arc>, + cluster_info: &Arc>, window_receiver: &EntryReceiver, ledger_writer: Option<&mut LedgerWriter>, keypair: &Arc, @@ -62,12 +62,12 @@ impl ReplicateStage { let res = bank.process_entries(&entries); if let Some(sender) = vote_blob_sender { - send_validator_vote(bank, keypair, crdt, sender)?; + send_validator_vote(bank, keypair, cluster_info, sender)?; } { - let mut wcrdt = crdt.write().unwrap(); - wcrdt.insert_votes(&entries.votes()); + let mut wcluster_info = cluster_info.write().unwrap(); + wcluster_info.insert_votes(&entries.votes()); } inc_new_counter_info!( @@ -87,7 +87,7 @@ impl ReplicateStage { pub fn new( keypair: Arc, bank: Arc, - crdt: Arc>, + cluster_info: Arc>, window_receiver: EntryReceiver, ledger_path: Option<&str>, exit: Arc, @@ -116,7 +116,7 @@ impl ReplicateStage { if let Err(e) = Self::replicate_requests( &bank, - &crdt, + &cluster_info, &window_receiver, ledger_writer.as_mut(), &keypair, diff --git a/src/replicator.rs b/src/replicator.rs index fa0c03d92..2a8f62938 100644 --- a/src/replicator.rs +++ b/src/replicator.rs @@ -1,5 +1,5 @@ use blob_fetch_stage::BlobFetchStage; -use crdt::{Crdt, Node, NodeInfo}; +use cluster_info::{ClusterInfo, Node, NodeInfo}; use hash::{Hash, Hasher}; use ncp::Ncp; use service::Service; @@ -77,12 +77,14 @@ impl Replicator { let window = window::new_window_from_entries(&[], entry_height, &node.info); let shared_window = Arc::new(RwLock::new(window)); - let crdt = Arc::new(RwLock::new(Crdt::new(node.info).expect("Crdt::new"))); + let cluster_info = Arc::new(RwLock::new( + ClusterInfo::new(node.info).expect("ClusterInfo::new"), + )); let leader_info = network_addr.map(|i| NodeInfo::new_entry_point(&i)); if let Some(leader_info) = leader_info.as_ref() { - crdt.write().unwrap().insert(leader_info); + cluster_info.write().unwrap().insert(leader_info); } else { panic!("No leader info!"); } @@ -98,7 +100,7 @@ impl Replicator { // todo: pull blobs off the retransmit_receiver and recycle them? let (retransmit_sender, retransmit_receiver) = channel(); let t_window = window_service( - crdt.clone(), + cluster_info.clone(), shared_window.clone(), entry_height, max_entry_height, @@ -112,7 +114,7 @@ impl Replicator { let store_ledger_stage = StoreLedgerStage::new(entry_window_receiver, ledger_path); let ncp = Ncp::new( - &crdt, + &cluster_info, shared_window.clone(), ledger_path, node.sockets.gossip, @@ -147,7 +149,7 @@ impl Replicator { #[cfg(test)] mod tests { use client::mk_client; - use crdt::Node; + use cluster_info::Node; use fullnode::Fullnode; use hash::Hash; use ledger::{genesis, read_ledger, tmp_ledger_path}; diff --git a/src/result.rs b/src/result.rs index 386e39559..e5a5ed303 100644 --- a/src/result.rs +++ b/src/result.rs @@ -2,7 +2,7 @@ use bank; use bincode; -use crdt; +use cluster_info; #[cfg(feature = "erasure")] use erasure; use packet; @@ -20,7 +20,7 @@ pub enum Error { RecvTimeoutError(std::sync::mpsc::RecvTimeoutError), Serialize(std::boxed::Box), BankError(bank::BankError), - CrdtError(crdt::CrdtError), + ClusterInfoError(cluster_info::ClusterInfoError), BlobError(packet::BlobError), #[cfg(feature = "erasure")] ErasureError(erasure::ErasureError), @@ -52,9 +52,9 @@ impl std::convert::From for Error { Error::BankError(e) } } -impl std::convert::From for Error { - fn from(e: crdt::CrdtError) -> Error { - Error::CrdtError(e) +impl std::convert::From for Error { + fn from(e: cluster_info::ClusterInfoError) -> Error { + Error::ClusterInfoError(e) } } #[cfg(feature = "erasure")] diff --git a/src/retransmit_stage.rs b/src/retransmit_stage.rs index c1469cf5c..5cad16e78 100644 --- a/src/retransmit_stage.rs +++ b/src/retransmit_stage.rs @@ -1,7 +1,7 @@ //! The `retransmit_stage` retransmits blobs between validators +use cluster_info::ClusterInfo; use counter::Counter; -use crdt::Crdt; use entry::Entry; use log::Level; use result::{Error, Result}; @@ -22,33 +22,41 @@ pub enum RetransmitStageReturnType { LeaderRotation(u64), } -fn retransmit(crdt: &Arc>, r: &BlobReceiver, sock: &UdpSocket) -> Result<()> { +fn retransmit( + cluster_info: &Arc>, + r: &BlobReceiver, + sock: &UdpSocket, +) -> Result<()> { let timer = Duration::new(1, 0); let mut dq = r.recv_timeout(timer)?; while let Ok(mut nq) = r.try_recv() { dq.append(&mut nq); } for b in &mut dq { - Crdt::retransmit(&crdt, b, sock)?; + ClusterInfo::retransmit(&cluster_info, b, sock)?; } Ok(()) } /// Service to retransmit messages from the leader to layer 1 nodes. -/// See `crdt` for network layer definitions. +/// See `cluster_info` for network layer definitions. /// # Arguments /// * `sock` - Socket to read from. Read timeout is set to 1. /// * `exit` - Boolean to signal system exit. -/// * `crdt` - This structure needs to be updated and populated by the bank and via gossip. +/// * `cluster_info` - This structure needs to be updated and populated by the bank and via gossip. /// * `recycler` - Blob recycler. /// * `r` - Receive channel for blobs to be retransmitted to all the layer 1 nodes. -fn retransmitter(sock: Arc, crdt: Arc>, r: BlobReceiver) -> JoinHandle<()> { +fn retransmitter( + sock: Arc, + cluster_info: Arc>, + r: BlobReceiver, +) -> JoinHandle<()> { Builder::new() .name("solana-retransmitter".to_string()) .spawn(move || { trace!("retransmitter started"); loop { - if let Err(e) = retransmit(&crdt, &r, &sock) { + if let Err(e) = retransmit(&cluster_info, &r, &sock) { match e { Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break, Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (), @@ -69,7 +77,7 @@ pub struct RetransmitStage { impl RetransmitStage { pub fn new( - crdt: &Arc>, + cluster_info: &Arc>, window: SharedWindow, entry_height: u64, retransmit_socket: Arc, @@ -78,11 +86,12 @@ impl RetransmitStage { ) -> (Self, Receiver>) { let (retransmit_sender, retransmit_receiver) = channel(); - let t_retransmit = retransmitter(retransmit_socket, crdt.clone(), retransmit_receiver); + let t_retransmit = + retransmitter(retransmit_socket, cluster_info.clone(), retransmit_receiver); let (entry_sender, entry_receiver) = channel(); let done = Arc::new(AtomicBool::new(false)); let t_window = window_service( - crdt.clone(), + cluster_info.clone(), window, entry_height, 0, diff --git a/src/thin_client.rs b/src/thin_client.rs index a386c6e9d..0115c13a4 100644 --- a/src/thin_client.rs +++ b/src/thin_client.rs @@ -5,7 +5,7 @@ use bank::Bank; use bincode::{deserialize, serialize}; -use crdt::{Crdt, CrdtError, NodeInfo}; +use cluster_info::{ClusterInfo, ClusterInfoError, NodeInfo}; use hash::Hash; use log::Level; use ncp::Ncp; @@ -373,14 +373,22 @@ impl Drop for ThinClient { pub fn poll_gossip_for_leader(leader_ncp: SocketAddr, timeout: Option) -> Result { let exit = Arc::new(AtomicBool::new(false)); - let (node, gossip_socket) = Crdt::spy_node(); + let (node, gossip_socket) = ClusterInfo::spy_node(); let my_addr = gossip_socket.local_addr().unwrap(); - let crdt = Arc::new(RwLock::new(Crdt::new(node).expect("Crdt::new"))); + let cluster_info = Arc::new(RwLock::new( + ClusterInfo::new(node).expect("ClusterInfo::new"), + )); let window = Arc::new(RwLock::new(vec![])); - let ncp = Ncp::new(&crdt.clone(), window, None, gossip_socket, exit.clone()); + let ncp = Ncp::new( + &cluster_info.clone(), + window, + None, + gossip_socket, + exit.clone(), + ); let leader_entry_point = NodeInfo::new_entry_point(&leader_ncp); - crdt.write().unwrap().insert(&leader_entry_point); + cluster_info.write().unwrap().insert(&leader_entry_point); sleep(Duration::from_millis(100)); @@ -395,17 +403,17 @@ pub fn poll_gossip_for_leader(leader_ncp: SocketAddr, timeout: Option) -> R loop { trace!("polling {:?} for leader from {:?}", leader_ncp, my_addr); - if let Some(l) = crdt.read().unwrap().leader_data() { + if let Some(l) = cluster_info.read().unwrap().leader_data() { leader = Some(l.clone()); break; } if log_enabled!(Level::Trace) { - trace!("{}", crdt.read().unwrap().node_info_trace()); + trace!("{}", cluster_info.read().unwrap().node_info_trace()); } if now.elapsed() > deadline { - return Err(Error::CrdtError(CrdtError::NoLeader)); + return Err(Error::ClusterInfoError(ClusterInfoError::NoLeader)); } sleep(Duration::from_millis(100)); @@ -414,7 +422,7 @@ pub fn poll_gossip_for_leader(leader_ncp: SocketAddr, timeout: Option) -> R ncp.close()?; if log_enabled!(Level::Trace) { - trace!("{}", crdt.read().unwrap().node_info_trace()); + trace!("{}", cluster_info.read().unwrap().node_info_trace()); } Ok(leader.unwrap().clone()) @@ -424,7 +432,7 @@ pub fn poll_gossip_for_leader(leader_ncp: SocketAddr, timeout: Option) -> R mod tests { use super::*; use bank::Bank; - use crdt::Node; + use cluster_info::Node; use fullnode::Fullnode; use ledger::LedgerWriter; use logger; diff --git a/src/tpu.rs b/src/tpu.rs index 95d164705..1ce81b35f 100644 --- a/src/tpu.rs +++ b/src/tpu.rs @@ -27,7 +27,7 @@ use bank::Bank; use banking_stage::{BankingStage, Config}; -use crdt::Crdt; +use cluster_info::ClusterInfo; use entry::Entry; use fetch_stage::FetchStage; use service::Service; @@ -56,7 +56,7 @@ impl Tpu { pub fn new( keypair: Arc, bank: &Arc, - crdt: &Arc>, + cluster_info: &Arc>, tick_duration: Config, transactions_sockets: Vec, ledger_path: &str, @@ -76,7 +76,7 @@ impl Tpu { let (write_stage, entry_forwarder) = WriteStage::new( keypair, bank.clone(), - crdt.clone(), + cluster_info.clone(), ledger_path, entry_receiver, entry_height, diff --git a/src/tvu.rs b/src/tvu.rs index 6383aa4d2..522e7f50f 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -38,7 +38,7 @@ use bank::Bank; use blob_fetch_stage::BlobFetchStage; -use crdt::Crdt; +use cluster_info::ClusterInfo; use replicate_stage::ReplicateStage; use retransmit_stage::{RetransmitStage, RetransmitStageReturnType}; use service::Service; @@ -67,7 +67,7 @@ impl Tvu { /// # Arguments /// * `bank` - The bank state. /// * `entry_height` - Initial ledger height, passed to replicate stage - /// * `crdt` - The crdt state. + /// * `cluster_info` - The cluster_info state. /// * `window` - The window state. /// * `replicate_socket` - my replicate socket /// * `repair_socket` - my repair socket @@ -78,7 +78,7 @@ impl Tvu { keypair: Arc, bank: &Arc, entry_height: u64, - crdt: Arc>, + cluster_info: Arc>, window: SharedWindow, replicate_sockets: Vec, repair_socket: UdpSocket, @@ -97,7 +97,7 @@ impl Tvu { //the packets coming out of blob_receiver need to be sent to the GPU and verified //then sent to the window, which does the erasure coding reconstruction let (retransmit_stage, blob_window_receiver) = RetransmitStage::new( - &crdt, + &cluster_info, window, entry_height, Arc::new(retransmit_socket), @@ -108,7 +108,7 @@ impl Tvu { let replicate_stage = ReplicateStage::new( keypair, bank.clone(), - crdt, + cluster_info, blob_window_receiver, ledger_path, exit.clone(), @@ -151,7 +151,7 @@ impl Service for Tvu { pub mod tests { use bank::Bank; use bincode::serialize; - use crdt::{Crdt, Node}; + use cluster_info::{ClusterInfo, Node}; use entry::Entry; use hash::{hash, Hash}; use logger; @@ -172,12 +172,12 @@ pub mod tests { use window::{self, SharedWindow}; fn new_ncp( - crdt: Arc>, + cluster_info: Arc>, gossip: UdpSocket, exit: Arc, ) -> (Ncp, SharedWindow) { let window = Arc::new(RwLock::new(window::default_window())); - let ncp = Ncp::new(&crdt, window.clone(), None, gossip, exit); + let ncp = Ncp::new(&cluster_info, window.clone(), None, gossip, exit); (ncp, window) } @@ -191,19 +191,19 @@ pub mod tests { let target2 = Node::new_localhost(); let exit = Arc::new(AtomicBool::new(false)); - //start crdt_leader - let mut crdt_l = Crdt::new(leader.info.clone()).expect("Crdt::new"); - crdt_l.set_leader(leader.info.id); + //start cluster_info_l + let mut cluster_info_l = ClusterInfo::new(leader.info.clone()).expect("ClusterInfo::new"); + cluster_info_l.set_leader(leader.info.id); - let cref_l = Arc::new(RwLock::new(crdt_l)); + let cref_l = Arc::new(RwLock::new(cluster_info_l)); let dr_l = new_ncp(cref_l, leader.sockets.gossip, exit.clone()); - //start crdt2 - let mut crdt2 = Crdt::new(target2.info.clone()).expect("Crdt::new"); - crdt2.insert(&leader.info); - crdt2.set_leader(leader.info.id); + //start cluster_info2 + let mut cluster_info2 = ClusterInfo::new(target2.info.clone()).expect("ClusterInfo::new"); + cluster_info2.insert(&leader.info); + cluster_info2.set_leader(leader.info.id); let leader_id = leader.info.id; - let cref2 = Arc::new(RwLock::new(crdt2)); + let cref2 = Arc::new(RwLock::new(cluster_info2)); let dr_2 = new_ncp(cref2, target2.sockets.gossip, exit.clone()); // setup some blob services to send blobs into the socket @@ -232,11 +232,11 @@ pub mod tests { let replicate_addr = target1.info.contact_info.tvu; let bank = Arc::new(Bank::new(&mint)); - //start crdt1 - let mut crdt1 = Crdt::new(target1.info.clone()).expect("Crdt::new"); - crdt1.insert(&leader.info); - crdt1.set_leader(leader.info.id); - let cref1 = Arc::new(RwLock::new(crdt1)); + //start cluster_info1 + let mut cluster_info1 = ClusterInfo::new(target1.info.clone()).expect("ClusterInfo::new"); + cluster_info1.insert(&leader.info); + cluster_info1.set_leader(leader.info.id); + let cref1 = Arc::new(RwLock::new(cluster_info1)); let dr_1 = new_ncp(cref1.clone(), target1.sockets.gossip, exit.clone()); let tvu = Tvu::new( diff --git a/src/vote_stage.rs b/src/vote_stage.rs index 2285de693..6e9fb1050 100644 --- a/src/vote_stage.rs +++ b/src/vote_stage.rs @@ -3,8 +3,8 @@ use bank::Bank; use bincode::serialize; use budget_transaction::BudgetTransaction; +use cluster_info::ClusterInfo; use counter::Counter; -use crdt::Crdt; use hash::Hash; use influx_db_client as influxdb; use log::Level; @@ -30,14 +30,14 @@ enum VoteError { pub fn create_new_signed_vote_blob( last_id: &Hash, keypair: &Keypair, - crdt: &Arc>, + cluster_info: &Arc>, ) -> Result { let shared_blob = SharedBlob::default(); let (vote, addr) = { - let mut wcrdt = crdt.write().unwrap(); + let mut wcluster_info = cluster_info.write().unwrap(); //TODO: doesn't seem like there is a synchronous call to get height and id debug!("voting on {:?}", &last_id.as_ref()[..8]); - wcrdt.new_vote(*last_id) + wcluster_info.new_vote(*last_id) }?; let tx = Transaction::budget_new_vote(&keypair, vote, *last_id, 0); { @@ -107,14 +107,14 @@ pub fn send_leader_vote( id: &Pubkey, keypair: &Keypair, bank: &Arc, - crdt: &Arc>, + cluster_info: &Arc>, vote_blob_sender: &BlobSender, last_vote: &mut u64, last_valid_validator_timestamp: &mut u64, ) -> Result<()> { let now = timing::timestamp(); if now - *last_vote > VOTE_TIMEOUT_MS { - let ids: Vec<_> = crdt.read().unwrap().valid_last_ids(); + let ids: Vec<_> = cluster_info.read().unwrap().valid_last_ids(); if let Ok((last_id, super_majority_timestamp)) = get_last_id_to_vote_on( id, &ids, @@ -123,7 +123,7 @@ pub fn send_leader_vote( last_vote, last_valid_validator_timestamp, ) { - if let Ok(shared_blob) = create_new_signed_vote_blob(&last_id, keypair, crdt) { + if let Ok(shared_blob) = create_new_signed_vote_blob(&last_id, keypair, cluster_info) { vote_blob_sender.send(vec![shared_blob])?; let finality_ms = now - super_majority_timestamp; @@ -147,11 +147,11 @@ pub fn send_leader_vote( pub fn send_validator_vote( bank: &Arc, keypair: &Arc, - crdt: &Arc>, + cluster_info: &Arc>, vote_blob_sender: &BlobSender, ) -> Result<()> { let last_id = bank.last_id(); - if let Ok(shared_blob) = create_new_signed_vote_blob(&last_id, keypair, crdt) { + if let Ok(shared_blob) = create_new_signed_vote_blob(&last_id, keypair, cluster_info) { inc_new_counter_info!("replicate-vote_sent", 1); vote_blob_sender.send(vec![shared_blob])?; @@ -165,7 +165,7 @@ pub mod tests { use bank::Bank; use bincode::deserialize; use budget_instruction::Vote; - use crdt::{Crdt, NodeInfo}; + use cluster_info::{ClusterInfo, NodeInfo}; use entry::next_entry; use hash::{hash, Hash}; use logger; @@ -193,14 +193,14 @@ pub mod tests { // Create a leader let leader_data = NodeInfo::new_with_socketaddr(&"127.0.0.1:1234".parse().unwrap()); let leader_pubkey = leader_data.id.clone(); - let mut leader_crdt = Crdt::new(leader_data).unwrap(); + let mut leader_cluster_info = ClusterInfo::new(leader_data).unwrap(); // give the leader some tokens let give_leader_tokens_tx = Transaction::system_new(&mint.keypair(), leader_pubkey.clone(), 100, entry.id); bank.process_transaction(&give_leader_tokens_tx).unwrap(); - leader_crdt.set_leader(leader_pubkey); + leader_cluster_info.set_leader(leader_pubkey); // Insert 7 agreeing validators / 3 disagreeing // and votes for new last_id @@ -217,12 +217,12 @@ pub mod tests { validator.ledger_state.last_id = entry.id; } - leader_crdt.insert(&validator); + leader_cluster_info.insert(&validator); trace!("validator id: {:?}", validator.id); - leader_crdt.insert_vote(&validator.id, &vote, entry.id); + leader_cluster_info.insert_vote(&validator.id, &vote, entry.id); } - let leader = Arc::new(RwLock::new(leader_crdt)); + let leader = Arc::new(RwLock::new(leader_cluster_info)); let (vote_blob_sender, vote_blob_receiver) = channel(); let mut last_vote: u64 = timing::timestamp() - VOTE_TIMEOUT_MS - 1; let mut last_valid_validator_timestamp = 0; diff --git a/src/wallet.rs b/src/wallet.rs index 8609dbeda..8f5ec07d4 100644 --- a/src/wallet.rs +++ b/src/wallet.rs @@ -4,7 +4,7 @@ use budget_program::BudgetState; use budget_transaction::BudgetTransaction; use chrono::prelude::*; use clap::ArgMatches; -use crdt::NodeInfo; +use cluster_info::NodeInfo; use drone::DroneRequest; use fullnode::Config; use hash::Hash; @@ -613,7 +613,7 @@ mod tests { use super::*; use bank::Bank; use clap::{App, Arg, SubCommand}; - use crdt::Node; + use cluster_info::Node; use drone::run_local_drone; use fullnode::Fullnode; use ledger::LedgerWriter; diff --git a/src/window.rs b/src/window.rs index 3f46caa89..cceed96c9 100644 --- a/src/window.rs +++ b/src/window.rs @@ -1,7 +1,7 @@ //! The `window` module defines data structure for storing the tail of the ledger. //! +use cluster_info::{ClusterInfo, NodeInfo}; use counter::Counter; -use crdt::{Crdt, NodeInfo}; use entry::Entry; #[cfg(feature = "erasure")] use erasure; @@ -53,7 +53,7 @@ pub trait WindowUtil { fn repair( &mut self, - crdt: &Arc>, + cluster_info: &Arc>, id: &Pubkey, times: usize, consumed: u64, @@ -67,7 +67,7 @@ pub trait WindowUtil { fn process_blob( &mut self, id: &Pubkey, - crdt: &Arc>, + cluster_info: &Arc>, blob: SharedBlob, pix: u64, consume_queue: &mut Vec, @@ -95,20 +95,20 @@ impl WindowUtil for Window { fn repair( &mut self, - crdt: &Arc>, + cluster_info: &Arc>, id: &Pubkey, times: usize, consumed: u64, received: u64, max_entry_height: u64, ) -> Vec<(SocketAddr, Vec)> { - let rcrdt = crdt.read().unwrap(); - let leader_rotation_interval = rcrdt.get_leader_rotation_interval(); + let rcluster_info = cluster_info.read().unwrap(); + let leader_rotation_interval = rcluster_info.get_leader_rotation_interval(); // Calculate the next leader rotation height and check if we are the leader let next_leader_rotation = consumed + leader_rotation_interval - (consumed % leader_rotation_interval); - let is_next_leader = rcrdt.get_scheduled_leader(next_leader_rotation) == Some(*id); - let num_peers = rcrdt.table.len() as u64; + let is_next_leader = rcluster_info.get_scheduled_leader(next_leader_rotation) == Some(*id); + let num_peers = rcluster_info.table.len() as u64; let max_repair = if max_entry_height == 0 { calculate_max_repair(num_peers, consumed, received, times, is_next_leader) @@ -119,10 +119,10 @@ impl WindowUtil for Window { let idxs = self.clear_slots(consumed, max_repair); let reqs: Vec<_> = idxs .into_iter() - .filter_map(|pix| rcrdt.window_index_request(pix).ok()) + .filter_map(|pix| rcluster_info.window_index_request(pix).ok()) .collect(); - drop(rcrdt); + drop(rcluster_info); inc_new_counter_info!("streamer-repair_window-repair", reqs.len()); @@ -196,7 +196,7 @@ impl WindowUtil for Window { fn process_blob( &mut self, id: &Pubkey, - crdt: &Arc>, + cluster_info: &Arc>, blob: SharedBlob, pix: u64, consume_queue: &mut Vec, @@ -259,9 +259,9 @@ impl WindowUtil for Window { // push all contiguous blobs into consumed queue, increment consumed loop { if *consumed != 0 && *consumed % (leader_rotation_interval as u64) == 0 { - let rcrdt = crdt.read().unwrap(); - let my_id = rcrdt.my_data().id; - match rcrdt.get_scheduled_leader(*consumed) { + let rcluster_info = cluster_info.read().unwrap(); + let my_id = rcluster_info.my_data().id; + match rcluster_info.get_scheduled_leader(*consumed) { // If we are the next leader, exit Some(id) if id == my_id => { break; @@ -388,7 +388,7 @@ pub fn index_blobs( } /// Initialize a rebroadcast window with most recent Entry blobs -/// * `crdt` - gossip instance, used to set blob ids +/// * `cluster_info` - gossip instance, used to set blob ids /// * `blobs` - up to WINDOW_SIZE most recent blobs /// * `entry_height` - current entry height pub fn initialized_window( diff --git a/src/window_service.rs b/src/window_service.rs index 137962b9d..83afac0c4 100644 --- a/src/window_service.rs +++ b/src/window_service.rs @@ -1,7 +1,7 @@ //! The `window_service` provides a thread for maintaining a window (tail of the ledger). //! +use cluster_info::{ClusterInfo, NodeInfo}; use counter::Counter; -use crdt::{Crdt, NodeInfo}; use entry::EntrySender; use log::Level; use packet::SharedBlob; @@ -136,7 +136,7 @@ fn retransmit_all_leader_blocks( fn recv_window( window: &SharedWindow, id: &Pubkey, - crdt: &Arc>, + cluster_info: &Arc>, consumed: &mut u64, received: &mut u64, max_ix: u64, @@ -149,9 +149,9 @@ fn recv_window( ) -> Result<()> { let timer = Duration::from_millis(200); let mut dq = r.recv_timeout(timer)?; - let maybe_leader: Option = crdt + let maybe_leader: Option = cluster_info .read() - .expect("'crdt' read lock in fn recv_window") + .expect("'cluster_info' read lock in fn recv_window") .leader_data() .cloned(); let leader_unknown = maybe_leader.is_none(); @@ -204,7 +204,7 @@ fn recv_window( window.write().unwrap().process_blob( id, - crdt, + cluster_info, b, pix, &mut consume_queue, @@ -239,7 +239,7 @@ fn recv_window( } pub fn window_service( - crdt: Arc>, + cluster_info: Arc>, window: SharedWindow, entry_height: u64, max_entry_height: u64, @@ -259,20 +259,20 @@ pub fn window_service( let id; let leader_rotation_interval; { - let rcrdt = crdt.read().unwrap(); - id = rcrdt.id; - leader_rotation_interval = rcrdt.get_leader_rotation_interval(); + let rcluster_info = cluster_info.read().unwrap(); + id = rcluster_info.id; + leader_rotation_interval = rcluster_info.get_leader_rotation_interval(); } let mut pending_retransmits = false; trace!("{}: RECV_WINDOW started", id); loop { if consumed != 0 && consumed % (leader_rotation_interval as u64) == 0 { - match crdt.read().unwrap().get_scheduled_leader(consumed) { + match cluster_info.read().unwrap().get_scheduled_leader(consumed) { // If we are the next leader, exit Some(next_leader_id) if id == next_leader_id => { return Some(WindowServiceReturnType::LeaderRotation(consumed)); } - // TODO: Figure out where to set the new leader in the crdt for + // TODO: Figure out where to set the new leader in the cluster_info for // validator -> validator transition (once we have real leader scheduling, // this decision will be clearer). Also make sure new blobs to window actually // originate from new leader @@ -283,7 +283,7 @@ pub fn window_service( if let Err(e) = recv_window( &window, &id, - &crdt, + &cluster_info, &mut consumed, &mut received, max_entry_height, @@ -322,7 +322,7 @@ pub fn window_service( trace!("{} let's repair! times = {}", id, times); let mut window = window.write().unwrap(); - let reqs = window.repair(&crdt, &id, times, consumed, received, max_entry_height); + let reqs = window.repair(&cluster_info, &id, times, consumed, received, max_entry_height); for (to, req) in reqs { repair_socket.send_to(&req, to).unwrap_or_else(|e| { info!("{} repair req send_to({}) error {:?}", id, to, e); @@ -336,7 +336,7 @@ pub fn window_service( #[cfg(test)] mod test { - use crdt::{Crdt, Node}; + use cluster_info::{ClusterInfo, Node}; use entry::Entry; use hash::Hash; use logger; @@ -371,10 +371,10 @@ mod test { logger::setup(); let tn = Node::new_localhost(); let exit = Arc::new(AtomicBool::new(false)); - let mut crdt_me = Crdt::new(tn.info.clone()).expect("Crdt::new"); - let me_id = crdt_me.my_data().id; - crdt_me.set_leader(me_id); - let subs = Arc::new(RwLock::new(crdt_me)); + let mut cluster_info_me = ClusterInfo::new(tn.info.clone()).expect("ClusterInfo::new"); + let me_id = cluster_info_me.my_data().id; + cluster_info_me.set_leader(me_id); + let subs = Arc::new(RwLock::new(cluster_info_me)); let (s_reader, r_reader) = channel(); let t_receiver = blob_receiver(Arc::new(tn.sockets.gossip), exit.clone(), s_reader); @@ -429,9 +429,9 @@ mod test { logger::setup(); let tn = Node::new_localhost(); let exit = Arc::new(AtomicBool::new(false)); - let crdt_me = Crdt::new(tn.info.clone()).expect("Crdt::new"); - let me_id = crdt_me.my_data().id; - let subs = Arc::new(RwLock::new(crdt_me)); + let cluster_info_me = ClusterInfo::new(tn.info.clone()).expect("ClusterInfo::new"); + let me_id = cluster_info_me.my_data().id; + let subs = Arc::new(RwLock::new(cluster_info_me)); let (s_reader, r_reader) = channel(); let t_receiver = blob_receiver(Arc::new(tn.sockets.gossip), exit.clone(), s_reader); @@ -485,9 +485,9 @@ mod test { logger::setup(); let tn = Node::new_localhost(); let exit = Arc::new(AtomicBool::new(false)); - let crdt_me = Crdt::new(tn.info.clone()).expect("Crdt::new"); - let me_id = crdt_me.my_data().id; - let subs = Arc::new(RwLock::new(crdt_me)); + let cluster_info_me = ClusterInfo::new(tn.info.clone()).expect("ClusterInfo::new"); + let me_id = cluster_info_me.my_data().id; + let subs = Arc::new(RwLock::new(cluster_info_me)); let (s_reader, r_reader) = channel(); let t_receiver = blob_receiver(Arc::new(tn.sockets.gossip), exit.clone(), s_reader); @@ -595,20 +595,21 @@ mod test { let my_leader_begin_epoch = 2; let tn = Node::new_localhost(); let exit = Arc::new(AtomicBool::new(false)); - let mut crdt_me = Crdt::new(tn.info.clone()).expect("Crdt::new"); - let me_id = crdt_me.my_data().id; + let mut cluster_info_me = ClusterInfo::new(tn.info.clone()).expect("ClusterInfo::new"); + let me_id = cluster_info_me.my_data().id; // Set myself in an upcoming epoch, but set the old_leader_id as the // leader for all epochs before that let old_leader_id = Keypair::new().pubkey(); - crdt_me.set_leader(me_id); - crdt_me.set_leader_rotation_interval(leader_rotation_interval); + cluster_info_me.set_leader(me_id); + cluster_info_me.set_leader_rotation_interval(leader_rotation_interval); for i in 0..my_leader_begin_epoch { - crdt_me.set_scheduled_leader(leader_rotation_interval * i, old_leader_id); + cluster_info_me.set_scheduled_leader(leader_rotation_interval * i, old_leader_id); } - crdt_me.set_scheduled_leader(my_leader_begin_epoch * leader_rotation_interval, me_id); + cluster_info_me + .set_scheduled_leader(my_leader_begin_epoch * leader_rotation_interval, me_id); - let subs = Arc::new(RwLock::new(crdt_me)); + let subs = Arc::new(RwLock::new(cluster_info_me)); let (s_reader, r_reader) = channel(); let t_receiver = blob_receiver(Arc::new(tn.sockets.gossip), exit.clone(), s_reader); diff --git a/src/write_stage.rs b/src/write_stage.rs index 6f03962d1..4e03c29af 100644 --- a/src/write_stage.rs +++ b/src/write_stage.rs @@ -3,8 +3,8 @@ //! stdout, and then sends the Entry to its output channel. use bank::Bank; +use cluster_info::ClusterInfo; use counter::Counter; -use crdt::Crdt; use entry::Entry; use ledger::{Block, LedgerWriter}; use log::Level; @@ -38,7 +38,7 @@ impl WriteStage { // fit before we hit the entry height for leader rotation. Also return a boolean // reflecting whether we actually hit an entry height for leader rotation. fn find_leader_rotation_index( - crdt: &Arc>, + cluster_info: &Arc>, leader_rotation_interval: u64, entry_height: u64, mut new_entries: Vec, @@ -51,9 +51,9 @@ impl WriteStage { loop { if (entry_height + i as u64) % leader_rotation_interval == 0 { - let rcrdt = crdt.read().unwrap(); - let my_id = rcrdt.my_data().id; - let next_leader = rcrdt.get_scheduled_leader(entry_height + i as u64); + let rcluster_info = cluster_info.read().unwrap(); + let my_id = rcluster_info.my_data().id; + let next_leader = rcluster_info.get_scheduled_leader(entry_height + i as u64); if next_leader != Some(my_id) { is_leader_rotation = true; break; @@ -86,7 +86,7 @@ impl WriteStage { /// Process any Entry items that have been published by the RecordStage. /// continuosly send entries out pub fn write_and_send_entries( - crdt: &Arc>, + cluster_info: &Arc>, ledger_writer: &mut LedgerWriter, entry_sender: &Sender>, entry_receiver: &Receiver>, @@ -103,7 +103,7 @@ impl WriteStage { // Find out how many more entries we can squeeze in until the next leader // rotation let (new_entries, is_leader_rotation) = Self::find_leader_rotation_index( - crdt, + cluster_info, leader_rotation_interval, *entry_height + num_new_entries as u64, received_entries, @@ -127,17 +127,17 @@ impl WriteStage { info!("write_stage entries: {}", num_new_entries); let mut entries_send_total = 0; - let mut crdt_votes_total = 0; + let mut cluster_info_votes_total = 0; let start = Instant::now(); for entries in ventries { for e in &entries { num_txs += e.transactions.len(); } - let crdt_votes_start = Instant::now(); + let cluster_info_votes_start = Instant::now(); let votes = &entries.votes(); - crdt.write().unwrap().insert_votes(&votes); - crdt_votes_total += duration_as_ms(&crdt_votes_start.elapsed()); + cluster_info.write().unwrap().insert_votes(&votes); + cluster_info_votes_total += duration_as_ms(&cluster_info_votes_start.elapsed()); ledger_writer.write_entries(entries.clone())?; // Once the entries have been written to the ledger, then we can @@ -165,11 +165,11 @@ impl WriteStage { "write_stage-time_ms", duration_as_ms(&now.elapsed()) as usize ); - info!("done write_stage txs: {} time {} ms txs/s: {} entries_send_total: {} crdt_votes_total: {}", + info!("done write_stage txs: {} time {} ms txs/s: {} entries_send_total: {} cluster_info_votes_total: {}", num_txs, duration_as_ms(&start.elapsed()), num_txs as f32 / duration_as_s(&start.elapsed()), entries_send_total, - crdt_votes_total); + cluster_info_votes_total); Ok(()) } @@ -178,7 +178,7 @@ impl WriteStage { pub fn new( keypair: Arc, bank: Arc, - crdt: Arc>, + cluster_info: Arc>, ledger_path: &str, entry_receiver: Receiver>, entry_height: u64, @@ -201,9 +201,9 @@ impl WriteStage { let id; let leader_rotation_interval; { - let rcrdt = crdt.read().unwrap(); - id = rcrdt.id; - leader_rotation_interval = rcrdt.get_leader_rotation_interval(); + let rcluster_info = cluster_info.read().unwrap(); + id = rcluster_info.id; + leader_rotation_interval = rcluster_info.get_leader_rotation_interval(); } let mut entry_height = entry_height; loop { @@ -212,10 +212,10 @@ impl WriteStage { // n * leader_rotation_interval for some "n". Once we've forwarded // that last block, check for the next scheduled leader. if entry_height % (leader_rotation_interval as u64) == 0 { - let rcrdt = crdt.read().unwrap(); - let my_id = rcrdt.my_data().id; - let scheduled_leader = rcrdt.get_scheduled_leader(entry_height); - drop(rcrdt); + let rcluster_info = cluster_info.read().unwrap(); + let my_id = rcluster_info.my_data().id; + let scheduled_leader = rcluster_info.get_scheduled_leader(entry_height); + drop(rcluster_info); match scheduled_leader { Some(id) if id == my_id => (), // If the leader stays in power for the next @@ -230,7 +230,7 @@ impl WriteStage { } if let Err(e) = Self::write_and_send_entries( - &crdt, + &cluster_info, &mut ledger_writer, &entry_sender, &entry_receiver, @@ -255,7 +255,7 @@ impl WriteStage { &id, &keypair, &bank, - &crdt, + &cluster_info, &vote_blob_sender, &mut last_vote, &mut last_valid_validator_timestamp, @@ -292,7 +292,7 @@ impl Service for WriteStage { #[cfg(test)] mod tests { use bank::Bank; - use crdt::{Crdt, Node}; + use cluster_info::{ClusterInfo, Node}; use entry::Entry; use hash::Hash; use ledger::{genesis, next_entries_mut, read_ledger}; @@ -309,7 +309,7 @@ mod tests { write_stage: WriteStage, entry_sender: Sender>, _write_stage_entry_receiver: Receiver>, - crdt: Arc>, + cluster_info: Arc>, bank: Arc, leader_ledger_path: String, ledger_tail: Vec, @@ -331,9 +331,9 @@ mod tests { let my_id = leader_keypair.pubkey(); let leader_info = Node::new_localhost_with_pubkey(leader_keypair.pubkey()); - let mut crdt = Crdt::new(leader_info.info).expect("Crdt::new"); - crdt.set_leader_rotation_interval(leader_rotation_interval); - let crdt = Arc::new(RwLock::new(crdt)); + let mut cluster_info = ClusterInfo::new(leader_info.info).expect("ClusterInfo::new"); + cluster_info.set_leader_rotation_interval(leader_rotation_interval); + let cluster_info = Arc::new(RwLock::new(cluster_info)); let bank = Bank::new_default(true); let bank = Arc::new(bank); @@ -349,7 +349,7 @@ mod tests { let (write_stage, _write_stage_entry_receiver) = WriteStage::new( leader_keypair, bank.clone(), - crdt.clone(), + cluster_info.clone(), &leader_ledger_path, entry_receiver, entry_height, @@ -362,7 +362,7 @@ mod tests { // Need to keep this alive, otherwise the write_stage will detect ChannelClosed // and shut down _write_stage_entry_receiver, - crdt, + cluster_info, bank, leader_ledger_path, ledger_tail, @@ -375,8 +375,8 @@ mod tests { let write_stage_info = setup_dummy_write_stage(leader_rotation_interval); { - let mut wcrdt = write_stage_info.crdt.write().unwrap(); - wcrdt.set_scheduled_leader(leader_rotation_interval, write_stage_info.my_id); + let mut wcluster_info = write_stage_info.cluster_info.write().unwrap(); + wcluster_info.set_scheduled_leader(leader_rotation_interval, write_stage_info.my_id); } let mut last_id = write_stage_info @@ -396,14 +396,15 @@ mod tests { write_stage_info.entry_sender.send(new_entry).unwrap(); } - // Set the scheduled next leader in the crdt to some other node + // Set the scheduled next leader in the cluster_info to some other node let leader2_keypair = Keypair::new(); let leader2_info = Node::new_localhost_with_pubkey(leader2_keypair.pubkey()); { - let mut wcrdt = write_stage_info.crdt.write().unwrap(); - wcrdt.insert(&leader2_info.info); - wcrdt.set_scheduled_leader(2 * leader_rotation_interval, leader2_keypair.pubkey()); + let mut wcluster_info = write_stage_info.cluster_info.write().unwrap(); + wcluster_info.insert(&leader2_info.info); + wcluster_info + .set_scheduled_leader(2 * leader_rotation_interval, leader2_keypair.pubkey()); } // Input another leader_rotation_interval dummy entries one at a time, @@ -440,13 +441,13 @@ mod tests { // time during which a leader is in power let num_epochs = 3; - let mut crdt = Crdt::new(leader_info.info).expect("Crdt::new"); - crdt.set_leader_rotation_interval(leader_rotation_interval as u64); + let mut cluster_info = ClusterInfo::new(leader_info.info).expect("ClusterInfo::new"); + cluster_info.set_leader_rotation_interval(leader_rotation_interval as u64); for i in 0..num_epochs { - crdt.set_scheduled_leader(i * leader_rotation_interval, my_id) + cluster_info.set_scheduled_leader(i * leader_rotation_interval, my_id) } - let crdt = Arc::new(RwLock::new(crdt)); + let cluster_info = Arc::new(RwLock::new(cluster_info)); let entry = Entry::new(&Hash::default(), 0, vec![]); // A vector that is completely within a certain epoch should return that @@ -454,7 +455,7 @@ mod tests { let mut len = leader_rotation_interval as usize - 1; let mut input = vec![entry.clone(); len]; let mut result = WriteStage::find_leader_rotation_index( - &crdt, + &cluster_info, leader_rotation_interval, (num_epochs - 1) * leader_rotation_interval, input.clone(), @@ -467,7 +468,7 @@ mod tests { len = leader_rotation_interval as usize - 1; input = vec![entry.clone(); len]; result = WriteStage::find_leader_rotation_index( - &crdt, + &cluster_info, leader_rotation_interval, (num_epochs * leader_rotation_interval) - 1, input.clone(), @@ -482,7 +483,7 @@ mod tests { len = 1; let mut input = vec![entry.clone(); len]; result = WriteStage::find_leader_rotation_index( - &crdt, + &cluster_info, leader_rotation_interval, leader_rotation_interval - 1, input.clone(), @@ -495,7 +496,7 @@ mod tests { len = leader_rotation_interval as usize; input = vec![entry.clone(); len]; result = WriteStage::find_leader_rotation_index( - &crdt, + &cluster_info, leader_rotation_interval, leader_rotation_interval - 1, input.clone(), @@ -508,7 +509,7 @@ mod tests { len = (num_epochs - 1) as usize * leader_rotation_interval as usize; input = vec![entry.clone(); len]; result = WriteStage::find_leader_rotation_index( - &crdt, + &cluster_info, leader_rotation_interval, leader_rotation_interval - 1, input.clone(), @@ -522,7 +523,7 @@ mod tests { len = (num_epochs - 1) as usize * leader_rotation_interval as usize + 1; input = vec![entry.clone(); len]; result = WriteStage::find_leader_rotation_index( - &crdt, + &cluster_info, leader_rotation_interval, leader_rotation_interval - 1, input.clone(), @@ -535,7 +536,7 @@ mod tests { len = leader_rotation_interval as usize; input = vec![entry.clone(); len]; result = WriteStage::find_leader_rotation_index( - &crdt, + &cluster_info, leader_rotation_interval, num_epochs * leader_rotation_interval, input.clone(), diff --git a/tests/data_replicator.rs b/tests/data_replicator.rs index a6f52f38b..9ab7143ac 100644 --- a/tests/data_replicator.rs +++ b/tests/data_replicator.rs @@ -4,7 +4,7 @@ extern crate rayon; extern crate solana; use rayon::iter::*; -use solana::crdt::{Crdt, Node}; +use solana::cluster_info::{ClusterInfo, Node}; use solana::logger; use solana::ncp::Ncp; use solana::packet::{Blob, SharedBlob}; @@ -16,10 +16,10 @@ use std::sync::{Arc, RwLock}; use std::thread::sleep; use std::time::Duration; -fn test_node(exit: Arc) -> (Arc>, Ncp, UdpSocket) { +fn test_node(exit: Arc) -> (Arc>, Ncp, UdpSocket) { let mut tn = Node::new_localhost(); - let crdt = Crdt::new(tn.info.clone()).expect("Crdt::new"); - let c = Arc::new(RwLock::new(crdt)); + let cluster_info = ClusterInfo::new(tn.info.clone()).expect("ClusterInfo::new"); + let c = Arc::new(RwLock::new(cluster_info)); let w = Arc::new(RwLock::new(vec![])); let d = Ncp::new(&c.clone(), w, None, tn.sockets.gossip, exit); (c, d, tn.sockets.replicate.pop().unwrap()) @@ -31,7 +31,7 @@ fn test_node(exit: Arc) -> (Arc>, Ncp, UdpSocket) { /// tests that actually use this function are below fn run_gossip_topo(topo: F) where - F: Fn(&Vec<(Arc>, Ncp, UdpSocket)>) -> (), + F: Fn(&Vec<(Arc>, Ncp, UdpSocket)>) -> (), { let num: usize = 5; let exit = Arc::new(AtomicBool::new(false)); @@ -128,7 +128,7 @@ fn gossip_rstar() { } #[test] -pub fn crdt_retransmit() -> result::Result<()> { +pub fn cluster_info_retransmit() -> result::Result<()> { logger::setup(); let exit = Arc::new(AtomicBool::new(false)); trace!("c1:"); @@ -161,7 +161,7 @@ pub fn crdt_retransmit() -> result::Result<()> { assert!(done); let b = SharedBlob::default(); b.write().unwrap().meta.size = 10; - Crdt::retransmit(&c1, &b, &tn1)?; + ClusterInfo::retransmit(&c1, &b, &tn1)?; let res: Vec<_> = [tn1, tn2, tn3] .into_par_iter() .map(|s| { diff --git a/tests/multinode.rs b/tests/multinode.rs index 57cee633c..ce9ff85f8 100644 --- a/tests/multinode.rs +++ b/tests/multinode.rs @@ -6,7 +6,7 @@ extern crate serde_json; extern crate solana; extern crate solana_program_interface; -use solana::crdt::{Crdt, Node, NodeInfo}; +use solana::cluster_info::{ClusterInfo, Node, NodeInfo}; use solana::entry::Entry; use solana::fullnode::{Fullnode, FullnodeReturnType}; use solana::hash::Hash; @@ -31,26 +31,26 @@ use std::thread::sleep; use std::thread::Builder; use std::time::{Duration, Instant}; -fn make_spy_node(leader: &NodeInfo) -> (Ncp, Arc>, Pubkey) { +fn make_spy_node(leader: &NodeInfo) -> (Ncp, Arc>, Pubkey) { let exit = Arc::new(AtomicBool::new(false)); let mut spy = Node::new_localhost(); let me = spy.info.id.clone(); spy.info.contact_info.tvu = spy.sockets.replicate[0].local_addr().unwrap(); spy.info.contact_info.rpu = spy.sockets.transaction[0].local_addr().unwrap(); - let mut spy_crdt = Crdt::new(spy.info).expect("Crdt::new"); - spy_crdt.insert(&leader); - spy_crdt.set_leader(leader.id); - let spy_crdt_ref = Arc::new(RwLock::new(spy_crdt)); + let mut spy_cluster_info = ClusterInfo::new(spy.info).expect("ClusterInfo::new"); + spy_cluster_info.insert(&leader); + spy_cluster_info.set_leader(leader.id); + let spy_cluster_info_ref = Arc::new(RwLock::new(spy_cluster_info)); let spy_window = Arc::new(RwLock::new(default_window())); let ncp = Ncp::new( - &spy_crdt_ref, + &spy_cluster_info_ref, spy_window, None, spy.sockets.gossip, exit.clone(), ); - (ncp, spy_crdt_ref, me) + (ncp, spy_cluster_info_ref, me) } fn converge(leader: &NodeInfo, num_nodes: usize) -> Vec { @@ -971,10 +971,10 @@ fn test_leader_validator_basic() { _ => panic!("Expected reason for exit to be leader rotation"), } - // TODO: We ignore this test for now b/c there's a chance here that the crdt + // TODO: We ignore this test for now b/c there's a chance here that the cluster_info // in the new leader calls the dummy sequence of update_leader -> top_leader() // (see the TODOs in those functions) during gossip and sets the leader back - // to the old leader, which causes a panic from an assertion failure in crdt broadcast(), + // to the old leader, which causes a panic from an assertion failure in cluster_info broadcast(), // specifically: assert!(me.leader_id != v.id). We can enable this test once we have real // leader scheduling @@ -1014,8 +1014,8 @@ fn mk_client(leader: &NodeInfo) -> ThinClient { .set_read_timeout(Some(Duration::new(1, 0))) .unwrap(); let transactions_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); - assert!(Crdt::is_valid_address(&leader.contact_info.rpu)); - assert!(Crdt::is_valid_address(&leader.contact_info.tpu)); + assert!(ClusterInfo::is_valid_address(&leader.contact_info.rpu)); + assert!(ClusterInfo::is_valid_address(&leader.contact_info.tpu)); ThinClient::new( leader.contact_info.rpu, requests_socket,