Crdt -> ClusterInfo

This commit is contained in:
Greg Fitzgerald 2018-10-08 20:55:54 -06:00
parent a99d17c3ac
commit 95701114e3
28 changed files with 499 additions and 452 deletions

View File

@ -11,7 +11,7 @@ use clap::{App, Arg};
use influx_db_client as influxdb; use influx_db_client as influxdb;
use rayon::prelude::*; use rayon::prelude::*;
use solana::client::mk_client; use solana::client::mk_client;
use solana::crdt::{Crdt, NodeInfo}; use solana::cluster_info::{ClusterInfo, NodeInfo};
use solana::drone::DRONE_PORT; use solana::drone::DRONE_PORT;
use solana::hash::Hash; use solana::hash::Hash;
use solana::logger; use solana::logger;
@ -734,7 +734,7 @@ fn main() {
total_tx_sent_count.load(Ordering::Relaxed), total_tx_sent_count.load(Ordering::Relaxed),
); );
// join the crdt client threads // join the cluster_info client threads
ncp.join().unwrap(); ncp.join().unwrap();
} }
@ -744,11 +744,11 @@ fn converge(
num_nodes: usize, num_nodes: usize,
) -> (Vec<NodeInfo>, Option<NodeInfo>, Ncp) { ) -> (Vec<NodeInfo>, Option<NodeInfo>, Ncp) {
//lets spy on the network //lets spy on the network
let (node, gossip_socket) = Crdt::spy_node(); let (node, gossip_socket) = ClusterInfo::spy_node();
let mut spy_crdt = Crdt::new(node).expect("Crdt::new"); let mut spy_cluster_info = ClusterInfo::new(node).expect("ClusterInfo::new");
spy_crdt.insert(&leader); spy_cluster_info.insert(&leader);
spy_crdt.set_leader(leader.id); spy_cluster_info.set_leader(leader.id);
let spy_ref = Arc::new(RwLock::new(spy_crdt)); let spy_ref = Arc::new(RwLock::new(spy_cluster_info));
let window = Arc::new(RwLock::new(default_window())); let window = Arc::new(RwLock::new(default_window()));
let ncp = Ncp::new(&spy_ref, window, None, gossip_socket, exit_signal.clone()); let ncp = Ncp::new(&spy_ref, window, None, gossip_socket, exit_signal.clone());
let mut v: Vec<NodeInfo> = vec![]; let mut v: Vec<NodeInfo> = vec![];
@ -763,7 +763,7 @@ fn converge(
v = spy_ref v = spy_ref
.table .table
.values() .values()
.filter(|x| Crdt::is_valid_address(&x.contact_info.rpu)) .filter(|x| ClusterInfo::is_valid_address(&x.contact_info.rpu))
.cloned() .cloned()
.collect(); .collect();

View File

@ -5,7 +5,7 @@ extern crate serde_json;
extern crate solana; extern crate solana;
use clap::{App, Arg}; use clap::{App, Arg};
use solana::crdt::FULLNODE_PORT_RANGE; use solana::cluster_info::FULLNODE_PORT_RANGE;
use solana::fullnode::Config; use solana::fullnode::Config;
use solana::logger; use solana::logger;
use solana::netutil::{get_ip_addr, get_public_ip_addr, parse_port_or_addr}; use solana::netutil::{get_ip_addr, get_public_ip_addr, parse_port_or_addr};

View File

@ -9,7 +9,7 @@ extern crate solana;
use clap::{App, Arg}; use clap::{App, Arg};
use solana::client::mk_client; use solana::client::mk_client;
use solana::crdt::Node; use solana::cluster_info::Node;
use solana::drone::DRONE_PORT; use solana::drone::DRONE_PORT;
use solana::fullnode::{Config, Fullnode, FullnodeReturnType}; use solana::fullnode::{Config, Fullnode, FullnodeReturnType};
use solana::logger; use solana::logger;

View File

@ -7,7 +7,7 @@ extern crate solana;
use clap::{App, Arg}; use clap::{App, Arg};
use solana::chacha::chacha_cbc_encrypt_files; use solana::chacha::chacha_cbc_encrypt_files;
use solana::crdt::Node; use solana::cluster_info::Node;
use solana::fullnode::Config; use solana::fullnode::Config;
use solana::ledger::LEDGER_DATA_FILE; use solana::ledger::LEDGER_DATA_FILE;
use solana::logger; use solana::logger;

View File

@ -1,7 +1,7 @@
//! The `broadcast_stage` broadcasts data from a leader node to validators //! The `broadcast_stage` broadcasts data from a leader node to validators
//! //!
use cluster_info::{ClusterInfo, ClusterInfoError, NodeInfo};
use counter::Counter; use counter::Counter;
use crdt::{Crdt, CrdtError, NodeInfo};
use entry::Entry; use entry::Entry;
#[cfg(feature = "erasure")] #[cfg(feature = "erasure")]
use erasure; use erasure;
@ -27,7 +27,7 @@ pub enum BroadcastStageReturnType {
} }
fn broadcast( fn broadcast(
crdt: &Arc<RwLock<Crdt>>, cluster_info: &Arc<RwLock<ClusterInfo>>,
leader_rotation_interval: u64, leader_rotation_interval: u64,
node_info: &NodeInfo, node_info: &NodeInfo,
broadcast_table: &[NodeInfo], broadcast_table: &[NodeInfo],
@ -129,8 +129,8 @@ fn broadcast(
*receive_index += blobs_len as u64; *receive_index += blobs_len as u64;
// Send blobs out from the window // Send blobs out from the window
Crdt::broadcast( ClusterInfo::broadcast(
crdt, cluster_info,
leader_rotation_interval, leader_rotation_interval,
&node_info, &node_info,
&broadcast_table, &broadcast_table,
@ -179,7 +179,7 @@ pub struct BroadcastStage {
impl BroadcastStage { impl BroadcastStage {
fn run( fn run(
sock: &UdpSocket, sock: &UdpSocket,
crdt: &Arc<RwLock<Crdt>>, cluster_info: &Arc<RwLock<ClusterInfo>>,
window: &SharedWindow, window: &SharedWindow,
entry_height: u64, entry_height: u64,
receiver: &Receiver<Vec<Entry>>, receiver: &Receiver<Vec<Entry>>,
@ -192,16 +192,16 @@ impl BroadcastStage {
let me; let me;
let leader_rotation_interval; let leader_rotation_interval;
{ {
let rcrdt = crdt.read().unwrap(); let rcluster_info = cluster_info.read().unwrap();
me = rcrdt.my_data().clone(); me = rcluster_info.my_data().clone();
leader_rotation_interval = rcrdt.get_leader_rotation_interval(); leader_rotation_interval = rcluster_info.get_leader_rotation_interval();
} }
loop { loop {
if transmit_index.data % (leader_rotation_interval as u64) == 0 { if transmit_index.data % (leader_rotation_interval as u64) == 0 {
let rcrdt = crdt.read().unwrap(); let rcluster_info = cluster_info.read().unwrap();
let my_id = rcrdt.my_data().id; let my_id = rcluster_info.my_data().id;
match rcrdt.get_scheduled_leader(transmit_index.data) { match rcluster_info.get_scheduled_leader(transmit_index.data) {
Some(id) if id == my_id => (), Some(id) if id == my_id => (),
// If the leader stays in power for the next // If the leader stays in power for the next
// round as well, then we don't exit. Otherwise, exit. // round as well, then we don't exit. Otherwise, exit.
@ -211,9 +211,9 @@ impl BroadcastStage {
} }
} }
let broadcast_table = crdt.read().unwrap().compute_broadcast_table(); let broadcast_table = cluster_info.read().unwrap().compute_broadcast_table();
if let Err(e) = broadcast( if let Err(e) = broadcast(
crdt, cluster_info,
leader_rotation_interval, leader_rotation_interval,
&me, &me,
&broadcast_table, &broadcast_table,
@ -228,7 +228,7 @@ impl BroadcastStage {
return BroadcastStageReturnType::ChannelDisconnected return BroadcastStageReturnType::ChannelDisconnected
} }
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (), Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
Error::CrdtError(CrdtError::NoPeers) => (), // TODO: Why are the unit-tests throwing hundreds of these? Error::ClusterInfoError(ClusterInfoError::NoPeers) => (), // TODO: Why are the unit-tests throwing hundreds of these?
_ => { _ => {
inc_new_counter_info!("streamer-broadcaster-error", 1, 1); inc_new_counter_info!("streamer-broadcaster-error", 1, 1);
error!("broadcaster error: {:?}", e); error!("broadcaster error: {:?}", e);
@ -239,11 +239,11 @@ impl BroadcastStage {
} }
/// Service to broadcast messages from the leader to layer 1 nodes. /// Service to broadcast messages from the leader to layer 1 nodes.
/// See `crdt` for network layer definitions. /// See `cluster_info` for network layer definitions.
/// # Arguments /// # Arguments
/// * `sock` - Socket to send from. /// * `sock` - Socket to send from.
/// * `exit` - Boolean to signal system exit. /// * `exit` - Boolean to signal system exit.
/// * `crdt` - CRDT structure /// * `cluster_info` - ClusterInfo structure
/// * `window` - Cache of blobs that we have broadcast /// * `window` - Cache of blobs that we have broadcast
/// * `receiver` - Receive channel for blobs to be retransmitted to all the layer 1 nodes. /// * `receiver` - Receive channel for blobs to be retransmitted to all the layer 1 nodes.
/// * `exit_sender` - Set to true when this stage exits, allows rest of Tpu to exit cleanly. Otherwise, /// * `exit_sender` - Set to true when this stage exits, allows rest of Tpu to exit cleanly. Otherwise,
@ -255,7 +255,7 @@ impl BroadcastStage {
/// completing the cycle. /// completing the cycle.
pub fn new( pub fn new(
sock: UdpSocket, sock: UdpSocket,
crdt: Arc<RwLock<Crdt>>, cluster_info: Arc<RwLock<ClusterInfo>>,
window: SharedWindow, window: SharedWindow,
entry_height: u64, entry_height: u64,
receiver: Receiver<Vec<Entry>>, receiver: Receiver<Vec<Entry>>,
@ -265,7 +265,7 @@ impl BroadcastStage {
.name("solana-broadcaster".to_string()) .name("solana-broadcaster".to_string())
.spawn(move || { .spawn(move || {
let _exit = Finalizer::new(exit_sender); let _exit = Finalizer::new(exit_sender);
Self::run(&sock, &crdt, &window, entry_height, &receiver) Self::run(&sock, &cluster_info, &window, entry_height, &receiver)
}).unwrap(); }).unwrap();
BroadcastStage { thread_hdl } BroadcastStage { thread_hdl }
@ -283,7 +283,7 @@ impl Service for BroadcastStage {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use broadcast_stage::{BroadcastStage, BroadcastStageReturnType}; use broadcast_stage::{BroadcastStage, BroadcastStageReturnType};
use crdt::{Crdt, Node}; use cluster_info::{ClusterInfo, Node};
use entry::Entry; use entry::Entry;
use ledger::next_entries_mut; use ledger::next_entries_mut;
use mint::Mint; use mint::Mint;
@ -302,7 +302,7 @@ mod tests {
broadcast_stage: BroadcastStage, broadcast_stage: BroadcastStage,
shared_window: SharedWindow, shared_window: SharedWindow,
entry_sender: Sender<Vec<Entry>>, entry_sender: Sender<Vec<Entry>>,
crdt: Arc<RwLock<Crdt>>, cluster_info: Arc<RwLock<ClusterInfo>>,
entries: Vec<Entry>, entries: Vec<Entry>,
} }
@ -317,11 +317,12 @@ mod tests {
let buddy_id = buddy_keypair.pubkey(); let buddy_id = buddy_keypair.pubkey();
let broadcast_buddy = Node::new_localhost_with_pubkey(buddy_keypair.pubkey()); let broadcast_buddy = Node::new_localhost_with_pubkey(buddy_keypair.pubkey());
// Fill the crdt with the buddy's info // Fill the cluster_info with the buddy's info
let mut crdt = Crdt::new(leader_info.info.clone()).expect("Crdt::new"); let mut cluster_info =
crdt.insert(&broadcast_buddy.info); ClusterInfo::new(leader_info.info.clone()).expect("ClusterInfo::new");
crdt.set_leader_rotation_interval(leader_rotation_interval); cluster_info.insert(&broadcast_buddy.info);
let crdt = Arc::new(RwLock::new(crdt)); cluster_info.set_leader_rotation_interval(leader_rotation_interval);
let cluster_info = Arc::new(RwLock::new(cluster_info));
// Make dummy initial entries // Make dummy initial entries
let mint = Mint::new(10000); let mint = Mint::new(10000);
@ -338,7 +339,7 @@ mod tests {
// Start up the broadcast stage // Start up the broadcast stage
let broadcast_stage = BroadcastStage::new( let broadcast_stage = BroadcastStage::new(
leader_info.sockets.broadcast, leader_info.sockets.broadcast,
crdt.clone(), cluster_info.clone(),
shared_window.clone(), shared_window.clone(),
entry_height, entry_height,
entry_receiver, entry_receiver,
@ -351,7 +352,7 @@ mod tests {
broadcast_stage, broadcast_stage,
shared_window, shared_window,
entry_sender, entry_sender,
crdt, cluster_info,
entries, entries,
} }
} }
@ -372,9 +373,9 @@ mod tests {
let leader_rotation_interval = 10; let leader_rotation_interval = 10;
let broadcast_info = setup_dummy_broadcast_stage(leader_rotation_interval); let broadcast_info = setup_dummy_broadcast_stage(leader_rotation_interval);
{ {
let mut wcrdt = broadcast_info.crdt.write().unwrap(); let mut wcluster_info = broadcast_info.cluster_info.write().unwrap();
// Set the leader for the next rotation to be myself // Set the leader for the next rotation to be myself
wcrdt.set_scheduled_leader(leader_rotation_interval, broadcast_info.my_id); wcluster_info.set_scheduled_leader(leader_rotation_interval, broadcast_info.my_id);
} }
let genesis_len = broadcast_info.entries.len() as u64; let genesis_len = broadcast_info.entries.len() as u64;
@ -394,16 +395,16 @@ mod tests {
broadcast_info.entry_sender.send(new_entry).unwrap(); broadcast_info.entry_sender.send(new_entry).unwrap();
} }
// Set the scheduled next leader in the crdt to the other buddy on the network // Set the scheduled next leader in the cluster_info to the other buddy on the network
broadcast_info broadcast_info
.crdt .cluster_info
.write() .write()
.unwrap() .unwrap()
.set_scheduled_leader(2 * leader_rotation_interval, broadcast_info.buddy_id); .set_scheduled_leader(2 * leader_rotation_interval, broadcast_info.buddy_id);
// Input another leader_rotation_interval dummy entries, which will take us // Input another leader_rotation_interval dummy entries, which will take us
// past the point of the leader rotation. The write_stage will see that // past the point of the leader rotation. The write_stage will see that
// it's no longer the leader after checking the crdt, and exit // it's no longer the leader after checking the cluster_info, and exit
for _ in 0..leader_rotation_interval { for _ in 0..leader_rotation_interval {
let new_entry = next_entries_mut(&mut last_id, &mut num_hashes, vec![]); let new_entry = next_entries_mut(&mut last_id, &mut num_hashes, vec![]);

View File

@ -12,9 +12,9 @@ pub struct Contract {
pub struct Vote { pub struct Vote {
/// We send some gossip specific membership information through the vote to shortcut /// We send some gossip specific membership information through the vote to shortcut
/// liveness voting /// liveness voting
/// The version of the CRDT struct that the last_id of this network voted with /// The version of the ClusterInfo struct that the last_id of this network voted with
pub version: u64, pub version: u64,
/// The version of the CRDT struct that has the same network configuration as this one /// The version of the ClusterInfo struct that has the same network configuration as this one
pub contact_info_version: u64, pub contact_info_version: u64,
// TODO: add signature of the state here as well // TODO: add signature of the state here as well
} }

View File

@ -1,4 +1,4 @@
use crdt::{CrdtError, NodeInfo}; use cluster_info::{ClusterInfoError, NodeInfo};
use rand::distributions::{Distribution, Weighted, WeightedChoice}; use rand::distributions::{Distribution, Weighted, WeightedChoice};
use rand::thread_rng; use rand::thread_rng;
use result::Result; use result::Result;
@ -29,7 +29,7 @@ impl<'a, 'b> ChooseRandomPeerStrategy<'a> {
impl<'a> ChooseGossipPeerStrategy for ChooseRandomPeerStrategy<'a> { impl<'a> ChooseGossipPeerStrategy for ChooseRandomPeerStrategy<'a> {
fn choose_peer<'b>(&self, options: Vec<&'b NodeInfo>) -> Result<&'b NodeInfo> { fn choose_peer<'b>(&self, options: Vec<&'b NodeInfo>) -> Result<&'b NodeInfo> {
if options.is_empty() { if options.is_empty() {
Err(CrdtError::NoPeers)?; Err(ClusterInfoError::NoPeers)?;
} }
let n = ((self.random)() as usize) % options.len(); let n = ((self.random)() as usize) % options.len();
@ -87,8 +87,8 @@ impl<'a> ChooseWeightedPeerStrategy<'a> {
fn calculate_weighted_remote_index(&self, peer_id: Pubkey) -> u32 { fn calculate_weighted_remote_index(&self, peer_id: Pubkey) -> u32 {
let mut last_seen_index = 0; let mut last_seen_index = 0;
// If the peer is not in our remote table, then we leave last_seen_index as zero. // If the peer is not in our remote table, then we leave last_seen_index as zero.
// Only happens when a peer appears in our crdt.table but not in our crdt.remote, // Only happens when a peer appears in our cluster_info.table but not in our cluster_info.remote,
// which means a validator was directly injected into our crdt.table // which means a validator was directly injected into our cluster_info.table
if let Some(index) = self.remote.get(&peer_id) { if let Some(index) = self.remote.get(&peer_id) {
last_seen_index = *index; last_seen_index = *index;
} }
@ -174,7 +174,7 @@ impl<'a> ChooseWeightedPeerStrategy<'a> {
impl<'a> ChooseGossipPeerStrategy for ChooseWeightedPeerStrategy<'a> { impl<'a> ChooseGossipPeerStrategy for ChooseWeightedPeerStrategy<'a> {
fn choose_peer<'b>(&self, options: Vec<&'b NodeInfo>) -> Result<&'b NodeInfo> { fn choose_peer<'b>(&self, options: Vec<&'b NodeInfo>) -> Result<&'b NodeInfo> {
if options.is_empty() { if options.is_empty() {
Err(CrdtError::NoPeers)?; Err(ClusterInfoError::NoPeers)?;
} }
let mut weighted_peers = vec![]; let mut weighted_peers = vec![];

View File

@ -1,4 +1,4 @@
use crdt::{NodeInfo, FULLNODE_PORT_RANGE}; use cluster_info::{NodeInfo, FULLNODE_PORT_RANGE};
use netutil::bind_in_range; use netutil::bind_in_range;
use std::time::Duration; use std::time::Duration;
use thin_client::ThinClient; use thin_client::ThinClient;

View File

@ -1,4 +1,4 @@
//! The `crdt` module defines a data structure that is shared by all the nodes in the network over //! The `cluster_info` module defines a data structure that is shared by all the nodes in the network over
//! a gossip control plane. The goal is to share small bits of off-chain information and detect and //! a gossip control plane. The goal is to share small bits of off-chain information and detect and
//! repair partitions. //! repair partitions.
//! //!
@ -64,7 +64,7 @@ macro_rules! socketaddr_any {
} }
#[derive(Debug, PartialEq, Eq)] #[derive(Debug, PartialEq, Eq)]
pub enum CrdtError { pub enum ClusterInfoError {
NoPeers, NoPeers,
NoLeader, NoLeader,
BadContactInfo, BadContactInfo,
@ -194,7 +194,7 @@ impl NodeInfo {
} }
} }
/// `Crdt` structure keeps a table of `NodeInfo` structs /// `ClusterInfo` structure keeps a table of `NodeInfo` structs
/// # Properties /// # Properties
/// * `table` - map of public id's to versioned and signed NodeInfo structs /// * `table` - map of public id's to versioned and signed NodeInfo structs
/// * `local` - map of public id's to what `self.update_index` `self.table` was updated /// * `local` - map of public id's to what `self.update_index` `self.table` was updated
@ -205,7 +205,7 @@ impl NodeInfo {
/// * `gossip` - asynchronously ask nodes to send updates /// * `gossip` - asynchronously ask nodes to send updates
/// * `listen` - listen for requests and responses /// * `listen` - listen for requests and responses
/// No attempt to keep track of timeouts or dropped requests is made, or should be. /// No attempt to keep track of timeouts or dropped requests is made, or should be.
pub struct Crdt { pub struct ClusterInfo {
/// table of everyone in the network /// table of everyone in the network
pub table: HashMap<Pubkey, NodeInfo>, pub table: HashMap<Pubkey, NodeInfo>,
/// Value of my update index when entry in table was updated. /// Value of my update index when entry in table was updated.
@ -247,12 +247,12 @@ enum Protocol {
RequestWindowIndex(NodeInfo, u64), RequestWindowIndex(NodeInfo, u64),
} }
impl Crdt { impl ClusterInfo {
pub fn new(node_info: NodeInfo) -> Result<Crdt> { pub fn new(node_info: NodeInfo) -> Result<ClusterInfo> {
if node_info.version != 0 { if node_info.version != 0 {
return Err(Error::CrdtError(CrdtError::BadNodeInfo)); return Err(Error::ClusterInfoError(ClusterInfoError::BadNodeInfo));
} }
let mut me = Crdt { let mut me = ClusterInfo {
table: HashMap::new(), table: HashMap::new(),
local: HashMap::new(), local: HashMap::new(),
remote: HashMap::new(), remote: HashMap::new(),
@ -351,7 +351,7 @@ impl Crdt {
.values() .values()
.into_iter() .into_iter()
.filter(|x| x.id != me) .filter(|x| x.id != me)
.filter(|x| Crdt::is_valid_address(&x.contact_info.rpu)) .filter(|x| ClusterInfo::is_valid_address(&x.contact_info.rpu))
.cloned() .cloned()
.collect() .collect()
} }
@ -374,7 +374,7 @@ impl Crdt {
} }
if *pubkey == self.my_data().leader_id { if *pubkey == self.my_data().leader_id {
info!("{}: LEADER_VOTED! {}", self.id, pubkey); info!("{}: LEADER_VOTED! {}", self.id, pubkey);
inc_new_counter_info!("crdt-insert_vote-leader_voted", 1); inc_new_counter_info!("cluster_info-insert_vote-leader_voted", 1);
} }
if v.version <= self.table[pubkey].version { if v.version <= self.table[pubkey].version {
@ -392,7 +392,7 @@ impl Crdt {
} }
} }
pub fn insert_votes(&mut self, votes: &[(Pubkey, Vote, Hash)]) { pub fn insert_votes(&mut self, votes: &[(Pubkey, Vote, Hash)]) {
inc_new_counter_info!("crdt-vote-count", votes.len()); inc_new_counter_info!("cluster_info-vote-count", votes.len());
if !votes.is_empty() { if !votes.is_empty() {
info!("{}: INSERTING VOTES {}", self.id, votes.len()); info!("{}: INSERTING VOTES {}", self.id, votes.len());
} }
@ -409,7 +409,7 @@ impl Crdt {
// we have stored ourselves // we have stored ourselves
trace!("{}: insert v.id: {} version: {}", self.id, v.id, v.version); trace!("{}: insert v.id: {} version: {}", self.id, v.id, v.version);
if self.table.get(&v.id).is_none() { if self.table.get(&v.id).is_none() {
inc_new_counter_info!("crdt-insert-new_entry", 1, 1); inc_new_counter_info!("cluster_info-insert-new_entry", 1, 1);
} }
self.update_index += 1; self.update_index += 1;
@ -462,7 +462,7 @@ impl Crdt {
} }
}).collect(); }).collect();
inc_new_counter_info!("crdt-purge-count", dead_ids.len()); inc_new_counter_info!("cluster_info-purge-count", dead_ids.len());
for id in &dead_ids { for id in &dead_ids {
self.alive.remove(id); self.alive.remove(id);
@ -476,7 +476,7 @@ impl Crdt {
} }
if *id == leader_id { if *id == leader_id {
info!("{}: PURGE LEADER {}", self.id, id,); info!("{}: PURGE LEADER {}", self.id, id,);
inc_new_counter_info!("crdt-purge-purged_leader", 1, 1); inc_new_counter_info!("cluster_info-purge-purged_leader", 1, 1);
self.set_leader(Pubkey::default()); self.set_leader(Pubkey::default());
} }
} }
@ -516,7 +516,7 @@ impl Crdt {
/// # Remarks /// # Remarks
/// We need to avoid having obj locked while doing any io, such as the `send_to` /// We need to avoid having obj locked while doing any io, such as the `send_to`
pub fn broadcast( pub fn broadcast(
crdt: &Arc<RwLock<Crdt>>, cluster_info: &Arc<RwLock<ClusterInfo>>,
leader_rotation_interval: u64, leader_rotation_interval: u64,
me: &NodeInfo, me: &NodeInfo,
broadcast_table: &[NodeInfo], broadcast_table: &[NodeInfo],
@ -526,9 +526,9 @@ impl Crdt {
received_index: u64, received_index: u64,
) -> Result<()> { ) -> Result<()> {
if broadcast_table.is_empty() { if broadcast_table.is_empty() {
debug!("{}:not enough peers in crdt table", me.id); debug!("{}:not enough peers in cluster_info table", me.id);
inc_new_counter_info!("crdt-broadcast-not_enough_peers_error", 1); inc_new_counter_info!("cluster_info-broadcast-not_enough_peers_error", 1);
Err(CrdtError::NoPeers)?; Err(ClusterInfoError::NoPeers)?;
} }
trace!( trace!(
"{} transmit_index: {:?} received_index: {} broadcast_len: {}", "{} transmit_index: {:?} received_index: {} broadcast_len: {}",
@ -563,7 +563,10 @@ impl Crdt {
// so he can initiate repairs if necessary // so he can initiate repairs if necessary
let entry_height = idx + 1; let entry_height = idx + 1;
if entry_height % leader_rotation_interval == 0 { if entry_height % leader_rotation_interval == 0 {
let next_leader_id = crdt.read().unwrap().get_scheduled_leader(entry_height); let next_leader_id = cluster_info
.read()
.unwrap()
.get_scheduled_leader(entry_height);
if next_leader_id.is_some() && next_leader_id != Some(me.id) { if next_leader_id.is_some() && next_leader_id != Some(me.id) {
let info_result = broadcast_table let info_result = broadcast_table
.iter() .iter()
@ -640,7 +643,7 @@ impl Crdt {
} }
} }
inc_new_counter_info!( inc_new_counter_info!(
"crdt-broadcast-max_idx", "cluster_info-broadcast-max_idx",
(transmit_index.data - old_transmit_index) as usize (transmit_index.data - old_transmit_index) as usize
); );
transmit_index.coding = transmit_index.data; transmit_index.coding = transmit_index.data;
@ -699,7 +702,7 @@ impl Crdt {
}).collect(); }).collect();
for e in errs { for e in errs {
if let Err(e) = &e { if let Err(e) = &e {
inc_new_counter_info!("crdt-retransmit-send_to_error", 1, 1); inc_new_counter_info!("cluster_info-retransmit-send_to_error", 1, 1);
error!("retransmit result {:?}", e); error!("retransmit result {:?}", e);
} }
e?; e?;
@ -751,7 +754,7 @@ impl Crdt {
.filter(|r| r.id != self.id && Self::is_valid_address(&r.contact_info.tvu)) .filter(|r| r.id != self.id && Self::is_valid_address(&r.contact_info.tvu))
.collect(); .collect();
if valid.is_empty() { if valid.is_empty() {
Err(CrdtError::NoPeers)?; Err(ClusterInfoError::NoPeers)?;
} }
let n = thread_rng().gen::<usize>() % valid.len(); let n = thread_rng().gen::<usize>() % valid.len();
let addr = valid[n].contact_info.ncp; // send the request to the peer's gossip port let addr = valid[n].contact_info.ncp; // send the request to the peer's gossip port
@ -783,8 +786,12 @@ impl Crdt {
let choose_peer_result = choose_peer_strategy.choose_peer(options); let choose_peer_result = choose_peer_strategy.choose_peer(options);
if let Err(Error::CrdtError(CrdtError::NoPeers)) = &choose_peer_result { if let Err(Error::ClusterInfoError(ClusterInfoError::NoPeers)) = &choose_peer_result {
trace!("crdt too small for gossip {} {}", self.id, self.table.len()); trace!(
"cluster_info too small for gossip {} {}",
self.id,
self.table.len()
);
}; };
let v = choose_peer_result?; let v = choose_peer_result?;
@ -803,7 +810,10 @@ impl Crdt {
pub fn new_vote(&mut self, last_id: Hash) -> Result<(Vote, SocketAddr)> { pub fn new_vote(&mut self, last_id: Hash) -> Result<(Vote, SocketAddr)> {
let mut me = self.my_data().clone(); let mut me = self.my_data().clone();
let leader = self.leader_data().ok_or(CrdtError::NoLeader)?.clone(); let leader = self
.leader_data()
.ok_or(ClusterInfoError::NoLeader)?
.clone();
me.version += 1; me.version += 1;
me.ledger_state.last_id = last_id; me.ledger_state.last_id = last_id;
let vote = Vote { let vote = Vote {
@ -879,7 +889,7 @@ impl Crdt {
for v in data { for v in data {
insert_total += self.insert(&v); insert_total += self.insert(&v);
} }
inc_new_counter_info!("crdt-update-count", insert_total); inc_new_counter_info!("cluster_info-update-count", insert_total);
for (pubkey, external_remote_index) in external_liveness { for (pubkey, external_remote_index) in external_liveness {
let remote_entry = if let Some(v) = self.remote.get(pubkey) { let remote_entry = if let Some(v) = self.remote.get(pubkey) {
@ -974,11 +984,11 @@ impl Crdt {
outblob.meta.set_addr(from_addr); outblob.meta.set_addr(from_addr);
outblob.set_id(sender_id).expect("blob set_id"); outblob.set_id(sender_id).expect("blob set_id");
} }
inc_new_counter_info!("crdt-window-request-pass", 1); inc_new_counter_info!("cluster_info-window-request-pass", 1);
return Some(out); return Some(out);
} else { } else {
inc_new_counter_info!("crdt-window-request-outside", 1); inc_new_counter_info!("cluster_info-window-request-outside", 1);
trace!( trace!(
"requested ix {} != blob_ix {}, outside window!", "requested ix {} != blob_ix {}, outside window!",
ix, ix,
@ -990,7 +1000,7 @@ impl Crdt {
if let Some(ledger_window) = ledger_window { if let Some(ledger_window) = ledger_window {
if let Ok(entry) = ledger_window.get_entry(ix) { if let Ok(entry) = ledger_window.get_entry(ix) {
inc_new_counter_info!("crdt-window-request-ledger", 1); inc_new_counter_info!("cluster_info-window-request-ledger", 1);
let out = entry.to_blob( let out = entry.to_blob(
Some(ix), Some(ix),
@ -1002,7 +1012,7 @@ impl Crdt {
} }
} }
inc_new_counter_info!("crdt-window-request-fail", 1); inc_new_counter_info!("cluster_info-window-request-fail", 1);
trace!( trace!(
"{}: failed RequestWindowIndex {} {} {}", "{}: failed RequestWindowIndex {} {} {}",
me.id, me.id,
@ -1023,10 +1033,10 @@ impl Crdt {
) -> Option<SharedBlob> { ) -> Option<SharedBlob> {
match deserialize(&blob.data[..blob.meta.size]) { match deserialize(&blob.data[..blob.meta.size]) {
Ok(request) => { Ok(request) => {
Crdt::handle_protocol(obj, &blob.meta.addr(), request, window, ledger_window) ClusterInfo::handle_protocol(obj, &blob.meta.addr(), request, window, ledger_window)
} }
Err(_) => { Err(_) => {
warn!("deserialize crdt packet failed"); warn!("deserialize cluster_info packet failed");
None None
} }
} }
@ -1058,7 +1068,7 @@ impl Crdt {
me.read().unwrap().id, me.read().unwrap().id,
from.id from.id
); );
inc_new_counter_info!("crdt-window-request-loopback", 1); inc_new_counter_info!("cluster_info-window-request-loopback", 1);
return None; return None;
} }
@ -1066,7 +1076,7 @@ impl Crdt {
// this may or may not be correct for everybody but it's better than leaving him with // this may or may not be correct for everybody but it's better than leaving him with
// an unspecified address in our table // an unspecified address in our table
if from.contact_info.ncp.ip().is_unspecified() { if from.contact_info.ncp.ip().is_unspecified() {
inc_new_counter_info!("crdt-window-request-updates-unspec-ncp", 1); inc_new_counter_info!("cluster_info-window-request-updates-unspec-ncp", 1);
from.contact_info.ncp = *from_addr; from.contact_info.ncp = *from_addr;
} }
@ -1144,7 +1154,7 @@ impl Crdt {
Protocol::RequestWindowIndex(from, ix) => { Protocol::RequestWindowIndex(from, ix) => {
let now = Instant::now(); let now = Instant::now();
//TODO this doesn't depend on CRDT module, could be moved //TODO this doesn't depend on cluster_info module, could be moved
//but we are using the listen thread to service these request //but we are using the listen thread to service these request
//TODO verify from is signed //TODO verify from is signed
@ -1155,13 +1165,13 @@ impl Crdt {
from.id, from.id,
ix, ix,
); );
inc_new_counter_info!("crdt-window-request-address-eq", 1); inc_new_counter_info!("cluster_info-window-request-address-eq", 1);
return None; return None;
} }
me.write().unwrap().insert(&from); me.write().unwrap().insert(&from);
let me = me.read().unwrap().my_data().clone(); let me = me.read().unwrap().my_data().clone();
inc_new_counter_info!("crdt-window-request-recv", 1); inc_new_counter_info!("cluster_info-window-request-recv", 1);
trace!("{}: received RequestWindowIndex {} {} ", me.id, from.id, ix,); trace!("{}: received RequestWindowIndex {} {} ", me.id, from.id, ix,);
let res = let res =
Self::run_window_request(&from, &from_addr, &window, ledger_window, &me, ix); Self::run_window_request(&from, &from_addr, &window, ledger_window, &me, ix);
@ -1376,9 +1386,9 @@ fn report_time_spent(label: &str, time: &Duration, extra: &str) {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use budget_instruction::Vote; use budget_instruction::Vote;
use crdt::{ use cluster_info::{
Crdt, CrdtError, Node, NodeInfo, Protocol, FULLNODE_PORT_RANGE, GOSSIP_PURGE_MILLIS, ClusterInfo, ClusterInfoError, Node, NodeInfo, Protocol, FULLNODE_PORT_RANGE,
GOSSIP_SLEEP_MILLIS, MIN_TABLE_SIZE, GOSSIP_PURGE_MILLIS, GOSSIP_SLEEP_MILLIS, MIN_TABLE_SIZE,
}; };
use entry::Entry; use entry::Entry;
use hash::{hash, Hash}; use hash::{hash, Hash};
@ -1401,83 +1411,83 @@ mod tests {
fn insert_test() { fn insert_test() {
let mut d = NodeInfo::new_localhost(Keypair::new().pubkey()); let mut d = NodeInfo::new_localhost(Keypair::new().pubkey());
assert_eq!(d.version, 0); assert_eq!(d.version, 0);
let mut crdt = Crdt::new(d.clone()).unwrap(); let mut cluster_info = ClusterInfo::new(d.clone()).unwrap();
assert_eq!(crdt.table[&d.id].version, 0); assert_eq!(cluster_info.table[&d.id].version, 0);
assert!(!crdt.alive.contains_key(&d.id)); assert!(!cluster_info.alive.contains_key(&d.id));
d.version = 2; d.version = 2;
crdt.insert(&d); cluster_info.insert(&d);
let liveness = crdt.alive[&d.id]; let liveness = cluster_info.alive[&d.id];
assert_eq!(crdt.table[&d.id].version, 2); assert_eq!(cluster_info.table[&d.id].version, 2);
d.version = 1; d.version = 1;
crdt.insert(&d); cluster_info.insert(&d);
assert_eq!(crdt.table[&d.id].version, 2); assert_eq!(cluster_info.table[&d.id].version, 2);
assert_eq!(liveness, crdt.alive[&d.id]); assert_eq!(liveness, cluster_info.alive[&d.id]);
// Ensure liveness will be updated for version 3 // Ensure liveness will be updated for version 3
sleep(Duration::from_millis(1)); sleep(Duration::from_millis(1));
d.version = 3; d.version = 3;
crdt.insert(&d); cluster_info.insert(&d);
assert_eq!(crdt.table[&d.id].version, 3); assert_eq!(cluster_info.table[&d.id].version, 3);
assert!(liveness < crdt.alive[&d.id]); assert!(liveness < cluster_info.alive[&d.id]);
} }
#[test] #[test]
fn test_new_vote() { fn test_new_vote() {
let d = NodeInfo::new_with_socketaddr(&socketaddr!("127.0.0.1:1234")); let d = NodeInfo::new_with_socketaddr(&socketaddr!("127.0.0.1:1234"));
assert_eq!(d.version, 0); assert_eq!(d.version, 0);
let mut crdt = Crdt::new(d.clone()).unwrap(); let mut cluster_info = ClusterInfo::new(d.clone()).unwrap();
assert_eq!(crdt.table[&d.id].version, 0); assert_eq!(cluster_info.table[&d.id].version, 0);
let leader = NodeInfo::new_with_socketaddr(&socketaddr!("127.0.0.2:1235")); let leader = NodeInfo::new_with_socketaddr(&socketaddr!("127.0.0.2:1235"));
assert_ne!(d.id, leader.id); assert_ne!(d.id, leader.id);
assert_matches!( assert_matches!(
crdt.new_vote(Hash::default()).err(), cluster_info.new_vote(Hash::default()).err(),
Some(Error::CrdtError(CrdtError::NoLeader)) Some(Error::ClusterInfoError(ClusterInfoError::NoLeader))
); );
crdt.insert(&leader); cluster_info.insert(&leader);
assert_matches!( assert_matches!(
crdt.new_vote(Hash::default()).err(), cluster_info.new_vote(Hash::default()).err(),
Some(Error::CrdtError(CrdtError::NoLeader)) Some(Error::ClusterInfoError(ClusterInfoError::NoLeader))
); );
crdt.set_leader(leader.id); cluster_info.set_leader(leader.id);
assert_eq!(crdt.table[&d.id].version, 1); assert_eq!(cluster_info.table[&d.id].version, 1);
let v = Vote { let v = Vote {
version: 2, //version should increase when we vote version: 2, //version should increase when we vote
contact_info_version: 0, contact_info_version: 0,
}; };
let expected = (v, crdt.table[&leader.id].contact_info.tpu); let expected = (v, cluster_info.table[&leader.id].contact_info.tpu);
assert_eq!(crdt.new_vote(Hash::default()).unwrap(), expected); assert_eq!(cluster_info.new_vote(Hash::default()).unwrap(), expected);
} }
#[test] #[test]
fn test_insert_vote() { fn test_insert_vote() {
let d = NodeInfo::new_with_socketaddr(&socketaddr!("127.0.0.1:1234")); let d = NodeInfo::new_with_socketaddr(&socketaddr!("127.0.0.1:1234"));
assert_eq!(d.version, 0); assert_eq!(d.version, 0);
let mut crdt = Crdt::new(d.clone()).unwrap(); let mut cluster_info = ClusterInfo::new(d.clone()).unwrap();
assert_eq!(crdt.table[&d.id].version, 0); assert_eq!(cluster_info.table[&d.id].version, 0);
let vote_same_version = Vote { let vote_same_version = Vote {
version: d.version, version: d.version,
contact_info_version: 0, contact_info_version: 0,
}; };
crdt.insert_vote(&d.id, &vote_same_version, Hash::default()); cluster_info.insert_vote(&d.id, &vote_same_version, Hash::default());
assert_eq!(crdt.table[&d.id].version, 0); assert_eq!(cluster_info.table[&d.id].version, 0);
let vote_new_version_new_addrs = Vote { let vote_new_version_new_addrs = Vote {
version: d.version + 1, version: d.version + 1,
contact_info_version: 1, contact_info_version: 1,
}; };
crdt.insert_vote(&d.id, &vote_new_version_new_addrs, Hash::default()); cluster_info.insert_vote(&d.id, &vote_new_version_new_addrs, Hash::default());
//should be dropped since the address is newer then we know //should be dropped since the address is newer then we know
assert_eq!(crdt.table[&d.id].version, 0); assert_eq!(cluster_info.table[&d.id].version, 0);
let vote_new_version_old_addrs = Vote { let vote_new_version_old_addrs = Vote {
version: d.version + 1, version: d.version + 1,
contact_info_version: 0, contact_info_version: 0,
}; };
crdt.insert_vote(&d.id, &vote_new_version_old_addrs, Hash::default()); cluster_info.insert_vote(&d.id, &vote_new_version_old_addrs, Hash::default());
//should be accepted, since the update is for the same address field as the one we know //should be accepted, since the update is for the same address field as the one we know
assert_eq!(crdt.table[&d.id].version, 1); assert_eq!(cluster_info.table[&d.id].version, 1);
} }
fn sorted(ls: &Vec<NodeInfo>) -> Vec<NodeInfo> { fn sorted(ls: &Vec<NodeInfo>) -> Vec<NodeInfo> {
let mut copy: Vec<_> = ls.iter().cloned().collect(); let mut copy: Vec<_> = ls.iter().cloned().collect();
@ -1502,20 +1512,20 @@ mod tests {
let d1 = NodeInfo::new_localhost(Keypair::new().pubkey()); let d1 = NodeInfo::new_localhost(Keypair::new().pubkey());
let d2 = NodeInfo::new_localhost(Keypair::new().pubkey()); let d2 = NodeInfo::new_localhost(Keypair::new().pubkey());
let d3 = NodeInfo::new_localhost(Keypair::new().pubkey()); let d3 = NodeInfo::new_localhost(Keypair::new().pubkey());
let mut crdt = Crdt::new(d1.clone()).expect("Crdt::new"); let mut cluster_info = ClusterInfo::new(d1.clone()).expect("ClusterInfo::new");
let (key, ix, ups) = crdt.get_updates_since(0); let (key, ix, ups) = cluster_info.get_updates_since(0);
assert_eq!(key, d1.id); assert_eq!(key, d1.id);
assert_eq!(ix, 1); assert_eq!(ix, 1);
assert_eq!(ups.len(), 1); assert_eq!(ups.len(), 1);
assert_eq!(sorted(&ups), sorted(&vec![d1.clone()])); assert_eq!(sorted(&ups), sorted(&vec![d1.clone()]));
crdt.insert(&d2); cluster_info.insert(&d2);
let (key, ix, ups) = crdt.get_updates_since(0); let (key, ix, ups) = cluster_info.get_updates_since(0);
assert_eq!(key, d1.id); assert_eq!(key, d1.id);
assert_eq!(ix, 2); assert_eq!(ix, 2);
assert_eq!(ups.len(), 2); assert_eq!(ups.len(), 2);
assert_eq!(sorted(&ups), sorted(&vec![d1.clone(), d2.clone()])); assert_eq!(sorted(&ups), sorted(&vec![d1.clone(), d2.clone()]));
crdt.insert(&d3); cluster_info.insert(&d3);
let (key, ix, ups) = crdt.get_updates_since(0); let (key, ix, ups) = cluster_info.get_updates_since(0);
assert_eq!(key, d1.id); assert_eq!(key, d1.id);
assert_eq!(ix, 3); assert_eq!(ix, 3);
assert_eq!(ups.len(), 3); assert_eq!(ups.len(), 3);
@ -1523,24 +1533,24 @@ mod tests {
sorted(&ups), sorted(&ups),
sorted(&vec![d1.clone(), d2.clone(), d3.clone()]) sorted(&vec![d1.clone(), d2.clone(), d3.clone()])
); );
let mut crdt2 = Crdt::new(d2.clone()).expect("Crdt::new"); let mut cluster_info2 = ClusterInfo::new(d2.clone()).expect("ClusterInfo::new");
crdt2.apply_updates(key, ix, &ups, &vec![]); cluster_info2.apply_updates(key, ix, &ups, &vec![]);
assert_eq!(crdt2.table.values().len(), 3); assert_eq!(cluster_info2.table.values().len(), 3);
assert_eq!( assert_eq!(
sorted(&crdt2.table.values().map(|x| x.clone()).collect()), sorted(&cluster_info2.table.values().map(|x| x.clone()).collect()),
sorted(&crdt.table.values().map(|x| x.clone()).collect()) sorted(&cluster_info.table.values().map(|x| x.clone()).collect())
); );
let d4 = NodeInfo::new_entry_point(&socketaddr!("127.0.0.4:1234")); let d4 = NodeInfo::new_entry_point(&socketaddr!("127.0.0.4:1234"));
crdt.insert(&d4); cluster_info.insert(&d4);
let (_key, _ix, ups) = crdt.get_updates_since(0); let (_key, _ix, ups) = cluster_info.get_updates_since(0);
assert_eq!(sorted(&ups), sorted(&vec![d2.clone(), d1, d3])); assert_eq!(sorted(&ups), sorted(&vec![d2.clone(), d1, d3]));
} }
#[test] #[test]
fn window_index_request() { fn window_index_request() {
let me = NodeInfo::new_localhost(Keypair::new().pubkey()); let me = NodeInfo::new_localhost(Keypair::new().pubkey());
let mut crdt = Crdt::new(me).expect("Crdt::new"); let mut cluster_info = ClusterInfo::new(me).expect("ClusterInfo::new");
let rv = crdt.window_index_request(0); let rv = cluster_info.window_index_request(0);
assert_matches!(rv, Err(Error::CrdtError(CrdtError::NoPeers))); assert_matches!(rv, Err(Error::ClusterInfoError(ClusterInfoError::NoPeers)));
let ncp = socketaddr!([127, 0, 0, 1], 1234); let ncp = socketaddr!([127, 0, 0, 1], 1234);
let nxt = NodeInfo::new( let nxt = NodeInfo::new(
@ -1551,8 +1561,8 @@ mod tests {
socketaddr!([127, 0, 0, 1], 1237), socketaddr!([127, 0, 0, 1], 1237),
socketaddr!([127, 0, 0, 1], 1238), socketaddr!([127, 0, 0, 1], 1238),
); );
crdt.insert(&nxt); cluster_info.insert(&nxt);
let rv = crdt.window_index_request(0).unwrap(); let rv = cluster_info.window_index_request(0).unwrap();
assert_eq!(nxt.contact_info.ncp, ncp); assert_eq!(nxt.contact_info.ncp, ncp);
assert_eq!(rv.0, nxt.contact_info.ncp); assert_eq!(rv.0, nxt.contact_info.ncp);
@ -1565,12 +1575,12 @@ mod tests {
socketaddr!([127, 0, 0, 1], 1237), socketaddr!([127, 0, 0, 1], 1237),
socketaddr!([127, 0, 0, 1], 1238), socketaddr!([127, 0, 0, 1], 1238),
); );
crdt.insert(&nxt); cluster_info.insert(&nxt);
let mut one = false; let mut one = false;
let mut two = false; let mut two = false;
while !one || !two { while !one || !two {
//this randomly picks an option, so eventually it should pick both //this randomly picks an option, so eventually it should pick both
let rv = crdt.window_index_request(0).unwrap(); let rv = cluster_info.window_index_request(0).unwrap();
if rv.0 == ncp { if rv.0 == ncp {
one = true; one = true;
} }
@ -1592,41 +1602,41 @@ mod tests {
socketaddr!("127.0.0.1:127"), socketaddr!("127.0.0.1:127"),
); );
let mut crdt = Crdt::new(me).expect("Crdt::new"); let mut cluster_info = ClusterInfo::new(me).expect("ClusterInfo::new");
let nxt1 = NodeInfo::new_unspecified(); let nxt1 = NodeInfo::new_unspecified();
// Filter out unspecified addresses // Filter out unspecified addresses
crdt.insert(&nxt1); //<--- attack! cluster_info.insert(&nxt1); //<--- attack!
let rv = crdt.gossip_request(); let rv = cluster_info.gossip_request();
assert_matches!(rv, Err(Error::CrdtError(CrdtError::NoPeers))); assert_matches!(rv, Err(Error::ClusterInfoError(ClusterInfoError::NoPeers)));
let nxt2 = NodeInfo::new_multicast(); let nxt2 = NodeInfo::new_multicast();
// Filter out multicast addresses // Filter out multicast addresses
crdt.insert(&nxt2); //<--- attack! cluster_info.insert(&nxt2); //<--- attack!
let rv = crdt.gossip_request(); let rv = cluster_info.gossip_request();
assert_matches!(rv, Err(Error::CrdtError(CrdtError::NoPeers))); assert_matches!(rv, Err(Error::ClusterInfoError(ClusterInfoError::NoPeers)));
} }
/// test that gossip requests are eventually generated for all nodes /// test that gossip requests are eventually generated for all nodes
#[test] #[test]
fn gossip_request() { fn gossip_request() {
let me = NodeInfo::new_localhost(Keypair::new().pubkey()); let me = NodeInfo::new_localhost(Keypair::new().pubkey());
let mut crdt = Crdt::new(me.clone()).expect("Crdt::new"); let mut cluster_info = ClusterInfo::new(me.clone()).expect("ClusterInfo::new");
let rv = crdt.gossip_request(); let rv = cluster_info.gossip_request();
assert_matches!(rv, Err(Error::CrdtError(CrdtError::NoPeers))); assert_matches!(rv, Err(Error::ClusterInfoError(ClusterInfoError::NoPeers)));
let nxt1 = NodeInfo::new_localhost(Keypair::new().pubkey()); let nxt1 = NodeInfo::new_localhost(Keypair::new().pubkey());
crdt.insert(&nxt1); cluster_info.insert(&nxt1);
let rv = crdt.gossip_request().unwrap(); let rv = cluster_info.gossip_request().unwrap();
assert_eq!(rv.0, nxt1.contact_info.ncp); assert_eq!(rv.0, nxt1.contact_info.ncp);
let nxt2 = NodeInfo::new_entry_point(&socketaddr!("127.0.0.3:1234")); let nxt2 = NodeInfo::new_entry_point(&socketaddr!("127.0.0.3:1234"));
crdt.insert(&nxt2); cluster_info.insert(&nxt2);
// check that the service works // check that the service works
// and that it eventually produces a request for both nodes // and that it eventually produces a request for both nodes
let (sender, reader) = channel(); let (sender, reader) = channel();
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
let obj = Arc::new(RwLock::new(crdt)); let obj = Arc::new(RwLock::new(cluster_info));
let thread = Crdt::gossip(obj, sender, exit.clone()); let thread = ClusterInfo::gossip(obj, sender, exit.clone());
let mut one = false; let mut one = false;
let mut two = false; let mut two = false;
for _ in 0..30 { for _ in 0..30 {
@ -1660,67 +1670,67 @@ mod tests {
fn purge_test() { fn purge_test() {
logger::setup(); logger::setup();
let me = NodeInfo::new_with_socketaddr(&socketaddr!("127.0.0.1:1234")); let me = NodeInfo::new_with_socketaddr(&socketaddr!("127.0.0.1:1234"));
let mut crdt = Crdt::new(me.clone()).expect("Crdt::new"); let mut cluster_info = ClusterInfo::new(me.clone()).expect("ClusterInfo::new");
let nxt = NodeInfo::new_with_socketaddr(&socketaddr!("127.0.0.2:1234")); let nxt = NodeInfo::new_with_socketaddr(&socketaddr!("127.0.0.2:1234"));
assert_ne!(me.id, nxt.id); assert_ne!(me.id, nxt.id);
crdt.set_leader(me.id); cluster_info.set_leader(me.id);
crdt.insert(&nxt); cluster_info.insert(&nxt);
let rv = crdt.gossip_request().unwrap(); let rv = cluster_info.gossip_request().unwrap();
assert_eq!(rv.0, nxt.contact_info.ncp); assert_eq!(rv.0, nxt.contact_info.ncp);
let now = crdt.alive[&nxt.id]; let now = cluster_info.alive[&nxt.id];
crdt.purge(now); cluster_info.purge(now);
let rv = crdt.gossip_request().unwrap(); let rv = cluster_info.gossip_request().unwrap();
assert_eq!(rv.0, nxt.contact_info.ncp); assert_eq!(rv.0, nxt.contact_info.ncp);
crdt.purge(now + GOSSIP_PURGE_MILLIS); cluster_info.purge(now + GOSSIP_PURGE_MILLIS);
let rv = crdt.gossip_request().unwrap(); let rv = cluster_info.gossip_request().unwrap();
assert_eq!(rv.0, nxt.contact_info.ncp); assert_eq!(rv.0, nxt.contact_info.ncp);
crdt.purge(now + GOSSIP_PURGE_MILLIS + 1); cluster_info.purge(now + GOSSIP_PURGE_MILLIS + 1);
let rv = crdt.gossip_request().unwrap(); let rv = cluster_info.gossip_request().unwrap();
assert_eq!(rv.0, nxt.contact_info.ncp); assert_eq!(rv.0, nxt.contact_info.ncp);
let mut nxt2 = NodeInfo::new_with_socketaddr(&socketaddr!("127.0.0.2:1234")); let mut nxt2 = NodeInfo::new_with_socketaddr(&socketaddr!("127.0.0.2:1234"));
assert_ne!(me.id, nxt2.id); assert_ne!(me.id, nxt2.id);
assert_ne!(nxt.id, nxt2.id); assert_ne!(nxt.id, nxt2.id);
crdt.insert(&nxt2); cluster_info.insert(&nxt2);
while now == crdt.alive[&nxt2.id] { while now == cluster_info.alive[&nxt2.id] {
sleep(Duration::from_millis(GOSSIP_SLEEP_MILLIS)); sleep(Duration::from_millis(GOSSIP_SLEEP_MILLIS));
nxt2.version += 1; nxt2.version += 1;
crdt.insert(&nxt2); cluster_info.insert(&nxt2);
} }
let len = crdt.table.len() as u64; let len = cluster_info.table.len() as u64;
assert!((MIN_TABLE_SIZE as u64) < len); assert!((MIN_TABLE_SIZE as u64) < len);
crdt.purge(now + GOSSIP_PURGE_MILLIS); cluster_info.purge(now + GOSSIP_PURGE_MILLIS);
assert_eq!(len as usize, crdt.table.len()); assert_eq!(len as usize, cluster_info.table.len());
trace!("purging"); trace!("purging");
crdt.purge(now + GOSSIP_PURGE_MILLIS + 1); cluster_info.purge(now + GOSSIP_PURGE_MILLIS + 1);
assert_eq!(len as usize - 1, crdt.table.len()); assert_eq!(len as usize - 1, cluster_info.table.len());
let rv = crdt.gossip_request().unwrap(); let rv = cluster_info.gossip_request().unwrap();
assert_eq!(rv.0, nxt.contact_info.ncp); assert_eq!(rv.0, nxt.contact_info.ncp);
} }
#[test] #[test]
fn purge_leader_test() { fn purge_leader_test() {
logger::setup(); logger::setup();
let me = NodeInfo::new_with_socketaddr(&socketaddr!("127.0.0.1:1234")); let me = NodeInfo::new_with_socketaddr(&socketaddr!("127.0.0.1:1234"));
let mut crdt = Crdt::new(me.clone()).expect("Crdt::new"); let mut cluster_info = ClusterInfo::new(me.clone()).expect("ClusterInfo::new");
let nxt = NodeInfo::new_with_socketaddr(&socketaddr!("127.0.0.2:1234")); let nxt = NodeInfo::new_with_socketaddr(&socketaddr!("127.0.0.2:1234"));
assert_ne!(me.id, nxt.id); assert_ne!(me.id, nxt.id);
crdt.insert(&nxt); cluster_info.insert(&nxt);
crdt.set_leader(nxt.id); cluster_info.set_leader(nxt.id);
let now = crdt.alive[&nxt.id]; let now = cluster_info.alive[&nxt.id];
let mut nxt2 = NodeInfo::new_with_socketaddr(&socketaddr!("127.0.0.2:1234")); let mut nxt2 = NodeInfo::new_with_socketaddr(&socketaddr!("127.0.0.2:1234"));
crdt.insert(&nxt2); cluster_info.insert(&nxt2);
while now == crdt.alive[&nxt2.id] { while now == cluster_info.alive[&nxt2.id] {
sleep(Duration::from_millis(GOSSIP_SLEEP_MILLIS)); sleep(Duration::from_millis(GOSSIP_SLEEP_MILLIS));
nxt2.version = nxt2.version + 1; nxt2.version = nxt2.version + 1;
crdt.insert(&nxt2); cluster_info.insert(&nxt2);
} }
let len = crdt.table.len() as u64; let len = cluster_info.table.len() as u64;
crdt.purge(now + GOSSIP_PURGE_MILLIS + 1); cluster_info.purge(now + GOSSIP_PURGE_MILLIS + 1);
assert_eq!(len as usize - 1, crdt.table.len()); assert_eq!(len as usize - 1, cluster_info.table.len());
assert_eq!(crdt.my_data().leader_id, Pubkey::default()); assert_eq!(cluster_info.my_data().leader_id, Pubkey::default());
assert!(crdt.leader_data().is_none()); assert!(cluster_info.leader_data().is_none());
} }
/// test window requests respond with the right blob, and do not overrun /// test window requests respond with the right blob, and do not overrun
@ -1736,18 +1746,21 @@ mod tests {
socketaddr!("127.0.0.1:1237"), socketaddr!("127.0.0.1:1237"),
socketaddr!("127.0.0.1:1238"), socketaddr!("127.0.0.1:1238"),
); );
let rv = Crdt::run_window_request(&me, &socketaddr_any!(), &window, &mut None, &me, 0); let rv =
ClusterInfo::run_window_request(&me, &socketaddr_any!(), &window, &mut None, &me, 0);
assert!(rv.is_none()); assert!(rv.is_none());
let out = SharedBlob::default(); let out = SharedBlob::default();
out.write().unwrap().meta.size = 200; out.write().unwrap().meta.size = 200;
window.write().unwrap()[0].data = Some(out); window.write().unwrap()[0].data = Some(out);
let rv = Crdt::run_window_request(&me, &socketaddr_any!(), &window, &mut None, &me, 0); let rv =
ClusterInfo::run_window_request(&me, &socketaddr_any!(), &window, &mut None, &me, 0);
assert!(rv.is_some()); assert!(rv.is_some());
let v = rv.unwrap(); let v = rv.unwrap();
//test we copied the blob //test we copied the blob
assert_eq!(v.read().unwrap().meta.size, 200); assert_eq!(v.read().unwrap().meta.size, 200);
let len = window.read().unwrap().len() as u64; let len = window.read().unwrap().len() as u64;
let rv = Crdt::run_window_request(&me, &socketaddr_any!(), &window, &mut None, &me, len); let rv =
ClusterInfo::run_window_request(&me, &socketaddr_any!(), &window, &mut None, &me, len);
assert!(rv.is_none()); assert!(rv.is_none());
fn tmp_ledger(name: &str) -> String { fn tmp_ledger(name: &str) -> String {
@ -1769,7 +1782,7 @@ mod tests {
let ledger_path = tmp_ledger("run_window_request"); let ledger_path = tmp_ledger("run_window_request");
let mut ledger_window = LedgerWindow::open(&ledger_path).unwrap(); let mut ledger_window = LedgerWindow::open(&ledger_path).unwrap();
let rv = Crdt::run_window_request( let rv = ClusterInfo::run_window_request(
&me, &me,
&socketaddr_any!(), &socketaddr_any!(),
&window, &window,
@ -1793,8 +1806,14 @@ mod tests {
let mock_peer = NodeInfo::new_with_socketaddr(&socketaddr!("127.0.0.1:1234")); let mock_peer = NodeInfo::new_with_socketaddr(&socketaddr!("127.0.0.1:1234"));
// Simulate handling a repair request from mock_peer // Simulate handling a repair request from mock_peer
let rv = let rv = ClusterInfo::run_window_request(
Crdt::run_window_request(&mock_peer, &socketaddr_any!(), &window, &mut None, &me, 0); &mock_peer,
&socketaddr_any!(),
&window,
&mut None,
&me,
0,
);
assert!(rv.is_none()); assert!(rv.is_none());
let blob = SharedBlob::default(); let blob = SharedBlob::default();
let blob_size = 200; let blob_size = 200;
@ -1803,7 +1822,7 @@ mod tests {
let num_requests: u32 = 64; let num_requests: u32 = 64;
for i in 0..num_requests { for i in 0..num_requests {
let shared_blob = Crdt::run_window_request( let shared_blob = ClusterInfo::run_window_request(
&mock_peer, &mock_peer,
&socketaddr_any!(), &socketaddr_any!(),
&window, &window,
@ -1831,23 +1850,23 @@ mod tests {
let me = NodeInfo::new_with_socketaddr(&socketaddr!("127.0.0.1:1234")); let me = NodeInfo::new_with_socketaddr(&socketaddr!("127.0.0.1:1234"));
let leader0 = NodeInfo::new_with_socketaddr(&socketaddr!("127.0.0.1:1234")); let leader0 = NodeInfo::new_with_socketaddr(&socketaddr!("127.0.0.1:1234"));
let leader1 = NodeInfo::new_with_socketaddr(&socketaddr!("127.0.0.1:1234")); let leader1 = NodeInfo::new_with_socketaddr(&socketaddr!("127.0.0.1:1234"));
let mut crdt = Crdt::new(me.clone()).expect("Crdt::new"); let mut cluster_info = ClusterInfo::new(me.clone()).expect("ClusterInfo::new");
assert_eq!(crdt.top_leader(), None); assert_eq!(cluster_info.top_leader(), None);
crdt.set_leader(leader0.id); cluster_info.set_leader(leader0.id);
assert_eq!(crdt.top_leader().unwrap(), leader0.id); assert_eq!(cluster_info.top_leader().unwrap(), leader0.id);
//add a bunch of nodes with a new leader //add a bunch of nodes with a new leader
for _ in 0..10 { for _ in 0..10 {
let mut dum = NodeInfo::new_entry_point(&socketaddr!("127.0.0.1:1234")); let mut dum = NodeInfo::new_entry_point(&socketaddr!("127.0.0.1:1234"));
dum.id = Keypair::new().pubkey(); dum.id = Keypair::new().pubkey();
dum.leader_id = leader1.id; dum.leader_id = leader1.id;
crdt.insert(&dum); cluster_info.insert(&dum);
} }
assert_eq!(crdt.top_leader().unwrap(), leader1.id); assert_eq!(cluster_info.top_leader().unwrap(), leader1.id);
crdt.update_leader(); cluster_info.update_leader();
assert_eq!(crdt.my_data().leader_id, leader0.id); assert_eq!(cluster_info.my_data().leader_id, leader0.id);
crdt.insert(&leader1); cluster_info.insert(&leader1);
crdt.update_leader(); cluster_info.update_leader();
assert_eq!(crdt.my_data().leader_id, leader1.id); assert_eq!(cluster_info.my_data().leader_id, leader1.id);
} }
#[test] #[test]
@ -1870,11 +1889,14 @@ mod tests {
socketaddr_any!(), socketaddr_any!(),
); );
leader3.ledger_state.last_id = hash(b"3"); leader3.ledger_state.last_id = hash(b"3");
let mut crdt = Crdt::new(leader0.clone()).expect("Crdt::new"); let mut cluster_info = ClusterInfo::new(leader0.clone()).expect("ClusterInfo::new");
crdt.insert(&leader1); cluster_info.insert(&leader1);
crdt.insert(&leader2); cluster_info.insert(&leader2);
crdt.insert(&leader3); cluster_info.insert(&leader3);
assert_eq!(crdt.valid_last_ids(), vec![leader0.ledger_state.last_id]); assert_eq!(
cluster_info.valid_last_ids(),
vec![leader0.ledger_state.last_id]
);
} }
/// Validates the node that sent Protocol::ReceiveUpdates gets its /// Validates the node that sent Protocol::ReceiveUpdates gets its
@ -1890,25 +1912,25 @@ mod tests {
assert_ne!(node.id, node_with_same_addr.id); assert_ne!(node.id, node_with_same_addr.id);
let node_with_diff_addr = NodeInfo::new_with_socketaddr(&socketaddr!("127.0.0.1:4321")); let node_with_diff_addr = NodeInfo::new_with_socketaddr(&socketaddr!("127.0.0.1:4321"));
let crdt = Crdt::new(node.clone()).expect("Crdt::new"); let cluster_info = ClusterInfo::new(node.clone()).expect("ClusterInfo::new");
assert_eq!(crdt.alive.len(), 0); assert_eq!(cluster_info.alive.len(), 0);
let obj = Arc::new(RwLock::new(crdt)); let obj = Arc::new(RwLock::new(cluster_info));
let request = Protocol::RequestUpdates(1, node.clone()); let request = Protocol::RequestUpdates(1, node.clone());
assert!( assert!(
Crdt::handle_protocol(&obj, &node.contact_info.ncp, request, &window, &mut None,) ClusterInfo::handle_protocol(&obj, &node.contact_info.ncp, request, &window, &mut None,)
.is_none() .is_none()
); );
let request = Protocol::RequestUpdates(1, node_with_same_addr.clone()); let request = Protocol::RequestUpdates(1, node_with_same_addr.clone());
assert!( assert!(
Crdt::handle_protocol(&obj, &node.contact_info.ncp, request, &window, &mut None,) ClusterInfo::handle_protocol(&obj, &node.contact_info.ncp, request, &window, &mut None,)
.is_none() .is_none()
); );
let request = Protocol::RequestUpdates(1, node_with_diff_addr.clone()); let request = Protocol::RequestUpdates(1, node_with_diff_addr.clone());
Crdt::handle_protocol(&obj, &node.contact_info.ncp, request, &window, &mut None); ClusterInfo::handle_protocol(&obj, &node.contact_info.ncp, request, &window, &mut None);
let me = obj.write().unwrap(); let me = obj.write().unwrap();
@ -1924,24 +1946,24 @@ mod tests {
fn test_is_valid_address() { fn test_is_valid_address() {
assert!(cfg!(test)); assert!(cfg!(test));
let bad_address_port = socketaddr!("127.0.0.1:0"); let bad_address_port = socketaddr!("127.0.0.1:0");
assert!(!Crdt::is_valid_address(&bad_address_port)); assert!(!ClusterInfo::is_valid_address(&bad_address_port));
let bad_address_unspecified = socketaddr!(0, 1234); let bad_address_unspecified = socketaddr!(0, 1234);
assert!(!Crdt::is_valid_address(&bad_address_unspecified)); assert!(!ClusterInfo::is_valid_address(&bad_address_unspecified));
let bad_address_multicast = socketaddr!([224, 254, 0, 0], 1234); let bad_address_multicast = socketaddr!([224, 254, 0, 0], 1234);
assert!(!Crdt::is_valid_address(&bad_address_multicast)); assert!(!ClusterInfo::is_valid_address(&bad_address_multicast));
let loopback = socketaddr!("127.0.0.1:1234"); let loopback = socketaddr!("127.0.0.1:1234");
assert!(Crdt::is_valid_address(&loopback)); assert!(ClusterInfo::is_valid_address(&loopback));
// assert!(!Crdt::is_valid_ip_internal(loopback.ip(), false)); // assert!(!ClusterInfo::is_valid_ip_internal(loopback.ip(), false));
} }
#[test] #[test]
fn test_default_leader() { fn test_default_leader() {
logger::setup(); logger::setup();
let node_info = NodeInfo::new_localhost(Keypair::new().pubkey()); let node_info = NodeInfo::new_localhost(Keypair::new().pubkey());
let mut crdt = Crdt::new(node_info).unwrap(); let mut cluster_info = ClusterInfo::new(node_info).unwrap();
let network_entry_point = NodeInfo::new_entry_point(&socketaddr!("127.0.0.1:1239")); let network_entry_point = NodeInfo::new_entry_point(&socketaddr!("127.0.0.1:1239"));
crdt.insert(&network_entry_point); cluster_info.insert(&network_entry_point);
assert!(crdt.leader_data().is_none()); assert!(cluster_info.leader_data().is_none());
} }
#[test] #[test]

View File

@ -224,7 +224,7 @@ pub fn run_local_drone(mint_keypair: Keypair, network: SocketAddr, sender: Sende
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use bank::Bank; use bank::Bank;
use crdt::Node; use cluster_info::Node;
use drone::{Drone, DroneRequest, REQUEST_CAP, TIME_SLICE}; use drone::{Drone, DroneRequest, REQUEST_CAP, TIME_SLICE};
use fullnode::Fullnode; use fullnode::Fullnode;
use logger; use logger;

View File

@ -591,7 +591,7 @@ pub fn recover(id: &Pubkey, window: &mut [WindowSlot], start_idx: u64, start: us
#[cfg(test)] #[cfg(test)]
mod test { mod test {
use crdt; use cluster_info;
use erasure; use erasure;
use logger; use logger;
use packet::{SharedBlob, BLOB_DATA_SIZE, BLOB_HEADER_SIZE, BLOB_SIZE}; use packet::{SharedBlob, BLOB_DATA_SIZE, BLOB_HEADER_SIZE, BLOB_SIZE};
@ -726,7 +726,7 @@ mod test {
blobs.push(b_); blobs.push(b_);
} }
let d = crdt::NodeInfo::new_localhost(Keypair::new().pubkey()); let d = cluster_info::NodeInfo::new_localhost(Keypair::new().pubkey());
assert!(index_blobs(&d, &blobs, &mut (offset as u64)).is_ok()); assert!(index_blobs(&d, &blobs, &mut (offset as u64)).is_ok());
for b in blobs { for b in blobs {
let idx = b.read().unwrap().get_index().unwrap() as usize % WINDOW_SIZE; let idx = b.read().unwrap().get_index().unwrap() as usize % WINDOW_SIZE;

View File

@ -2,7 +2,7 @@
use bank::Bank; use bank::Bank;
use broadcast_stage::BroadcastStage; use broadcast_stage::BroadcastStage;
use crdt::{Crdt, Node, NodeInfo}; use cluster_info::{ClusterInfo, Node, NodeInfo};
use drone::DRONE_PORT; use drone::DRONE_PORT;
use entry::Entry; use entry::Entry;
use ledger::read_ledger; use ledger::read_ledger;
@ -80,7 +80,7 @@ pub struct Fullnode {
rpc_service: JsonRpcService, rpc_service: JsonRpcService,
ncp: Ncp, ncp: Ncp,
bank: Arc<Bank>, bank: Arc<Bank>,
crdt: Arc<RwLock<Crdt>>, cluster_info: Arc<RwLock<ClusterInfo>>,
ledger_path: String, ledger_path: String,
sigverify_disabled: bool, sigverify_disabled: bool,
shared_window: window::SharedWindow, shared_window: window::SharedWindow,
@ -270,14 +270,14 @@ impl Fullnode {
let window = window::new_window_from_entries(ledger_tail, entry_height, &node.info); let window = window::new_window_from_entries(ledger_tail, entry_height, &node.info);
let shared_window = Arc::new(RwLock::new(window)); let shared_window = Arc::new(RwLock::new(window));
let mut crdt = Crdt::new(node.info).expect("Crdt::new"); let mut cluster_info = ClusterInfo::new(node.info).expect("ClusterInfo::new");
if let Some(interval) = leader_rotation_interval { if let Some(interval) = leader_rotation_interval {
crdt.set_leader_rotation_interval(interval); cluster_info.set_leader_rotation_interval(interval);
} }
let crdt = Arc::new(RwLock::new(crdt)); let cluster_info = Arc::new(RwLock::new(cluster_info));
let ncp = Ncp::new( let ncp = Ncp::new(
&crdt, &cluster_info,
shared_window.clone(), shared_window.clone(),
Some(ledger_path), Some(ledger_path),
node.sockets.gossip, node.sockets.gossip,
@ -289,13 +289,13 @@ impl Fullnode {
match leader_info { match leader_info {
Some(leader_info) => { Some(leader_info) => {
// Start in validator mode. // Start in validator mode.
// TODO: let Crdt get that data from the network? // TODO: let ClusterInfo get that data from the network?
crdt.write().unwrap().insert(leader_info); cluster_info.write().unwrap().insert(leader_info);
let tvu = Tvu::new( let tvu = Tvu::new(
keypair.clone(), keypair.clone(),
&bank, &bank,
entry_height, entry_height,
crdt.clone(), cluster_info.clone(),
shared_window.clone(), shared_window.clone(),
node.sockets node.sockets
.replicate .replicate
@ -320,7 +320,7 @@ impl Fullnode {
let (tpu, entry_receiver, tpu_exit) = Tpu::new( let (tpu, entry_receiver, tpu_exit) = Tpu::new(
keypair.clone(), keypair.clone(),
&bank, &bank,
&crdt, &cluster_info,
Default::default(), Default::default(),
node.sockets node.sockets
.transaction .transaction
@ -337,7 +337,7 @@ impl Fullnode {
.broadcast .broadcast
.try_clone() .try_clone()
.expect("Failed to clone broadcast socket"), .expect("Failed to clone broadcast socket"),
crdt.clone(), cluster_info.clone(),
shared_window.clone(), shared_window.clone(),
entry_height, entry_height,
entry_receiver, entry_receiver,
@ -350,7 +350,7 @@ impl Fullnode {
Fullnode { Fullnode {
keypair, keypair,
crdt, cluster_info,
shared_window, shared_window,
bank, bank,
sigverify_disabled, sigverify_disabled,
@ -377,13 +377,13 @@ impl Fullnode {
self.bank = Arc::new(bank); self.bank = Arc::new(bank);
{ {
let mut wcrdt = self.crdt.write().unwrap(); let mut wcluster_info = self.cluster_info.write().unwrap();
let scheduled_leader = wcrdt.get_scheduled_leader(entry_height); let scheduled_leader = wcluster_info.get_scheduled_leader(entry_height);
match scheduled_leader { match scheduled_leader {
//TODO: Handle the case where we don't know who the next //TODO: Handle the case where we don't know who the next
//scheduled leader is //scheduled leader is
None => (), None => (),
Some(leader_id) => wcrdt.set_leader(leader_id), Some(leader_id) => wcluster_info.set_leader(leader_id),
} }
} }
@ -407,7 +407,7 @@ impl Fullnode {
self.keypair.clone(), self.keypair.clone(),
&self.bank, &self.bank,
entry_height, entry_height,
self.crdt.clone(), self.cluster_info.clone(),
self.shared_window.clone(), self.shared_window.clone(),
self.replicate_socket self.replicate_socket
.iter() .iter()
@ -427,11 +427,14 @@ impl Fullnode {
} }
fn validator_to_leader(&mut self, entry_height: u64) { fn validator_to_leader(&mut self, entry_height: u64) {
self.crdt.write().unwrap().set_leader(self.keypair.pubkey()); self.cluster_info
.write()
.unwrap()
.set_leader(self.keypair.pubkey());
let (tpu, blob_receiver, tpu_exit) = Tpu::new( let (tpu, blob_receiver, tpu_exit) = Tpu::new(
self.keypair.clone(), self.keypair.clone(),
&self.bank, &self.bank,
&self.crdt, &self.cluster_info,
Default::default(), Default::default(),
self.transaction_sockets self.transaction_sockets
.iter() .iter()
@ -446,7 +449,7 @@ impl Fullnode {
self.broadcast_socket self.broadcast_socket
.try_clone() .try_clone()
.expect("Failed to clone broadcast socket"), .expect("Failed to clone broadcast socket"),
self.crdt.clone(), self.cluster_info.clone(),
self.shared_window.clone(), self.shared_window.clone(),
entry_height, entry_height,
blob_receiver, blob_receiver,
@ -498,7 +501,7 @@ impl Fullnode {
// TODO: only used for testing, get rid of this once we have actual // TODO: only used for testing, get rid of this once we have actual
// leader scheduling // leader scheduling
pub fn set_scheduled_leader(&self, leader_id: Pubkey, entry_height: u64) { pub fn set_scheduled_leader(&self, leader_id: Pubkey, entry_height: u64) {
self.crdt self.cluster_info
.write() .write()
.unwrap() .unwrap()
.set_scheduled_leader(entry_height, leader_id); .set_scheduled_leader(entry_height, leader_id);
@ -549,7 +552,7 @@ impl Service for Fullnode {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use bank::Bank; use bank::Bank;
use crdt::Node; use cluster_info::Node;
use fullnode::{Fullnode, NodeRole, TvuReturnType}; use fullnode::{Fullnode, NodeRole, TvuReturnType};
use ledger::genesis; use ledger::genesis;
use packet::make_consecutive_blobs; use packet::make_consecutive_blobs;

View File

@ -21,7 +21,7 @@ pub mod chacha;
pub mod choose_gossip_peer_strategy; pub mod choose_gossip_peer_strategy;
pub mod client; pub mod client;
#[macro_use] #[macro_use]
pub mod crdt; pub mod cluster_info;
pub mod bpf_verifier; pub mod bpf_verifier;
pub mod budget_program; pub mod budget_program;
pub mod drone; pub mod drone;

View File

@ -1,6 +1,6 @@
//! The `ncp` module implements the network control plane. //! The `ncp` module implements the network control plane.
use crdt::Crdt; use cluster_info::ClusterInfo;
use service::Service; use service::Service;
use std::net::UdpSocket; use std::net::UdpSocket;
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
@ -17,7 +17,7 @@ pub struct Ncp {
impl Ncp { impl Ncp {
pub fn new( pub fn new(
crdt: &Arc<RwLock<Crdt>>, cluster_info: &Arc<RwLock<ClusterInfo>>,
window: SharedWindow, window: SharedWindow,
ledger_path: Option<&str>, ledger_path: Option<&str>,
gossip_socket: UdpSocket, gossip_socket: UdpSocket,
@ -27,22 +27,22 @@ impl Ncp {
let gossip_socket = Arc::new(gossip_socket); let gossip_socket = Arc::new(gossip_socket);
trace!( trace!(
"Ncp: id: {:?}, listening on: {:?}", "Ncp: id: {:?}, listening on: {:?}",
&crdt.read().unwrap().id.as_ref()[..4], &cluster_info.read().unwrap().id.as_ref()[..4],
gossip_socket.local_addr().unwrap() gossip_socket.local_addr().unwrap()
); );
let t_receiver = let t_receiver =
streamer::blob_receiver(gossip_socket.clone(), exit.clone(), request_sender); streamer::blob_receiver(gossip_socket.clone(), exit.clone(), request_sender);
let (response_sender, response_receiver) = channel(); let (response_sender, response_receiver) = channel();
let t_responder = streamer::responder("ncp", gossip_socket, response_receiver); let t_responder = streamer::responder("ncp", gossip_socket, response_receiver);
let t_listen = Crdt::listen( let t_listen = ClusterInfo::listen(
crdt.clone(), cluster_info.clone(),
window, window,
ledger_path, ledger_path,
request_receiver, request_receiver,
response_sender.clone(), response_sender.clone(),
exit.clone(), exit.clone(),
); );
let t_gossip = Crdt::gossip(crdt.clone(), response_sender, exit.clone()); let t_gossip = ClusterInfo::gossip(cluster_info.clone(), response_sender, exit.clone());
let thread_hdls = vec![t_receiver, t_responder, t_listen, t_gossip]; let thread_hdls = vec![t_receiver, t_responder, t_listen, t_gossip];
Ncp { exit, thread_hdls } Ncp { exit, thread_hdls }
} }
@ -66,7 +66,7 @@ impl Service for Ncp {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use crdt::{Crdt, Node}; use cluster_info::{ClusterInfo, Node};
use ncp::Ncp; use ncp::Ncp;
use std::sync::atomic::AtomicBool; use std::sync::atomic::AtomicBool;
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
@ -77,8 +77,8 @@ mod tests {
fn test_exit() { fn test_exit() {
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
let tn = Node::new_localhost(); let tn = Node::new_localhost();
let crdt = Crdt::new(tn.info.clone()).expect("Crdt::new"); let cluster_info = ClusterInfo::new(tn.info.clone()).expect("ClusterInfo::new");
let c = Arc::new(RwLock::new(crdt)); let c = Arc::new(RwLock::new(cluster_info));
let w = Arc::new(RwLock::new(vec![])); let w = Arc::new(RwLock::new(vec![]));
let d = Ncp::new(&c, w, None, tn.sockets.gossip, exit.clone()); let d = Ncp::new(&c, w, None, tn.sockets.gossip, exit.clone());
d.close().expect("thread join"); d.close().expect("thread join");

View File

@ -1,8 +1,8 @@
//! The `replicate_stage` replicates transactions broadcast by the leader. //! The `replicate_stage` replicates transactions broadcast by the leader.
use bank::Bank; use bank::Bank;
use cluster_info::ClusterInfo;
use counter::Counter; use counter::Counter;
use crdt::Crdt;
use entry::EntryReceiver; use entry::EntryReceiver;
use ledger::{Block, LedgerWriter}; use ledger::{Block, LedgerWriter};
use log::Level; use log::Level;
@ -46,7 +46,7 @@ impl ReplicateStage {
/// Process entry blobs, already in order /// Process entry blobs, already in order
fn replicate_requests( fn replicate_requests(
bank: &Arc<Bank>, bank: &Arc<Bank>,
crdt: &Arc<RwLock<Crdt>>, cluster_info: &Arc<RwLock<ClusterInfo>>,
window_receiver: &EntryReceiver, window_receiver: &EntryReceiver,
ledger_writer: Option<&mut LedgerWriter>, ledger_writer: Option<&mut LedgerWriter>,
keypair: &Arc<Keypair>, keypair: &Arc<Keypair>,
@ -62,12 +62,12 @@ impl ReplicateStage {
let res = bank.process_entries(&entries); let res = bank.process_entries(&entries);
if let Some(sender) = vote_blob_sender { if let Some(sender) = vote_blob_sender {
send_validator_vote(bank, keypair, crdt, sender)?; send_validator_vote(bank, keypair, cluster_info, sender)?;
} }
{ {
let mut wcrdt = crdt.write().unwrap(); let mut wcluster_info = cluster_info.write().unwrap();
wcrdt.insert_votes(&entries.votes()); wcluster_info.insert_votes(&entries.votes());
} }
inc_new_counter_info!( inc_new_counter_info!(
@ -87,7 +87,7 @@ impl ReplicateStage {
pub fn new( pub fn new(
keypair: Arc<Keypair>, keypair: Arc<Keypair>,
bank: Arc<Bank>, bank: Arc<Bank>,
crdt: Arc<RwLock<Crdt>>, cluster_info: Arc<RwLock<ClusterInfo>>,
window_receiver: EntryReceiver, window_receiver: EntryReceiver,
ledger_path: Option<&str>, ledger_path: Option<&str>,
exit: Arc<AtomicBool>, exit: Arc<AtomicBool>,
@ -116,7 +116,7 @@ impl ReplicateStage {
if let Err(e) = Self::replicate_requests( if let Err(e) = Self::replicate_requests(
&bank, &bank,
&crdt, &cluster_info,
&window_receiver, &window_receiver,
ledger_writer.as_mut(), ledger_writer.as_mut(),
&keypair, &keypair,

View File

@ -1,5 +1,5 @@
use blob_fetch_stage::BlobFetchStage; use blob_fetch_stage::BlobFetchStage;
use crdt::{Crdt, Node, NodeInfo}; use cluster_info::{ClusterInfo, Node, NodeInfo};
use hash::{Hash, Hasher}; use hash::{Hash, Hasher};
use ncp::Ncp; use ncp::Ncp;
use service::Service; use service::Service;
@ -77,12 +77,14 @@ impl Replicator {
let window = window::new_window_from_entries(&[], entry_height, &node.info); let window = window::new_window_from_entries(&[], entry_height, &node.info);
let shared_window = Arc::new(RwLock::new(window)); let shared_window = Arc::new(RwLock::new(window));
let crdt = Arc::new(RwLock::new(Crdt::new(node.info).expect("Crdt::new"))); let cluster_info = Arc::new(RwLock::new(
ClusterInfo::new(node.info).expect("ClusterInfo::new"),
));
let leader_info = network_addr.map(|i| NodeInfo::new_entry_point(&i)); let leader_info = network_addr.map(|i| NodeInfo::new_entry_point(&i));
if let Some(leader_info) = leader_info.as_ref() { if let Some(leader_info) = leader_info.as_ref() {
crdt.write().unwrap().insert(leader_info); cluster_info.write().unwrap().insert(leader_info);
} else { } else {
panic!("No leader info!"); panic!("No leader info!");
} }
@ -98,7 +100,7 @@ impl Replicator {
// todo: pull blobs off the retransmit_receiver and recycle them? // todo: pull blobs off the retransmit_receiver and recycle them?
let (retransmit_sender, retransmit_receiver) = channel(); let (retransmit_sender, retransmit_receiver) = channel();
let t_window = window_service( let t_window = window_service(
crdt.clone(), cluster_info.clone(),
shared_window.clone(), shared_window.clone(),
entry_height, entry_height,
max_entry_height, max_entry_height,
@ -112,7 +114,7 @@ impl Replicator {
let store_ledger_stage = StoreLedgerStage::new(entry_window_receiver, ledger_path); let store_ledger_stage = StoreLedgerStage::new(entry_window_receiver, ledger_path);
let ncp = Ncp::new( let ncp = Ncp::new(
&crdt, &cluster_info,
shared_window.clone(), shared_window.clone(),
ledger_path, ledger_path,
node.sockets.gossip, node.sockets.gossip,
@ -147,7 +149,7 @@ impl Replicator {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use client::mk_client; use client::mk_client;
use crdt::Node; use cluster_info::Node;
use fullnode::Fullnode; use fullnode::Fullnode;
use hash::Hash; use hash::Hash;
use ledger::{genesis, read_ledger, tmp_ledger_path}; use ledger::{genesis, read_ledger, tmp_ledger_path};

View File

@ -2,7 +2,7 @@
use bank; use bank;
use bincode; use bincode;
use crdt; use cluster_info;
#[cfg(feature = "erasure")] #[cfg(feature = "erasure")]
use erasure; use erasure;
use packet; use packet;
@ -20,7 +20,7 @@ pub enum Error {
RecvTimeoutError(std::sync::mpsc::RecvTimeoutError), RecvTimeoutError(std::sync::mpsc::RecvTimeoutError),
Serialize(std::boxed::Box<bincode::ErrorKind>), Serialize(std::boxed::Box<bincode::ErrorKind>),
BankError(bank::BankError), BankError(bank::BankError),
CrdtError(crdt::CrdtError), ClusterInfoError(cluster_info::ClusterInfoError),
BlobError(packet::BlobError), BlobError(packet::BlobError),
#[cfg(feature = "erasure")] #[cfg(feature = "erasure")]
ErasureError(erasure::ErasureError), ErasureError(erasure::ErasureError),
@ -52,9 +52,9 @@ impl std::convert::From<bank::BankError> for Error {
Error::BankError(e) Error::BankError(e)
} }
} }
impl std::convert::From<crdt::CrdtError> for Error { impl std::convert::From<cluster_info::ClusterInfoError> for Error {
fn from(e: crdt::CrdtError) -> Error { fn from(e: cluster_info::ClusterInfoError) -> Error {
Error::CrdtError(e) Error::ClusterInfoError(e)
} }
} }
#[cfg(feature = "erasure")] #[cfg(feature = "erasure")]

View File

@ -1,7 +1,7 @@
//! The `retransmit_stage` retransmits blobs between validators //! The `retransmit_stage` retransmits blobs between validators
use cluster_info::ClusterInfo;
use counter::Counter; use counter::Counter;
use crdt::Crdt;
use entry::Entry; use entry::Entry;
use log::Level; use log::Level;
use result::{Error, Result}; use result::{Error, Result};
@ -22,33 +22,41 @@ pub enum RetransmitStageReturnType {
LeaderRotation(u64), LeaderRotation(u64),
} }
fn retransmit(crdt: &Arc<RwLock<Crdt>>, r: &BlobReceiver, sock: &UdpSocket) -> Result<()> { fn retransmit(
cluster_info: &Arc<RwLock<ClusterInfo>>,
r: &BlobReceiver,
sock: &UdpSocket,
) -> Result<()> {
let timer = Duration::new(1, 0); let timer = Duration::new(1, 0);
let mut dq = r.recv_timeout(timer)?; let mut dq = r.recv_timeout(timer)?;
while let Ok(mut nq) = r.try_recv() { while let Ok(mut nq) = r.try_recv() {
dq.append(&mut nq); dq.append(&mut nq);
} }
for b in &mut dq { for b in &mut dq {
Crdt::retransmit(&crdt, b, sock)?; ClusterInfo::retransmit(&cluster_info, b, sock)?;
} }
Ok(()) Ok(())
} }
/// Service to retransmit messages from the leader to layer 1 nodes. /// Service to retransmit messages from the leader to layer 1 nodes.
/// See `crdt` for network layer definitions. /// See `cluster_info` for network layer definitions.
/// # Arguments /// # Arguments
/// * `sock` - Socket to read from. Read timeout is set to 1. /// * `sock` - Socket to read from. Read timeout is set to 1.
/// * `exit` - Boolean to signal system exit. /// * `exit` - Boolean to signal system exit.
/// * `crdt` - This structure needs to be updated and populated by the bank and via gossip. /// * `cluster_info` - This structure needs to be updated and populated by the bank and via gossip.
/// * `recycler` - Blob recycler. /// * `recycler` - Blob recycler.
/// * `r` - Receive channel for blobs to be retransmitted to all the layer 1 nodes. /// * `r` - Receive channel for blobs to be retransmitted to all the layer 1 nodes.
fn retransmitter(sock: Arc<UdpSocket>, crdt: Arc<RwLock<Crdt>>, r: BlobReceiver) -> JoinHandle<()> { fn retransmitter(
sock: Arc<UdpSocket>,
cluster_info: Arc<RwLock<ClusterInfo>>,
r: BlobReceiver,
) -> JoinHandle<()> {
Builder::new() Builder::new()
.name("solana-retransmitter".to_string()) .name("solana-retransmitter".to_string())
.spawn(move || { .spawn(move || {
trace!("retransmitter started"); trace!("retransmitter started");
loop { loop {
if let Err(e) = retransmit(&crdt, &r, &sock) { if let Err(e) = retransmit(&cluster_info, &r, &sock) {
match e { match e {
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break, Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (), Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
@ -69,7 +77,7 @@ pub struct RetransmitStage {
impl RetransmitStage { impl RetransmitStage {
pub fn new( pub fn new(
crdt: &Arc<RwLock<Crdt>>, cluster_info: &Arc<RwLock<ClusterInfo>>,
window: SharedWindow, window: SharedWindow,
entry_height: u64, entry_height: u64,
retransmit_socket: Arc<UdpSocket>, retransmit_socket: Arc<UdpSocket>,
@ -78,11 +86,12 @@ impl RetransmitStage {
) -> (Self, Receiver<Vec<Entry>>) { ) -> (Self, Receiver<Vec<Entry>>) {
let (retransmit_sender, retransmit_receiver) = channel(); let (retransmit_sender, retransmit_receiver) = channel();
let t_retransmit = retransmitter(retransmit_socket, crdt.clone(), retransmit_receiver); let t_retransmit =
retransmitter(retransmit_socket, cluster_info.clone(), retransmit_receiver);
let (entry_sender, entry_receiver) = channel(); let (entry_sender, entry_receiver) = channel();
let done = Arc::new(AtomicBool::new(false)); let done = Arc::new(AtomicBool::new(false));
let t_window = window_service( let t_window = window_service(
crdt.clone(), cluster_info.clone(),
window, window,
entry_height, entry_height,
0, 0,

View File

@ -5,7 +5,7 @@
use bank::Bank; use bank::Bank;
use bincode::{deserialize, serialize}; use bincode::{deserialize, serialize};
use crdt::{Crdt, CrdtError, NodeInfo}; use cluster_info::{ClusterInfo, ClusterInfoError, NodeInfo};
use hash::Hash; use hash::Hash;
use log::Level; use log::Level;
use ncp::Ncp; use ncp::Ncp;
@ -373,14 +373,22 @@ impl Drop for ThinClient {
pub fn poll_gossip_for_leader(leader_ncp: SocketAddr, timeout: Option<u64>) -> Result<NodeInfo> { pub fn poll_gossip_for_leader(leader_ncp: SocketAddr, timeout: Option<u64>) -> Result<NodeInfo> {
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
let (node, gossip_socket) = Crdt::spy_node(); let (node, gossip_socket) = ClusterInfo::spy_node();
let my_addr = gossip_socket.local_addr().unwrap(); let my_addr = gossip_socket.local_addr().unwrap();
let crdt = Arc::new(RwLock::new(Crdt::new(node).expect("Crdt::new"))); let cluster_info = Arc::new(RwLock::new(
ClusterInfo::new(node).expect("ClusterInfo::new"),
));
let window = Arc::new(RwLock::new(vec![])); let window = Arc::new(RwLock::new(vec![]));
let ncp = Ncp::new(&crdt.clone(), window, None, gossip_socket, exit.clone()); let ncp = Ncp::new(
&cluster_info.clone(),
window,
None,
gossip_socket,
exit.clone(),
);
let leader_entry_point = NodeInfo::new_entry_point(&leader_ncp); let leader_entry_point = NodeInfo::new_entry_point(&leader_ncp);
crdt.write().unwrap().insert(&leader_entry_point); cluster_info.write().unwrap().insert(&leader_entry_point);
sleep(Duration::from_millis(100)); sleep(Duration::from_millis(100));
@ -395,17 +403,17 @@ pub fn poll_gossip_for_leader(leader_ncp: SocketAddr, timeout: Option<u64>) -> R
loop { loop {
trace!("polling {:?} for leader from {:?}", leader_ncp, my_addr); trace!("polling {:?} for leader from {:?}", leader_ncp, my_addr);
if let Some(l) = crdt.read().unwrap().leader_data() { if let Some(l) = cluster_info.read().unwrap().leader_data() {
leader = Some(l.clone()); leader = Some(l.clone());
break; break;
} }
if log_enabled!(Level::Trace) { if log_enabled!(Level::Trace) {
trace!("{}", crdt.read().unwrap().node_info_trace()); trace!("{}", cluster_info.read().unwrap().node_info_trace());
} }
if now.elapsed() > deadline { if now.elapsed() > deadline {
return Err(Error::CrdtError(CrdtError::NoLeader)); return Err(Error::ClusterInfoError(ClusterInfoError::NoLeader));
} }
sleep(Duration::from_millis(100)); sleep(Duration::from_millis(100));
@ -414,7 +422,7 @@ pub fn poll_gossip_for_leader(leader_ncp: SocketAddr, timeout: Option<u64>) -> R
ncp.close()?; ncp.close()?;
if log_enabled!(Level::Trace) { if log_enabled!(Level::Trace) {
trace!("{}", crdt.read().unwrap().node_info_trace()); trace!("{}", cluster_info.read().unwrap().node_info_trace());
} }
Ok(leader.unwrap().clone()) Ok(leader.unwrap().clone())
@ -424,7 +432,7 @@ pub fn poll_gossip_for_leader(leader_ncp: SocketAddr, timeout: Option<u64>) -> R
mod tests { mod tests {
use super::*; use super::*;
use bank::Bank; use bank::Bank;
use crdt::Node; use cluster_info::Node;
use fullnode::Fullnode; use fullnode::Fullnode;
use ledger::LedgerWriter; use ledger::LedgerWriter;
use logger; use logger;

View File

@ -27,7 +27,7 @@
use bank::Bank; use bank::Bank;
use banking_stage::{BankingStage, Config}; use banking_stage::{BankingStage, Config};
use crdt::Crdt; use cluster_info::ClusterInfo;
use entry::Entry; use entry::Entry;
use fetch_stage::FetchStage; use fetch_stage::FetchStage;
use service::Service; use service::Service;
@ -56,7 +56,7 @@ impl Tpu {
pub fn new( pub fn new(
keypair: Arc<Keypair>, keypair: Arc<Keypair>,
bank: &Arc<Bank>, bank: &Arc<Bank>,
crdt: &Arc<RwLock<Crdt>>, cluster_info: &Arc<RwLock<ClusterInfo>>,
tick_duration: Config, tick_duration: Config,
transactions_sockets: Vec<UdpSocket>, transactions_sockets: Vec<UdpSocket>,
ledger_path: &str, ledger_path: &str,
@ -76,7 +76,7 @@ impl Tpu {
let (write_stage, entry_forwarder) = WriteStage::new( let (write_stage, entry_forwarder) = WriteStage::new(
keypair, keypair,
bank.clone(), bank.clone(),
crdt.clone(), cluster_info.clone(),
ledger_path, ledger_path,
entry_receiver, entry_receiver,
entry_height, entry_height,

View File

@ -38,7 +38,7 @@
use bank::Bank; use bank::Bank;
use blob_fetch_stage::BlobFetchStage; use blob_fetch_stage::BlobFetchStage;
use crdt::Crdt; use cluster_info::ClusterInfo;
use replicate_stage::ReplicateStage; use replicate_stage::ReplicateStage;
use retransmit_stage::{RetransmitStage, RetransmitStageReturnType}; use retransmit_stage::{RetransmitStage, RetransmitStageReturnType};
use service::Service; use service::Service;
@ -67,7 +67,7 @@ impl Tvu {
/// # Arguments /// # Arguments
/// * `bank` - The bank state. /// * `bank` - The bank state.
/// * `entry_height` - Initial ledger height, passed to replicate stage /// * `entry_height` - Initial ledger height, passed to replicate stage
/// * `crdt` - The crdt state. /// * `cluster_info` - The cluster_info state.
/// * `window` - The window state. /// * `window` - The window state.
/// * `replicate_socket` - my replicate socket /// * `replicate_socket` - my replicate socket
/// * `repair_socket` - my repair socket /// * `repair_socket` - my repair socket
@ -78,7 +78,7 @@ impl Tvu {
keypair: Arc<Keypair>, keypair: Arc<Keypair>,
bank: &Arc<Bank>, bank: &Arc<Bank>,
entry_height: u64, entry_height: u64,
crdt: Arc<RwLock<Crdt>>, cluster_info: Arc<RwLock<ClusterInfo>>,
window: SharedWindow, window: SharedWindow,
replicate_sockets: Vec<UdpSocket>, replicate_sockets: Vec<UdpSocket>,
repair_socket: UdpSocket, repair_socket: UdpSocket,
@ -97,7 +97,7 @@ impl Tvu {
//the packets coming out of blob_receiver need to be sent to the GPU and verified //the packets coming out of blob_receiver need to be sent to the GPU and verified
//then sent to the window, which does the erasure coding reconstruction //then sent to the window, which does the erasure coding reconstruction
let (retransmit_stage, blob_window_receiver) = RetransmitStage::new( let (retransmit_stage, blob_window_receiver) = RetransmitStage::new(
&crdt, &cluster_info,
window, window,
entry_height, entry_height,
Arc::new(retransmit_socket), Arc::new(retransmit_socket),
@ -108,7 +108,7 @@ impl Tvu {
let replicate_stage = ReplicateStage::new( let replicate_stage = ReplicateStage::new(
keypair, keypair,
bank.clone(), bank.clone(),
crdt, cluster_info,
blob_window_receiver, blob_window_receiver,
ledger_path, ledger_path,
exit.clone(), exit.clone(),
@ -151,7 +151,7 @@ impl Service for Tvu {
pub mod tests { pub mod tests {
use bank::Bank; use bank::Bank;
use bincode::serialize; use bincode::serialize;
use crdt::{Crdt, Node}; use cluster_info::{ClusterInfo, Node};
use entry::Entry; use entry::Entry;
use hash::{hash, Hash}; use hash::{hash, Hash};
use logger; use logger;
@ -172,12 +172,12 @@ pub mod tests {
use window::{self, SharedWindow}; use window::{self, SharedWindow};
fn new_ncp( fn new_ncp(
crdt: Arc<RwLock<Crdt>>, cluster_info: Arc<RwLock<ClusterInfo>>,
gossip: UdpSocket, gossip: UdpSocket,
exit: Arc<AtomicBool>, exit: Arc<AtomicBool>,
) -> (Ncp, SharedWindow) { ) -> (Ncp, SharedWindow) {
let window = Arc::new(RwLock::new(window::default_window())); let window = Arc::new(RwLock::new(window::default_window()));
let ncp = Ncp::new(&crdt, window.clone(), None, gossip, exit); let ncp = Ncp::new(&cluster_info, window.clone(), None, gossip, exit);
(ncp, window) (ncp, window)
} }
@ -191,19 +191,19 @@ pub mod tests {
let target2 = Node::new_localhost(); let target2 = Node::new_localhost();
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
//start crdt_leader //start cluster_info_l
let mut crdt_l = Crdt::new(leader.info.clone()).expect("Crdt::new"); let mut cluster_info_l = ClusterInfo::new(leader.info.clone()).expect("ClusterInfo::new");
crdt_l.set_leader(leader.info.id); cluster_info_l.set_leader(leader.info.id);
let cref_l = Arc::new(RwLock::new(crdt_l)); let cref_l = Arc::new(RwLock::new(cluster_info_l));
let dr_l = new_ncp(cref_l, leader.sockets.gossip, exit.clone()); let dr_l = new_ncp(cref_l, leader.sockets.gossip, exit.clone());
//start crdt2 //start cluster_info2
let mut crdt2 = Crdt::new(target2.info.clone()).expect("Crdt::new"); let mut cluster_info2 = ClusterInfo::new(target2.info.clone()).expect("ClusterInfo::new");
crdt2.insert(&leader.info); cluster_info2.insert(&leader.info);
crdt2.set_leader(leader.info.id); cluster_info2.set_leader(leader.info.id);
let leader_id = leader.info.id; let leader_id = leader.info.id;
let cref2 = Arc::new(RwLock::new(crdt2)); let cref2 = Arc::new(RwLock::new(cluster_info2));
let dr_2 = new_ncp(cref2, target2.sockets.gossip, exit.clone()); let dr_2 = new_ncp(cref2, target2.sockets.gossip, exit.clone());
// setup some blob services to send blobs into the socket // setup some blob services to send blobs into the socket
@ -232,11 +232,11 @@ pub mod tests {
let replicate_addr = target1.info.contact_info.tvu; let replicate_addr = target1.info.contact_info.tvu;
let bank = Arc::new(Bank::new(&mint)); let bank = Arc::new(Bank::new(&mint));
//start crdt1 //start cluster_info1
let mut crdt1 = Crdt::new(target1.info.clone()).expect("Crdt::new"); let mut cluster_info1 = ClusterInfo::new(target1.info.clone()).expect("ClusterInfo::new");
crdt1.insert(&leader.info); cluster_info1.insert(&leader.info);
crdt1.set_leader(leader.info.id); cluster_info1.set_leader(leader.info.id);
let cref1 = Arc::new(RwLock::new(crdt1)); let cref1 = Arc::new(RwLock::new(cluster_info1));
let dr_1 = new_ncp(cref1.clone(), target1.sockets.gossip, exit.clone()); let dr_1 = new_ncp(cref1.clone(), target1.sockets.gossip, exit.clone());
let tvu = Tvu::new( let tvu = Tvu::new(

View File

@ -3,8 +3,8 @@
use bank::Bank; use bank::Bank;
use bincode::serialize; use bincode::serialize;
use budget_transaction::BudgetTransaction; use budget_transaction::BudgetTransaction;
use cluster_info::ClusterInfo;
use counter::Counter; use counter::Counter;
use crdt::Crdt;
use hash::Hash; use hash::Hash;
use influx_db_client as influxdb; use influx_db_client as influxdb;
use log::Level; use log::Level;
@ -30,14 +30,14 @@ enum VoteError {
pub fn create_new_signed_vote_blob( pub fn create_new_signed_vote_blob(
last_id: &Hash, last_id: &Hash,
keypair: &Keypair, keypair: &Keypair,
crdt: &Arc<RwLock<Crdt>>, cluster_info: &Arc<RwLock<ClusterInfo>>,
) -> Result<SharedBlob> { ) -> Result<SharedBlob> {
let shared_blob = SharedBlob::default(); let shared_blob = SharedBlob::default();
let (vote, addr) = { let (vote, addr) = {
let mut wcrdt = crdt.write().unwrap(); let mut wcluster_info = cluster_info.write().unwrap();
//TODO: doesn't seem like there is a synchronous call to get height and id //TODO: doesn't seem like there is a synchronous call to get height and id
debug!("voting on {:?}", &last_id.as_ref()[..8]); debug!("voting on {:?}", &last_id.as_ref()[..8]);
wcrdt.new_vote(*last_id) wcluster_info.new_vote(*last_id)
}?; }?;
let tx = Transaction::budget_new_vote(&keypair, vote, *last_id, 0); let tx = Transaction::budget_new_vote(&keypair, vote, *last_id, 0);
{ {
@ -107,14 +107,14 @@ pub fn send_leader_vote(
id: &Pubkey, id: &Pubkey,
keypair: &Keypair, keypair: &Keypair,
bank: &Arc<Bank>, bank: &Arc<Bank>,
crdt: &Arc<RwLock<Crdt>>, cluster_info: &Arc<RwLock<ClusterInfo>>,
vote_blob_sender: &BlobSender, vote_blob_sender: &BlobSender,
last_vote: &mut u64, last_vote: &mut u64,
last_valid_validator_timestamp: &mut u64, last_valid_validator_timestamp: &mut u64,
) -> Result<()> { ) -> Result<()> {
let now = timing::timestamp(); let now = timing::timestamp();
if now - *last_vote > VOTE_TIMEOUT_MS { if now - *last_vote > VOTE_TIMEOUT_MS {
let ids: Vec<_> = crdt.read().unwrap().valid_last_ids(); let ids: Vec<_> = cluster_info.read().unwrap().valid_last_ids();
if let Ok((last_id, super_majority_timestamp)) = get_last_id_to_vote_on( if let Ok((last_id, super_majority_timestamp)) = get_last_id_to_vote_on(
id, id,
&ids, &ids,
@ -123,7 +123,7 @@ pub fn send_leader_vote(
last_vote, last_vote,
last_valid_validator_timestamp, last_valid_validator_timestamp,
) { ) {
if let Ok(shared_blob) = create_new_signed_vote_blob(&last_id, keypair, crdt) { if let Ok(shared_blob) = create_new_signed_vote_blob(&last_id, keypair, cluster_info) {
vote_blob_sender.send(vec![shared_blob])?; vote_blob_sender.send(vec![shared_blob])?;
let finality_ms = now - super_majority_timestamp; let finality_ms = now - super_majority_timestamp;
@ -147,11 +147,11 @@ pub fn send_leader_vote(
pub fn send_validator_vote( pub fn send_validator_vote(
bank: &Arc<Bank>, bank: &Arc<Bank>,
keypair: &Arc<Keypair>, keypair: &Arc<Keypair>,
crdt: &Arc<RwLock<Crdt>>, cluster_info: &Arc<RwLock<ClusterInfo>>,
vote_blob_sender: &BlobSender, vote_blob_sender: &BlobSender,
) -> Result<()> { ) -> Result<()> {
let last_id = bank.last_id(); let last_id = bank.last_id();
if let Ok(shared_blob) = create_new_signed_vote_blob(&last_id, keypair, crdt) { if let Ok(shared_blob) = create_new_signed_vote_blob(&last_id, keypair, cluster_info) {
inc_new_counter_info!("replicate-vote_sent", 1); inc_new_counter_info!("replicate-vote_sent", 1);
vote_blob_sender.send(vec![shared_blob])?; vote_blob_sender.send(vec![shared_blob])?;
@ -165,7 +165,7 @@ pub mod tests {
use bank::Bank; use bank::Bank;
use bincode::deserialize; use bincode::deserialize;
use budget_instruction::Vote; use budget_instruction::Vote;
use crdt::{Crdt, NodeInfo}; use cluster_info::{ClusterInfo, NodeInfo};
use entry::next_entry; use entry::next_entry;
use hash::{hash, Hash}; use hash::{hash, Hash};
use logger; use logger;
@ -193,14 +193,14 @@ pub mod tests {
// Create a leader // Create a leader
let leader_data = NodeInfo::new_with_socketaddr(&"127.0.0.1:1234".parse().unwrap()); let leader_data = NodeInfo::new_with_socketaddr(&"127.0.0.1:1234".parse().unwrap());
let leader_pubkey = leader_data.id.clone(); let leader_pubkey = leader_data.id.clone();
let mut leader_crdt = Crdt::new(leader_data).unwrap(); let mut leader_cluster_info = ClusterInfo::new(leader_data).unwrap();
// give the leader some tokens // give the leader some tokens
let give_leader_tokens_tx = let give_leader_tokens_tx =
Transaction::system_new(&mint.keypair(), leader_pubkey.clone(), 100, entry.id); Transaction::system_new(&mint.keypair(), leader_pubkey.clone(), 100, entry.id);
bank.process_transaction(&give_leader_tokens_tx).unwrap(); bank.process_transaction(&give_leader_tokens_tx).unwrap();
leader_crdt.set_leader(leader_pubkey); leader_cluster_info.set_leader(leader_pubkey);
// Insert 7 agreeing validators / 3 disagreeing // Insert 7 agreeing validators / 3 disagreeing
// and votes for new last_id // and votes for new last_id
@ -217,12 +217,12 @@ pub mod tests {
validator.ledger_state.last_id = entry.id; validator.ledger_state.last_id = entry.id;
} }
leader_crdt.insert(&validator); leader_cluster_info.insert(&validator);
trace!("validator id: {:?}", validator.id); trace!("validator id: {:?}", validator.id);
leader_crdt.insert_vote(&validator.id, &vote, entry.id); leader_cluster_info.insert_vote(&validator.id, &vote, entry.id);
} }
let leader = Arc::new(RwLock::new(leader_crdt)); let leader = Arc::new(RwLock::new(leader_cluster_info));
let (vote_blob_sender, vote_blob_receiver) = channel(); let (vote_blob_sender, vote_blob_receiver) = channel();
let mut last_vote: u64 = timing::timestamp() - VOTE_TIMEOUT_MS - 1; let mut last_vote: u64 = timing::timestamp() - VOTE_TIMEOUT_MS - 1;
let mut last_valid_validator_timestamp = 0; let mut last_valid_validator_timestamp = 0;

View File

@ -4,7 +4,7 @@ use budget_program::BudgetState;
use budget_transaction::BudgetTransaction; use budget_transaction::BudgetTransaction;
use chrono::prelude::*; use chrono::prelude::*;
use clap::ArgMatches; use clap::ArgMatches;
use crdt::NodeInfo; use cluster_info::NodeInfo;
use drone::DroneRequest; use drone::DroneRequest;
use fullnode::Config; use fullnode::Config;
use hash::Hash; use hash::Hash;
@ -613,7 +613,7 @@ mod tests {
use super::*; use super::*;
use bank::Bank; use bank::Bank;
use clap::{App, Arg, SubCommand}; use clap::{App, Arg, SubCommand};
use crdt::Node; use cluster_info::Node;
use drone::run_local_drone; use drone::run_local_drone;
use fullnode::Fullnode; use fullnode::Fullnode;
use ledger::LedgerWriter; use ledger::LedgerWriter;

View File

@ -1,7 +1,7 @@
//! The `window` module defines data structure for storing the tail of the ledger. //! The `window` module defines data structure for storing the tail of the ledger.
//! //!
use cluster_info::{ClusterInfo, NodeInfo};
use counter::Counter; use counter::Counter;
use crdt::{Crdt, NodeInfo};
use entry::Entry; use entry::Entry;
#[cfg(feature = "erasure")] #[cfg(feature = "erasure")]
use erasure; use erasure;
@ -53,7 +53,7 @@ pub trait WindowUtil {
fn repair( fn repair(
&mut self, &mut self,
crdt: &Arc<RwLock<Crdt>>, cluster_info: &Arc<RwLock<ClusterInfo>>,
id: &Pubkey, id: &Pubkey,
times: usize, times: usize,
consumed: u64, consumed: u64,
@ -67,7 +67,7 @@ pub trait WindowUtil {
fn process_blob( fn process_blob(
&mut self, &mut self,
id: &Pubkey, id: &Pubkey,
crdt: &Arc<RwLock<Crdt>>, cluster_info: &Arc<RwLock<ClusterInfo>>,
blob: SharedBlob, blob: SharedBlob,
pix: u64, pix: u64,
consume_queue: &mut Vec<Entry>, consume_queue: &mut Vec<Entry>,
@ -95,20 +95,20 @@ impl WindowUtil for Window {
fn repair( fn repair(
&mut self, &mut self,
crdt: &Arc<RwLock<Crdt>>, cluster_info: &Arc<RwLock<ClusterInfo>>,
id: &Pubkey, id: &Pubkey,
times: usize, times: usize,
consumed: u64, consumed: u64,
received: u64, received: u64,
max_entry_height: u64, max_entry_height: u64,
) -> Vec<(SocketAddr, Vec<u8>)> { ) -> Vec<(SocketAddr, Vec<u8>)> {
let rcrdt = crdt.read().unwrap(); let rcluster_info = cluster_info.read().unwrap();
let leader_rotation_interval = rcrdt.get_leader_rotation_interval(); let leader_rotation_interval = rcluster_info.get_leader_rotation_interval();
// Calculate the next leader rotation height and check if we are the leader // Calculate the next leader rotation height and check if we are the leader
let next_leader_rotation = let next_leader_rotation =
consumed + leader_rotation_interval - (consumed % leader_rotation_interval); consumed + leader_rotation_interval - (consumed % leader_rotation_interval);
let is_next_leader = rcrdt.get_scheduled_leader(next_leader_rotation) == Some(*id); let is_next_leader = rcluster_info.get_scheduled_leader(next_leader_rotation) == Some(*id);
let num_peers = rcrdt.table.len() as u64; let num_peers = rcluster_info.table.len() as u64;
let max_repair = if max_entry_height == 0 { let max_repair = if max_entry_height == 0 {
calculate_max_repair(num_peers, consumed, received, times, is_next_leader) calculate_max_repair(num_peers, consumed, received, times, is_next_leader)
@ -119,10 +119,10 @@ impl WindowUtil for Window {
let idxs = self.clear_slots(consumed, max_repair); let idxs = self.clear_slots(consumed, max_repair);
let reqs: Vec<_> = idxs let reqs: Vec<_> = idxs
.into_iter() .into_iter()
.filter_map(|pix| rcrdt.window_index_request(pix).ok()) .filter_map(|pix| rcluster_info.window_index_request(pix).ok())
.collect(); .collect();
drop(rcrdt); drop(rcluster_info);
inc_new_counter_info!("streamer-repair_window-repair", reqs.len()); inc_new_counter_info!("streamer-repair_window-repair", reqs.len());
@ -196,7 +196,7 @@ impl WindowUtil for Window {
fn process_blob( fn process_blob(
&mut self, &mut self,
id: &Pubkey, id: &Pubkey,
crdt: &Arc<RwLock<Crdt>>, cluster_info: &Arc<RwLock<ClusterInfo>>,
blob: SharedBlob, blob: SharedBlob,
pix: u64, pix: u64,
consume_queue: &mut Vec<Entry>, consume_queue: &mut Vec<Entry>,
@ -259,9 +259,9 @@ impl WindowUtil for Window {
// push all contiguous blobs into consumed queue, increment consumed // push all contiguous blobs into consumed queue, increment consumed
loop { loop {
if *consumed != 0 && *consumed % (leader_rotation_interval as u64) == 0 { if *consumed != 0 && *consumed % (leader_rotation_interval as u64) == 0 {
let rcrdt = crdt.read().unwrap(); let rcluster_info = cluster_info.read().unwrap();
let my_id = rcrdt.my_data().id; let my_id = rcluster_info.my_data().id;
match rcrdt.get_scheduled_leader(*consumed) { match rcluster_info.get_scheduled_leader(*consumed) {
// If we are the next leader, exit // If we are the next leader, exit
Some(id) if id == my_id => { Some(id) if id == my_id => {
break; break;
@ -388,7 +388,7 @@ pub fn index_blobs(
} }
/// Initialize a rebroadcast window with most recent Entry blobs /// Initialize a rebroadcast window with most recent Entry blobs
/// * `crdt` - gossip instance, used to set blob ids /// * `cluster_info` - gossip instance, used to set blob ids
/// * `blobs` - up to WINDOW_SIZE most recent blobs /// * `blobs` - up to WINDOW_SIZE most recent blobs
/// * `entry_height` - current entry height /// * `entry_height` - current entry height
pub fn initialized_window( pub fn initialized_window(

View File

@ -1,7 +1,7 @@
//! The `window_service` provides a thread for maintaining a window (tail of the ledger). //! The `window_service` provides a thread for maintaining a window (tail of the ledger).
//! //!
use cluster_info::{ClusterInfo, NodeInfo};
use counter::Counter; use counter::Counter;
use crdt::{Crdt, NodeInfo};
use entry::EntrySender; use entry::EntrySender;
use log::Level; use log::Level;
use packet::SharedBlob; use packet::SharedBlob;
@ -136,7 +136,7 @@ fn retransmit_all_leader_blocks(
fn recv_window( fn recv_window(
window: &SharedWindow, window: &SharedWindow,
id: &Pubkey, id: &Pubkey,
crdt: &Arc<RwLock<Crdt>>, cluster_info: &Arc<RwLock<ClusterInfo>>,
consumed: &mut u64, consumed: &mut u64,
received: &mut u64, received: &mut u64,
max_ix: u64, max_ix: u64,
@ -149,9 +149,9 @@ fn recv_window(
) -> Result<()> { ) -> Result<()> {
let timer = Duration::from_millis(200); let timer = Duration::from_millis(200);
let mut dq = r.recv_timeout(timer)?; let mut dq = r.recv_timeout(timer)?;
let maybe_leader: Option<NodeInfo> = crdt let maybe_leader: Option<NodeInfo> = cluster_info
.read() .read()
.expect("'crdt' read lock in fn recv_window") .expect("'cluster_info' read lock in fn recv_window")
.leader_data() .leader_data()
.cloned(); .cloned();
let leader_unknown = maybe_leader.is_none(); let leader_unknown = maybe_leader.is_none();
@ -204,7 +204,7 @@ fn recv_window(
window.write().unwrap().process_blob( window.write().unwrap().process_blob(
id, id,
crdt, cluster_info,
b, b,
pix, pix,
&mut consume_queue, &mut consume_queue,
@ -239,7 +239,7 @@ fn recv_window(
} }
pub fn window_service( pub fn window_service(
crdt: Arc<RwLock<Crdt>>, cluster_info: Arc<RwLock<ClusterInfo>>,
window: SharedWindow, window: SharedWindow,
entry_height: u64, entry_height: u64,
max_entry_height: u64, max_entry_height: u64,
@ -259,20 +259,20 @@ pub fn window_service(
let id; let id;
let leader_rotation_interval; let leader_rotation_interval;
{ {
let rcrdt = crdt.read().unwrap(); let rcluster_info = cluster_info.read().unwrap();
id = rcrdt.id; id = rcluster_info.id;
leader_rotation_interval = rcrdt.get_leader_rotation_interval(); leader_rotation_interval = rcluster_info.get_leader_rotation_interval();
} }
let mut pending_retransmits = false; let mut pending_retransmits = false;
trace!("{}: RECV_WINDOW started", id); trace!("{}: RECV_WINDOW started", id);
loop { loop {
if consumed != 0 && consumed % (leader_rotation_interval as u64) == 0 { if consumed != 0 && consumed % (leader_rotation_interval as u64) == 0 {
match crdt.read().unwrap().get_scheduled_leader(consumed) { match cluster_info.read().unwrap().get_scheduled_leader(consumed) {
// If we are the next leader, exit // If we are the next leader, exit
Some(next_leader_id) if id == next_leader_id => { Some(next_leader_id) if id == next_leader_id => {
return Some(WindowServiceReturnType::LeaderRotation(consumed)); return Some(WindowServiceReturnType::LeaderRotation(consumed));
} }
// TODO: Figure out where to set the new leader in the crdt for // TODO: Figure out where to set the new leader in the cluster_info for
// validator -> validator transition (once we have real leader scheduling, // validator -> validator transition (once we have real leader scheduling,
// this decision will be clearer). Also make sure new blobs to window actually // this decision will be clearer). Also make sure new blobs to window actually
// originate from new leader // originate from new leader
@ -283,7 +283,7 @@ pub fn window_service(
if let Err(e) = recv_window( if let Err(e) = recv_window(
&window, &window,
&id, &id,
&crdt, &cluster_info,
&mut consumed, &mut consumed,
&mut received, &mut received,
max_entry_height, max_entry_height,
@ -322,7 +322,7 @@ pub fn window_service(
trace!("{} let's repair! times = {}", id, times); trace!("{} let's repair! times = {}", id, times);
let mut window = window.write().unwrap(); let mut window = window.write().unwrap();
let reqs = window.repair(&crdt, &id, times, consumed, received, max_entry_height); let reqs = window.repair(&cluster_info, &id, times, consumed, received, max_entry_height);
for (to, req) in reqs { for (to, req) in reqs {
repair_socket.send_to(&req, to).unwrap_or_else(|e| { repair_socket.send_to(&req, to).unwrap_or_else(|e| {
info!("{} repair req send_to({}) error {:?}", id, to, e); info!("{} repair req send_to({}) error {:?}", id, to, e);
@ -336,7 +336,7 @@ pub fn window_service(
#[cfg(test)] #[cfg(test)]
mod test { mod test {
use crdt::{Crdt, Node}; use cluster_info::{ClusterInfo, Node};
use entry::Entry; use entry::Entry;
use hash::Hash; use hash::Hash;
use logger; use logger;
@ -371,10 +371,10 @@ mod test {
logger::setup(); logger::setup();
let tn = Node::new_localhost(); let tn = Node::new_localhost();
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
let mut crdt_me = Crdt::new(tn.info.clone()).expect("Crdt::new"); let mut cluster_info_me = ClusterInfo::new(tn.info.clone()).expect("ClusterInfo::new");
let me_id = crdt_me.my_data().id; let me_id = cluster_info_me.my_data().id;
crdt_me.set_leader(me_id); cluster_info_me.set_leader(me_id);
let subs = Arc::new(RwLock::new(crdt_me)); let subs = Arc::new(RwLock::new(cluster_info_me));
let (s_reader, r_reader) = channel(); let (s_reader, r_reader) = channel();
let t_receiver = blob_receiver(Arc::new(tn.sockets.gossip), exit.clone(), s_reader); let t_receiver = blob_receiver(Arc::new(tn.sockets.gossip), exit.clone(), s_reader);
@ -429,9 +429,9 @@ mod test {
logger::setup(); logger::setup();
let tn = Node::new_localhost(); let tn = Node::new_localhost();
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
let crdt_me = Crdt::new(tn.info.clone()).expect("Crdt::new"); let cluster_info_me = ClusterInfo::new(tn.info.clone()).expect("ClusterInfo::new");
let me_id = crdt_me.my_data().id; let me_id = cluster_info_me.my_data().id;
let subs = Arc::new(RwLock::new(crdt_me)); let subs = Arc::new(RwLock::new(cluster_info_me));
let (s_reader, r_reader) = channel(); let (s_reader, r_reader) = channel();
let t_receiver = blob_receiver(Arc::new(tn.sockets.gossip), exit.clone(), s_reader); let t_receiver = blob_receiver(Arc::new(tn.sockets.gossip), exit.clone(), s_reader);
@ -485,9 +485,9 @@ mod test {
logger::setup(); logger::setup();
let tn = Node::new_localhost(); let tn = Node::new_localhost();
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
let crdt_me = Crdt::new(tn.info.clone()).expect("Crdt::new"); let cluster_info_me = ClusterInfo::new(tn.info.clone()).expect("ClusterInfo::new");
let me_id = crdt_me.my_data().id; let me_id = cluster_info_me.my_data().id;
let subs = Arc::new(RwLock::new(crdt_me)); let subs = Arc::new(RwLock::new(cluster_info_me));
let (s_reader, r_reader) = channel(); let (s_reader, r_reader) = channel();
let t_receiver = blob_receiver(Arc::new(tn.sockets.gossip), exit.clone(), s_reader); let t_receiver = blob_receiver(Arc::new(tn.sockets.gossip), exit.clone(), s_reader);
@ -595,20 +595,21 @@ mod test {
let my_leader_begin_epoch = 2; let my_leader_begin_epoch = 2;
let tn = Node::new_localhost(); let tn = Node::new_localhost();
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
let mut crdt_me = Crdt::new(tn.info.clone()).expect("Crdt::new"); let mut cluster_info_me = ClusterInfo::new(tn.info.clone()).expect("ClusterInfo::new");
let me_id = crdt_me.my_data().id; let me_id = cluster_info_me.my_data().id;
// Set myself in an upcoming epoch, but set the old_leader_id as the // Set myself in an upcoming epoch, but set the old_leader_id as the
// leader for all epochs before that // leader for all epochs before that
let old_leader_id = Keypair::new().pubkey(); let old_leader_id = Keypair::new().pubkey();
crdt_me.set_leader(me_id); cluster_info_me.set_leader(me_id);
crdt_me.set_leader_rotation_interval(leader_rotation_interval); cluster_info_me.set_leader_rotation_interval(leader_rotation_interval);
for i in 0..my_leader_begin_epoch { for i in 0..my_leader_begin_epoch {
crdt_me.set_scheduled_leader(leader_rotation_interval * i, old_leader_id); cluster_info_me.set_scheduled_leader(leader_rotation_interval * i, old_leader_id);
} }
crdt_me.set_scheduled_leader(my_leader_begin_epoch * leader_rotation_interval, me_id); cluster_info_me
.set_scheduled_leader(my_leader_begin_epoch * leader_rotation_interval, me_id);
let subs = Arc::new(RwLock::new(crdt_me)); let subs = Arc::new(RwLock::new(cluster_info_me));
let (s_reader, r_reader) = channel(); let (s_reader, r_reader) = channel();
let t_receiver = blob_receiver(Arc::new(tn.sockets.gossip), exit.clone(), s_reader); let t_receiver = blob_receiver(Arc::new(tn.sockets.gossip), exit.clone(), s_reader);

View File

@ -3,8 +3,8 @@
//! stdout, and then sends the Entry to its output channel. //! stdout, and then sends the Entry to its output channel.
use bank::Bank; use bank::Bank;
use cluster_info::ClusterInfo;
use counter::Counter; use counter::Counter;
use crdt::Crdt;
use entry::Entry; use entry::Entry;
use ledger::{Block, LedgerWriter}; use ledger::{Block, LedgerWriter};
use log::Level; use log::Level;
@ -38,7 +38,7 @@ impl WriteStage {
// fit before we hit the entry height for leader rotation. Also return a boolean // fit before we hit the entry height for leader rotation. Also return a boolean
// reflecting whether we actually hit an entry height for leader rotation. // reflecting whether we actually hit an entry height for leader rotation.
fn find_leader_rotation_index( fn find_leader_rotation_index(
crdt: &Arc<RwLock<Crdt>>, cluster_info: &Arc<RwLock<ClusterInfo>>,
leader_rotation_interval: u64, leader_rotation_interval: u64,
entry_height: u64, entry_height: u64,
mut new_entries: Vec<Entry>, mut new_entries: Vec<Entry>,
@ -51,9 +51,9 @@ impl WriteStage {
loop { loop {
if (entry_height + i as u64) % leader_rotation_interval == 0 { if (entry_height + i as u64) % leader_rotation_interval == 0 {
let rcrdt = crdt.read().unwrap(); let rcluster_info = cluster_info.read().unwrap();
let my_id = rcrdt.my_data().id; let my_id = rcluster_info.my_data().id;
let next_leader = rcrdt.get_scheduled_leader(entry_height + i as u64); let next_leader = rcluster_info.get_scheduled_leader(entry_height + i as u64);
if next_leader != Some(my_id) { if next_leader != Some(my_id) {
is_leader_rotation = true; is_leader_rotation = true;
break; break;
@ -86,7 +86,7 @@ impl WriteStage {
/// Process any Entry items that have been published by the RecordStage. /// Process any Entry items that have been published by the RecordStage.
/// continuosly send entries out /// continuosly send entries out
pub fn write_and_send_entries( pub fn write_and_send_entries(
crdt: &Arc<RwLock<Crdt>>, cluster_info: &Arc<RwLock<ClusterInfo>>,
ledger_writer: &mut LedgerWriter, ledger_writer: &mut LedgerWriter,
entry_sender: &Sender<Vec<Entry>>, entry_sender: &Sender<Vec<Entry>>,
entry_receiver: &Receiver<Vec<Entry>>, entry_receiver: &Receiver<Vec<Entry>>,
@ -103,7 +103,7 @@ impl WriteStage {
// Find out how many more entries we can squeeze in until the next leader // Find out how many more entries we can squeeze in until the next leader
// rotation // rotation
let (new_entries, is_leader_rotation) = Self::find_leader_rotation_index( let (new_entries, is_leader_rotation) = Self::find_leader_rotation_index(
crdt, cluster_info,
leader_rotation_interval, leader_rotation_interval,
*entry_height + num_new_entries as u64, *entry_height + num_new_entries as u64,
received_entries, received_entries,
@ -127,17 +127,17 @@ impl WriteStage {
info!("write_stage entries: {}", num_new_entries); info!("write_stage entries: {}", num_new_entries);
let mut entries_send_total = 0; let mut entries_send_total = 0;
let mut crdt_votes_total = 0; let mut cluster_info_votes_total = 0;
let start = Instant::now(); let start = Instant::now();
for entries in ventries { for entries in ventries {
for e in &entries { for e in &entries {
num_txs += e.transactions.len(); num_txs += e.transactions.len();
} }
let crdt_votes_start = Instant::now(); let cluster_info_votes_start = Instant::now();
let votes = &entries.votes(); let votes = &entries.votes();
crdt.write().unwrap().insert_votes(&votes); cluster_info.write().unwrap().insert_votes(&votes);
crdt_votes_total += duration_as_ms(&crdt_votes_start.elapsed()); cluster_info_votes_total += duration_as_ms(&cluster_info_votes_start.elapsed());
ledger_writer.write_entries(entries.clone())?; ledger_writer.write_entries(entries.clone())?;
// Once the entries have been written to the ledger, then we can // Once the entries have been written to the ledger, then we can
@ -165,11 +165,11 @@ impl WriteStage {
"write_stage-time_ms", "write_stage-time_ms",
duration_as_ms(&now.elapsed()) as usize duration_as_ms(&now.elapsed()) as usize
); );
info!("done write_stage txs: {} time {} ms txs/s: {} entries_send_total: {} crdt_votes_total: {}", info!("done write_stage txs: {} time {} ms txs/s: {} entries_send_total: {} cluster_info_votes_total: {}",
num_txs, duration_as_ms(&start.elapsed()), num_txs, duration_as_ms(&start.elapsed()),
num_txs as f32 / duration_as_s(&start.elapsed()), num_txs as f32 / duration_as_s(&start.elapsed()),
entries_send_total, entries_send_total,
crdt_votes_total); cluster_info_votes_total);
Ok(()) Ok(())
} }
@ -178,7 +178,7 @@ impl WriteStage {
pub fn new( pub fn new(
keypair: Arc<Keypair>, keypair: Arc<Keypair>,
bank: Arc<Bank>, bank: Arc<Bank>,
crdt: Arc<RwLock<Crdt>>, cluster_info: Arc<RwLock<ClusterInfo>>,
ledger_path: &str, ledger_path: &str,
entry_receiver: Receiver<Vec<Entry>>, entry_receiver: Receiver<Vec<Entry>>,
entry_height: u64, entry_height: u64,
@ -201,9 +201,9 @@ impl WriteStage {
let id; let id;
let leader_rotation_interval; let leader_rotation_interval;
{ {
let rcrdt = crdt.read().unwrap(); let rcluster_info = cluster_info.read().unwrap();
id = rcrdt.id; id = rcluster_info.id;
leader_rotation_interval = rcrdt.get_leader_rotation_interval(); leader_rotation_interval = rcluster_info.get_leader_rotation_interval();
} }
let mut entry_height = entry_height; let mut entry_height = entry_height;
loop { loop {
@ -212,10 +212,10 @@ impl WriteStage {
// n * leader_rotation_interval for some "n". Once we've forwarded // n * leader_rotation_interval for some "n". Once we've forwarded
// that last block, check for the next scheduled leader. // that last block, check for the next scheduled leader.
if entry_height % (leader_rotation_interval as u64) == 0 { if entry_height % (leader_rotation_interval as u64) == 0 {
let rcrdt = crdt.read().unwrap(); let rcluster_info = cluster_info.read().unwrap();
let my_id = rcrdt.my_data().id; let my_id = rcluster_info.my_data().id;
let scheduled_leader = rcrdt.get_scheduled_leader(entry_height); let scheduled_leader = rcluster_info.get_scheduled_leader(entry_height);
drop(rcrdt); drop(rcluster_info);
match scheduled_leader { match scheduled_leader {
Some(id) if id == my_id => (), Some(id) if id == my_id => (),
// If the leader stays in power for the next // If the leader stays in power for the next
@ -230,7 +230,7 @@ impl WriteStage {
} }
if let Err(e) = Self::write_and_send_entries( if let Err(e) = Self::write_and_send_entries(
&crdt, &cluster_info,
&mut ledger_writer, &mut ledger_writer,
&entry_sender, &entry_sender,
&entry_receiver, &entry_receiver,
@ -255,7 +255,7 @@ impl WriteStage {
&id, &id,
&keypair, &keypair,
&bank, &bank,
&crdt, &cluster_info,
&vote_blob_sender, &vote_blob_sender,
&mut last_vote, &mut last_vote,
&mut last_valid_validator_timestamp, &mut last_valid_validator_timestamp,
@ -292,7 +292,7 @@ impl Service for WriteStage {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use bank::Bank; use bank::Bank;
use crdt::{Crdt, Node}; use cluster_info::{ClusterInfo, Node};
use entry::Entry; use entry::Entry;
use hash::Hash; use hash::Hash;
use ledger::{genesis, next_entries_mut, read_ledger}; use ledger::{genesis, next_entries_mut, read_ledger};
@ -309,7 +309,7 @@ mod tests {
write_stage: WriteStage, write_stage: WriteStage,
entry_sender: Sender<Vec<Entry>>, entry_sender: Sender<Vec<Entry>>,
_write_stage_entry_receiver: Receiver<Vec<Entry>>, _write_stage_entry_receiver: Receiver<Vec<Entry>>,
crdt: Arc<RwLock<Crdt>>, cluster_info: Arc<RwLock<ClusterInfo>>,
bank: Arc<Bank>, bank: Arc<Bank>,
leader_ledger_path: String, leader_ledger_path: String,
ledger_tail: Vec<Entry>, ledger_tail: Vec<Entry>,
@ -331,9 +331,9 @@ mod tests {
let my_id = leader_keypair.pubkey(); let my_id = leader_keypair.pubkey();
let leader_info = Node::new_localhost_with_pubkey(leader_keypair.pubkey()); let leader_info = Node::new_localhost_with_pubkey(leader_keypair.pubkey());
let mut crdt = Crdt::new(leader_info.info).expect("Crdt::new"); let mut cluster_info = ClusterInfo::new(leader_info.info).expect("ClusterInfo::new");
crdt.set_leader_rotation_interval(leader_rotation_interval); cluster_info.set_leader_rotation_interval(leader_rotation_interval);
let crdt = Arc::new(RwLock::new(crdt)); let cluster_info = Arc::new(RwLock::new(cluster_info));
let bank = Bank::new_default(true); let bank = Bank::new_default(true);
let bank = Arc::new(bank); let bank = Arc::new(bank);
@ -349,7 +349,7 @@ mod tests {
let (write_stage, _write_stage_entry_receiver) = WriteStage::new( let (write_stage, _write_stage_entry_receiver) = WriteStage::new(
leader_keypair, leader_keypair,
bank.clone(), bank.clone(),
crdt.clone(), cluster_info.clone(),
&leader_ledger_path, &leader_ledger_path,
entry_receiver, entry_receiver,
entry_height, entry_height,
@ -362,7 +362,7 @@ mod tests {
// Need to keep this alive, otherwise the write_stage will detect ChannelClosed // Need to keep this alive, otherwise the write_stage will detect ChannelClosed
// and shut down // and shut down
_write_stage_entry_receiver, _write_stage_entry_receiver,
crdt, cluster_info,
bank, bank,
leader_ledger_path, leader_ledger_path,
ledger_tail, ledger_tail,
@ -375,8 +375,8 @@ mod tests {
let write_stage_info = setup_dummy_write_stage(leader_rotation_interval); let write_stage_info = setup_dummy_write_stage(leader_rotation_interval);
{ {
let mut wcrdt = write_stage_info.crdt.write().unwrap(); let mut wcluster_info = write_stage_info.cluster_info.write().unwrap();
wcrdt.set_scheduled_leader(leader_rotation_interval, write_stage_info.my_id); wcluster_info.set_scheduled_leader(leader_rotation_interval, write_stage_info.my_id);
} }
let mut last_id = write_stage_info let mut last_id = write_stage_info
@ -396,14 +396,15 @@ mod tests {
write_stage_info.entry_sender.send(new_entry).unwrap(); write_stage_info.entry_sender.send(new_entry).unwrap();
} }
// Set the scheduled next leader in the crdt to some other node // Set the scheduled next leader in the cluster_info to some other node
let leader2_keypair = Keypair::new(); let leader2_keypair = Keypair::new();
let leader2_info = Node::new_localhost_with_pubkey(leader2_keypair.pubkey()); let leader2_info = Node::new_localhost_with_pubkey(leader2_keypair.pubkey());
{ {
let mut wcrdt = write_stage_info.crdt.write().unwrap(); let mut wcluster_info = write_stage_info.cluster_info.write().unwrap();
wcrdt.insert(&leader2_info.info); wcluster_info.insert(&leader2_info.info);
wcrdt.set_scheduled_leader(2 * leader_rotation_interval, leader2_keypair.pubkey()); wcluster_info
.set_scheduled_leader(2 * leader_rotation_interval, leader2_keypair.pubkey());
} }
// Input another leader_rotation_interval dummy entries one at a time, // Input another leader_rotation_interval dummy entries one at a time,
@ -440,13 +441,13 @@ mod tests {
// time during which a leader is in power // time during which a leader is in power
let num_epochs = 3; let num_epochs = 3;
let mut crdt = Crdt::new(leader_info.info).expect("Crdt::new"); let mut cluster_info = ClusterInfo::new(leader_info.info).expect("ClusterInfo::new");
crdt.set_leader_rotation_interval(leader_rotation_interval as u64); cluster_info.set_leader_rotation_interval(leader_rotation_interval as u64);
for i in 0..num_epochs { for i in 0..num_epochs {
crdt.set_scheduled_leader(i * leader_rotation_interval, my_id) cluster_info.set_scheduled_leader(i * leader_rotation_interval, my_id)
} }
let crdt = Arc::new(RwLock::new(crdt)); let cluster_info = Arc::new(RwLock::new(cluster_info));
let entry = Entry::new(&Hash::default(), 0, vec![]); let entry = Entry::new(&Hash::default(), 0, vec![]);
// A vector that is completely within a certain epoch should return that // A vector that is completely within a certain epoch should return that
@ -454,7 +455,7 @@ mod tests {
let mut len = leader_rotation_interval as usize - 1; let mut len = leader_rotation_interval as usize - 1;
let mut input = vec![entry.clone(); len]; let mut input = vec![entry.clone(); len];
let mut result = WriteStage::find_leader_rotation_index( let mut result = WriteStage::find_leader_rotation_index(
&crdt, &cluster_info,
leader_rotation_interval, leader_rotation_interval,
(num_epochs - 1) * leader_rotation_interval, (num_epochs - 1) * leader_rotation_interval,
input.clone(), input.clone(),
@ -467,7 +468,7 @@ mod tests {
len = leader_rotation_interval as usize - 1; len = leader_rotation_interval as usize - 1;
input = vec![entry.clone(); len]; input = vec![entry.clone(); len];
result = WriteStage::find_leader_rotation_index( result = WriteStage::find_leader_rotation_index(
&crdt, &cluster_info,
leader_rotation_interval, leader_rotation_interval,
(num_epochs * leader_rotation_interval) - 1, (num_epochs * leader_rotation_interval) - 1,
input.clone(), input.clone(),
@ -482,7 +483,7 @@ mod tests {
len = 1; len = 1;
let mut input = vec![entry.clone(); len]; let mut input = vec![entry.clone(); len];
result = WriteStage::find_leader_rotation_index( result = WriteStage::find_leader_rotation_index(
&crdt, &cluster_info,
leader_rotation_interval, leader_rotation_interval,
leader_rotation_interval - 1, leader_rotation_interval - 1,
input.clone(), input.clone(),
@ -495,7 +496,7 @@ mod tests {
len = leader_rotation_interval as usize; len = leader_rotation_interval as usize;
input = vec![entry.clone(); len]; input = vec![entry.clone(); len];
result = WriteStage::find_leader_rotation_index( result = WriteStage::find_leader_rotation_index(
&crdt, &cluster_info,
leader_rotation_interval, leader_rotation_interval,
leader_rotation_interval - 1, leader_rotation_interval - 1,
input.clone(), input.clone(),
@ -508,7 +509,7 @@ mod tests {
len = (num_epochs - 1) as usize * leader_rotation_interval as usize; len = (num_epochs - 1) as usize * leader_rotation_interval as usize;
input = vec![entry.clone(); len]; input = vec![entry.clone(); len];
result = WriteStage::find_leader_rotation_index( result = WriteStage::find_leader_rotation_index(
&crdt, &cluster_info,
leader_rotation_interval, leader_rotation_interval,
leader_rotation_interval - 1, leader_rotation_interval - 1,
input.clone(), input.clone(),
@ -522,7 +523,7 @@ mod tests {
len = (num_epochs - 1) as usize * leader_rotation_interval as usize + 1; len = (num_epochs - 1) as usize * leader_rotation_interval as usize + 1;
input = vec![entry.clone(); len]; input = vec![entry.clone(); len];
result = WriteStage::find_leader_rotation_index( result = WriteStage::find_leader_rotation_index(
&crdt, &cluster_info,
leader_rotation_interval, leader_rotation_interval,
leader_rotation_interval - 1, leader_rotation_interval - 1,
input.clone(), input.clone(),
@ -535,7 +536,7 @@ mod tests {
len = leader_rotation_interval as usize; len = leader_rotation_interval as usize;
input = vec![entry.clone(); len]; input = vec![entry.clone(); len];
result = WriteStage::find_leader_rotation_index( result = WriteStage::find_leader_rotation_index(
&crdt, &cluster_info,
leader_rotation_interval, leader_rotation_interval,
num_epochs * leader_rotation_interval, num_epochs * leader_rotation_interval,
input.clone(), input.clone(),

View File

@ -4,7 +4,7 @@ extern crate rayon;
extern crate solana; extern crate solana;
use rayon::iter::*; use rayon::iter::*;
use solana::crdt::{Crdt, Node}; use solana::cluster_info::{ClusterInfo, Node};
use solana::logger; use solana::logger;
use solana::ncp::Ncp; use solana::ncp::Ncp;
use solana::packet::{Blob, SharedBlob}; use solana::packet::{Blob, SharedBlob};
@ -16,10 +16,10 @@ use std::sync::{Arc, RwLock};
use std::thread::sleep; use std::thread::sleep;
use std::time::Duration; use std::time::Duration;
fn test_node(exit: Arc<AtomicBool>) -> (Arc<RwLock<Crdt>>, Ncp, UdpSocket) { fn test_node(exit: Arc<AtomicBool>) -> (Arc<RwLock<ClusterInfo>>, Ncp, UdpSocket) {
let mut tn = Node::new_localhost(); let mut tn = Node::new_localhost();
let crdt = Crdt::new(tn.info.clone()).expect("Crdt::new"); let cluster_info = ClusterInfo::new(tn.info.clone()).expect("ClusterInfo::new");
let c = Arc::new(RwLock::new(crdt)); let c = Arc::new(RwLock::new(cluster_info));
let w = Arc::new(RwLock::new(vec![])); let w = Arc::new(RwLock::new(vec![]));
let d = Ncp::new(&c.clone(), w, None, tn.sockets.gossip, exit); let d = Ncp::new(&c.clone(), w, None, tn.sockets.gossip, exit);
(c, d, tn.sockets.replicate.pop().unwrap()) (c, d, tn.sockets.replicate.pop().unwrap())
@ -31,7 +31,7 @@ fn test_node(exit: Arc<AtomicBool>) -> (Arc<RwLock<Crdt>>, Ncp, UdpSocket) {
/// tests that actually use this function are below /// tests that actually use this function are below
fn run_gossip_topo<F>(topo: F) fn run_gossip_topo<F>(topo: F)
where where
F: Fn(&Vec<(Arc<RwLock<Crdt>>, Ncp, UdpSocket)>) -> (), F: Fn(&Vec<(Arc<RwLock<ClusterInfo>>, Ncp, UdpSocket)>) -> (),
{ {
let num: usize = 5; let num: usize = 5;
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
@ -128,7 +128,7 @@ fn gossip_rstar() {
} }
#[test] #[test]
pub fn crdt_retransmit() -> result::Result<()> { pub fn cluster_info_retransmit() -> result::Result<()> {
logger::setup(); logger::setup();
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
trace!("c1:"); trace!("c1:");
@ -161,7 +161,7 @@ pub fn crdt_retransmit() -> result::Result<()> {
assert!(done); assert!(done);
let b = SharedBlob::default(); let b = SharedBlob::default();
b.write().unwrap().meta.size = 10; b.write().unwrap().meta.size = 10;
Crdt::retransmit(&c1, &b, &tn1)?; ClusterInfo::retransmit(&c1, &b, &tn1)?;
let res: Vec<_> = [tn1, tn2, tn3] let res: Vec<_> = [tn1, tn2, tn3]
.into_par_iter() .into_par_iter()
.map(|s| { .map(|s| {

View File

@ -6,7 +6,7 @@ extern crate serde_json;
extern crate solana; extern crate solana;
extern crate solana_program_interface; extern crate solana_program_interface;
use solana::crdt::{Crdt, Node, NodeInfo}; use solana::cluster_info::{ClusterInfo, Node, NodeInfo};
use solana::entry::Entry; use solana::entry::Entry;
use solana::fullnode::{Fullnode, FullnodeReturnType}; use solana::fullnode::{Fullnode, FullnodeReturnType};
use solana::hash::Hash; use solana::hash::Hash;
@ -31,26 +31,26 @@ use std::thread::sleep;
use std::thread::Builder; use std::thread::Builder;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
fn make_spy_node(leader: &NodeInfo) -> (Ncp, Arc<RwLock<Crdt>>, Pubkey) { fn make_spy_node(leader: &NodeInfo) -> (Ncp, Arc<RwLock<ClusterInfo>>, Pubkey) {
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
let mut spy = Node::new_localhost(); let mut spy = Node::new_localhost();
let me = spy.info.id.clone(); let me = spy.info.id.clone();
spy.info.contact_info.tvu = spy.sockets.replicate[0].local_addr().unwrap(); spy.info.contact_info.tvu = spy.sockets.replicate[0].local_addr().unwrap();
spy.info.contact_info.rpu = spy.sockets.transaction[0].local_addr().unwrap(); spy.info.contact_info.rpu = spy.sockets.transaction[0].local_addr().unwrap();
let mut spy_crdt = Crdt::new(spy.info).expect("Crdt::new"); let mut spy_cluster_info = ClusterInfo::new(spy.info).expect("ClusterInfo::new");
spy_crdt.insert(&leader); spy_cluster_info.insert(&leader);
spy_crdt.set_leader(leader.id); spy_cluster_info.set_leader(leader.id);
let spy_crdt_ref = Arc::new(RwLock::new(spy_crdt)); let spy_cluster_info_ref = Arc::new(RwLock::new(spy_cluster_info));
let spy_window = Arc::new(RwLock::new(default_window())); let spy_window = Arc::new(RwLock::new(default_window()));
let ncp = Ncp::new( let ncp = Ncp::new(
&spy_crdt_ref, &spy_cluster_info_ref,
spy_window, spy_window,
None, None,
spy.sockets.gossip, spy.sockets.gossip,
exit.clone(), exit.clone(),
); );
(ncp, spy_crdt_ref, me) (ncp, spy_cluster_info_ref, me)
} }
fn converge(leader: &NodeInfo, num_nodes: usize) -> Vec<NodeInfo> { fn converge(leader: &NodeInfo, num_nodes: usize) -> Vec<NodeInfo> {
@ -971,10 +971,10 @@ fn test_leader_validator_basic() {
_ => panic!("Expected reason for exit to be leader rotation"), _ => panic!("Expected reason for exit to be leader rotation"),
} }
// TODO: We ignore this test for now b/c there's a chance here that the crdt // TODO: We ignore this test for now b/c there's a chance here that the cluster_info
// in the new leader calls the dummy sequence of update_leader -> top_leader() // in the new leader calls the dummy sequence of update_leader -> top_leader()
// (see the TODOs in those functions) during gossip and sets the leader back // (see the TODOs in those functions) during gossip and sets the leader back
// to the old leader, which causes a panic from an assertion failure in crdt broadcast(), // to the old leader, which causes a panic from an assertion failure in cluster_info broadcast(),
// specifically: assert!(me.leader_id != v.id). We can enable this test once we have real // specifically: assert!(me.leader_id != v.id). We can enable this test once we have real
// leader scheduling // leader scheduling
@ -1014,8 +1014,8 @@ fn mk_client(leader: &NodeInfo) -> ThinClient {
.set_read_timeout(Some(Duration::new(1, 0))) .set_read_timeout(Some(Duration::new(1, 0)))
.unwrap(); .unwrap();
let transactions_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let transactions_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
assert!(Crdt::is_valid_address(&leader.contact_info.rpu)); assert!(ClusterInfo::is_valid_address(&leader.contact_info.rpu));
assert!(Crdt::is_valid_address(&leader.contact_info.tpu)); assert!(ClusterInfo::is_valid_address(&leader.contact_info.tpu));
ThinClient::new( ThinClient::new(
leader.contact_info.rpu, leader.contact_info.rpu,
requests_socket, requests_socket,