Refactor the "ReplicatedData" struct

Rename the "ReplicatedData" struct to the "NodeInfo" struct.
Also refactors and renames the members in this struct.
This commit is contained in:
OEM Configuration (temporary user) 2018-07-11 00:18:48 -07:00 committed by Greg Fitzgerald
parent 705720f086
commit 468ac9facd
11 changed files with 142 additions and 134 deletions

View File

@ -8,7 +8,7 @@ extern crate solana;
use bincode::serialize; use bincode::serialize;
use clap::{App, Arg}; use clap::{App, Arg};
use rayon::prelude::*; use rayon::prelude::*;
use solana::crdt::{Crdt, ReplicatedData}; use solana::crdt::{Crdt, NodeInfo};
use solana::drone::DroneRequest; use solana::drone::DroneRequest;
use solana::fullnode::Config; use solana::fullnode::Config;
use solana::hash::Hash; use solana::hash::Hash;
@ -38,7 +38,7 @@ fn sample_tx_count(
exit: Arc<AtomicBool>, exit: Arc<AtomicBool>,
maxes: Arc<RwLock<Vec<(f64, u64)>>>, maxes: Arc<RwLock<Vec<(f64, u64)>>>,
first_count: u64, first_count: u64,
v: ReplicatedData, v: NodeInfo,
sample_period: u64, sample_period: u64,
) { ) {
let mut client = mk_client(&v); let mut client = mk_client(&v);
@ -79,7 +79,7 @@ fn generate_and_send_txs(
tx_clients: &Vec<ThinClient>, tx_clients: &Vec<ThinClient>,
id: &Mint, id: &Mint,
keypairs: &Vec<KeyPair>, keypairs: &Vec<KeyPair>,
leader: &ReplicatedData, leader: &NodeInfo,
txs: i64, txs: i64,
last_id: &mut Hash, last_id: &mut Hash,
threads: usize, threads: usize,
@ -185,12 +185,12 @@ fn main() {
) )
.get_matches(); .get_matches();
let leader: ReplicatedData; let leader: NodeInfo;
if let Some(l) = matches.value_of("leader") { if let Some(l) = matches.value_of("leader") {
leader = read_leader(l.to_string()).node_info; leader = read_leader(l.to_string()).node_info;
} else { } else {
let server_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 8000); let server_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 8000);
leader = ReplicatedData::new_leader(&server_addr); leader = NodeInfo::new_leader(&server_addr);
}; };
let id: Mint; let id: Mint;
@ -319,7 +319,7 @@ fn main() {
} }
} }
fn mk_client(r: &ReplicatedData) -> ThinClient { fn mk_client(r: &NodeInfo) -> ThinClient {
let requests_socket = udp_random_bind(8000, 10000, 5).unwrap(); let requests_socket = udp_random_bind(8000, 10000, 5).unwrap();
let transactions_socket = udp_random_bind(8000, 10000, 5).unwrap(); let transactions_socket = udp_random_bind(8000, 10000, 5).unwrap();
@ -335,11 +335,11 @@ fn mk_client(r: &ReplicatedData) -> ThinClient {
) )
} }
fn spy_node() -> (ReplicatedData, UdpSocket) { fn spy_node() -> (NodeInfo, UdpSocket) {
let gossip_socket_pair = udp_public_bind("gossip", 8000, 10000); let gossip_socket_pair = udp_public_bind("gossip", 8000, 10000);
let pubkey = KeyPair::new().pubkey(); let pubkey = KeyPair::new().pubkey();
let daddr = "0.0.0.0:0".parse().unwrap(); let daddr = "0.0.0.0:0".parse().unwrap();
let node = ReplicatedData::new( let node = NodeInfo::new(
pubkey, pubkey,
//gossip.local_addr().unwrap(), //gossip.local_addr().unwrap(),
gossip_socket_pair.addr, gossip_socket_pair.addr,
@ -352,11 +352,11 @@ fn spy_node() -> (ReplicatedData, UdpSocket) {
} }
fn converge( fn converge(
leader: &ReplicatedData, leader: &NodeInfo,
exit: Arc<AtomicBool>, exit: Arc<AtomicBool>,
num_nodes: usize, num_nodes: usize,
threads: &mut Vec<JoinHandle<()>>, threads: &mut Vec<JoinHandle<()>>,
) -> Vec<ReplicatedData> { ) -> Vec<NodeInfo> {
//lets spy on the network //lets spy on the network
let daddr = "0.0.0.0:0".parse().unwrap(); let daddr = "0.0.0.0:0".parse().unwrap();
let (spy, spy_gossip) = spy_node(); let (spy, spy_gossip) = spy_node();
@ -376,7 +376,7 @@ fn converge(
let mut rv = vec![]; let mut rv = vec![];
//wait for the network to converge, 30 seconds should be plenty //wait for the network to converge, 30 seconds should be plenty
for _ in 0..30 { for _ in 0..30 {
let v: Vec<ReplicatedData> = spy_ref let v: Vec<NodeInfo> = spy_ref
.read() .read()
.unwrap() .unwrap()
.table .table

View File

@ -9,7 +9,7 @@ extern crate tokio_io;
use bincode::deserialize; use bincode::deserialize;
use clap::{App, Arg}; use clap::{App, Arg};
use solana::crdt::ReplicatedData; use solana::crdt::NodeInfo;
use solana::drone::{Drone, DroneRequest}; use solana::drone::{Drone, DroneRequest};
use solana::fullnode::Config; use solana::fullnode::Config;
use solana::mint::Mint; use solana::mint::Mint;
@ -60,12 +60,12 @@ fn main() {
) )
.get_matches(); .get_matches();
let leader: ReplicatedData; let leader: NodeInfo;
if let Some(l) = matches.value_of("leader") { if let Some(l) = matches.value_of("leader") {
leader = read_leader(l.to_string()).node_info; leader = read_leader(l.to_string()).node_info;
} else { } else {
let server_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 8000); let server_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 8000);
leader = ReplicatedData::new_leader(&server_addr); leader = NodeInfo::new_leader(&server_addr);
}; };
let mint: Mint; let mint: Mint;

View File

@ -8,7 +8,7 @@ extern crate solana;
use atty::{is, Stream}; use atty::{is, Stream};
use clap::{App, Arg}; use clap::{App, Arg};
use solana::crdt::{ReplicatedData, TestNode}; use solana::crdt::{NodeInfo, TestNode};
use solana::fullnode::{Config, FullNode, InFile, OutFile}; use solana::fullnode::{Config, FullNode, InFile, OutFile};
use solana::service::Service; use solana::service::Service;
use solana::signature::{KeyPair, KeyPairUtil}; use solana::signature::{KeyPair, KeyPairUtil};
@ -52,7 +52,7 @@ fn main() -> () {
let bind_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 8000); let bind_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 8000);
let mut keypair = KeyPair::new(); let mut keypair = KeyPair::new();
let mut repl_data = ReplicatedData::new_leader_with_pubkey(keypair.pubkey(), &bind_addr); let mut repl_data = NodeInfo::new_leader_with_pubkey(keypair.pubkey(), &bind_addr);
if let Some(l) = matches.value_of("identity") { if let Some(l) = matches.value_of("identity") {
let path = l.to_string(); let path = l.to_string();
if let Ok(file) = File::open(path.clone()) { if let Ok(file) = File::open(path.clone()) {
@ -82,7 +82,7 @@ fn main() -> () {
None, None,
) )
} else { } else {
node.data.current_leader_id = node.data.id.clone(); node.data.leader_id = node.data.id.clone();
let outfile = if let Some(o) = matches.value_of("output") { let outfile = if let Some(o) = matches.value_of("output") {
OutFile::Path(o.to_string()) OutFile::Path(o.to_string())

View File

@ -8,7 +8,7 @@ extern crate solana;
use bincode::serialize; use bincode::serialize;
use clap::{App, Arg, SubCommand}; use clap::{App, Arg, SubCommand};
use solana::crdt::ReplicatedData; use solana::crdt::NodeInfo;
use solana::drone::DroneRequest; use solana::drone::DroneRequest;
use solana::fullnode::Config; use solana::fullnode::Config;
use solana::mint::Mint; use solana::mint::Mint;
@ -56,7 +56,7 @@ impl error::Error for WalletError {
} }
struct WalletConfig { struct WalletConfig {
leader: ReplicatedData, leader: NodeInfo,
id: Mint, id: Mint,
drone_addr: SocketAddr, drone_addr: SocketAddr,
command: WalletCommand, command: WalletCommand,
@ -66,7 +66,7 @@ impl Default for WalletConfig {
fn default() -> WalletConfig { fn default() -> WalletConfig {
let default_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 8000); let default_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 8000);
WalletConfig { WalletConfig {
leader: ReplicatedData::new_leader(&default_addr.clone()), leader: NodeInfo::new_leader(&default_addr.clone()),
id: Mint::new(0), id: Mint::new(0),
drone_addr: default_addr.clone(), drone_addr: default_addr.clone(),
command: WalletCommand::Balance, command: WalletCommand::Balance,
@ -141,12 +141,12 @@ fn parse_args() -> Result<WalletConfig, Box<error::Error>> {
.subcommand(SubCommand::with_name("address").about("Get your public key")) .subcommand(SubCommand::with_name("address").about("Get your public key"))
.get_matches(); .get_matches();
let leader: ReplicatedData; let leader: NodeInfo;
if let Some(l) = matches.value_of("leader") { if let Some(l) = matches.value_of("leader") {
leader = read_leader(l.to_string()).node_info; leader = read_leader(l.to_string()).node_info;
} else { } else {
let server_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 8000); let server_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 8000);
leader = ReplicatedData::new_leader(&server_addr); leader = NodeInfo::new_leader(&server_addr);
}; };
let id: Mint; let id: Mint;
@ -298,7 +298,7 @@ fn read_mint(path: String) -> Result<Mint, Box<error::Error>> {
Ok(mint) Ok(mint)
} }
fn mk_client(r: &ReplicatedData) -> io::Result<ThinClient> { fn mk_client(r: &NodeInfo) -> io::Result<ThinClient> {
let requests_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let requests_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
let transactions_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let transactions_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
requests_socket requests_socket

View File

@ -1,4 +1,4 @@
use crdt::{CrdtError, ReplicatedData}; use crdt::{CrdtError, NodeInfo};
use rand::distributions::{Distribution, Weighted, WeightedChoice}; use rand::distributions::{Distribution, Weighted, WeightedChoice};
use rand::thread_rng; use rand::thread_rng;
use result::Result; use result::Result;
@ -9,7 +9,7 @@ use std::collections::HashMap;
pub const DEFAULT_WEIGHT: u32 = 1; pub const DEFAULT_WEIGHT: u32 = 1;
pub trait ChooseGossipPeerStrategy { pub trait ChooseGossipPeerStrategy {
fn choose_peer<'a>(&self, options: Vec<&'a ReplicatedData>) -> Result<&'a ReplicatedData>; fn choose_peer<'a>(&self, options: Vec<&'a NodeInfo>) -> Result<&'a NodeInfo>;
} }
pub struct ChooseRandomPeerStrategy<'a> { pub struct ChooseRandomPeerStrategy<'a> {
@ -27,7 +27,7 @@ impl<'a, 'b> ChooseRandomPeerStrategy<'a> {
} }
impl<'a> ChooseGossipPeerStrategy for ChooseRandomPeerStrategy<'a> { impl<'a> ChooseGossipPeerStrategy for ChooseRandomPeerStrategy<'a> {
fn choose_peer<'b>(&self, options: Vec<&'b ReplicatedData>) -> Result<&'b ReplicatedData> { fn choose_peer<'b>(&self, options: Vec<&'b NodeInfo>) -> Result<&'b NodeInfo> {
if options.is_empty() { if options.is_empty() {
Err(CrdtError::TooSmall)?; Err(CrdtError::TooSmall)?;
} }
@ -172,7 +172,7 @@ impl<'a> ChooseWeightedPeerStrategy<'a> {
} }
impl<'a> ChooseGossipPeerStrategy for ChooseWeightedPeerStrategy<'a> { impl<'a> ChooseGossipPeerStrategy for ChooseWeightedPeerStrategy<'a> {
fn choose_peer<'b>(&self, options: Vec<&'b ReplicatedData>) -> Result<&'b ReplicatedData> { fn choose_peer<'b>(&self, options: Vec<&'b NodeInfo>) -> Result<&'b NodeInfo> {
if options.len() < 1 { if options.len() < 1 {
Err(CrdtError::TooSmall)?; Err(CrdtError::TooSmall)?;
} }

View File

@ -108,26 +108,32 @@ pub struct ContactInfo {
/// destined to the replciate_addr /// destined to the replciate_addr
pub tvu_window: SocketAddr, pub tvu_window: SocketAddr,
/// if this struture changes update this value as well /// if this struture changes update this value as well
/// Always update `ReplicatedData` version too /// Always update `NodeInfo` version too
/// This separate version for addresses allows us to use the `Vote` /// This separate version for addresses allows us to use the `Vote`
/// as means of updating the `ReplicatedData` table without touching the /// as means of updating the `NodeInfo` table without touching the
/// addresses if they haven't changed. /// addresses if they haven't changed.
pub version: u64, pub version: u64,
} }
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] #[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
pub struct ReplicatedData { pub struct LedgerState {
/// last verified hash that was submitted to the leader
pub last_id: Hash,
/// last verified entry count, always increasing
pub entry_height: u64,
}
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
pub struct NodeInfo {
pub id: PublicKey, pub id: PublicKey,
/// If any of the bits change, update increment this value /// If any of the bits change, update increment this value
pub version: u64, pub version: u64,
/// network addresses /// network addresses
pub contact_info: ContactInfo, pub contact_info: ContactInfo,
/// current leader identity /// current leader identity
pub current_leader_id: PublicKey, pub leader_id: PublicKey,
/// last verified hash that was submitted to the leader /// information about the state of the ledger
last_verified_id: Hash, ledger_state: LedgerState,
/// last verified count, always increasing
last_verified_height: u64,
} }
fn make_debug_id(buf: &[u8]) -> u64 { fn make_debug_id(buf: &[u8]) -> u64 {
@ -136,7 +142,7 @@ fn make_debug_id(buf: &[u8]) -> u64 {
.expect("rdr.read_u64 in fn debug_id") .expect("rdr.read_u64 in fn debug_id")
} }
impl ReplicatedData { impl NodeInfo {
pub fn new( pub fn new(
id: PublicKey, id: PublicKey,
ncp: SocketAddr, ncp: SocketAddr,
@ -144,8 +150,8 @@ impl ReplicatedData {
rpu: SocketAddr, rpu: SocketAddr,
tpu: SocketAddr, tpu: SocketAddr,
tvu_window: SocketAddr, tvu_window: SocketAddr,
) -> ReplicatedData { ) -> NodeInfo {
ReplicatedData { NodeInfo {
id, id,
version: 0, version: 0,
contact_info: ContactInfo { contact_info: ContactInfo {
@ -156,9 +162,11 @@ impl ReplicatedData {
tvu_window, tvu_window,
version: 0, version: 0,
}, },
current_leader_id: PublicKey::default(), leader_id: PublicKey::default(),
last_verified_id: Hash::default(), ledger_state: LedgerState {
last_verified_height: 0, last_id: Hash::default(),
entry_height: 0,
},
} }
} }
pub fn debug_id(&self) -> u64 { pub fn debug_id(&self) -> u64 {
@ -175,7 +183,7 @@ impl ReplicatedData {
let replicate_addr = Self::next_port(&bind_addr, 2); let replicate_addr = Self::next_port(&bind_addr, 2);
let requests_addr = Self::next_port(&bind_addr, 3); let requests_addr = Self::next_port(&bind_addr, 3);
let repair_addr = Self::next_port(&bind_addr, 4); let repair_addr = Self::next_port(&bind_addr, 4);
ReplicatedData::new( NodeInfo::new(
pubkey, pubkey,
gossip_addr, gossip_addr,
replicate_addr, replicate_addr,
@ -190,7 +198,7 @@ impl ReplicatedData {
} }
pub fn new_entry_point(gossip_addr: SocketAddr) -> Self { pub fn new_entry_point(gossip_addr: SocketAddr) -> Self {
let daddr: SocketAddr = "0.0.0.0:0".parse().unwrap(); let daddr: SocketAddr = "0.0.0.0:0".parse().unwrap();
ReplicatedData::new( NodeInfo::new(
PublicKey::default(), PublicKey::default(),
gossip_addr, gossip_addr,
daddr.clone(), daddr.clone(),
@ -201,9 +209,9 @@ impl ReplicatedData {
} }
} }
/// `Crdt` structure keeps a table of `ReplicatedData` structs /// `Crdt` structure keeps a table of `NodeInfo` structs
/// # Properties /// # Properties
/// * `table` - map of public id's to versioned and signed ReplicatedData structs /// * `table` - map of public id's to versioned and signed NodeInfo structs
/// * `local` - map of public id's to what `self.update_index` `self.table` was updated /// * `local` - map of public id's to what `self.update_index` `self.table` was updated
/// * `remote` - map of public id's to the `remote.update_index` was sent /// * `remote` - map of public id's to the `remote.update_index` was sent
/// * `update_index` - my update index /// * `update_index` - my update index
@ -214,7 +222,7 @@ impl ReplicatedData {
/// No attempt to keep track of timeouts or dropped requests is made, or should be. /// No attempt to keep track of timeouts or dropped requests is made, or should be.
pub struct Crdt { pub struct Crdt {
/// table of everyone in the network /// table of everyone in the network
pub table: HashMap<PublicKey, ReplicatedData>, pub table: HashMap<PublicKey, NodeInfo>,
/// Value of my update index when entry in table was updated. /// Value of my update index when entry in table was updated.
/// Nodes will ask for updates since `update_index`, and this node /// Nodes will ask for updates since `update_index`, and this node
/// should respond with all the identities that are greater then the /// should respond with all the identities that are greater then the
@ -238,17 +246,17 @@ enum Protocol {
/// this doesn't update the `remote` update index, but it allows the /// this doesn't update the `remote` update index, but it allows the
/// recepient of this request to add knowledge of this node to the network /// recepient of this request to add knowledge of this node to the network
/// (last update index i saw from you, my replicated data) /// (last update index i saw from you, my replicated data)
RequestUpdates(u64, ReplicatedData), RequestUpdates(u64, NodeInfo),
//TODO might need a since? //TODO might need a since?
/// from id, form's last update index, ReplicatedData /// from id, form's last update index, NodeInfo
ReceiveUpdates(PublicKey, u64, Vec<ReplicatedData>, Vec<(PublicKey, u64)>), ReceiveUpdates(PublicKey, u64, Vec<NodeInfo>, Vec<(PublicKey, u64)>),
/// ask for a missing index /// ask for a missing index
/// (my replicated data to keep alive, missing window index) /// (my replicated data to keep alive, missing window index)
RequestWindowIndex(ReplicatedData, u64), RequestWindowIndex(NodeInfo, u64),
} }
impl Crdt { impl Crdt {
pub fn new(me: ReplicatedData) -> Crdt { pub fn new(me: NodeInfo) -> Crdt {
assert_eq!(me.version, 0); assert_eq!(me.version, 0);
let mut g = Crdt { let mut g = Crdt {
table: HashMap::new(), table: HashMap::new(),
@ -266,11 +274,11 @@ impl Crdt {
pub fn debug_id(&self) -> u64 { pub fn debug_id(&self) -> u64 {
make_debug_id(&self.me) make_debug_id(&self.me)
} }
pub fn my_data(&self) -> &ReplicatedData { pub fn my_data(&self) -> &NodeInfo {
&self.table[&self.me] &self.table[&self.me]
} }
pub fn leader_data(&self) -> Option<&ReplicatedData> { pub fn leader_data(&self) -> Option<&NodeInfo> {
self.table.get(&(self.table[&self.me].current_leader_id)) self.table.get(&(self.table[&self.me].leader_id))
} }
pub fn set_leader(&mut self, key: PublicKey) -> () { pub fn set_leader(&mut self, key: PublicKey) -> () {
@ -279,9 +287,9 @@ impl Crdt {
"{:x}: LEADER_UPDATE TO {:x} from {:x}", "{:x}: LEADER_UPDATE TO {:x} from {:x}",
me.debug_id(), me.debug_id(),
make_debug_id(&key), make_debug_id(&key),
make_debug_id(&me.current_leader_id), make_debug_id(&me.leader_id),
); );
me.current_leader_id = key; me.leader_id = key;
me.version += 1; me.version += 1;
self.insert(&me); self.insert(&me);
} }
@ -321,7 +329,7 @@ impl Crdt {
} else { } else {
let mut data = self.table[pubkey].clone(); let mut data = self.table[pubkey].clone();
data.version = v.version; data.version = v.version;
data.last_verified_id = last_id; data.ledger_state.last_id = last_id;
debug!( debug!(
"{:x}: INSERTING VOTE! for {:x}", "{:x}: INSERTING VOTE! for {:x}",
self.debug_id(), self.debug_id(),
@ -349,7 +357,7 @@ impl Crdt {
self.insert_vote(&v.0, &v.1, v.2); self.insert_vote(&v.0, &v.1, v.2);
} }
} }
pub fn insert(&mut self, v: &ReplicatedData) { pub fn insert(&mut self, v: &NodeInfo) {
// TODO check that last_verified types are always increasing // TODO check that last_verified types are always increasing
//update the peer table //update the peer table
if self.table.get(&v.id).is_none() || (v.version > self.table[&v.id].version) { if self.table.get(&v.id).is_none() || (v.version > self.table[&v.id].version) {
@ -454,7 +462,7 @@ impl Crdt {
} }
pub fn index_blobs( pub fn index_blobs(
me: &ReplicatedData, me: &NodeInfo,
blobs: &Vec<SharedBlob>, blobs: &Vec<SharedBlob>,
receive_index: &mut u64, receive_index: &mut u64,
) -> Result<()> { ) -> Result<()> {
@ -472,12 +480,12 @@ impl Crdt {
} }
/// compute broadcast table /// compute broadcast table
/// # Remarks /// # Remarks
pub fn compute_broadcast_table(&self) -> Vec<ReplicatedData> { pub fn compute_broadcast_table(&self) -> Vec<NodeInfo> {
let live: Vec<_> = self.alive.iter().collect(); let live: Vec<_> = self.alive.iter().collect();
//thread_rng().shuffle(&mut live); //thread_rng().shuffle(&mut live);
let daddr = "0.0.0.0:0".parse().unwrap(); let daddr = "0.0.0.0:0".parse().unwrap();
let me = &self.table[&self.me]; let me = &self.table[&self.me];
let cloned_table: Vec<ReplicatedData> = live.iter() let cloned_table: Vec<NodeInfo> = live.iter()
.map(|x| &self.table[x.0]) .map(|x| &self.table[x.0])
.filter(|v| { .filter(|v| {
if me.id == v.id { if me.id == v.id {
@ -509,8 +517,8 @@ impl Crdt {
/// # Remarks /// # Remarks
/// We need to avoid having obj locked while doing any io, such as the `send_to` /// We need to avoid having obj locked while doing any io, such as the `send_to`
pub fn broadcast( pub fn broadcast(
me: &ReplicatedData, me: &NodeInfo,
broadcast_table: &Vec<ReplicatedData>, broadcast_table: &Vec<NodeInfo>,
window: &Window, window: &Window,
s: &UdpSocket, s: &UdpSocket,
transmit_index: &mut u64, transmit_index: &mut u64,
@ -540,7 +548,7 @@ impl Crdt {
.into_iter() .into_iter()
.map(|(b, v)| { .map(|(b, v)| {
// only leader should be broadcasting // only leader should be broadcasting
assert!(me.current_leader_id != v.id); assert!(me.leader_id != v.id);
let bl = b.unwrap(); let bl = b.unwrap();
let blob = bl.read().expect("blob read lock in streamer::broadcast"); let blob = bl.read().expect("blob read lock in streamer::broadcast");
//TODO profile this, may need multiple sockets for par_iter //TODO profile this, may need multiple sockets for par_iter
@ -580,7 +588,7 @@ impl Crdt {
/// # Remarks /// # Remarks
/// We need to avoid having obj locked while doing any io, such as the `send_to` /// We need to avoid having obj locked while doing any io, such as the `send_to`
pub fn retransmit(obj: &Arc<RwLock<Self>>, blob: &SharedBlob, s: &UdpSocket) -> Result<()> { pub fn retransmit(obj: &Arc<RwLock<Self>>, blob: &SharedBlob, s: &UdpSocket) -> Result<()> {
let (me, table): (ReplicatedData, Vec<ReplicatedData>) = { let (me, table): (NodeInfo, Vec<NodeInfo>) = {
// copy to avoid locking during IO // copy to avoid locking during IO
let s = obj.read().expect("'obj' read lock in pub fn retransmit"); let s = obj.read().expect("'obj' read lock in pub fn retransmit");
(s.table[&s.me].clone(), s.table.values().cloned().collect()) (s.table[&s.me].clone(), s.table.values().cloned().collect())
@ -596,7 +604,7 @@ impl Crdt {
.filter(|v| { .filter(|v| {
if me.id == v.id { if me.id == v.id {
false false
} else if me.current_leader_id == v.id { } else if me.leader_id == v.id {
trace!("skip retransmit to leader {:?}", v.id); trace!("skip retransmit to leader {:?}", v.id);
false false
} else if v.contact_info.tvu == daddr { } else if v.contact_info.tvu == daddr {
@ -646,7 +654,7 @@ impl Crdt {
1.0 1.0
} }
fn get_updates_since(&self, v: u64) -> (PublicKey, u64, Vec<ReplicatedData>) { fn get_updates_since(&self, v: u64) -> (PublicKey, u64, Vec<NodeInfo>) {
//trace!("get updates since {}", v); //trace!("get updates since {}", v);
let data = self.table let data = self.table
.values() .values()
@ -715,8 +723,8 @@ impl Crdt {
let mut me = self.my_data().clone(); let mut me = self.my_data().clone();
let leader = self.leader_data().ok_or(CrdtError::NoLeader)?.clone(); let leader = self.leader_data().ok_or(CrdtError::NoLeader)?.clone();
me.version += 1; me.version += 1;
me.last_verified_id = last_id; me.ledger_state.last_id = last_id;
me.last_verified_height = height; me.ledger_state.entry_height = height;
let vote = Vote { let vote = Vote {
version: me.version, version: me.version,
contact_info_version: me.contact_info.version, contact_info_version: me.contact_info.version,
@ -752,11 +760,11 @@ impl Crdt {
fn top_leader(&self) -> Option<PublicKey> { fn top_leader(&self) -> Option<PublicKey> {
let mut table = HashMap::new(); let mut table = HashMap::new();
let def = PublicKey::default(); let def = PublicKey::default();
let cur = self.table.values().filter(|x| x.current_leader_id != def); let cur = self.table.values().filter(|x| x.leader_id != def);
for v in cur { for v in cur {
let cnt = table.entry(&v.current_leader_id).or_insert(0); let cnt = table.entry(&v.leader_id).or_insert(0);
*cnt += 1; *cnt += 1;
trace!("leader {:x} {}", make_debug_id(&v.current_leader_id), *cnt); trace!("leader {:x} {}", make_debug_id(&v.leader_id), *cnt);
} }
let mut sorted: Vec<(&PublicKey, usize)> = table.into_iter().collect(); let mut sorted: Vec<(&PublicKey, usize)> = table.into_iter().collect();
let my_id = self.debug_id(); let my_id = self.debug_id();
@ -776,7 +784,7 @@ impl Crdt {
/// A t-shirt for the first person to actually use this bad behavior to attack the alpha testnet /// A t-shirt for the first person to actually use this bad behavior to attack the alpha testnet
fn update_leader(&mut self) { fn update_leader(&mut self) {
if let Some(leader_id) = self.top_leader() { if let Some(leader_id) = self.top_leader() {
if self.my_data().current_leader_id != leader_id { if self.my_data().leader_id != leader_id {
if self.table.get(&leader_id).is_some() { if self.table.get(&leader_id).is_some() {
self.set_leader(leader_id); self.set_leader(leader_id);
} }
@ -793,7 +801,7 @@ impl Crdt {
&mut self, &mut self,
from: PublicKey, from: PublicKey,
update_index: u64, update_index: u64,
data: &[ReplicatedData], data: &[NodeInfo],
external_liveness: &[(PublicKey, u64)], external_liveness: &[(PublicKey, u64)],
) { ) {
trace!("got updates {}", data.len()); trace!("got updates {}", data.len());
@ -857,8 +865,8 @@ impl Crdt {
} }
fn run_window_request( fn run_window_request(
window: &Window, window: &Window,
me: &ReplicatedData, me: &NodeInfo,
from: &ReplicatedData, from: &NodeInfo,
ix: u64, ix: u64,
blob_recycler: &BlobRecycler, blob_recycler: &BlobRecycler,
) -> Option<SharedBlob> { ) -> Option<SharedBlob> {
@ -877,7 +885,7 @@ impl Crdt {
// Allow retransmission of this response if the node // Allow retransmission of this response if the node
// is the leader and the number of repair requests equals // is the leader and the number of repair requests equals
// a power of two // a power of two
if me.current_leader_id == me.id if me.leader_id == me.id
&& (num_retransmits == 0 || num_retransmits.is_power_of_two()) && (num_retransmits == 0 || num_retransmits.is_power_of_two())
{ {
sender_id = me.id sender_id = me.id
@ -1079,7 +1087,7 @@ pub struct Sockets {
} }
pub struct TestNode { pub struct TestNode {
pub data: ReplicatedData, pub data: NodeInfo,
pub sockets: Sockets, pub sockets: Sockets,
} }
@ -1098,7 +1106,7 @@ impl TestNode {
let respond = UdpSocket::bind("0.0.0.0:0").unwrap(); let respond = UdpSocket::bind("0.0.0.0:0").unwrap();
let broadcast = UdpSocket::bind("0.0.0.0:0").unwrap(); let broadcast = UdpSocket::bind("0.0.0.0:0").unwrap();
let retransmit = UdpSocket::bind("0.0.0.0:0").unwrap(); let retransmit = UdpSocket::bind("0.0.0.0:0").unwrap();
let data = ReplicatedData::new( let data = NodeInfo::new(
pubkey, pubkey,
gossip.local_addr().unwrap(), gossip.local_addr().unwrap(),
replicate.local_addr().unwrap(), replicate.local_addr().unwrap(),
@ -1121,7 +1129,7 @@ impl TestNode {
}, },
} }
} }
pub fn new_with_bind_addr(data: ReplicatedData, bind_addr: SocketAddr) -> TestNode { pub fn new_with_bind_addr(data: NodeInfo, bind_addr: SocketAddr) -> TestNode {
let mut local_gossip_addr = bind_addr.clone(); let mut local_gossip_addr = bind_addr.clone();
local_gossip_addr.set_port(data.contact_info.ncp.port()); local_gossip_addr.set_port(data.contact_info.ncp.port());
@ -1171,7 +1179,7 @@ impl TestNode {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use crdt::{ use crdt::{
parse_port_or_addr, Crdt, CrdtError, ReplicatedData, GOSSIP_PURGE_MILLIS, parse_port_or_addr, Crdt, CrdtError, NodeInfo, GOSSIP_PURGE_MILLIS,
GOSSIP_SLEEP_MILLIS, MIN_TABLE_SIZE, GOSSIP_SLEEP_MILLIS, MIN_TABLE_SIZE,
}; };
use hash::Hash; use hash::Hash;
@ -1198,7 +1206,7 @@ mod tests {
} }
#[test] #[test]
fn insert_test() { fn insert_test() {
let mut d = ReplicatedData::new( let mut d = NodeInfo::new(
KeyPair::new().pubkey(), KeyPair::new().pubkey(),
"127.0.0.1:1234".parse().unwrap(), "127.0.0.1:1234".parse().unwrap(),
"127.0.0.1:1235".parse().unwrap(), "127.0.0.1:1235".parse().unwrap(),
@ -1218,11 +1226,11 @@ mod tests {
} }
#[test] #[test]
fn test_new_vote() { fn test_new_vote() {
let d = ReplicatedData::new_leader(&"127.0.0.1:1234".parse().unwrap()); let d = NodeInfo::new_leader(&"127.0.0.1:1234".parse().unwrap());
assert_eq!(d.version, 0); assert_eq!(d.version, 0);
let mut crdt = Crdt::new(d.clone()); let mut crdt = Crdt::new(d.clone());
assert_eq!(crdt.table[&d.id].version, 0); assert_eq!(crdt.table[&d.id].version, 0);
let leader = ReplicatedData::new_leader(&"127.0.0.2:1235".parse().unwrap()); let leader = NodeInfo::new_leader(&"127.0.0.2:1235".parse().unwrap());
assert_ne!(d.id, leader.id); assert_ne!(d.id, leader.id);
assert_matches!( assert_matches!(
crdt.new_vote(0, Hash::default()).err(), crdt.new_vote(0, Hash::default()).err(),
@ -1245,7 +1253,7 @@ mod tests {
#[test] #[test]
fn test_insert_vote() { fn test_insert_vote() {
let d = ReplicatedData::new_leader(&"127.0.0.1:1234".parse().unwrap()); let d = NodeInfo::new_leader(&"127.0.0.1:1234".parse().unwrap());
assert_eq!(d.version, 0); assert_eq!(d.version, 0);
let mut crdt = Crdt::new(d.clone()); let mut crdt = Crdt::new(d.clone());
assert_eq!(crdt.table[&d.id].version, 0); assert_eq!(crdt.table[&d.id].version, 0);
@ -1277,10 +1285,10 @@ mod tests {
fn test_insert_vote_leader_liveness() { fn test_insert_vote_leader_liveness() {
logger::setup(); logger::setup();
// TODO: remove this test once leaders vote // TODO: remove this test once leaders vote
let d = ReplicatedData::new_leader(&"127.0.0.1:1234".parse().unwrap()); let d = NodeInfo::new_leader(&"127.0.0.1:1234".parse().unwrap());
assert_eq!(d.version, 0); assert_eq!(d.version, 0);
let mut crdt = Crdt::new(d.clone()); let mut crdt = Crdt::new(d.clone());
let leader = ReplicatedData::new_leader(&"127.0.0.2:1235".parse().unwrap()); let leader = NodeInfo::new_leader(&"127.0.0.2:1235".parse().unwrap());
assert_ne!(d.id, leader.id); assert_ne!(d.id, leader.id);
crdt.insert(&leader); crdt.insert(&leader);
crdt.set_leader(leader.id); crdt.set_leader(leader.id);
@ -1300,7 +1308,7 @@ mod tests {
assert!(updated > live); assert!(updated > live);
} }
fn sorted(ls: &Vec<ReplicatedData>) -> Vec<ReplicatedData> { fn sorted(ls: &Vec<NodeInfo>) -> Vec<NodeInfo> {
let mut copy: Vec<_> = ls.iter().cloned().collect(); let mut copy: Vec<_> = ls.iter().cloned().collect();
copy.sort_by(|x, y| x.id.cmp(&y.id)); copy.sort_by(|x, y| x.id.cmp(&y.id));
copy copy
@ -1308,7 +1316,7 @@ mod tests {
#[test] #[test]
fn replicated_data_new_leader_with_pubkey() { fn replicated_data_new_leader_with_pubkey() {
let kp = KeyPair::new(); let kp = KeyPair::new();
let d1 = ReplicatedData::new_leader_with_pubkey( let d1 = NodeInfo::new_leader_with_pubkey(
kp.pubkey().clone(), kp.pubkey().clone(),
&"127.0.0.1:1234".parse().unwrap(), &"127.0.0.1:1234".parse().unwrap(),
); );
@ -1324,7 +1332,7 @@ mod tests {
} }
#[test] #[test]
fn update_test() { fn update_test() {
let d1 = ReplicatedData::new( let d1 = NodeInfo::new(
KeyPair::new().pubkey(), KeyPair::new().pubkey(),
"127.0.0.1:1234".parse().unwrap(), "127.0.0.1:1234".parse().unwrap(),
"127.0.0.1:1235".parse().unwrap(), "127.0.0.1:1235".parse().unwrap(),
@ -1332,7 +1340,7 @@ mod tests {
"127.0.0.1:1237".parse().unwrap(), "127.0.0.1:1237".parse().unwrap(),
"127.0.0.1:1238".parse().unwrap(), "127.0.0.1:1238".parse().unwrap(),
); );
let d2 = ReplicatedData::new( let d2 = NodeInfo::new(
KeyPair::new().pubkey(), KeyPair::new().pubkey(),
"127.0.0.1:1234".parse().unwrap(), "127.0.0.1:1234".parse().unwrap(),
"127.0.0.1:1235".parse().unwrap(), "127.0.0.1:1235".parse().unwrap(),
@ -1340,7 +1348,7 @@ mod tests {
"127.0.0.1:1237".parse().unwrap(), "127.0.0.1:1237".parse().unwrap(),
"127.0.0.1:1238".parse().unwrap(), "127.0.0.1:1238".parse().unwrap(),
); );
let d3 = ReplicatedData::new( let d3 = NodeInfo::new(
KeyPair::new().pubkey(), KeyPair::new().pubkey(),
"127.0.0.1:1234".parse().unwrap(), "127.0.0.1:1234".parse().unwrap(),
"127.0.0.1:1235".parse().unwrap(), "127.0.0.1:1235".parse().unwrap(),
@ -1376,14 +1384,14 @@ mod tests {
sorted(&crdt2.table.values().map(|x| x.clone()).collect()), sorted(&crdt2.table.values().map(|x| x.clone()).collect()),
sorted(&crdt.table.values().map(|x| x.clone()).collect()) sorted(&crdt.table.values().map(|x| x.clone()).collect())
); );
let d4 = ReplicatedData::new_entry_point("127.0.0.4:1234".parse().unwrap()); let d4 = NodeInfo::new_entry_point("127.0.0.4:1234".parse().unwrap());
crdt.insert(&d4); crdt.insert(&d4);
let (_key, _ix, ups) = crdt.get_updates_since(0); let (_key, _ix, ups) = crdt.get_updates_since(0);
assert_eq!(sorted(&ups), sorted(&vec![d2.clone(), d1, d3])); assert_eq!(sorted(&ups), sorted(&vec![d2.clone(), d1, d3]));
} }
#[test] #[test]
fn window_index_request() { fn window_index_request() {
let me = ReplicatedData::new( let me = NodeInfo::new(
KeyPair::new().pubkey(), KeyPair::new().pubkey(),
"127.0.0.1:1234".parse().unwrap(), "127.0.0.1:1234".parse().unwrap(),
"127.0.0.1:1235".parse().unwrap(), "127.0.0.1:1235".parse().unwrap(),
@ -1394,7 +1402,7 @@ mod tests {
let mut crdt = Crdt::new(me.clone()); let mut crdt = Crdt::new(me.clone());
let rv = crdt.window_index_request(0); let rv = crdt.window_index_request(0);
assert_matches!(rv, Err(Error::CrdtError(CrdtError::TooSmall))); assert_matches!(rv, Err(Error::CrdtError(CrdtError::TooSmall)));
let nxt = ReplicatedData::new( let nxt = NodeInfo::new(
KeyPair::new().pubkey(), KeyPair::new().pubkey(),
"127.0.0.1:1234".parse().unwrap(), "127.0.0.1:1234".parse().unwrap(),
"127.0.0.1:1235".parse().unwrap(), "127.0.0.1:1235".parse().unwrap(),
@ -1405,7 +1413,7 @@ mod tests {
crdt.insert(&nxt); crdt.insert(&nxt);
let rv = crdt.window_index_request(0); let rv = crdt.window_index_request(0);
assert_matches!(rv, Err(Error::CrdtError(CrdtError::TooSmall))); assert_matches!(rv, Err(Error::CrdtError(CrdtError::TooSmall)));
let nxt = ReplicatedData::new( let nxt = NodeInfo::new(
KeyPair::new().pubkey(), KeyPair::new().pubkey(),
"127.0.0.2:1234".parse().unwrap(), "127.0.0.2:1234".parse().unwrap(),
"127.0.0.1:1235".parse().unwrap(), "127.0.0.1:1235".parse().unwrap(),
@ -1418,7 +1426,7 @@ mod tests {
assert_eq!(nxt.contact_info.ncp, "127.0.0.2:1234".parse().unwrap()); assert_eq!(nxt.contact_info.ncp, "127.0.0.2:1234".parse().unwrap());
assert_eq!(rv.0, "127.0.0.2:1234".parse().unwrap()); assert_eq!(rv.0, "127.0.0.2:1234".parse().unwrap());
let nxt = ReplicatedData::new( let nxt = NodeInfo::new(
KeyPair::new().pubkey(), KeyPair::new().pubkey(),
"127.0.0.3:1234".parse().unwrap(), "127.0.0.3:1234".parse().unwrap(),
"127.0.0.1:1235".parse().unwrap(), "127.0.0.1:1235".parse().unwrap(),
@ -1445,7 +1453,7 @@ mod tests {
/// test that gossip requests are eventually generated for all nodes /// test that gossip requests are eventually generated for all nodes
#[test] #[test]
fn gossip_request() { fn gossip_request() {
let me = ReplicatedData::new( let me = NodeInfo::new(
KeyPair::new().pubkey(), KeyPair::new().pubkey(),
"127.0.0.1:1234".parse().unwrap(), "127.0.0.1:1234".parse().unwrap(),
"127.0.0.1:1235".parse().unwrap(), "127.0.0.1:1235".parse().unwrap(),
@ -1456,7 +1464,7 @@ mod tests {
let mut crdt = Crdt::new(me.clone()); let mut crdt = Crdt::new(me.clone());
let rv = crdt.gossip_request(); let rv = crdt.gossip_request();
assert_matches!(rv, Err(Error::CrdtError(CrdtError::TooSmall))); assert_matches!(rv, Err(Error::CrdtError(CrdtError::TooSmall)));
let nxt1 = ReplicatedData::new( let nxt1 = NodeInfo::new(
KeyPair::new().pubkey(), KeyPair::new().pubkey(),
"127.0.0.2:1234".parse().unwrap(), "127.0.0.2:1234".parse().unwrap(),
"127.0.0.1:1235".parse().unwrap(), "127.0.0.1:1235".parse().unwrap(),
@ -1470,7 +1478,7 @@ mod tests {
let rv = crdt.gossip_request().unwrap(); let rv = crdt.gossip_request().unwrap();
assert_eq!(rv.0, nxt1.contact_info.ncp); assert_eq!(rv.0, nxt1.contact_info.ncp);
let nxt2 = ReplicatedData::new_entry_point("127.0.0.3:1234".parse().unwrap()); let nxt2 = NodeInfo::new_entry_point("127.0.0.3:1234".parse().unwrap());
crdt.insert(&nxt2); crdt.insert(&nxt2);
// check that the service works // check that the service works
// and that it eventually produces a request for both nodes // and that it eventually produces a request for both nodes
@ -1511,9 +1519,9 @@ mod tests {
#[test] #[test]
fn purge_test() { fn purge_test() {
logger::setup(); logger::setup();
let me = ReplicatedData::new_leader(&"127.0.0.1:1234".parse().unwrap()); let me = NodeInfo::new_leader(&"127.0.0.1:1234".parse().unwrap());
let mut crdt = Crdt::new(me.clone()); let mut crdt = Crdt::new(me.clone());
let nxt = ReplicatedData::new_leader(&"127.0.0.2:1234".parse().unwrap()); let nxt = NodeInfo::new_leader(&"127.0.0.2:1234".parse().unwrap());
assert_ne!(me.id, nxt.id); assert_ne!(me.id, nxt.id);
crdt.set_leader(me.id); crdt.set_leader(me.id);
crdt.insert(&nxt); crdt.insert(&nxt);
@ -1532,7 +1540,7 @@ mod tests {
let rv = crdt.gossip_request().unwrap(); let rv = crdt.gossip_request().unwrap();
assert_eq!(rv.0, nxt.contact_info.ncp); assert_eq!(rv.0, nxt.contact_info.ncp);
let nxt2 = ReplicatedData::new_leader(&"127.0.0.2:1234".parse().unwrap()); let nxt2 = NodeInfo::new_leader(&"127.0.0.2:1234".parse().unwrap());
assert_ne!(me.id, nxt2.id); assert_ne!(me.id, nxt2.id);
assert_ne!(nxt.id, nxt2.id); assert_ne!(nxt.id, nxt2.id);
crdt.insert(&nxt2); crdt.insert(&nxt2);
@ -1555,7 +1563,7 @@ mod tests {
#[test] #[test]
fn run_window_request() { fn run_window_request() {
let window = default_window(); let window = default_window();
let me = ReplicatedData::new( let me = NodeInfo::new(
KeyPair::new().pubkey(), KeyPair::new().pubkey(),
"127.0.0.1:1234".parse().unwrap(), "127.0.0.1:1234".parse().unwrap(),
"127.0.0.1:1235".parse().unwrap(), "127.0.0.1:1235".parse().unwrap(),
@ -1584,10 +1592,10 @@ mod tests {
fn run_window_request_with_backoff() { fn run_window_request_with_backoff() {
let window = default_window(); let window = default_window();
let mut me = ReplicatedData::new_leader(&"127.0.0.1:1234".parse().unwrap()); let mut me = NodeInfo::new_leader(&"127.0.0.1:1234".parse().unwrap());
me.current_leader_id = me.id; me.leader_id = me.id;
let mock_peer = ReplicatedData::new_leader(&"127.0.0.1:1234".parse().unwrap()); let mock_peer = NodeInfo::new_leader(&"127.0.0.1:1234".parse().unwrap());
let recycler = BlobRecycler::default(); let recycler = BlobRecycler::default();
@ -1620,25 +1628,25 @@ mod tests {
#[test] #[test]
fn test_update_leader() { fn test_update_leader() {
logger::setup(); logger::setup();
let me = ReplicatedData::new_leader(&"127.0.0.1:1234".parse().unwrap()); let me = NodeInfo::new_leader(&"127.0.0.1:1234".parse().unwrap());
let leader0 = ReplicatedData::new_leader(&"127.0.0.1:1234".parse().unwrap()); let leader0 = NodeInfo::new_leader(&"127.0.0.1:1234".parse().unwrap());
let leader1 = ReplicatedData::new_leader(&"127.0.0.1:1234".parse().unwrap()); let leader1 = NodeInfo::new_leader(&"127.0.0.1:1234".parse().unwrap());
let mut crdt = Crdt::new(me.clone()); let mut crdt = Crdt::new(me.clone());
assert_eq!(crdt.top_leader(), None); assert_eq!(crdt.top_leader(), None);
crdt.set_leader(leader0.id); crdt.set_leader(leader0.id);
assert_eq!(crdt.top_leader().unwrap(), leader0.id); assert_eq!(crdt.top_leader().unwrap(), leader0.id);
//add a bunch of nodes with a new leader //add a bunch of nodes with a new leader
for _ in 0..10 { for _ in 0..10 {
let mut dum = ReplicatedData::new_entry_point("127.0.0.1:1234".parse().unwrap()); let mut dum = NodeInfo::new_entry_point("127.0.0.1:1234".parse().unwrap());
dum.id = KeyPair::new().pubkey(); dum.id = KeyPair::new().pubkey();
dum.current_leader_id = leader1.id; dum.leader_id = leader1.id;
crdt.insert(&dum); crdt.insert(&dum);
} }
assert_eq!(crdt.top_leader().unwrap(), leader1.id); assert_eq!(crdt.top_leader().unwrap(), leader1.id);
crdt.update_leader(); crdt.update_leader();
assert_eq!(crdt.my_data().current_leader_id, leader0.id); assert_eq!(crdt.my_data().leader_id, leader0.id);
crdt.insert(&leader1); crdt.insert(&leader1);
crdt.update_leader(); crdt.update_leader();
assert_eq!(crdt.my_data().current_leader_id, leader1.id); assert_eq!(crdt.my_data().leader_id, leader1.id);
} }
} }

View File

@ -523,7 +523,7 @@ mod test {
erasure::add_coding_blobs(blob_recycler, &mut blobs, offset as u64); erasure::add_coding_blobs(blob_recycler, &mut blobs, offset as u64);
let blobs_len = blobs.len(); let blobs_len = blobs.len();
let d = crdt::ReplicatedData::new( let d = crdt::NodeInfo::new(
KeyPair::new().pubkey(), KeyPair::new().pubkey(),
"127.0.0.1:1234".parse().unwrap(), "127.0.0.1:1234".parse().unwrap(),
"127.0.0.1:1235".parse().unwrap(), "127.0.0.1:1235".parse().unwrap(),

View File

@ -1,7 +1,7 @@
//! The `fullnode` module hosts all the fullnode microservices. //! The `fullnode` module hosts all the fullnode microservices.
use bank::Bank; use bank::Bank;
use crdt::{Crdt, ReplicatedData, TestNode}; use crdt::{Crdt, NodeInfo, TestNode};
use entry::Entry; use entry::Entry;
use entry_writer; use entry_writer;
use ledger::Block; use ledger::Block;
@ -44,7 +44,7 @@ pub enum OutFile {
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] #[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
/// Fullnode configuration to be stored in file /// Fullnode configuration to be stored in file
pub struct Config { pub struct Config {
pub node_info: ReplicatedData, pub node_info: NodeInfo,
pkcs8: Vec<u8>, pkcs8: Vec<u8>,
} }
@ -58,7 +58,7 @@ impl Config {
let keypair = let keypair =
KeyPair::from_pkcs8(Input::from(&pkcs8)).expect("from_pkcs8 in fullnode::Config new"); KeyPair::from_pkcs8(Input::from(&pkcs8)).expect("from_pkcs8 in fullnode::Config new");
let pubkey = keypair.pubkey(); let pubkey = keypair.pubkey();
let node_info = ReplicatedData::new_leader_with_pubkey(pubkey, bind_addr); let node_info = NodeInfo::new_leader_with_pubkey(pubkey, bind_addr);
Config { node_info, pkcs8 } Config { node_info, pkcs8 }
} }
pub fn keypair(&self) -> KeyPair { pub fn keypair(&self) -> KeyPair {
@ -104,7 +104,7 @@ impl FullNode {
if !leader { if !leader {
let testnet_addr = network_entry_for_validator.expect("validator requires entry"); let testnet_addr = network_entry_for_validator.expect("validator requires entry");
let network_entry_point = ReplicatedData::new_entry_point(testnet_addr); let network_entry_point = NodeInfo::new_entry_point(testnet_addr);
let keypair = keypair_for_validator.expect("validator requires keypair"); let keypair = keypair_for_validator.expect("validator requires keypair");
let server = FullNode::new_validator( let server = FullNode::new_validator(
keypair, keypair,
@ -121,7 +121,7 @@ impl FullNode {
); );
server server
} else { } else {
node.data.current_leader_id = node.data.id.clone(); node.data.leader_id = node.data.id.clone();
let outfile_for_leader: Box<Write + Send> = match outfile_for_leader { let outfile_for_leader: Box<Write + Send> = match outfile_for_leader {
Some(OutFile::Path(file)) => Box::new( Some(OutFile::Path(file)) => Box::new(
OpenOptions::new() OpenOptions::new()
@ -285,7 +285,7 @@ impl FullNode {
entry_height: u64, entry_height: u64,
ledger_tail: Option<Vec<Entry>>, ledger_tail: Option<Vec<Entry>>,
node: TestNode, node: TestNode,
entry_point: ReplicatedData, entry_point: NodeInfo,
exit: Arc<AtomicBool>, exit: Arc<AtomicBool>,
) -> Self { ) -> Self {
let bank = Arc::new(bank); let bank = Arc::new(bank);

View File

@ -1,7 +1,7 @@
//! The `streamer` module defines a set of services for efficiently pulling data from UDP sockets. //! The `streamer` module defines a set of services for efficiently pulling data from UDP sockets.
//! //!
use counter::Counter; use counter::Counter;
use crdt::{Crdt, CrdtError, ReplicatedData}; use crdt::{Crdt, CrdtError, NodeInfo};
#[cfg(feature = "erasure")] #[cfg(feature = "erasure")]
use erasure; use erasure;
use packet::{ use packet::{
@ -262,7 +262,7 @@ fn recv_window(
) -> Result<()> { ) -> Result<()> {
let timer = Duration::from_millis(200); let timer = Duration::from_millis(200);
let mut dq = r.recv_timeout(timer)?; let mut dq = r.recv_timeout(timer)?;
let maybe_leader: Option<ReplicatedData> = crdt.read() let maybe_leader: Option<NodeInfo> = crdt.read()
.expect("'crdt' read lock in fn recv_window") .expect("'crdt' read lock in fn recv_window")
.leader_data() .leader_data()
.cloned(); .cloned();
@ -574,8 +574,8 @@ pub fn window(
} }
fn broadcast( fn broadcast(
me: &ReplicatedData, me: &NodeInfo,
broadcast_table: &Vec<ReplicatedData>, broadcast_table: &Vec<NodeInfo>,
window: &Window, window: &Window,
recycler: &BlobRecycler, recycler: &BlobRecycler,
r: &BlobReceiver, r: &BlobReceiver,

View File

@ -31,8 +31,8 @@ fn test_node(exit: Arc<AtomicBool>) -> (Arc<RwLock<Crdt>>, Ncp, UdpSocket) {
} }
/// Test that the network converges. /// Test that the network converges.
/// Run until every node in the network has a full ReplicatedData set. /// Run until every node in the network has a full NodeInfo set.
/// Check that nodes stop sending updates after all the ReplicatedData has been shared. /// Check that nodes stop sending updates after all the NodeInfo has been shared.
/// tests that actually use this function are below /// tests that actually use this function are below
fn run_gossip_topo<F>(topo: F) fn run_gossip_topo<F>(topo: F)
where where

View File

@ -5,7 +5,7 @@ extern crate serde_json;
extern crate solana; extern crate solana;
use solana::crdt::TestNode; use solana::crdt::TestNode;
use solana::crdt::{Crdt, ReplicatedData}; use solana::crdt::{Crdt, NodeInfo};
use solana::entry_writer::EntryWriter; use solana::entry_writer::EntryWriter;
use solana::fullnode::{FullNode, InFile, OutFile}; use solana::fullnode::{FullNode, InFile, OutFile};
use solana::logger; use solana::logger;
@ -21,7 +21,7 @@ use std::sync::{Arc, RwLock};
use std::thread::sleep; use std::thread::sleep;
use std::time::Duration; use std::time::Duration;
fn converge(leader: &ReplicatedData, num_nodes: usize) -> Vec<ReplicatedData> { fn converge(leader: &NodeInfo, num_nodes: usize) -> Vec<NodeInfo> {
//lets spy on the network //lets spy on the network
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
let mut spy = TestNode::new(); let mut spy = TestNode::new();
@ -46,7 +46,7 @@ fn converge(leader: &ReplicatedData, num_nodes: usize) -> Vec<ReplicatedData> {
let mut rv = vec![]; let mut rv = vec![];
for _ in 0..30 { for _ in 0..30 {
let num = spy_ref.read().unwrap().convergence(); let num = spy_ref.read().unwrap().convergence();
let mut v: Vec<ReplicatedData> = spy_ref let mut v: Vec<NodeInfo> = spy_ref
.read() .read()
.unwrap() .unwrap()
.table .table
@ -284,7 +284,7 @@ fn test_boot_validator_from_file() {
std::fs::remove_file(ledger_path).unwrap(); std::fs::remove_file(ledger_path).unwrap();
} }
fn create_leader(ledger_path: &str) -> (ReplicatedData, FullNode) { fn create_leader(ledger_path: &str) -> (NodeInfo, FullNode) {
let leader = TestNode::new(); let leader = TestNode::new();
let leader_data = leader.data.clone(); let leader_data = leader.data.clone();
let leader_fullnode = FullNode::new( let leader_fullnode = FullNode::new(
@ -399,7 +399,7 @@ fn test_multi_node_dynamic_network() {
send_tx_and_retry_get_balance(&leader_data, &alice, &bob_pubkey, Some(1000)).unwrap(); send_tx_and_retry_get_balance(&leader_data, &alice, &bob_pubkey, Some(1000)).unwrap();
assert_eq!(leader_balance, 1000); assert_eq!(leader_balance, 1000);
let validators: Vec<(ReplicatedData, FullNode)> = (0..N) let validators: Vec<(NodeInfo, FullNode)> = (0..N)
.into_iter() .into_iter()
.map(|n| { .map(|n| {
let keypair = KeyPair::new(); let keypair = KeyPair::new();
@ -475,7 +475,7 @@ fn test_multi_node_dynamic_network() {
std::fs::remove_file(ledger_path).unwrap(); std::fs::remove_file(ledger_path).unwrap();
} }
fn mk_client(leader: &ReplicatedData) -> ThinClient { fn mk_client(leader: &NodeInfo) -> ThinClient {
let requests_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let requests_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
requests_socket requests_socket
.set_read_timeout(Some(Duration::new(1, 0))) .set_read_timeout(Some(Duration::new(1, 0)))
@ -514,7 +514,7 @@ fn retry_get_balance(
} }
fn send_tx_and_retry_get_balance( fn send_tx_and_retry_get_balance(
leader: &ReplicatedData, leader: &NodeInfo,
alice: &Mint, alice: &Mint,
bob_pubkey: &PublicKey, bob_pubkey: &PublicKey,
expected: Option<i64>, expected: Option<i64>,