Add scalable gossip library (#1546)

* Cluster Replicated Data Store

Separate the data storage and merge strategy from the network IO boundary.
Implement an eager push overlay for transporting recent messages.

Simulation shows fast convergence with 20k nodes.
This commit is contained in:
anatoly yakovenko 2018-11-15 13:23:26 -08:00 committed by GitHub
parent 4a3230904e
commit a41254e18c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
31 changed files with 2821 additions and 1698 deletions

View File

@ -70,6 +70,7 @@ unstable = []
atty = "0.2"
bincode = "1.0.0"
bs58 = "0.2.0"
bv = { version = "0.10.0", features = ["serde"] }
byteorder = "1.2.1"
bytes = "0.4"
chrono = { version = "0.4.0", features = ["serde"] }
@ -88,6 +89,7 @@ solana-jsonrpc-pubsub = "0.3.0"
solana-jsonrpc-ws-server = "0.3.0"
ipnetwork = "0.12.7"
itertools = "0.7.8"
indexmap = "1.0"
libc = "0.2.43"
libloading = "0.5.0"
log = "0.4.2"

View File

@ -69,7 +69,7 @@ fn sample_tx_count(
let mut max_tps = 0.0;
let mut total;
let log_prefix = format!("{:21}:", v.contact_info.tpu.to_string());
let log_prefix = format!("{:21}:", v.tpu.to_string());
loop {
let tx_count = client.transaction_count();
@ -106,7 +106,7 @@ fn sample_tx_count(
tps: max_tps,
tx: total,
};
maxes.write().unwrap().push((v.contact_info.tpu, stats));
maxes.write().unwrap().push((v.tpu, stats));
break;
}
}
@ -257,7 +257,7 @@ fn do_tx_transfers(
println!(
"Transferring 1 unit {} times... to {}",
txs0.len(),
leader.contact_info.tpu
leader.tpu
);
let tx_len = txs0.len();
let transfer_start = Instant::now();
@ -377,7 +377,7 @@ fn fund_keys(client: &mut ThinClient, source: &Keypair, dests: &[Keypair], token
}
fn airdrop_tokens(client: &mut ThinClient, leader: &NodeInfo, id: &Keypair, tx_count: u64) {
let mut drone_addr = leader.contact_info.tpu;
let mut drone_addr = leader.tpu;
drone_addr.set_port(DRONE_PORT);
let starting_balance = client.poll_get_balance(&id.pubkey()).unwrap_or(0);
@ -638,7 +638,7 @@ fn main() {
let leader = leader.unwrap();
println!("leader RPC is at {} {}", leader.contact_info.rpc, leader.id);
println!("leader RPC is at {} {}", leader.rpc, leader.id);
let mut client = mk_client(&leader);
let mut barrier_client = mk_client(&leader);
@ -804,7 +804,7 @@ fn converge(
//lets spy on the network
let (node, gossip_socket) = ClusterInfo::spy_node();
let mut spy_cluster_info = ClusterInfo::new(node).expect("ClusterInfo::new");
spy_cluster_info.insert(&leader);
spy_cluster_info.insert_info(leader.clone());
spy_cluster_info.set_leader(leader.id);
let spy_ref = Arc::new(RwLock::new(spy_cluster_info));
let window = Arc::new(RwLock::new(default_window()));
@ -818,13 +818,7 @@ fn converge(
println!("{}", spy_ref.node_info_trace());
if spy_ref.leader_data().is_some() {
v = spy_ref
.table
.values()
.filter(|x| ClusterInfo::is_valid_address(&x.contact_info.rpc))
.cloned()
.collect();
v = spy_ref.rpc_peers();
if v.len() >= num_nodes {
println!("CONVERGED!");
break;

View File

@ -67,7 +67,7 @@ fn main() {
if let Ok(file) = File::open(path.clone()) {
let parse: serde_json::Result<Config> = serde_json::from_reader(file);
if let Ok(data) = parse {
(data.keypair(), data.node_info.contact_info.ncp)
(data.keypair(), data.node_info.ncp)
} else {
eprintln!("failed to parse {}", path);
exit(1);

View File

@ -63,7 +63,7 @@ fn main() {
if let Ok(file) = File::open(path.clone()) {
let parse: serde_json::Result<Config> = serde_json::from_reader(file);
if let Ok(data) = parse {
(data.keypair(), data.node_info.contact_info.ncp)
(data.keypair(), data.node_info.ncp)
} else {
eprintln!("failed to parse {}", path);
exit(1);
@ -129,7 +129,7 @@ fn main() {
let mut client = mk_client(&leader_info);
let mut drone_addr = leader_info.contact_info.tpu;
let mut drone_addr = leader_info.tpu;
drone_addr.set_port(DRONE_PORT);
let airdrop_amount = 5;
if let Err(e) = request_airdrop(&drone_addr, &keypair.pubkey(), airdrop_amount) {

103
src/bloom.rs Normal file
View File

@ -0,0 +1,103 @@
//! Simple Bloom Filter
use bv::BitVec;
use rand::{self, Rng};
use std::cmp;
use std::marker::PhantomData;
/// Generate a stable hash of `self` for each `hash_index`
/// Best effort can be made for uniqueness of each hash.
pub trait BloomHashIndex {
fn hash(&self, hash_index: u64) -> u64;
}
#[derive(Serialize, Deserialize, Default, Clone, Debug, PartialEq)]
pub struct Bloom<T: BloomHashIndex> {
pub keys: Vec<u64>,
pub bits: BitVec<u8>,
_phantom: PhantomData<T>,
}
impl<T: BloomHashIndex> Bloom<T> {
/// create filter optimal for num size given the `false_rate`
/// the keys are randomized for picking data out of a collision resistant hash of size
/// `keysize` bytes
/// https://hur.st/bloomfilter/
pub fn random(num: usize, false_rate: f64, max_bits: usize) -> Self {
let min_num_bits = ((num as f64 * false_rate.log(2f64))
/ (1f64 / 2f64.powf(2f64.log(2f64))).log(2f64)).ceil()
as usize;
let num_bits = cmp::max(1, cmp::min(min_num_bits, max_bits));
let num_keys = ((num_bits as f64 / num as f64) * 2f64.log(2f64)).round() as usize;
let keys: Vec<u64> = (0..num_keys).map(|_| rand::thread_rng().gen()).collect();
let bits = BitVec::new_fill(false, num_bits as u64);
Bloom {
keys,
bits,
_phantom: Default::default(),
}
}
fn pos(&self, key: &T, k: u64) -> u64 {
key.hash(k) % self.bits.len()
}
pub fn add(&mut self, key: &T) {
for k in &self.keys {
let pos = self.pos(key, *k);
self.bits.set(pos, true);
}
}
pub fn contains(&mut self, key: &T) -> bool {
for k in &self.keys {
let pos = self.pos(key, *k);
if !self.bits.get(pos) {
return false;
}
}
true
}
}
#[cfg(test)]
mod test {
use super::*;
use hash::{hash, Hash};
#[test]
fn test_bloom_filter() {
//empty
let bloom: Bloom<Hash> = Bloom::random(0, 0.1, 100);
assert_eq!(bloom.keys.len(), 0);
assert_eq!(bloom.bits.len(), 1);
//normal
let bloom: Bloom<Hash> = Bloom::random(10, 0.1, 100);
assert_eq!(bloom.keys.len(), 3);
assert_eq!(bloom.bits.len(), 34);
//saturated
let bloom: Bloom<Hash> = Bloom::random(100, 0.1, 100);
assert_eq!(bloom.keys.len(), 1);
assert_eq!(bloom.bits.len(), 100);
}
#[test]
fn test_add_contains() {
let mut bloom: Bloom<Hash> = Bloom::random(100, 0.1, 100);
let key = hash(b"hello");
assert!(!bloom.contains(&key));
bloom.add(&key);
assert!(bloom.contains(&key));
let key = hash(b"world");
assert!(!bloom.contains(&key));
bloom.add(&key);
assert!(bloom.contains(&key));
}
#[test]
fn test_random() {
let mut b1: Bloom<Hash> = Bloom::random(10, 0.1, 100);
let mut b2: Bloom<Hash> = Bloom::random(10, 0.1, 100);
b1.keys.sort();
b2.keys.sort();
assert_ne!(b1.keys, b2.keys);
}
}

View File

@ -13,6 +13,7 @@ use packet::{index_blobs, SharedBlobs};
use rayon::prelude::*;
use result::{Error, Result};
use service::Service;
use solana_sdk::pubkey::Pubkey;
use std::net::UdpSocket;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::mpsc::{Receiver, RecvTimeoutError};
@ -32,6 +33,7 @@ pub enum BroadcastStageReturnType {
fn broadcast(
max_tick_height: Option<u64>,
tick_height: &mut u64,
leader_id: Pubkey,
node_info: &NodeInfo,
broadcast_table: &[NodeInfo],
window: &SharedWindow,
@ -140,6 +142,7 @@ fn broadcast(
// Send blobs out from the window
ClusterInfo::broadcast(
Some(*tick_height) == max_tick_height,
leader_id,
&node_info,
&broadcast_table,
&window,
@ -211,10 +214,12 @@ impl BroadcastStage {
let me = cluster_info.read().unwrap().my_data().clone();
let mut tick_height_ = tick_height;
loop {
let broadcast_table = cluster_info.read().unwrap().compute_broadcast_table();
let broadcast_table = cluster_info.read().unwrap().tpu_peers();
let leader_id = cluster_info.read().unwrap().leader_id();
if let Err(e) = broadcast(
max_tick_height,
&mut tick_height_,
leader_id,
&me,
&broadcast_table,
&window,

View File

@ -1,334 +0,0 @@
use cluster_info::{ClusterInfoError, NodeInfo};
use rand::distributions::{Distribution, Weighted, WeightedChoice};
use rand::thread_rng;
use result::Result;
use solana_sdk::pubkey::Pubkey;
use std;
use std::collections::HashMap;
pub const DEFAULT_WEIGHT: u32 = 1;
pub trait ChooseGossipPeerStrategy {
fn choose_peer<'a>(&self, options: Vec<&'a NodeInfo>) -> Result<&'a NodeInfo>;
}
pub struct ChooseRandomPeerStrategy<'a> {
random: &'a Fn() -> u64,
}
// Given a source of randomness "random", this strategy will randomly pick a validator
// from the input options. This strategy works in isolation, but doesn't leverage any
// rumors from the rest of the gossip network to make more informed decisions about
// which validators have more/less updates
impl<'a, 'b> ChooseRandomPeerStrategy<'a> {
pub fn new(random: &'a Fn() -> u64) -> Self {
ChooseRandomPeerStrategy { random }
}
}
impl<'a> ChooseGossipPeerStrategy for ChooseRandomPeerStrategy<'a> {
fn choose_peer<'b>(&self, options: Vec<&'b NodeInfo>) -> Result<&'b NodeInfo> {
if options.is_empty() {
Err(ClusterInfoError::NoPeers)?;
}
let n = ((self.random)() as usize) % options.len();
Ok(options[n])
}
}
// This strategy uses rumors accumulated from the rest of the network to weight
// the importance of communicating with a particular validator based on cumulative network
// perceiption of the number of updates the validator has to offer. A validator is randomly
// picked based on a weighted sample from the pool of viable choices. The "weight", w, of a
// particular validator "v" is calculated as follows:
//
// w = [Sum for all i in I_v: (rumor_v(i) - observed(v)) * stake(i)] /
// [Sum for all i in I_v: Sum(stake(i))]
//
// where I_v is the set of all validators that returned a rumor about the update_index of
// validator "v", stake(i) is the size of the stake of validator "i", observed(v) is the
// observed update_index from the last direct communication validator "v", and
// rumor_v(i) is the rumored update_index of validator "v" propagated by fellow validator "i".
// This could be a problem if there are validators with large stakes lying about their
// observed updates. There could also be a problem in network partitions, or even just
// when certain validators are disproportionately active, where we hear more rumors about
// certain clusters of nodes that then propagate more rumros about each other. Hopefully
// this can be resolved with a good baseline DEFAULT_WEIGHT, or by implementing lockout
// periods for very active validators in the future.
pub struct ChooseWeightedPeerStrategy<'a> {
// The map of last directly observed update_index for each active validator.
// This is how we get observed(v) from the formula above.
remote: &'a HashMap<Pubkey, u64>,
// The map of rumored update_index for each active validator. Using the formula above,
// to find rumor_v(i), we would first look up "v" in the outer map, then look up
// "i" in the inner map, i.e. look up external_liveness[v][i]
external_liveness: &'a HashMap<Pubkey, HashMap<Pubkey, u64>>,
// A function returning the size of the stake for a particular validator, corresponds
// to stake(i) in the formula above.
get_stake: &'a Fn(Pubkey) -> f64,
}
impl<'a> ChooseWeightedPeerStrategy<'a> {
pub fn new(
remote: &'a HashMap<Pubkey, u64>,
external_liveness: &'a HashMap<Pubkey, HashMap<Pubkey, u64>>,
get_stake: &'a Fn(Pubkey) -> f64,
) -> Self {
ChooseWeightedPeerStrategy {
remote,
external_liveness,
get_stake,
}
}
fn calculate_weighted_remote_index(&self, peer_id: Pubkey) -> u32 {
let mut last_seen_index = 0;
// If the peer is not in our remote table, then we leave last_seen_index as zero.
// Only happens when a peer appears in our cluster_info.table but not in our cluster_info.remote,
// which means a validator was directly injected into our cluster_info.table
if let Some(index) = self.remote.get(&peer_id) {
last_seen_index = *index;
}
let liveness_entry = self.external_liveness.get(&peer_id);
if liveness_entry.is_none() {
return DEFAULT_WEIGHT;
}
let votes = liveness_entry.unwrap();
if votes.is_empty() {
return DEFAULT_WEIGHT;
}
// Calculate the weighted average of the rumors
let mut relevant_votes = vec![];
let total_stake = votes.iter().fold(0.0, |total_stake, (&id, &vote)| {
let stake = (self.get_stake)(id);
// If the total stake is going to overflow u64, pick
// the larger of either the current total_stake, or the
// new stake, this way we are guaranteed to get at least u64/2
// sample of stake in our weighted calculation
if std::f64::MAX - total_stake < stake {
if stake > total_stake {
relevant_votes = vec![(stake, vote)];
stake
} else {
total_stake
}
} else {
relevant_votes.push((stake, vote));
total_stake + stake
}
});
let weighted_vote = relevant_votes.iter().fold(0.0, |sum, &(stake, vote)| {
if vote < last_seen_index {
// This should never happen because we maintain the invariant that the indexes
// in the external_liveness table are always greater than the corresponding
// indexes in the remote table, if the index exists in the remote table at all.
// Case 1: Attempt to insert bigger index into the "external_liveness" table
// happens after an insertion into the "remote" table. In this case,
// (see apply_updates()) function, we prevent the insertion if the entry
// in the remote table >= the atempted insertion into the "external" liveness
// table.
// Case 2: Bigger index in the "external_liveness" table inserted before
// a smaller insertion into the "remote" table. We clear the corresponding
// "external_liveness" table entry on all insertions into the "remote" table
// See apply_updates() function.
warn!("weighted peer index was smaller than local entry in remote table");
return sum;
}
let vote_difference = (vote - last_seen_index) as f64;
let new_weight = vote_difference * (stake / total_stake);
if std::f64::MAX - sum < new_weight {
return f64::max(new_weight, sum);
}
sum + new_weight
});
// Return u32 b/c the weighted sampling API from rand::distributions
// only takes u32 for weights
if weighted_vote >= f64::from(std::u32::MAX) {
return std::u32::MAX;
}
// If the weighted rumors we've heard about aren't any greater than
// what we've directly learned from the last time we communicated with the
// peer (i.e. weighted_vote == 0), then return a weight of 1.
// Otherwise, return the calculated weight.
weighted_vote as u32 + DEFAULT_WEIGHT
}
}
impl<'a> ChooseGossipPeerStrategy for ChooseWeightedPeerStrategy<'a> {
fn choose_peer<'b>(&self, options: Vec<&'b NodeInfo>) -> Result<&'b NodeInfo> {
if options.is_empty() {
Err(ClusterInfoError::NoPeers)?;
}
let mut weighted_peers = vec![];
for peer in options {
let weight = self.calculate_weighted_remote_index(peer.id);
weighted_peers.push(Weighted { weight, item: peer });
}
let mut rng = thread_rng();
Ok(WeightedChoice::new(&mut weighted_peers).sample(&mut rng))
}
}
#[cfg(test)]
mod tests {
use choose_gossip_peer_strategy::{ChooseWeightedPeerStrategy, DEFAULT_WEIGHT};
use logger;
use signature::{Keypair, KeypairUtil};
use solana_sdk::pubkey::Pubkey;
use std;
use std::collections::HashMap;
fn get_stake(_id: Pubkey) -> f64 {
1.0
}
#[test]
fn test_default() {
logger::setup();
// Initialize the filler keys
let key1 = Keypair::new().pubkey();
let remote: HashMap<Pubkey, u64> = HashMap::new();
let external_liveness: HashMap<Pubkey, HashMap<Pubkey, u64>> = HashMap::new();
let weighted_strategy =
ChooseWeightedPeerStrategy::new(&remote, &external_liveness, &get_stake);
// If external_liveness table doesn't contain this entry,
// return the default weight
let result = weighted_strategy.calculate_weighted_remote_index(key1);
assert_eq!(result, DEFAULT_WEIGHT);
}
#[test]
fn test_only_external_liveness() {
logger::setup();
// Initialize the filler keys
let key1 = Keypair::new().pubkey();
let key2 = Keypair::new().pubkey();
let remote: HashMap<Pubkey, u64> = HashMap::new();
let mut external_liveness: HashMap<Pubkey, HashMap<Pubkey, u64>> = HashMap::new();
// If only the liveness table contains the entry, should return the
// weighted liveness entries
let test_value: u32 = 5;
let mut rumors: HashMap<Pubkey, u64> = HashMap::new();
rumors.insert(key2, test_value as u64);
external_liveness.insert(key1, rumors);
let weighted_strategy =
ChooseWeightedPeerStrategy::new(&remote, &external_liveness, &get_stake);
let result = weighted_strategy.calculate_weighted_remote_index(key1);
assert_eq!(result, test_value + DEFAULT_WEIGHT);
}
#[test]
fn test_overflow_votes() {
logger::setup();
// Initialize the filler keys
let key1 = Keypair::new().pubkey();
let key2 = Keypair::new().pubkey();
let remote: HashMap<Pubkey, u64> = HashMap::new();
let mut external_liveness: HashMap<Pubkey, HashMap<Pubkey, u64>> = HashMap::new();
// If the vote index is greater than u32::MAX, default to u32::MAX
let test_value = (std::u32::MAX as u64) + 10;
let mut rumors: HashMap<Pubkey, u64> = HashMap::new();
rumors.insert(key2, test_value);
external_liveness.insert(key1, rumors);
let weighted_strategy =
ChooseWeightedPeerStrategy::new(&remote, &external_liveness, &get_stake);
let result = weighted_strategy.calculate_weighted_remote_index(key1);
assert_eq!(result, std::u32::MAX);
}
#[test]
fn test_many_validators() {
logger::setup();
// Initialize the filler keys
let key1 = Keypair::new().pubkey();
let mut remote: HashMap<Pubkey, u64> = HashMap::new();
let mut external_liveness: HashMap<Pubkey, HashMap<Pubkey, u64>> = HashMap::new();
// Test many validators' rumors in external_liveness
let num_peers = 10;
let mut rumors: HashMap<Pubkey, u64> = HashMap::new();
remote.insert(key1, 0);
for i in 0..num_peers {
let pubkey = Keypair::new().pubkey();
rumors.insert(pubkey, i);
}
external_liveness.insert(key1, rumors);
let weighted_strategy =
ChooseWeightedPeerStrategy::new(&remote, &external_liveness, &get_stake);
let result = weighted_strategy.calculate_weighted_remote_index(key1);
assert_eq!(result, (num_peers / 2) as u32);
}
#[test]
fn test_many_validators2() {
logger::setup();
// Initialize the filler keys
let key1 = Keypair::new().pubkey();
let mut remote: HashMap<Pubkey, u64> = HashMap::new();
let mut external_liveness: HashMap<Pubkey, HashMap<Pubkey, u64>> = HashMap::new();
// Test many validators' rumors in external_liveness
let num_peers = 10;
let old_index = 20;
let mut rumors: HashMap<Pubkey, u64> = HashMap::new();
remote.insert(key1, old_index);
for _i in 0..num_peers {
let pubkey = Keypair::new().pubkey();
rumors.insert(pubkey, old_index);
}
external_liveness.insert(key1, rumors);
let weighted_strategy =
ChooseWeightedPeerStrategy::new(&remote, &external_liveness, &get_stake);
let result = weighted_strategy.calculate_weighted_remote_index(key1);
// If nobody has seen a newer update then revert to default
assert_eq!(result, DEFAULT_WEIGHT);
}
}

View File

@ -4,6 +4,5 @@ use thin_client::ThinClient;
pub fn mk_client(r: &NodeInfo) -> ThinClient {
let (_, transactions_socket) = bind_in_range(FULLNODE_PORT_RANGE).unwrap();
ThinClient::new(r.contact_info.rpc, r.contact_info.tpu, transactions_socket)
ThinClient::new(r.rpc, r.tpu, transactions_socket)
}

File diff suppressed because it is too large Load Diff

237
src/contact_info.rs Normal file
View File

@ -0,0 +1,237 @@
use rpc::RPC_PORT;
use signature::{Keypair, KeypairUtil};
use solana_sdk::pubkey::Pubkey;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use timing::timestamp;
/// Structure representing a node on the network
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
pub struct ContactInfo {
pub id: Pubkey,
/// gossip address
pub ncp: SocketAddr,
/// address to connect to for replication
pub tvu: SocketAddr,
/// transactions address
pub tpu: SocketAddr,
/// storage data address
pub storage_addr: SocketAddr,
/// address to which to send JSON-RPC requests
pub rpc: SocketAddr,
/// websocket for JSON-RPC push notifications
pub rpc_pubsub: SocketAddr,
/// latest wallclock picked
pub wallclock: u64,
}
#[macro_export]
macro_rules! socketaddr {
($ip:expr, $port:expr) => {
SocketAddr::from((Ipv4Addr::from($ip), $port))
};
($str:expr) => {{
let a: SocketAddr = $str.parse().unwrap();
a
}};
}
#[macro_export]
macro_rules! socketaddr_any {
() => {
socketaddr!(0, 0)
};
}
impl Default for ContactInfo {
fn default() -> Self {
ContactInfo {
id: Pubkey::default(),
ncp: socketaddr_any!(),
tvu: socketaddr_any!(),
tpu: socketaddr_any!(),
storage_addr: socketaddr_any!(),
rpc: socketaddr_any!(),
rpc_pubsub: socketaddr_any!(),
wallclock: 0,
}
}
}
impl ContactInfo {
pub fn new(
id: Pubkey,
ncp: SocketAddr,
tvu: SocketAddr,
tpu: SocketAddr,
storage_addr: SocketAddr,
rpc: SocketAddr,
rpc_pubsub: SocketAddr,
now: u64,
) -> Self {
ContactInfo {
id,
ncp,
tvu,
tpu,
storage_addr,
rpc,
rpc_pubsub,
wallclock: now,
}
}
pub fn new_localhost(id: Pubkey, now: u64) -> Self {
Self::new(
id,
socketaddr!("127.0.0.1:1234"),
socketaddr!("127.0.0.1:1235"),
socketaddr!("127.0.0.1:1236"),
socketaddr!("127.0.0.1:1237"),
socketaddr!("127.0.0.1:1238"),
socketaddr!("127.0.0.1:1239"),
now,
)
}
#[cfg(test)]
/// ContactInfo with multicast addresses for adversarial testing.
pub fn new_multicast() -> Self {
let addr = socketaddr!("224.0.1.255:1000");
assert!(addr.ip().is_multicast());
Self::new(
Keypair::new().pubkey(),
addr,
addr,
addr,
addr,
addr,
addr,
0,
)
}
fn next_port(addr: &SocketAddr, nxt: u16) -> SocketAddr {
let mut nxt_addr = *addr;
nxt_addr.set_port(addr.port() + nxt);
nxt_addr
}
pub fn new_with_pubkey_socketaddr(pubkey: Pubkey, bind_addr: &SocketAddr) -> Self {
let transactions_addr = *bind_addr;
let gossip_addr = Self::next_port(&bind_addr, 1);
let replicate_addr = Self::next_port(&bind_addr, 2);
let rpc_addr = SocketAddr::new(bind_addr.ip(), RPC_PORT);
let rpc_pubsub_addr = SocketAddr::new(bind_addr.ip(), RPC_PORT + 1);
ContactInfo::new(
pubkey,
gossip_addr,
replicate_addr,
transactions_addr,
"0.0.0.0:0".parse().unwrap(),
rpc_addr,
rpc_pubsub_addr,
timestamp(),
)
}
pub fn new_with_socketaddr(bind_addr: &SocketAddr) -> Self {
let keypair = Keypair::new();
Self::new_with_pubkey_socketaddr(keypair.pubkey(), bind_addr)
}
//
pub fn new_entry_point(gossip_addr: &SocketAddr) -> Self {
let daddr: SocketAddr = socketaddr!("0.0.0.0:0");
ContactInfo::new(
Pubkey::default(),
*gossip_addr,
daddr,
daddr,
daddr,
daddr,
daddr,
timestamp(),
)
}
fn is_valid_ip(addr: IpAddr) -> bool {
!(addr.is_unspecified() || addr.is_multicast())
// || (addr.is_loopback() && !cfg_test))
// TODO: boot loopback in production networks
}
/// port must not be 0
/// ip must be specified and not mulitcast
/// loopback ip is only allowed in tests
pub fn is_valid_address(addr: &SocketAddr) -> bool {
(addr.port() != 0) && Self::is_valid_ip(addr.ip())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_is_valid_address() {
assert!(cfg!(test));
let bad_address_port = socketaddr!("127.0.0.1:0");
assert!(!ContactInfo::is_valid_address(&bad_address_port));
let bad_address_unspecified = socketaddr!(0, 1234);
assert!(!ContactInfo::is_valid_address(&bad_address_unspecified));
let bad_address_multicast = socketaddr!([224, 254, 0, 0], 1234);
assert!(!ContactInfo::is_valid_address(&bad_address_multicast));
let loopback = socketaddr!("127.0.0.1:1234");
assert!(ContactInfo::is_valid_address(&loopback));
// assert!(!ContactInfo::is_valid_ip_internal(loopback.ip(), false));
}
#[test]
fn test_default() {
let ci = ContactInfo::default();
assert!(ci.ncp.ip().is_unspecified());
assert!(ci.tvu.ip().is_unspecified());
assert!(ci.rpc.ip().is_unspecified());
assert!(ci.rpc_pubsub.ip().is_unspecified());
assert!(ci.tpu.ip().is_unspecified());
assert!(ci.storage_addr.ip().is_unspecified());
}
#[test]
fn test_multicast() {
let ci = ContactInfo::new_multicast();
assert!(ci.ncp.ip().is_multicast());
assert!(ci.tvu.ip().is_multicast());
assert!(ci.rpc.ip().is_multicast());
assert!(ci.rpc_pubsub.ip().is_multicast());
assert!(ci.tpu.ip().is_multicast());
assert!(ci.storage_addr.ip().is_multicast());
}
#[test]
fn test_entry_point() {
let addr = socketaddr!("127.0.0.1:10");
let ci = ContactInfo::new_entry_point(&addr);
assert_eq!(ci.ncp, addr);
assert!(ci.tvu.ip().is_unspecified());
assert!(ci.rpc.ip().is_unspecified());
assert!(ci.rpc_pubsub.ip().is_unspecified());
assert!(ci.tpu.ip().is_unspecified());
assert!(ci.storage_addr.ip().is_unspecified());
}
#[test]
fn test_socketaddr() {
let addr = socketaddr!("127.0.0.1:10");
let ci = ContactInfo::new_with_socketaddr(&addr);
assert_eq!(ci.tpu, addr);
assert_eq!(ci.ncp.port(), 11);
assert_eq!(ci.tvu.port(), 12);
assert_eq!(ci.rpc.port(), 8899);
assert_eq!(ci.rpc_pubsub.port(), 8900);
assert!(ci.storage_addr.ip().is_unspecified());
}
#[test]
fn replicated_data_new_with_socketaddr_with_pubkey() {
let keypair = Keypair::new();
let d1 = ContactInfo::new_with_pubkey_socketaddr(
keypair.pubkey().clone(),
&socketaddr!("127.0.0.1:1234"),
);
assert_eq!(d1.id, keypair.pubkey());
assert_eq!(d1.ncp, socketaddr!("127.0.0.1:1235"));
assert_eq!(d1.tvu, socketaddr!("127.0.0.1:1236"));
assert_eq!(d1.tpu, socketaddr!("127.0.0.1:1234"));
assert_eq!(d1.rpc, socketaddr!("127.0.0.1:8899"));
assert_eq!(d1.rpc_pubsub, socketaddr!("127.0.0.1:8900"));
}
}

351
src/crds.rs Normal file
View File

@ -0,0 +1,351 @@
//! This module implements Cluster Replicated Data Store for
//! asynchronous updates in a distributed network.
//!
//! Data is stored in the CrdsValue type, each type has a specific
//! CrdsValueLabel. Labels are semantically grouped into a single record
//! that is identified by a Pubkey.
//! * 1 Pubkey maps many CrdsValueLabels
//! * 1 CrdsValueLabel maps to 1 CrdsValue
//! The Label, the record Pubkey, and all the record labels can be derived
//! from a single CrdsValue.
//!
//! The actual data is stored in a single map of
//! `CrdsValueLabel(Pubkey) -> CrdsValue` This allows for partial record
//! updates to be propagated through the network.
//!
//! This means that full `Record` updates are not atomic.
//!
//! Additional labels can be added by appending them to the CrdsValueLabel,
//! CrdsValue enums.
//!
//! Merge strategy is implemented in:
//! impl PartialOrd for VersionedCrdsValue
//!
//! A value is updated to a new version if the labels match, and the value
//! wallclock is later, or the value hash is greater.
use bincode::serialize;
use crds_value::{CrdsValue, CrdsValueLabel};
use hash::{hash, Hash};
use indexmap::map::IndexMap;
use solana_sdk::pubkey::Pubkey;
use std::cmp;
pub struct Crds {
/// Stores the map of labels and values
pub table: IndexMap<CrdsValueLabel, VersionedCrdsValue>,
}
#[derive(PartialEq, Debug)]
pub enum CrdsError {
InsertFailed,
}
/// This structure stores some local metadata assosciated with the CrdsValue
/// The implementation of PartialOrd ensures that the "highest" version is always picked to be
/// stored in the Crds
#[derive(PartialEq, Debug)]
pub struct VersionedCrdsValue {
pub value: CrdsValue,
/// local time when inserted
pub insert_timestamp: u64,
/// local time when updated
pub local_timestamp: u64,
/// value hash
pub value_hash: Hash,
}
impl PartialOrd for VersionedCrdsValue {
fn partial_cmp(&self, other: &VersionedCrdsValue) -> Option<cmp::Ordering> {
if self.value.label() != other.value.label() {
None
} else if self.value.wallclock() == other.value.wallclock() {
Some(self.value_hash.cmp(&other.value_hash))
} else {
Some(self.value.wallclock().cmp(&other.value.wallclock()))
}
}
}
impl VersionedCrdsValue {
pub fn new(local_timestamp: u64, value: CrdsValue) -> Self {
let value_hash = hash(&serialize(&value).unwrap());
VersionedCrdsValue {
value,
insert_timestamp: local_timestamp,
local_timestamp,
value_hash,
}
}
}
impl Default for Crds {
fn default() -> Self {
Crds {
table: IndexMap::new(),
}
}
}
impl Crds {
/// must be called atomically with `insert_versioned`
pub fn new_versioned(&self, local_timestamp: u64, value: CrdsValue) -> VersionedCrdsValue {
VersionedCrdsValue::new(local_timestamp, value)
}
/// insert the new value, returns the old value if insert succeeds
pub fn insert_versioned(
&mut self,
new_value: VersionedCrdsValue,
) -> Result<Option<VersionedCrdsValue>, CrdsError> {
let label = new_value.value.label();
let wallclock = new_value.value.wallclock();
let do_insert = self
.table
.get(&label)
.map(|current| new_value > *current)
.unwrap_or(true);
if do_insert {
let old = self.table.insert(label, new_value);
Ok(old)
} else {
trace!("INSERT FAILED data: {} new.wallclock: {}", label, wallclock,);
Err(CrdsError::InsertFailed)
}
}
pub fn insert(
&mut self,
value: CrdsValue,
local_timestamp: u64,
) -> Result<Option<VersionedCrdsValue>, CrdsError> {
let new_value = self.new_versioned(local_timestamp, value);
self.insert_versioned(new_value)
}
pub fn lookup(&self, label: &CrdsValueLabel) -> Option<&CrdsValue> {
self.table.get(label).map(|x| &x.value)
}
pub fn lookup_versioned(&self, label: &CrdsValueLabel) -> Option<&VersionedCrdsValue> {
self.table.get(label)
}
fn update_label_timestamp(&mut self, id: &CrdsValueLabel, now: u64) {
if let Some(e) = self.table.get_mut(id) {
e.local_timestamp = cmp::max(e.local_timestamp, now);
}
}
/// Update the timestamp's of all the labels that are assosciated with Pubkey
pub fn update_record_timestamp(&mut self, pubkey: Pubkey, now: u64) {
for label in &CrdsValue::record_labels(pubkey) {
self.update_label_timestamp(label, now);
}
}
/// find all the keys that are older or equal to min_ts
pub fn find_old_labels(&self, min_ts: u64) -> Vec<CrdsValueLabel> {
self.table
.iter()
.filter_map(|(k, v)| {
if v.local_timestamp <= min_ts {
Some(k)
} else {
None
}
}).cloned()
.collect()
}
pub fn remove(&mut self, key: &CrdsValueLabel) {
self.table.remove(key);
}
}
#[cfg(test)]
mod test {
use super::*;
use contact_info::ContactInfo;
use crds_value::LeaderId;
use signature::{Keypair, KeypairUtil};
#[test]
fn test_insert() {
let mut crds = Crds::default();
let val = CrdsValue::LeaderId(LeaderId::default());
assert_eq!(crds.insert(val.clone(), 0).ok(), Some(None));
assert_eq!(crds.table.len(), 1);
assert!(crds.table.contains_key(&val.label()));
assert_eq!(crds.table[&val.label()].local_timestamp, 0);
}
#[test]
fn test_update_old() {
let mut crds = Crds::default();
let val = CrdsValue::LeaderId(LeaderId::default());
assert_eq!(crds.insert(val.clone(), 0), Ok(None));
assert_eq!(crds.insert(val.clone(), 1), Err(CrdsError::InsertFailed));
assert_eq!(crds.table[&val.label()].local_timestamp, 0);
}
#[test]
fn test_update_new() {
let mut crds = Crds::default();
let original = CrdsValue::LeaderId(LeaderId::default());
assert_matches!(crds.insert(original.clone(), 0), Ok(_));
let val = CrdsValue::LeaderId(LeaderId {
id: Pubkey::default(),
leader_id: Pubkey::default(),
wallclock: 1,
});
assert_eq!(
crds.insert(val.clone(), 1).unwrap().unwrap().value,
original
);
assert_eq!(crds.table[&val.label()].local_timestamp, 1);
}
#[test]
fn test_update_timestsamp() {
let mut crds = Crds::default();
let val = CrdsValue::LeaderId(LeaderId::default());
assert_eq!(crds.insert(val.clone(), 0), Ok(None));
crds.update_label_timestamp(&val.label(), 1);
assert_eq!(crds.table[&val.label()].local_timestamp, 1);
assert_eq!(crds.table[&val.label()].insert_timestamp, 0);
let val2 = CrdsValue::ContactInfo(ContactInfo::default());
assert_eq!(val2.label().pubkey(), val.label().pubkey());
assert_matches!(crds.insert(val2.clone(), 0), Ok(None));
crds.update_record_timestamp(val.label().pubkey(), 2);
assert_eq!(crds.table[&val.label()].local_timestamp, 2);
assert_eq!(crds.table[&val.label()].insert_timestamp, 0);
assert_eq!(crds.table[&val2.label()].local_timestamp, 2);
assert_eq!(crds.table[&val2.label()].insert_timestamp, 0);
crds.update_record_timestamp(val.label().pubkey(), 1);
assert_eq!(crds.table[&val.label()].local_timestamp, 2);
assert_eq!(crds.table[&val.label()].insert_timestamp, 0);
let mut ci = ContactInfo::default();
ci.wallclock += 1;
let val3 = CrdsValue::ContactInfo(ci);
assert_matches!(crds.insert(val3.clone(), 3), Ok(Some(_)));
assert_eq!(crds.table[&val2.label()].local_timestamp, 3);
assert_eq!(crds.table[&val2.label()].insert_timestamp, 3);
}
#[test]
fn test_find_old_records() {
let mut crds = Crds::default();
let val = CrdsValue::LeaderId(LeaderId::default());
assert_eq!(crds.insert(val.clone(), 1), Ok(None));
assert!(crds.find_old_labels(0).is_empty());
assert_eq!(crds.find_old_labels(1), vec![val.label()]);
assert_eq!(crds.find_old_labels(2), vec![val.label()]);
}
#[test]
fn test_remove() {
let mut crds = Crds::default();
let val = CrdsValue::LeaderId(LeaderId::default());
assert_matches!(crds.insert(val.clone(), 1), Ok(_));
assert_eq!(crds.find_old_labels(1), vec![val.label()]);
crds.remove(&val.label());
assert!(crds.find_old_labels(1).is_empty());
}
#[test]
fn test_equal() {
let key = Keypair::new();
let v1 = VersionedCrdsValue::new(
1,
CrdsValue::LeaderId(LeaderId {
id: key.pubkey(),
leader_id: Pubkey::default(),
wallclock: 0,
}),
);
let v2 = VersionedCrdsValue::new(
1,
CrdsValue::LeaderId(LeaderId {
id: key.pubkey(),
leader_id: Pubkey::default(),
wallclock: 0,
}),
);
assert!(!(v1 != v2));
assert!(v1 == v2);
}
#[test]
fn test_hash_order() {
let key = Keypair::new();
let v1 = VersionedCrdsValue::new(
1,
CrdsValue::LeaderId(LeaderId {
id: key.pubkey(),
leader_id: Pubkey::default(),
wallclock: 0,
}),
);
let v2 = VersionedCrdsValue::new(
1,
CrdsValue::LeaderId(LeaderId {
id: key.pubkey(),
leader_id: key.pubkey(),
wallclock: 0,
}),
);
assert!(v1 != v2);
assert!(!(v1 == v2));
if v1 > v2 {
assert!(v2 < v1)
} else {
assert!(v2 > v1)
}
}
#[test]
fn test_wallclock_order() {
let key = Keypair::new();
let v1 = VersionedCrdsValue::new(
1,
CrdsValue::LeaderId(LeaderId {
id: key.pubkey(),
leader_id: Pubkey::default(),
wallclock: 1,
}),
);
let v2 = VersionedCrdsValue::new(
1,
CrdsValue::LeaderId(LeaderId {
id: key.pubkey(),
leader_id: Pubkey::default(),
wallclock: 0,
}),
);
assert!(v1 > v2);
assert!(!(v1 < v2));
assert!(v1 != v2);
assert!(!(v1 == v2));
}
#[test]
fn test_label_order() {
let v1 = VersionedCrdsValue::new(
1,
CrdsValue::LeaderId(LeaderId {
id: Keypair::new().pubkey(),
leader_id: Pubkey::default(),
wallclock: 0,
}),
);
let v2 = VersionedCrdsValue::new(
1,
CrdsValue::LeaderId(LeaderId {
id: Keypair::new().pubkey(),
leader_id: Pubkey::default(),
wallclock: 0,
}),
);
assert!(v1 != v2);
assert!(!(v1 == v2));
assert!(!(v1 < v2));
assert!(!(v1 > v2));
assert!(!(v2 < v1));
assert!(!(v2 > v1));
}
}

486
src/crds_gossip.rs Normal file
View File

@ -0,0 +1,486 @@
//! Crds Gossip
//! This module ties together Crds and the push and pull gossip overlays. The interface is
//! designed to run with a simulator or over a UDP network connection with messages up to a
//! packet::BLOB_DATA_SIZE size.
use bloom::Bloom;
use crds::Crds;
use crds_gossip_error::CrdsGossipError;
use crds_gossip_pull::CrdsGossipPull;
use crds_gossip_push::{CrdsGossipPush, CRDS_GOSSIP_NUM_ACTIVE};
use crds_value::CrdsValue;
use hash::Hash;
use solana_sdk::pubkey::Pubkey;
pub struct CrdsGossip {
pub crds: Crds,
pub id: Pubkey,
push: CrdsGossipPush,
pull: CrdsGossipPull,
}
impl Default for CrdsGossip {
fn default() -> Self {
CrdsGossip {
crds: Crds::default(),
id: Pubkey::default(),
push: CrdsGossipPush::default(),
pull: CrdsGossipPull::default(),
}
}
}
impl CrdsGossip {
pub fn set_self(&mut self, id: Pubkey) {
self.id = id;
}
/// process a push message to the network
pub fn process_push_message(&mut self, values: &[CrdsValue], now: u64) -> Vec<Pubkey> {
let results: Vec<_> = values
.iter()
.map(|val| {
self.push
.process_push_message(&mut self.crds, val.clone(), now)
}).collect();
results
.into_iter()
.zip(values)
.filter_map(|(r, d)| {
if r == Err(CrdsGossipError::PushMessagePrune) {
Some(d.label().pubkey())
} else if let Ok(Some(val)) = r {
self.pull
.record_old_hash(val.value_hash, val.local_timestamp);
None
} else {
None
}
}).collect()
}
pub fn new_push_messages(&mut self, now: u64) -> (Pubkey, Vec<Pubkey>, Vec<CrdsValue>) {
let (peers, values) = self.push.new_push_messages(&self.crds, now);
(self.id, peers, values)
}
/// add the `from` to the peer's filter of nodes
pub fn process_prune_msg(&mut self, peer: Pubkey, origin: &[Pubkey]) {
self.push.process_prune_msg(peer, origin)
}
/// refresh the push active set
/// * ratio - number of actives to rotate
pub fn refresh_push_active_set(&mut self) {
self.push.refresh_push_active_set(
&self.crds,
self.id,
self.pull.pull_request_time.len(),
CRDS_GOSSIP_NUM_ACTIVE,
)
}
/// generate a random request
pub fn new_pull_request(
&self,
now: u64,
) -> Result<(Pubkey, Bloom<Hash>, CrdsValue), CrdsGossipError> {
self.pull.new_pull_request(&self.crds, self.id, now)
}
/// time when a request to `from` was initiated
/// This is used for weighted random selection durring `new_pull_request`
/// It's important to use the local nodes request creation time as the weight
/// instaad of the response received time otherwise failed nodes will increase their weight.
pub fn mark_pull_request_creation_time(&mut self, from: Pubkey, now: u64) {
self.pull.mark_pull_request_creation_time(from, now)
}
/// process a pull request and create a response
pub fn process_pull_request(
&mut self,
caller: CrdsValue,
filter: Bloom<Hash>,
now: u64,
) -> Vec<CrdsValue> {
self.pull
.process_pull_request(&mut self.crds, caller, filter, now)
}
/// process a pull response
pub fn process_pull_response(
&mut self,
from: Pubkey,
response: Vec<CrdsValue>,
now: u64,
) -> usize {
self.pull
.process_pull_response(&mut self.crds, from, response, now)
}
pub fn purge(&mut self, now: u64) {
if now > self.push.msg_timeout {
let min = now - self.push.msg_timeout;
self.push.purge_old_pending_push_messages(&self.crds, min);
}
if now > 5 * self.push.msg_timeout {
let min = now - 5 * self.push.msg_timeout;
self.push.purge_old_pushed_once_messages(min);
}
if now > self.pull.crds_timeout {
let min = now - self.pull.crds_timeout;
self.pull.purge_active(&mut self.crds, self.id, min);
}
if now > 5 * self.pull.crds_timeout {
let min = now - 5 * self.pull.crds_timeout;
self.pull.purge_purged(min);
}
}
}
#[cfg(test)]
mod test {
use super::*;
use bincode::serialized_size;
use contact_info::ContactInfo;
use crds_gossip_push::CRDS_GOSSIP_PUSH_MSG_TIMEOUT_MS;
use crds_value::CrdsValueLabel;
use rayon::prelude::*;
use signature::{Keypair, KeypairUtil};
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
type Node = Arc<Mutex<CrdsGossip>>;
type Network = HashMap<Pubkey, Node>;
fn star_network_create(num: usize) -> Network {
let entry = CrdsValue::ContactInfo(ContactInfo::new_localhost(Keypair::new().pubkey(), 0));
let mut network: HashMap<_, _> = (1..num)
.map(|_| {
let new =
CrdsValue::ContactInfo(ContactInfo::new_localhost(Keypair::new().pubkey(), 0));
let id = new.label().pubkey();
let mut node = CrdsGossip::default();
node.crds.insert(new.clone(), 0).unwrap();
node.crds.insert(entry.clone(), 0).unwrap();
node.set_self(id);
(new.label().pubkey(), Arc::new(Mutex::new(node)))
}).collect();
let mut node = CrdsGossip::default();
let id = entry.label().pubkey();
node.crds.insert(entry.clone(), 0).unwrap();
node.set_self(id);
network.insert(id, Arc::new(Mutex::new(node)));
network
}
fn rstar_network_create(num: usize) -> Network {
let entry = CrdsValue::ContactInfo(ContactInfo::new_localhost(Keypair::new().pubkey(), 0));
let mut origin = CrdsGossip::default();
let id = entry.label().pubkey();
origin.crds.insert(entry.clone(), 0).unwrap();
origin.set_self(id);
let mut network: HashMap<_, _> = (1..num)
.map(|_| {
let new =
CrdsValue::ContactInfo(ContactInfo::new_localhost(Keypair::new().pubkey(), 0));
let id = new.label().pubkey();
let mut node = CrdsGossip::default();
node.crds.insert(new.clone(), 0).unwrap();
origin.crds.insert(new.clone(), 0).unwrap();
node.set_self(id);
(new.label().pubkey(), Arc::new(Mutex::new(node)))
}).collect();
network.insert(id, Arc::new(Mutex::new(origin)));
network
}
fn ring_network_create(num: usize) -> Network {
let mut network: HashMap<_, _> = (0..num)
.map(|_| {
let new =
CrdsValue::ContactInfo(ContactInfo::new_localhost(Keypair::new().pubkey(), 0));
let id = new.label().pubkey();
let mut node = CrdsGossip::default();
node.crds.insert(new.clone(), 0).unwrap();
node.set_self(id);
(new.label().pubkey(), Arc::new(Mutex::new(node)))
}).collect();
let keys: Vec<Pubkey> = network.keys().cloned().collect();
for k in 0..keys.len() {
let start_info = {
let start = &network[&keys[k]];
let start_id = start.lock().unwrap().id.clone();
start
.lock()
.unwrap()
.crds
.lookup(&CrdsValueLabel::ContactInfo(start_id))
.unwrap()
.clone()
};
let end = network.get_mut(&keys[(k + 1) % keys.len()]).unwrap();
end.lock().unwrap().crds.insert(start_info, 0).unwrap();
}
network
}
fn network_simulator_pull_only(network: &mut Network) {
let num = network.len();
let (converged, bytes_tx) = network_run_pull(network, 0, num * 2, 0.9);
trace!(
"network_simulator_pull_{}: converged: {} total_bytes: {}",
num,
converged,
bytes_tx
);
assert!(converged >= 0.9);
}
fn network_simulator(network: &mut Network) {
let num = network.len();
// run for a small amount of time
let (converged, bytes_tx) = network_run_pull(network, 0, 10, 1.0);
trace!("network_simulator_push_{}: converged: {}", num, converged);
// make sure there is someone in the active set
let network_values: Vec<Node> = network.values().cloned().collect();
network_values.par_iter().for_each(|node| {
node.lock().unwrap().refresh_push_active_set();
});
let mut total_bytes = bytes_tx;
for second in 1..num {
let start = second * 10;
let end = (second + 1) * 10;
let now = (start * 100) as u64;
// push a message to the network
network_values.par_iter().for_each(|locked_node| {
let node = &mut locked_node.lock().unwrap();
let mut m = node
.crds
.lookup(&CrdsValueLabel::ContactInfo(node.id))
.and_then(|v| v.contact_info().cloned())
.unwrap();
m.wallclock = now;
node.process_push_message(&[CrdsValue::ContactInfo(m.clone())], now);
});
// push for a bit
let (queue_size, bytes_tx) = network_run_push(network, start, end);
total_bytes += bytes_tx;
trace!(
"network_simulator_push_{}: queue_size: {} bytes: {}",
num,
queue_size,
bytes_tx
);
// pull for a bit
let (converged, bytes_tx) = network_run_pull(network, start, end, 1.0);
total_bytes += bytes_tx;
trace!(
"network_simulator_push_{}: converged: {} bytes: {} total_bytes: {}",
num,
converged,
bytes_tx,
total_bytes
);
if converged > 0.9 {
break;
}
}
}
fn network_run_push(network: &mut Network, start: usize, end: usize) -> (usize, usize) {
let mut bytes: usize = 0;
let mut num_msgs: usize = 0;
let mut total: usize = 0;
let num = network.len();
let mut prunes: usize = 0;
let mut delivered: usize = 0;
let network_values: Vec<Node> = network.values().cloned().collect();
for t in start..end {
let now = t as u64 * 100;
let requests: Vec<_> = network_values
.par_iter()
.map(|node| {
node.lock().unwrap().purge(now);
node.lock().unwrap().new_push_messages(now)
}).collect();
let transfered: Vec<_> = requests
.par_iter()
.map(|(from, peers, msgs)| {
let mut bytes: usize = 0;
let mut delivered: usize = 0;
let mut num_msgs: usize = 0;
let mut prunes: usize = 0;
for to in peers {
bytes += serialized_size(msgs).unwrap() as usize;
num_msgs += 1;
let rsps = network
.get(&to)
.map(|node| node.lock().unwrap().process_push_message(&msgs, now))
.unwrap();
bytes += serialized_size(&rsps).unwrap() as usize;
prunes += rsps.len();
network
.get(&from)
.map(|node| node.lock().unwrap().process_prune_msg(*to, &rsps))
.unwrap();
delivered += rsps.is_empty() as usize;
}
(bytes, delivered, num_msgs, prunes)
}).collect();
for (b, d, m, p) in transfered {
bytes += b;
delivered += d;
num_msgs += m;
prunes += p;
}
if now % CRDS_GOSSIP_PUSH_MSG_TIMEOUT_MS == 0 && now > 0 {
network_values.par_iter().for_each(|node| {
node.lock().unwrap().refresh_push_active_set();
});
}
total = network_values
.par_iter()
.map(|v| v.lock().unwrap().push.num_pending())
.sum();
trace!(
"network_run_push_{}: now: {} queue: {} bytes: {} num_msgs: {} prunes: {} delivered: {}",
num,
now,
total,
bytes,
num_msgs,
prunes,
delivered,
);
}
(total, bytes)
}
fn network_run_pull(
network: &mut Network,
start: usize,
end: usize,
max_convergance: f64,
) -> (f64, usize) {
let mut bytes: usize = 0;
let mut msgs: usize = 0;
let mut overhead: usize = 0;
let mut convergance = 0f64;
let num = network.len();
let network_values: Vec<Node> = network.values().cloned().collect();
for t in start..end {
let now = t as u64 * 100;
let mut requests: Vec<_> = {
network_values
.par_iter()
.filter_map(|from| from.lock().unwrap().new_pull_request(now).ok())
.collect()
};
let transfered: Vec<_> = requests
.into_par_iter()
.map(|(to, request, caller_info)| {
let mut bytes: usize = 0;
let mut msgs: usize = 0;
let mut overhead: usize = 0;
let from = caller_info.label().pubkey();
bytes += request.keys.len();
bytes += (request.bits.len() / 8) as usize;
bytes += serialized_size(&caller_info).unwrap() as usize;
let rsp = network
.get(&to)
.map(|node| {
node.lock()
.unwrap()
.process_pull_request(caller_info, request, now)
}).unwrap();
bytes += serialized_size(&rsp).unwrap() as usize;
msgs += rsp.len();
network.get(&from).map(|node| {
node.lock()
.unwrap()
.mark_pull_request_creation_time(from, now);
overhead += node.lock().unwrap().process_pull_response(from, rsp, now);
});
(bytes, msgs, overhead)
}).collect();
for (b, m, o) in transfered {
bytes += b;
msgs += m;
overhead += o;
}
let total: usize = network_values
.par_iter()
.map(|v| v.lock().unwrap().crds.table.len())
.sum();
convergance = total as f64 / ((num * num) as f64);
if convergance > max_convergance {
break;
}
trace!(
"network_run_pull_{}: now: {} connections: {} convergance: {} bytes: {} msgs: {} overhead: {}",
num,
now,
total,
convergance,
bytes,
msgs,
overhead
);
}
(convergance, bytes)
}
#[test]
fn test_star_network_pull_50() {
let mut network = star_network_create(50);
network_simulator_pull_only(&mut network);
}
#[test]
fn test_star_network_pull_100() {
let mut network = star_network_create(100);
network_simulator_pull_only(&mut network);
}
#[test]
fn test_star_network_push_star_200() {
let mut network = star_network_create(200);
network_simulator(&mut network);
}
#[test]
fn test_star_network_push_rstar_200() {
let mut network = rstar_network_create(200);
network_simulator(&mut network);
}
#[test]
fn test_star_network_push_ring_200() {
let mut network = ring_network_create(200);
network_simulator(&mut network);
}
#[test]
#[ignore]
fn test_star_network_large_pull() {
use logger;
logger::setup();
let mut network = star_network_create(2000);
network_simulator_pull_only(&mut network);
}
#[test]
#[ignore]
fn test_rstar_network_large_push() {
use logger;
logger::setup();
let mut network = rstar_network_create(4000);
network_simulator(&mut network);
}
#[test]
#[ignore]
fn test_ring_network_large_push() {
use logger;
logger::setup();
let mut network = ring_network_create(4001);
network_simulator(&mut network);
}
#[test]
#[ignore]
fn test_star_network_large_push() {
use logger;
logger::setup();
let mut network = star_network_create(4002);
network_simulator(&mut network);
}
}

7
src/crds_gossip_error.rs Normal file
View File

@ -0,0 +1,7 @@
#[derive(PartialEq, Debug)]
pub enum CrdsGossipError {
NoPeers,
PushMessageTimeout,
PushMessagePrune,
PushMessageOldVersion,
}

378
src/crds_gossip_pull.rs Normal file
View File

@ -0,0 +1,378 @@
//! Crds Gossip Pull overlay
//! This module implements the anti-entropy protocol for the network.
//!
//! The basic strategy is as follows:
//! 1. Construct a bloom filter of the local data set
//! 2. Randomly ask a node on the network for data that is not contained in the bloom filter.
//!
//! Bloom filters have a false positive rate. Each requests uses a different bloom filter
//! with random hash functions. So each subsequent request will have a different distribution
//! of false positives.
use bincode::serialized_size;
use bloom::Bloom;
use crds::Crds;
use crds_gossip_error::CrdsGossipError;
use crds_value::{CrdsValue, CrdsValueLabel};
use hash::Hash;
use packet::BLOB_DATA_SIZE;
use rand;
use rand::distributions::{Distribution, Weighted, WeightedChoice};
use solana_sdk::pubkey::Pubkey;
use std::cmp;
use std::collections::HashMap;
use std::collections::VecDeque;
pub const CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS: u64 = 15000;
pub struct CrdsGossipPull {
/// timestamp of last request
pub pull_request_time: HashMap<Pubkey, u64>,
/// hash and insert time
purged_values: VecDeque<(Hash, u64)>,
/// max bytes per message
pub max_bytes: usize,
pub crds_timeout: u64,
}
impl Default for CrdsGossipPull {
fn default() -> Self {
Self {
purged_values: VecDeque::new(),
pull_request_time: HashMap::new(),
max_bytes: BLOB_DATA_SIZE,
crds_timeout: CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS,
}
}
}
impl CrdsGossipPull {
/// generate a random request
pub fn new_pull_request(
&self,
crds: &Crds,
self_id: Pubkey,
now: u64,
) -> Result<(Pubkey, Bloom<Hash>, CrdsValue), CrdsGossipError> {
let mut options: Vec<_> = crds
.table
.values()
.filter_map(|v| v.value.contact_info())
.filter(|v| {
v.id != self_id && !v.ncp.ip().is_unspecified() && !v.ncp.ip().is_multicast()
}).map(|item| {
let req_time: u64 = *self.pull_request_time.get(&item.id).unwrap_or(&0);
let weight = cmp::max(
1,
cmp::min(u64::from(u16::max_value()) - 1, (now - req_time) / 1024) as u32,
);
Weighted { weight, item }
}).collect();
if options.is_empty() {
return Err(CrdsGossipError::NoPeers);
}
let filter = self.build_crds_filter(crds);
let random = WeightedChoice::new(&mut options).sample(&mut rand::thread_rng());
let self_info = crds
.lookup(&CrdsValueLabel::ContactInfo(self_id))
.unwrap_or_else(|| panic!("self_id invalid {}", self_id));
Ok((random.id, filter, self_info.clone()))
}
/// time when a request to `from` was initiated
/// This is used for weighted random selection during `new_pull_request`
/// It's important to use the local nodes request creation time as the weight
/// instead of the response received time otherwise failed nodes will increase their weight.
pub fn mark_pull_request_creation_time(&mut self, from: Pubkey, now: u64) {
self.pull_request_time.insert(from, now);
}
/// Store an old hash in the purged values set
pub fn record_old_hash(&mut self, hash: Hash, timestamp: u64) {
self.purged_values.push_back((hash, timestamp))
}
/// process a pull request and create a response
pub fn process_pull_request(
&mut self,
crds: &mut Crds,
caller: CrdsValue,
mut filter: Bloom<Hash>,
now: u64,
) -> Vec<CrdsValue> {
let rv = self.filter_crds_values(crds, &mut filter);
let key = caller.label().pubkey();
let old = crds.insert(caller, now);
if let Some(val) = old.ok().and_then(|opt| opt) {
self.purged_values
.push_back((val.value_hash, val.local_timestamp))
}
crds.update_record_timestamp(key, now);
rv
}
/// process a pull response
pub fn process_pull_response(
&mut self,
crds: &mut Crds,
from: Pubkey,
response: Vec<CrdsValue>,
now: u64,
) -> usize {
let mut failed = 0;
for r in response {
let owner = r.label().pubkey();
let old = crds.insert(r, now);
failed += old.is_err() as usize;
old.ok().map(|opt| {
crds.update_record_timestamp(owner, now);
opt.map(|val| {
self.purged_values
.push_back((val.value_hash, val.local_timestamp))
})
});
}
crds.update_record_timestamp(from, now);
failed
}
/// build a filter of the current crds table
fn build_crds_filter(&self, crds: &Crds) -> Bloom<Hash> {
let num = crds.table.values().count() + self.purged_values.len();
let mut bloom = Bloom::random(num, 0.1, 4 * 1024 * 8 - 1);
for v in crds.table.values() {
bloom.add(&v.value_hash);
}
for (value_hash, _insert_timestamp) in &self.purged_values {
bloom.add(value_hash);
}
bloom
}
/// filter values that fail the bloom filter up to max_bytes
fn filter_crds_values(&self, crds: &Crds, filter: &mut Bloom<Hash>) -> Vec<CrdsValue> {
let mut max_bytes = self.max_bytes as isize;
let mut ret = vec![];
for v in crds.table.values() {
if filter.contains(&v.value_hash) {
continue;
}
max_bytes -= serialized_size(&v.value).unwrap() as isize;
if max_bytes < 0 {
break;
}
ret.push(v.value.clone());
}
ret
}
/// Purge values from the crds that are older then `active_timeout`
/// The value_hash of an active item is put into self.purged_values queue
pub fn purge_active(&mut self, crds: &mut Crds, self_id: Pubkey, min_ts: u64) {
let old = crds.find_old_labels(min_ts);
let mut purged: VecDeque<_> = old
.iter()
.filter(|label| label.pubkey() != self_id)
.filter_map(|label| {
let rv = crds
.lookup_versioned(label)
.map(|val| (val.value_hash, val.local_timestamp));
crds.remove(label);
rv
}).collect();
self.purged_values.append(&mut purged);
}
/// Purge values from the `self.purged_values` queue that are older then purge_timeout
pub fn purge_purged(&mut self, min_ts: u64) {
let cnt = self
.purged_values
.iter()
.take_while(|v| v.1 < min_ts)
.count();
self.purged_values.drain(..cnt);
}
}
#[cfg(test)]
mod test {
use super::*;
use contact_info::ContactInfo;
use crds_value::LeaderId;
use signature::{Keypair, KeypairUtil};
#[test]
fn test_new_pull_request() {
let mut crds = Crds::default();
let entry = CrdsValue::ContactInfo(ContactInfo::new_localhost(Keypair::new().pubkey(), 0));
let id = entry.label().pubkey();
let node = CrdsGossipPull::default();
assert_eq!(
node.new_pull_request(&crds, id, 0),
Err(CrdsGossipError::NoPeers)
);
crds.insert(entry.clone(), 0).unwrap();
assert_eq!(
node.new_pull_request(&crds, id, 0),
Err(CrdsGossipError::NoPeers)
);
let new = CrdsValue::ContactInfo(ContactInfo::new_localhost(Keypair::new().pubkey(), 0));
crds.insert(new.clone(), 0).unwrap();
let req = node.new_pull_request(&crds, id, 0);
let (to, _, self_info) = req.unwrap();
assert_eq!(to, new.label().pubkey());
assert_eq!(self_info, entry);
}
#[test]
fn test_new_mark_creation_time() {
let mut crds = Crds::default();
let entry = CrdsValue::ContactInfo(ContactInfo::new_localhost(Keypair::new().pubkey(), 0));
let node_id = entry.label().pubkey();
let mut node = CrdsGossipPull::default();
crds.insert(entry.clone(), 0).unwrap();
let old = CrdsValue::ContactInfo(ContactInfo::new_localhost(Keypair::new().pubkey(), 0));
crds.insert(old.clone(), 0).unwrap();
let new = CrdsValue::ContactInfo(ContactInfo::new_localhost(Keypair::new().pubkey(), 0));
crds.insert(new.clone(), 0).unwrap();
// set request creation time to max_value
node.mark_pull_request_creation_time(new.label().pubkey(), u64::max_value());
// odds of getting the other request should be 1 in u64::max_value()
for _ in 0..10 {
let req = node.new_pull_request(&crds, node_id, u64::max_value());
let (to, _, self_info) = req.unwrap();
assert_eq!(to, old.label().pubkey());
assert_eq!(self_info, entry);
}
}
#[test]
fn test_process_pull_request() {
let mut node_crds = Crds::default();
let entry = CrdsValue::ContactInfo(ContactInfo::new_localhost(Keypair::new().pubkey(), 0));
let node_id = entry.label().pubkey();
let node = CrdsGossipPull::default();
node_crds.insert(entry.clone(), 0).unwrap();
let new = CrdsValue::ContactInfo(ContactInfo::new_localhost(Keypair::new().pubkey(), 0));
node_crds.insert(new.clone(), 0).unwrap();
let req = node.new_pull_request(&node_crds, node_id, 0);
let mut dest_crds = Crds::default();
let mut dest = CrdsGossipPull::default();
let (_, filter, caller) = req.unwrap();
let rsp = dest.process_pull_request(&mut dest_crds, caller.clone(), filter, 1);
assert!(rsp.is_empty());
assert!(dest_crds.lookup(&caller.label()).is_some());
assert_eq!(
dest_crds
.lookup_versioned(&caller.label())
.unwrap()
.insert_timestamp,
1
);
assert_eq!(
dest_crds
.lookup_versioned(&caller.label())
.unwrap()
.local_timestamp,
1
);
}
#[test]
fn test_process_pull_request_response() {
let mut node_crds = Crds::default();
let entry = CrdsValue::ContactInfo(ContactInfo::new_localhost(Keypair::new().pubkey(), 0));
let node_id = entry.label().pubkey();
let mut node = CrdsGossipPull::default();
node_crds.insert(entry.clone(), 0).unwrap();
let new = CrdsValue::ContactInfo(ContactInfo::new_localhost(Keypair::new().pubkey(), 0));
node_crds.insert(new.clone(), 0).unwrap();
let mut dest = CrdsGossipPull::default();
let mut dest_crds = Crds::default();
let new = CrdsValue::ContactInfo(ContactInfo::new_localhost(Keypair::new().pubkey(), 0));
dest_crds.insert(new.clone(), 0).unwrap();
// node contains a key from the dest node, but at an older local timestamp
let dest_id = new.label().pubkey();
let same_key = CrdsValue::LeaderId(LeaderId {
id: dest_id,
leader_id: dest_id,
wallclock: 1,
});
node_crds.insert(same_key.clone(), 0).unwrap();
assert_eq!(
node_crds
.lookup_versioned(&same_key.label())
.unwrap()
.local_timestamp,
0
);
let mut done = false;
for _ in 0..30 {
// there is a chance of a false positive with bloom filters
let req = node.new_pull_request(&node_crds, node_id, 0);
let (_, filter, caller) = req.unwrap();
let rsp = dest.process_pull_request(&mut dest_crds, caller, filter, 0);
// if there is a false positive this is empty
// prob should be around 0.1 per iteration
if rsp.is_empty() {
continue;
}
assert_eq!(rsp.len(), 1);
let failed = node.process_pull_response(&mut node_crds, node_id, rsp, 1);
assert_eq!(failed, 0);
assert_eq!(
node_crds
.lookup_versioned(&new.label())
.unwrap()
.local_timestamp,
1
);
// verify that the whole record was updated for dest since this is a response from dest
assert_eq!(
node_crds
.lookup_versioned(&same_key.label())
.unwrap()
.local_timestamp,
1
);
done = true;
break;
}
assert!(done);
}
#[test]
fn test_gossip_purge() {
let mut node_crds = Crds::default();
let entry = CrdsValue::ContactInfo(ContactInfo::new_localhost(Keypair::new().pubkey(), 0));
let node_label = entry.label();
let node_id = node_label.pubkey();
let mut node = CrdsGossipPull::default();
node_crds.insert(entry.clone(), 0).unwrap();
let old = CrdsValue::ContactInfo(ContactInfo::new_localhost(Keypair::new().pubkey(), 0));
node_crds.insert(old.clone(), 0).unwrap();
let value_hash = node_crds.lookup_versioned(&old.label()).unwrap().value_hash;
//verify self is valid
assert_eq!(node_crds.lookup(&node_label).unwrap().label(), node_label);
// purge
node.purge_active(&mut node_crds, node_id, 1);
//verify self is still valid after purge
assert_eq!(node_crds.lookup(&node_label).unwrap().label(), node_label);
assert_eq!(node_crds.lookup_versioned(&old.label()), None);
assert_eq!(node.purged_values.len(), 1);
for _ in 0..30 {
// there is a chance of a false positive with bloom filters
// assert that purged value is still in the set
// chance of 30 consecutive false positives is 0.1^30
let mut filter = node.build_crds_filter(&node_crds);
assert!(filter.contains(&value_hash));
}
// purge the value
node.purge_purged(1);
assert_eq!(node.purged_values.len(), 0);
}
}

453
src/crds_gossip_push.rs Normal file
View File

@ -0,0 +1,453 @@
//! Crds Gossip Push overlay
//! This module is used to propagate recently created CrdsValues across the network
//! Eager push strategy is based on Plumtree
//! http://asc.di.fct.unl.pt/~jleitao/pdf/srds07-leitao.pdf
//!
//! Main differences are:
//! 1. There is no `max hop`. Messages are signed with a local wallclock. If they are outside of
//! the local nodes wallclock window they are drooped silently.
//! 2. The prune set is stored in a Bloom filter.
use bincode::serialized_size;
use bloom::Bloom;
use crds::{Crds, VersionedCrdsValue};
use crds_gossip_error::CrdsGossipError;
use crds_value::{CrdsValue, CrdsValueLabel};
use hash::Hash;
use indexmap::map::IndexMap;
use packet::BLOB_DATA_SIZE;
use rand::{self, Rng};
use solana_sdk::pubkey::Pubkey;
use std::cmp;
use std::collections::HashMap;
pub const CRDS_GOSSIP_NUM_ACTIVE: usize = 30;
pub const CRDS_GOSSIP_PUSH_FANOUT: usize = 6;
pub const CRDS_GOSSIP_PUSH_MSG_TIMEOUT_MS: u64 = 5000;
pub struct CrdsGossipPush {
/// max bytes per message
pub max_bytes: usize,
/// active set of validators for push
active_set: IndexMap<Pubkey, Bloom<Pubkey>>,
/// push message queue
push_messages: HashMap<CrdsValueLabel, Hash>,
pushed_once: HashMap<Hash, u64>,
pub num_active: usize,
pub push_fanout: usize,
pub msg_timeout: u64,
}
impl Default for CrdsGossipPush {
fn default() -> Self {
Self {
max_bytes: BLOB_DATA_SIZE,
active_set: IndexMap::new(),
push_messages: HashMap::new(),
pushed_once: HashMap::new(),
num_active: CRDS_GOSSIP_NUM_ACTIVE,
push_fanout: CRDS_GOSSIP_PUSH_FANOUT,
msg_timeout: CRDS_GOSSIP_PUSH_MSG_TIMEOUT_MS,
}
}
}
impl CrdsGossipPush {
pub fn num_pending(&self) -> usize {
self.push_messages.len()
}
/// process a push message to the network
pub fn process_push_message(
&mut self,
crds: &mut Crds,
value: CrdsValue,
now: u64,
) -> Result<Option<VersionedCrdsValue>, CrdsGossipError> {
if now > value.wallclock() + self.msg_timeout {
return Err(CrdsGossipError::PushMessageTimeout);
}
if now + self.msg_timeout < value.wallclock() {
return Err(CrdsGossipError::PushMessageTimeout);
}
let label = value.label();
let new_value = crds.new_versioned(now, value);
let value_hash = new_value.value_hash;
if self.pushed_once.get(&value_hash).is_some() {
return Err(CrdsGossipError::PushMessagePrune);
}
let old = crds.insert_versioned(new_value);
if old.is_err() {
return Err(CrdsGossipError::PushMessageOldVersion);
}
self.push_messages.insert(label, value_hash);
self.pushed_once.insert(value_hash, now);
Ok(old.ok().and_then(|opt| opt))
}
/// New push message to broadcast to peers.
/// Returns a list of Pubkeys for the selected peers and a list of values to send to all the
/// peers.
/// The list of push messages is created such that all the randomly selected peers have not
/// pruned the source addresses.
pub fn new_push_messages(&mut self, crds: &Crds, now: u64) -> (Vec<Pubkey>, Vec<CrdsValue>) {
let max = self.active_set.len();
let mut nodes: Vec<_> = (0..max).collect();
rand::thread_rng().shuffle(&mut nodes);
let peers: Vec<Pubkey> = nodes
.into_iter()
.filter_map(|n| self.active_set.get_index(n))
.take(self.push_fanout)
.map(|n| *n.0)
.collect();
let mut total_bytes: usize = 0;
let mut values = vec![];
for (label, hash) in &self.push_messages {
let mut failed = false;
for p in &peers {
let filter = self.active_set.get_mut(p);
failed |= filter.is_none() || filter.unwrap().contains(&label.pubkey());
}
if failed {
continue;
}
let res = crds.lookup_versioned(label);
if res.is_none() {
continue;
}
let version = res.unwrap();
if version.value_hash != *hash {
continue;
}
let value = &version.value;
if value.wallclock() > now || value.wallclock() + self.msg_timeout < now {
continue;
}
total_bytes += serialized_size(value).unwrap() as usize;
if total_bytes > self.max_bytes {
break;
}
values.push(value.clone());
}
for v in &values {
self.push_messages.remove(&v.label());
}
(peers, values)
}
/// add the `from` to the peer's filter of nodes
pub fn process_prune_msg(&mut self, peer: Pubkey, origins: &[Pubkey]) {
for origin in origins {
if let Some(p) = self.active_set.get_mut(&peer) {
p.add(origin)
}
}
}
fn compute_need(num_active: usize, active_set_len: usize, ratio: usize) -> usize {
let num = active_set_len / ratio;
cmp::min(num_active, (num_active - active_set_len) + num)
}
/// refresh the push active set
/// * ratio - active_set.len()/ratio is the number of actives to rotate
pub fn refresh_push_active_set(
&mut self,
crds: &Crds,
self_id: Pubkey,
network_size: usize,
ratio: usize,
) {
let need = Self::compute_need(self.num_active, self.active_set.len(), ratio);
let mut new_items = HashMap::new();
let mut ixs: Vec<_> = (0..crds.table.len()).collect();
rand::thread_rng().shuffle(&mut ixs);
for ix in ixs {
let item = crds.table.get_index(ix);
if item.is_none() {
continue;
}
let val = item.unwrap();
if val.0.pubkey() == self_id {
continue;
}
if self.active_set.get(&val.0.pubkey()).is_some() {
continue;
}
if new_items.get(&val.0.pubkey()).is_some() {
continue;
}
let bloom = Bloom::random(network_size, 0.1, 1024 * 8 * 4);
new_items.insert(val.0.pubkey(), bloom);
if new_items.len() == need {
break;
}
}
let mut keys: Vec<Pubkey> = self.active_set.keys().cloned().collect();
rand::thread_rng().shuffle(&mut keys);
let num = keys.len() / ratio;
for k in &keys[..num] {
self.active_set.remove(k);
}
for (k, v) in new_items {
self.active_set.insert(k, v);
}
}
/// purge old pending push messages
pub fn purge_old_pending_push_messages(&mut self, crds: &Crds, min_time: u64) {
let old_msgs: Vec<CrdsValueLabel> = self
.push_messages
.iter()
.filter_map(|(k, hash)| {
if let Some(versioned) = crds.lookup_versioned(k) {
if versioned.value.wallclock() < min_time || versioned.value_hash != *hash {
Some(k)
} else {
None
}
} else {
Some(k)
}
}).cloned()
.collect();
for k in old_msgs {
self.push_messages.remove(&k);
}
}
/// purge old pushed_once messages
pub fn purge_old_pushed_once_messages(&mut self, min_time: u64) {
let old_msgs: Vec<Hash> = self
.pushed_once
.iter()
.filter_map(|(k, v)| if *v < min_time { Some(k) } else { None })
.cloned()
.collect();
for k in old_msgs {
self.pushed_once.remove(&k);
}
}
}
#[cfg(test)]
mod test {
use super::*;
use contact_info::ContactInfo;
use signature::{Keypair, KeypairUtil};
#[test]
fn test_process_push() {
let mut crds = Crds::default();
let mut push = CrdsGossipPush::default();
let value = CrdsValue::ContactInfo(ContactInfo::new_localhost(Keypair::new().pubkey(), 0));
let label = value.label();
// push a new message
assert_eq!(
push.process_push_message(&mut crds, value.clone(), 0),
Ok(None)
);
assert_eq!(crds.lookup(&label), Some(&value));
// push it again
assert_eq!(
push.process_push_message(&mut crds, value.clone(), 0),
Err(CrdsGossipError::PushMessagePrune)
);
}
#[test]
fn test_process_push_old_version() {
let mut crds = Crds::default();
let mut push = CrdsGossipPush::default();
let mut ci = ContactInfo::new_localhost(Keypair::new().pubkey(), 0);
ci.wallclock = 1;
let value = CrdsValue::ContactInfo(ci.clone());
// push a new message
assert_eq!(push.process_push_message(&mut crds, value, 0), Ok(None));
// push an old version
ci.wallclock = 0;
let value = CrdsValue::ContactInfo(ci.clone());
assert_eq!(
push.process_push_message(&mut crds, value, 0),
Err(CrdsGossipError::PushMessageOldVersion)
);
}
#[test]
fn test_process_push_timeout() {
let mut crds = Crds::default();
let mut push = CrdsGossipPush::default();
let timeout = push.msg_timeout;
let mut ci = ContactInfo::new_localhost(Keypair::new().pubkey(), 0);
// push a version to far in the future
ci.wallclock = timeout + 1;
let value = CrdsValue::ContactInfo(ci.clone());
assert_eq!(
push.process_push_message(&mut crds, value, 0),
Err(CrdsGossipError::PushMessageTimeout)
);
// push a version to far in the past
ci.wallclock = 0;
let value = CrdsValue::ContactInfo(ci.clone());
assert_eq!(
push.process_push_message(&mut crds, value, timeout + 1),
Err(CrdsGossipError::PushMessageTimeout)
);
}
#[test]
fn test_process_push_update() {
let mut crds = Crds::default();
let mut push = CrdsGossipPush::default();
let mut ci = ContactInfo::new_localhost(Keypair::new().pubkey(), 0);
ci.wallclock = 0;
let value_old = CrdsValue::ContactInfo(ci.clone());
// push a new message
assert_eq!(
push.process_push_message(&mut crds, value_old.clone(), 0),
Ok(None)
);
// push an old version
ci.wallclock = 1;
let value = CrdsValue::ContactInfo(ci.clone());
assert_eq!(
push.process_push_message(&mut crds, value, 0)
.unwrap()
.unwrap()
.value,
value_old
);
}
#[test]
fn test_compute_need() {
assert_eq!(CrdsGossipPush::compute_need(30, 0, 10), 30);
assert_eq!(CrdsGossipPush::compute_need(30, 1, 10), 29);
assert_eq!(CrdsGossipPush::compute_need(30, 30, 10), 3);
assert_eq!(CrdsGossipPush::compute_need(30, 29, 10), 3);
}
#[test]
fn test_refresh_active_set() {
use logger;
logger::setup();
let mut crds = Crds::default();
let mut push = CrdsGossipPush::default();
let value1 = CrdsValue::ContactInfo(ContactInfo::new_localhost(Keypair::new().pubkey(), 0));
assert_eq!(crds.insert(value1.clone(), 0), Ok(None));
push.refresh_push_active_set(&crds, Pubkey::default(), 1, 1);
assert!(push.active_set.get(&value1.label().pubkey()).is_some());
let value2 = CrdsValue::ContactInfo(ContactInfo::new_localhost(Keypair::new().pubkey(), 0));
assert!(push.active_set.get(&value2.label().pubkey()).is_none());
assert_eq!(crds.insert(value2.clone(), 0), Ok(None));
for _ in 0..30 {
push.refresh_push_active_set(&crds, Pubkey::default(), 1, 1);
if push.active_set.get(&value2.label().pubkey()).is_some() {
break;
}
}
assert!(push.active_set.get(&value2.label().pubkey()).is_some());
for _ in 0..push.num_active {
let value2 =
CrdsValue::ContactInfo(ContactInfo::new_localhost(Keypair::new().pubkey(), 0));
assert_eq!(crds.insert(value2.clone(), 0), Ok(None));
}
push.refresh_push_active_set(&crds, Pubkey::default(), 1, 1);
assert_eq!(push.active_set.len(), push.num_active);
}
#[test]
fn test_new_push_messages() {
let mut crds = Crds::default();
let mut push = CrdsGossipPush::default();
let peer = CrdsValue::ContactInfo(ContactInfo::new_localhost(Keypair::new().pubkey(), 0));
assert_eq!(crds.insert(peer.clone(), 0), Ok(None));
push.refresh_push_active_set(&crds, Pubkey::default(), 1, 1);
let new_msg =
CrdsValue::ContactInfo(ContactInfo::new_localhost(Keypair::new().pubkey(), 0));
assert_eq!(
push.process_push_message(&mut crds, new_msg.clone(), 0),
Ok(None)
);
assert_eq!(push.active_set.len(), 1);
assert_eq!(
push.new_push_messages(&crds, 0),
(vec![peer.label().pubkey()], vec![new_msg])
);
}
#[test]
fn test_process_prune() {
let mut crds = Crds::default();
let mut push = CrdsGossipPush::default();
let peer = CrdsValue::ContactInfo(ContactInfo::new_localhost(Keypair::new().pubkey(), 0));
assert_eq!(crds.insert(peer.clone(), 0), Ok(None));
push.refresh_push_active_set(&crds, Pubkey::default(), 1, 1);
let new_msg =
CrdsValue::ContactInfo(ContactInfo::new_localhost(Keypair::new().pubkey(), 0));
assert_eq!(
push.process_push_message(&mut crds, new_msg.clone(), 0),
Ok(None)
);
push.process_prune_msg(peer.label().pubkey(), &[new_msg.label().pubkey()]);
assert_eq!(
push.new_push_messages(&crds, 0),
(vec![peer.label().pubkey()], vec![])
);
}
#[test]
fn test_purge_old_pending_push_messages() {
let mut crds = Crds::default();
let mut push = CrdsGossipPush::default();
let peer = CrdsValue::ContactInfo(ContactInfo::new_localhost(Keypair::new().pubkey(), 0));
assert_eq!(crds.insert(peer.clone(), 0), Ok(None));
push.refresh_push_active_set(&crds, Pubkey::default(), 1, 1);
let mut ci = ContactInfo::new_localhost(Keypair::new().pubkey(), 0);
ci.wallclock = 1;
let new_msg = CrdsValue::ContactInfo(ci.clone());
assert_eq!(
push.process_push_message(&mut crds, new_msg.clone(), 1),
Ok(None)
);
push.purge_old_pending_push_messages(&crds, 0);
assert_eq!(
push.new_push_messages(&crds, 0),
(vec![peer.label().pubkey()], vec![])
);
}
#[test]
fn test_purge_old_pushed_once_messages() {
let mut crds = Crds::default();
let mut push = CrdsGossipPush::default();
let mut ci = ContactInfo::new_localhost(Keypair::new().pubkey(), 0);
ci.wallclock = 0;
let value = CrdsValue::ContactInfo(ci.clone());
let label = value.label();
// push a new message
assert_eq!(
push.process_push_message(&mut crds, value.clone(), 0),
Ok(None)
);
assert_eq!(crds.lookup(&label), Some(&value));
// push it again
assert_eq!(
push.process_push_message(&mut crds, value.clone(), 0),
Err(CrdsGossipError::PushMessagePrune)
);
// purge the old pushed
push.purge_old_pushed_once_messages(1);
// push it again
assert_eq!(
push.process_push_message(&mut crds, value.clone(), 0),
Err(CrdsGossipError::PushMessageOldVersion)
);
}
}

26
src/crds_traits_impls.rs Normal file
View File

@ -0,0 +1,26 @@
use bloom::BloomHashIndex;
use hash::Hash;
use solana_sdk::pubkey::Pubkey;
fn slice_hash(slice: &[u8], hash_index: u64) -> u64 {
let len = slice.len();
assert!(len < 256);
let mut rv = 0u64;
for i in 0..8 {
let pos = (hash_index >> i) & 0xff;
rv |= u64::from(slice[pos as usize % len]) << i;
}
rv
}
impl BloomHashIndex for Pubkey {
fn hash(&self, hash_index: u64) -> u64 {
slice_hash(self.as_ref(), hash_index)
}
}
impl BloomHashIndex for Hash {
fn hash(&self, hash_index: u64) -> u64 {
slice_hash(self.as_ref(), hash_index)
}
}

147
src/crds_value.rs Normal file
View File

@ -0,0 +1,147 @@
use contact_info::ContactInfo;
use solana_sdk::pubkey::Pubkey;
use std::fmt;
use transaction::Transaction;
/// CrdsValue that is replicated across the cluster
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
pub enum CrdsValue {
/// * Merge Strategy - Latest wallclock is picked
ContactInfo(ContactInfo),
/// TODO, Votes need a height potentially in the userdata
/// * Merge Strategy - Latest height is picked
Vote(Vote),
/// * Merge Strategy - Latest wallclock is picked
LeaderId(LeaderId),
}
#[derive(Serialize, Deserialize, Default, Clone, Debug, PartialEq)]
pub struct LeaderId {
pub id: Pubkey,
pub leader_id: Pubkey,
pub wallclock: u64,
}
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
pub struct Vote {
pub transaction: Transaction,
pub height: u64,
pub wallclock: u64,
}
/// Type of the replicated value
/// These are labels for values in a record that is assosciated with `Pubkey`
#[derive(PartialEq, Hash, Eq, Clone, Debug)]
pub enum CrdsValueLabel {
ContactInfo(Pubkey),
Vote(Pubkey),
LeaderId(Pubkey),
}
impl fmt::Display for CrdsValueLabel {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
CrdsValueLabel::ContactInfo(_) => write!(f, "ContactInfo({})", self.pubkey()),
CrdsValueLabel::Vote(_) => write!(f, "Vote({})", self.pubkey()),
CrdsValueLabel::LeaderId(_) => write!(f, "LeaderId({})", self.pubkey()),
}
}
}
impl CrdsValueLabel {
pub fn pubkey(&self) -> Pubkey {
match self {
CrdsValueLabel::ContactInfo(p) => *p,
CrdsValueLabel::Vote(p) => *p,
CrdsValueLabel::LeaderId(p) => *p,
}
}
}
impl CrdsValue {
/// Totally unsecure unverfiable wallclock of the node that generatd this message
/// Latest wallclock is always picked.
/// This is used to time out push messages.
pub fn wallclock(&self) -> u64 {
match self {
CrdsValue::ContactInfo(contact_info) => contact_info.wallclock,
CrdsValue::Vote(vote) => vote.wallclock,
CrdsValue::LeaderId(leader_id) => leader_id.wallclock,
}
}
pub fn label(&self) -> CrdsValueLabel {
match self {
CrdsValue::ContactInfo(contact_info) => CrdsValueLabel::ContactInfo(contact_info.id),
CrdsValue::Vote(vote) => CrdsValueLabel::Vote(vote.transaction.account_keys[0]),
CrdsValue::LeaderId(leader_id) => CrdsValueLabel::LeaderId(leader_id.id),
}
}
pub fn contact_info(&self) -> Option<&ContactInfo> {
match self {
CrdsValue::ContactInfo(contact_info) => Some(contact_info),
_ => None,
}
}
pub fn leader_id(&self) -> Option<&LeaderId> {
match self {
CrdsValue::LeaderId(leader_id) => Some(leader_id),
_ => None,
}
}
pub fn vote(&self) -> Option<&Vote> {
match self {
CrdsValue::Vote(vote) => Some(vote),
_ => None,
}
}
/// Return all the possible labels for a record identified by Pubkey.
pub fn record_labels(key: Pubkey) -> [CrdsValueLabel; 3] {
[
CrdsValueLabel::ContactInfo(key),
CrdsValueLabel::Vote(key),
CrdsValueLabel::LeaderId(key),
]
}
}
#[cfg(test)]
mod test {
use super::*;
use contact_info::ContactInfo;
use system_transaction::test_tx;
#[test]
fn test_labels() {
let mut hits = [false; 3];
// this method should cover all the possible labels
for v in &CrdsValue::record_labels(Pubkey::default()) {
match v {
CrdsValueLabel::ContactInfo(_) => hits[0] = true,
CrdsValueLabel::Vote(_) => hits[1] = true,
CrdsValueLabel::LeaderId(_) => hits[2] = true,
}
}
assert!(hits.iter().all(|x| *x));
}
#[test]
fn test_keys_and_values() {
let v = CrdsValue::LeaderId(LeaderId::default());
let key = v.clone().leader_id().unwrap().id;
assert_eq!(v.wallclock(), 0);
assert_eq!(v.label(), CrdsValueLabel::LeaderId(key));
let v = CrdsValue::ContactInfo(ContactInfo::default());
assert_eq!(v.wallclock(), 0);
let key = v.clone().contact_info().unwrap().id;
assert_eq!(v.label(), CrdsValueLabel::ContactInfo(key));
let v = CrdsValue::Vote(Vote {
transaction: test_tx(),
height: 1,
wallclock: 0,
});
assert_eq!(v.wallclock(), 0);
let key = v.clone().vote().unwrap().transaction.account_keys[0];
assert_eq!(v.label(), CrdsValueLabel::Vote(key));
}
}

View File

@ -109,11 +109,7 @@ impl Drone {
let leader = poll_gossip_for_leader(self.network_addr, Some(10))
.map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?;
let mut client = ThinClient::new(
leader.contact_info.rpc,
leader.contact_info.tpu,
transactions_socket,
);
let mut client = ThinClient::new(leader.rpc, leader.tpu, transactions_socket);
let last_id = client.get_last_id();
let mut tx = match req {
@ -343,22 +339,12 @@ mod tests {
let mut addr: SocketAddr = "0.0.0.0:9900".parse().expect("bind to drone socket");
addr.set_ip(get_ip_addr().expect("drone get_ip_addr"));
let mut drone = Drone::new(
alice.keypair(),
addr,
leader_data.contact_info.ncp,
None,
Some(150_000),
);
let mut drone = Drone::new(alice.keypair(), addr, leader_data.ncp, None, Some(150_000));
let transactions_socket =
UdpSocket::bind("0.0.0.0:0").expect("drone bind to transactions socket");
let mut client = ThinClient::new(
leader_data.contact_info.rpc,
leader_data.contact_info.tpu,
transactions_socket,
);
let mut client = ThinClient::new(leader_data.rpc, leader_data.tpu, transactions_socket);
let bob_req = DroneRequest::GetAirdrop {
airdrop_request_amount: 50,
@ -387,11 +373,7 @@ mod tests {
let transactions_socket =
UdpSocket::bind("0.0.0.0:0").expect("drone bind to transactions socket");
let mut client = ThinClient::new(
leader_data.contact_info.rpc,
leader_data.contact_info.tpu,
transactions_socket,
);
let mut client = ThinClient::new(leader_data.rpc, leader_data.tpu, transactions_socket);
let carlos_req = DroneRequest::GetAirdrop {
airdrop_request_amount: 5_000_000,

View File

@ -16,6 +16,7 @@ use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock};
use std::thread::Result;
use timing::timestamp;
use tpu::{Tpu, TpuReturnType};
use tvu::{Tvu, TvuReturnType};
use untrusted::Input;
@ -148,9 +149,9 @@ impl Fullnode {
info!(
"starting... local gossip address: {} (advertising {})",
local_gossip_addr, node.info.contact_info.ncp
local_gossip_addr, node.info.ncp
);
let mut rpc_addr = node.info.contact_info.rpc;
let mut rpc_addr = node.info.rpc;
if let Some(port) = rpc_port {
rpc_addr.set_port(port);
}
@ -198,16 +199,16 @@ impl Fullnode {
sigverify_disabled: bool,
rpc_port: Option<u16>,
) -> Self {
let mut rpc_addr = node.info.contact_info.rpc;
let mut rpc_pubsub_addr = node.info.contact_info.rpc_pubsub;
let mut rpc_addr = node.info.rpc;
let mut rpc_pubsub_addr = node.info.rpc_pubsub;
// Use custom RPC port, if provided (`Some(port)`)
// RPC port may be any valid open port on the node
// If rpc_port == `None`, node will listen on the ports set in NodeInfo
if let Some(port) = rpc_port {
rpc_addr.set_port(port);
node.info.contact_info.rpc = rpc_addr;
node.info.rpc = rpc_addr;
rpc_pubsub_addr.set_port(port + 1);
node.info.contact_info.rpc_pubsub = rpc_pubsub_addr;
node.info.rpc_pubsub = rpc_pubsub_addr;
}
let exit = Arc::new(AtomicBool::new(false));
@ -215,6 +216,7 @@ impl Fullnode {
let window = new_window(32 * 1024);
let shared_window = Arc::new(RwLock::new(window));
node.info.wallclock = timestamp();
let cluster_info = Arc::new(RwLock::new(
ClusterInfo::new(node.info).expect("ClusterInfo::new"),
));
@ -233,7 +235,10 @@ impl Fullnode {
// Insert the bootstrap leader info, should only be None if this node
// is the bootstrap leader
if let Some(bootstrap_leader_info) = bootstrap_leader_info_option {
cluster_info.write().unwrap().insert(bootstrap_leader_info);
cluster_info
.write()
.unwrap()
.insert_info(bootstrap_leader_info.clone());
}
// Get the scheduled leader
@ -738,7 +743,7 @@ mod tests {
&bootstrap_leader_ledger_path,
Arc::new(bootstrap_leader_keypair),
Arc::new(Keypair::new()),
Some(bootstrap_leader_info.contact_info.ncp),
Some(bootstrap_leader_info.ncp),
false,
LeaderScheduler::new(&leader_scheduler_config),
None,
@ -829,7 +834,7 @@ mod tests {
&bootstrap_leader_ledger_path,
bootstrap_leader_keypair,
leader_vote_account_keypair,
Some(bootstrap_leader_info.contact_info.ncp),
Some(bootstrap_leader_info.ncp),
false,
LeaderScheduler::new(&leader_scheduler_config),
None,
@ -848,7 +853,7 @@ mod tests {
&bootstrap_leader_ledger_path,
Arc::new(validator_keypair),
Arc::new(validator_vote_account_keypair),
Some(bootstrap_leader_info.contact_info.ncp),
Some(bootstrap_leader_info.ncp),
false,
LeaderScheduler::new(&leader_scheduler_config),
None,
@ -869,7 +874,7 @@ mod tests {
let leader_keypair = Keypair::new();
let leader_node = Node::new_localhost_with_pubkey(leader_keypair.pubkey());
let leader_id = leader_node.info.id;
let leader_ncp = leader_node.info.contact_info.ncp;
let leader_ncp = leader_node.info.ncp;
// Create validator identity
let num_ending_ticks = 1;
@ -954,7 +959,7 @@ mod tests {
// "extra_blobs" number of blobs to make sure the window stops in the right place.
let extra_blobs = cmp::max(leader_rotation_interval / 3, 1);
let total_blobs_to_send = bootstrap_height + extra_blobs;
let tvu_address = &validator_info.contact_info.tvu;
let tvu_address = &validator_info.tvu;
let msgs = make_consecutive_blobs(
leader_id,
total_blobs_to_send,

View File

@ -12,6 +12,7 @@ pub mod counter;
pub mod bank;
pub mod banking_stage;
pub mod blob_fetch_stage;
pub mod bloom;
pub mod bpf_loader;
pub mod broadcast_stage;
pub mod budget_expr;
@ -21,11 +22,18 @@ pub mod budget_transaction;
pub mod chacha;
#[cfg(all(feature = "chacha", feature = "cuda"))]
pub mod chacha_cuda;
pub mod choose_gossip_peer_strategy;
pub mod client;
pub mod crds;
pub mod crds_gossip;
pub mod crds_gossip_error;
pub mod crds_gossip_pull;
pub mod crds_gossip_push;
pub mod crds_traits_impls;
pub mod crds_value;
#[macro_use]
pub mod cluster_info;
pub mod contact_info;
pub mod budget_program;
pub mod cluster_info;
pub mod compute_leader_finality_service;
pub mod drone;
pub mod entry;
@ -82,6 +90,7 @@ pub mod window;
pub mod window_service;
extern crate bincode;
extern crate bs58;
extern crate bv;
extern crate byteorder;
extern crate bytes;
extern crate chrono;
@ -93,6 +102,7 @@ extern crate generic_array;
#[cfg(any(feature = "chacha", feature = "cuda"))]
#[macro_use]
extern crate hex_literal;
extern crate indexmap;
extern crate ipnetwork;
extern crate itertools;
extern crate libc;

View File

@ -26,8 +26,8 @@ impl Ncp {
let (request_sender, request_receiver) = channel();
let gossip_socket = Arc::new(gossip_socket);
trace!(
"Ncp: id: {:?}, listening on: {:?}",
&cluster_info.read().unwrap().id.as_ref()[..4],
"Ncp: id: {}, listening on: {:?}",
&cluster_info.read().unwrap().my_data().id,
gossip_socket.local_addr().unwrap()
);
let t_receiver =

View File

@ -89,9 +89,9 @@ impl Replicator {
let leader_info = network_addr.map(|i| NodeInfo::new_entry_point(&i));
let leader_pubkey;
if let Some(leader_info) = leader_info.as_ref() {
if let Some(leader_info) = leader_info {
leader_pubkey = leader_info.id;
cluster_info.write().unwrap().insert(leader_info);
cluster_info.write().unwrap().insert_info(leader_info);
} else {
panic!("No leader info!");
}

View File

@ -297,7 +297,7 @@ impl JsonRpcRequestProcessor {
fn get_leader_addr(cluster_info: &Arc<RwLock<ClusterInfo>>) -> Result<SocketAddr> {
if let Some(leader_data) = cluster_info.read().unwrap().leader_data() {
Ok(leader_data.contact_info.tpu)
Ok(leader_data.tpu)
} else {
Err(Error {
code: ErrorCode::InternalError,
@ -368,11 +368,9 @@ mod tests {
bank.process_transaction(&tx).expect("process transaction");
let request_processor = JsonRpcRequestProcessor::new(Arc::new(bank));
let cluster_info = Arc::new(RwLock::new(
ClusterInfo::new(NodeInfo::new_unspecified()).unwrap(),
));
let cluster_info = Arc::new(RwLock::new(ClusterInfo::new(NodeInfo::default()).unwrap()));
let leader = NodeInfo::new_with_socketaddr(&socketaddr!("127.0.0.1:1234"));
cluster_info.write().unwrap().insert(&leader);
cluster_info.write().unwrap().insert_info(leader.clone());
cluster_info.write().unwrap().set_leader(leader.id);
let rpc_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0);
let exit = Arc::new(AtomicBool::new(false));
@ -394,9 +392,7 @@ mod tests {
fn test_rpc_new() {
let alice = Mint::new(10_000);
let bank = Bank::new(&alice);
let cluster_info = Arc::new(RwLock::new(
ClusterInfo::new(NodeInfo::new_unspecified()).unwrap(),
));
let cluster_info = Arc::new(RwLock::new(ClusterInfo::new(NodeInfo::default()).unwrap()));
let rpc_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 24680);
let rpc_service = JsonRpcService::new(&Arc::new(bank), &cluster_info, rpc_addr);
let thread = rpc_service.thread_hdl.thread();
@ -645,7 +641,7 @@ mod tests {
"method": "sendTransaction",
"params": json!(vec![serial_tx])
});
let rpc_addr = leader_data.contact_info.rpc;
let rpc_addr = leader_data.rpc;
let rpc_string = format!("http://{}", rpc_addr.to_string());
let mut response = client
.post(&rpc_string)
@ -689,9 +685,7 @@ mod tests {
io.extend_with(rpc.to_delegate());
let meta = Meta {
request_processor: JsonRpcRequestProcessor::new(Arc::new(bank)),
cluster_info: Arc::new(RwLock::new(
ClusterInfo::new(NodeInfo::new_unspecified()).unwrap(),
)),
cluster_info: Arc::new(RwLock::new(ClusterInfo::new(NodeInfo::default()).unwrap())),
rpc_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0),
exit: Arc::new(AtomicBool::new(false)),
};
@ -710,9 +704,7 @@ mod tests {
#[test]
fn test_rpc_get_leader_addr() {
let cluster_info = Arc::new(RwLock::new(
ClusterInfo::new(NodeInfo::new_unspecified()).unwrap(),
));
let cluster_info = Arc::new(RwLock::new(ClusterInfo::new(NodeInfo::default()).unwrap()));
assert_eq!(
get_leader_addr(&cluster_info),
Err(Error {
@ -722,7 +714,7 @@ mod tests {
})
);
let leader = NodeInfo::new_with_socketaddr(&socketaddr!("127.0.0.1:1234"));
cluster_info.write().unwrap().insert(&leader);
cluster_info.write().unwrap().insert_info(leader.clone());
cluster_info.write().unwrap().set_leader(leader.id);
assert_eq!(
get_leader_addr(&cluster_info),

View File

@ -359,7 +359,10 @@ pub fn poll_gossip_for_leader(leader_ncp: SocketAddr, timeout: Option<u64>) -> R
);
let leader_entry_point = NodeInfo::new_entry_point(&leader_ncp);
cluster_info.write().unwrap().insert(&leader_entry_point);
cluster_info
.write()
.unwrap()
.insert_info(leader_entry_point);
sleep(Duration::from_millis(100));
@ -475,11 +478,7 @@ mod tests {
let transactions_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
let mut client = ThinClient::new(
leader_data.contact_info.rpc,
leader_data.contact_info.tpu,
transactions_socket,
);
let mut client = ThinClient::new(leader_data.rpc, leader_data.tpu, transactions_socket);
let transaction_count = client.transaction_count();
assert_eq!(transaction_count, 0);
let finality = client.get_finality();
@ -532,11 +531,7 @@ mod tests {
sleep(Duration::from_millis(300));
let transactions_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
let mut client = ThinClient::new(
leader_data.contact_info.rpc,
leader_data.contact_info.tpu,
transactions_socket,
);
let mut client = ThinClient::new(leader_data.rpc, leader_data.tpu, transactions_socket);
let last_id = client.get_last_id();
let tx = Transaction::system_new(&alice.keypair(), bob_pubkey, 500, last_id);
@ -593,11 +588,7 @@ mod tests {
sleep(Duration::from_millis(300));
let transactions_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
let mut client = ThinClient::new(
leader_data.contact_info.rpc,
leader_data.contact_info.tpu,
transactions_socket,
);
let mut client = ThinClient::new(leader_data.rpc, leader_data.tpu, transactions_socket);
let last_id = client.get_last_id();
let signature = client
.transfer(500, &alice.keypair(), bob_pubkey, &last_id)
@ -642,11 +633,7 @@ mod tests {
sleep(Duration::from_millis(300));
let transactions_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
let mut client = ThinClient::new(
leader_data.contact_info.rpc,
leader_data.contact_info.tpu,
transactions_socket,
);
let mut client = ThinClient::new(leader_data.rpc, leader_data.tpu, transactions_socket);
// Create the validator account, transfer some tokens to that account
let validator_keypair = Keypair::new();
@ -744,11 +731,7 @@ mod tests {
sleep(Duration::from_millis(900));
let transactions_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
let mut client = ThinClient::new(
leader_data.contact_info.rpc,
leader_data.contact_info.tpu,
transactions_socket,
);
let mut client = ThinClient::new(leader_data.rpc, leader_data.tpu, transactions_socket);
let last_id = client.get_last_id();
// give bob 500 tokens

View File

@ -216,7 +216,7 @@ pub mod tests {
//start cluster_info2
let mut cluster_info2 = ClusterInfo::new(target2.info.clone()).expect("ClusterInfo::new");
cluster_info2.insert(&leader.info);
cluster_info2.insert_info(leader.info.clone());
cluster_info2.set_leader(leader.info.id);
let leader_id = leader.info.id;
let cref2 = Arc::new(RwLock::new(cluster_info2));
@ -245,7 +245,7 @@ pub mod tests {
let starting_balance = 10_000;
let mint = Mint::new(starting_balance);
let replicate_addr = target1.info.contact_info.tvu;
let replicate_addr = target1.info.tvu;
let leader_scheduler = Arc::new(RwLock::new(LeaderScheduler::from_bootstrap_leader(
leader_id,
)));
@ -255,7 +255,7 @@ pub mod tests {
//start cluster_info1
let mut cluster_info1 = ClusterInfo::new(target1.info.clone()).expect("ClusterInfo::new");
cluster_info1.insert(&leader.info);
cluster_info1.insert_info(leader.info.clone());
cluster_info1.set_leader(leader.info.id);
let cref1 = Arc::new(RwLock::new(cluster_info1));
let dr_1 = new_ncp(cref1.clone(), target1.sockets.gossip, exit.clone());

View File

@ -34,7 +34,7 @@ pub fn create_new_signed_vote_blob(
let shared_blob = SharedBlob::default();
let tick_height = bank.tick_height();
let leader_tpu = get_leader_tpu(bank, cluster_info)?;
let leader_tpu = get_leader_tpu(&bank, cluster_info)?;
//TODO: doesn't seem like there is a synchronous call to get height and id
debug!("voting on {:?}", &last_id.as_ref()[..8]);
let vote = Vote { tick_height };
@ -58,11 +58,7 @@ fn get_leader_tpu(bank: &Bank, cluster_info: &Arc<RwLock<ClusterInfo>>) -> Resul
};
let rcluster_info = cluster_info.read().unwrap();
let leader_tpu = rcluster_info
.table
.get(&leader_id)
.map(|leader| leader.contact_info.tpu);
let leader_tpu = rcluster_info.lookup(leader_id).map(|leader| leader.tpu);
if let Some(leader_tpu) = leader_tpu {
Ok(leader_tpu)
} else {

View File

@ -317,9 +317,9 @@ pub fn process_command(config: &WalletConfig) -> Result<String, Box<error::Error
}
let leader = poll_gossip_for_leader(config.network, config.timeout)?;
let tpu_addr = leader.contact_info.tpu;
let tpu_addr = leader.tpu;
let drone_addr = config.drone_addr(tpu_addr);
let rpc_addr = config.rpc_addr(leader.contact_info.rpc);
let rpc_addr = config.rpc_addr(leader.rpc);
match config.command {
// Get address of this client
@ -1147,11 +1147,11 @@ mod tests {
sleep(Duration::from_millis(900));
let (sender, receiver) = channel();
run_local_drone(alice.keypair(), leader_data.contact_info.ncp, sender);
run_local_drone(alice.keypair(), leader_data.ncp, sender);
let drone_addr = receiver.recv().unwrap();
let mut config = WalletConfig::default();
config.network = leader_data.contact_info.ncp;
config.network = leader_data.ncp;
config.drone_port = Some(drone_addr.port());
let tokens = 50;
@ -1220,10 +1220,10 @@ mod tests {
sleep(Duration::from_millis(900));
let (sender, receiver) = channel();
run_local_drone(alice.keypair(), leader_data.contact_info.ncp, sender);
run_local_drone(alice.keypair(), leader_data.ncp, sender);
let drone_addr = receiver.recv().unwrap();
let rpc_addr = format!("http://{}", leader_data.contact_info.rpc.to_string());
let rpc_addr = format!("http://{}", leader_data.rpc.to_string());
let signature = request_airdrop(&drone_addr, &bob_pubkey, 50);
assert!(signature.is_ok());
@ -1295,17 +1295,17 @@ mod tests {
sleep(Duration::from_millis(900));
let (sender, receiver) = channel();
run_local_drone(alice.keypair(), leader_data.contact_info.ncp, sender);
run_local_drone(alice.keypair(), leader_data.ncp, sender);
let drone_addr = receiver.recv().unwrap();
let rpc_addr = format!("http://{}", leader_data.contact_info.rpc.to_string());
let rpc_addr = format!("http://{}", leader_data.rpc.to_string());
let mut config_payer = WalletConfig::default();
config_payer.network = leader_data.contact_info.ncp;
config_payer.network = leader_data.ncp;
config_payer.drone_port = Some(drone_addr.port());
let mut config_witness = WalletConfig::default();
config_witness.network = leader_data.contact_info.ncp;
config_witness.network = leader_data.ncp;
config_witness.drone_port = Some(drone_addr.port());
assert_ne!(config_payer.id.pubkey(), config_witness.id.pubkey());
@ -1419,10 +1419,10 @@ mod tests {
sleep(Duration::from_millis(900));
let (sender, receiver) = channel();
run_local_drone(alice.keypair(), leader_data.contact_info.ncp, sender);
run_local_drone(alice.keypair(), leader_data.ncp, sender);
let drone_addr = receiver.recv().unwrap();
let rpc_addr = format!("http://{}", leader_data.contact_info.rpc.to_string());
let rpc_addr = format!("http://{}", leader_data.rpc.to_string());
assert_ne!(config_payer.id.pubkey(), config_witness.id.pubkey());
@ -1532,17 +1532,17 @@ mod tests {
sleep(Duration::from_millis(900));
let (sender, receiver) = channel();
run_local_drone(alice.keypair(), leader_data.contact_info.ncp, sender);
run_local_drone(alice.keypair(), leader_data.ncp, sender);
let drone_addr = receiver.recv().unwrap();
let rpc_addr = format!("http://{}", leader_data.contact_info.rpc.to_string());
let rpc_addr = format!("http://{}", leader_data.rpc.to_string());
let mut config_payer = WalletConfig::default();
config_payer.network = leader_data.contact_info.ncp;
config_payer.network = leader_data.ncp;
config_payer.drone_port = Some(drone_addr.port());
let mut config_witness = WalletConfig::default();
config_witness.network = leader_data.contact_info.ncp;
config_witness.network = leader_data.ncp;
config_witness.drone_port = Some(drone_addr.port());
assert_ne!(config_payer.id.pubkey(), config_witness.id.pubkey());

View File

@ -177,8 +177,7 @@ impl WindowUtil for Window {
}
}
let num_peers = rcluster_info.table.len() as u64;
let num_peers = rcluster_info.tvu_peers().len() as u64;
let max_repair = if max_entry_height == 0 {
calculate_max_repair(
num_peers,

View File

@ -277,7 +277,7 @@ pub fn window_service(
let mut received = entry_height;
let mut last = entry_height;
let mut times = 0;
let id = cluster_info.read().unwrap().id;
let id = cluster_info.read().unwrap().my_data().id;
let mut pending_retransmits = false;
trace!("{}: RECV_WINDOW started", id);
loop {
@ -418,7 +418,7 @@ mod test {
let t_responder = responder("window_send_test", blob_sockets[0].clone(), r_responder);
let mut num_blobs_to_make = 10;
let gossip_address = &tn.info.contact_info.ncp;
let gossip_address = &tn.info.ncp;
let msgs = make_consecutive_blobs(
me_id,
num_blobs_to_make,
@ -495,7 +495,7 @@ mod test {
w.set_id(&me_id).unwrap();
assert_eq!(i, w.index().unwrap());
w.meta.size = PACKET_DATA_SIZE;
w.meta.set_addr(&tn.info.contact_info.ncp);
w.meta.set_addr(&tn.info.ncp);
}
msgs.push(b);
}
@ -559,7 +559,7 @@ mod test {
w.set_id(&me_id).unwrap();
assert_eq!(i, w.index().unwrap());
w.meta.size = PACKET_DATA_SIZE;
w.meta.set_addr(&tn.info.contact_info.ncp);
w.meta.set_addr(&tn.info.ncp);
}
msgs.push(b);
}
@ -579,7 +579,7 @@ mod test {
w.set_id(&me_id).unwrap();
assert_eq!(i, w.index().unwrap());
w.meta.size = PACKET_DATA_SIZE;
w.meta.set_addr(&tn.info.contact_info.ncp);
w.meta.set_addr(&tn.info.ncp);
}
msgs1.push(b);
}

View File

@ -10,6 +10,7 @@ use solana::ncp::Ncp;
use solana::packet::{Blob, SharedBlob};
use solana::result;
use solana::service::Service;
use solana::timing::timestamp;
use std::net::UdpSocket;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock};
@ -22,6 +23,7 @@ fn test_node(exit: Arc<AtomicBool>) -> (Arc<RwLock<ClusterInfo>>, Ncp, UdpSocket
let c = Arc::new(RwLock::new(cluster_info));
let w = Arc::new(RwLock::new(vec![]));
let d = Ncp::new(&c.clone(), w, None, tn.sockets.gossip, exit);
let _ = c.read().unwrap().my_data();
(c, d, tn.sockets.replicate.pop().unwrap())
}
@ -29,38 +31,31 @@ fn test_node(exit: Arc<AtomicBool>) -> (Arc<RwLock<ClusterInfo>>, Ncp, UdpSocket
/// Run until every node in the network has a full NodeInfo set.
/// Check that nodes stop sending updates after all the NodeInfo has been shared.
/// tests that actually use this function are below
fn run_gossip_topo<F>(topo: F)
fn run_gossip_topo<F>(num: usize, topo: F)
where
F: Fn(&Vec<(Arc<RwLock<ClusterInfo>>, Ncp, UdpSocket)>) -> (),
{
let num: usize = 5;
let exit = Arc::new(AtomicBool::new(false));
let listen: Vec<_> = (0..num).map(|_| test_node(exit.clone())).collect();
topo(&listen);
let mut done = true;
for i in 0..(num * 32) {
done = false;
trace!("round {}", i);
for (c, _, _) in &listen {
if num == c.read().unwrap().convergence() as usize {
done = true;
break;
}
}
//at least 1 node converged
if done == true {
done = true;
let total: usize = listen
.iter()
.map(|v| v.0.read().unwrap().ncp_peers().len())
.sum();
if (total + num) * 10 > num * num * 9 {
done = true;
break;
} else {
trace!("not converged {} {} {}", i, total + num, num * num);
}
sleep(Duration::new(1, 0));
}
exit.store(true, Ordering::Relaxed);
for (c, dr, _) in listen {
for (_, dr, _) in listen {
dr.join().unwrap();
// make it clear what failed
// protocol is to chatty, updates should stop after everyone receives `num`
assert!(c.read().unwrap().update_index <= num as u64);
// protocol is not chatty enough, everyone should get `num` entries
assert_eq!(c.read().unwrap().table.len(), num);
}
assert!(done);
}
@ -68,37 +63,57 @@ where
#[test]
fn gossip_ring() -> result::Result<()> {
logger::setup();
run_gossip_topo(|listen| {
run_gossip_topo(50, |listen| {
let num = listen.len();
for n in 0..num {
let y = n % listen.len();
let x = (n + 1) % listen.len();
let mut xv = listen[x].0.write().unwrap();
let yv = listen[y].0.read().unwrap();
let mut d = yv.table[&yv.id].clone();
d.version = 0;
xv.insert(&d);
let mut d = yv.lookup(yv.id()).unwrap().clone();
d.wallclock = timestamp();
xv.insert_info(d);
}
});
Ok(())
}
/// ring a -> b -> c -> d -> e -> a
#[test]
#[ignore]
fn gossip_ring_large() -> result::Result<()> {
logger::setup();
run_gossip_topo(600, |listen| {
let num = listen.len();
for n in 0..num {
let y = n % listen.len();
let x = (n + 1) % listen.len();
let mut xv = listen[x].0.write().unwrap();
let yv = listen[y].0.read().unwrap();
let mut d = yv.lookup(yv.id()).unwrap().clone();
d.wallclock = timestamp();
xv.insert_info(d);
}
});
Ok(())
}
/// star a -> (b,c,d,e)
#[test]
fn gossip_star() {
logger::setup();
run_gossip_topo(|listen| {
run_gossip_topo(50, |listen| {
let num = listen.len();
for n in 0..(num - 1) {
let x = 0;
let y = (n + 1) % listen.len();
let mut xv = listen[x].0.write().unwrap();
let yv = listen[y].0.read().unwrap();
let mut yd = yv.table[&yv.id].clone();
yd.version = 0;
xv.insert(&yd);
trace!("star leader {:?}", &xv.id.as_ref()[..4]);
let mut yd = yv.lookup(yv.id()).unwrap().clone();
yd.wallclock = timestamp();
xv.insert_info(yd);
trace!("star leader {}", &xv.id());
}
});
}
@ -107,22 +122,18 @@ fn gossip_star() {
#[test]
fn gossip_rstar() {
logger::setup();
run_gossip_topo(|listen| {
run_gossip_topo(50, |listen| {
let num = listen.len();
let xd = {
let xv = listen[0].0.read().unwrap();
xv.table[&xv.id].clone()
xv.lookup(xv.id()).unwrap().clone()
};
trace!("rstar leader {:?}", &xd.id.as_ref()[..4]);
trace!("rstar leader {}", xd.id);
for n in 0..(num - 1) {
let y = (n + 1) % listen.len();
let mut yv = listen[y].0.write().unwrap();
yv.insert(&xd);
trace!(
"rstar insert {:?} into {:?}",
&xd.id.as_ref()[..4],
&yv.id.as_ref()[..4]
);
yv.insert_info(xd.clone());
trace!("rstar insert {} into {}", xd.id, yv.id());
}
});
}
@ -140,19 +151,20 @@ pub fn cluster_info_retransmit() -> result::Result<()> {
let c1_data = c1.read().unwrap().my_data().clone();
c1.write().unwrap().set_leader(c1_data.id);
c2.write().unwrap().insert(&c1_data);
c3.write().unwrap().insert(&c1_data);
c2.write().unwrap().insert_info(c1_data.clone());
c3.write().unwrap().insert_info(c1_data.clone());
c2.write().unwrap().set_leader(c1_data.id);
c3.write().unwrap().set_leader(c1_data.id);
let num = 3;
//wait to converge
trace!("waiting to converge:");
let mut done = false;
for _ in 0..30 {
done = c1.read().unwrap().table.len() == 3
&& c2.read().unwrap().table.len() == 3
&& c3.read().unwrap().table.len() == 3;
done = c1.read().unwrap().ncp_peers().len() == num - 1
&& c2.read().unwrap().ncp_peers().len() == num - 1
&& c3.read().unwrap().ncp_peers().len() == num - 1;
if done {
break;
}
@ -180,102 +192,3 @@ pub fn cluster_info_retransmit() -> result::Result<()> {
Ok(())
}
#[test]
#[ignore]
fn test_external_liveness_table() {
logger::setup();
assert!(cfg!(feature = "test"));
let c1_c4_exit = Arc::new(AtomicBool::new(false));
let c2_c3_exit = Arc::new(AtomicBool::new(false));
trace!("c1:");
let (c1, dr1, _) = test_node(c1_c4_exit.clone());
trace!("c2:");
let (c2, dr2, _) = test_node(c2_c3_exit.clone());
trace!("c3:");
let (c3, dr3, _) = test_node(c2_c3_exit.clone());
trace!("c4:");
let (c4, dr4, _) = test_node(c1_c4_exit.clone());
let c1_data = c1.read().unwrap().my_data().clone();
c1.write().unwrap().set_leader(c1_data.id);
let c2_id = c2.read().unwrap().id;
let c3_id = c3.read().unwrap().id;
let c4_id = c4.read().unwrap().id;
// Insert the remote data about c4
let c2_index_for_c4 = 10;
c2.write().unwrap().remote.insert(c4_id, c2_index_for_c4);
let c3_index_for_c4 = 20;
c3.write().unwrap().remote.insert(c4_id, c3_index_for_c4);
// Set up the initial network topology
c2.write().unwrap().insert(&c1_data);
c3.write().unwrap().insert(&c1_data);
c2.write().unwrap().set_leader(c1_data.id);
c3.write().unwrap().set_leader(c1_data.id);
// Wait to converge
trace!("waiting to converge:");
let mut done = false;
for _ in 0..30 {
done = c1.read().unwrap().table.len() == 3
&& c2.read().unwrap().table.len() == 3
&& c3.read().unwrap().table.len() == 3;
if done {
break;
}
sleep(Duration::new(1, 0));
}
assert!(done);
// Validate c1's external liveness table, then release lock rc1
{
let rc1 = c1.read().unwrap();
let el = rc1.get_external_liveness_entry(&c4.read().unwrap().id);
// Make sure liveness table entry for c4 exists on node c1
assert!(el.is_some());
let liveness_map = el.unwrap();
// Make sure liveness table entry contains correct result for c2
let c2_index_result_for_c4 = liveness_map.get(&c2_id);
assert!(c2_index_result_for_c4.is_some());
assert_eq!(*(c2_index_result_for_c4.unwrap()), c2_index_for_c4);
// Make sure liveness table entry contains correct result for c3
let c3_index_result_for_c4 = liveness_map.get(&c3_id);
assert!(c3_index_result_for_c4.is_some());
assert_eq!(*(c3_index_result_for_c4.unwrap()), c3_index_for_c4);
}
// Shutdown validators c2 and c3
c2_c3_exit.store(true, Ordering::Relaxed);
dr2.join().unwrap();
dr3.join().unwrap();
// Allow communication between c1 and c4, make sure that c1's external_liveness table
// entry for c4 gets cleared
c4.write().unwrap().insert(&c1_data);
c4.write().unwrap().set_leader(c1_data.id);
for _ in 0..30 {
done = c1
.read()
.unwrap()
.get_external_liveness_entry(&c4_id)
.is_none();
if done {
break;
}
sleep(Duration::new(1, 0));
}
assert!(done);
// Shutdown validators c1 and c4
c1_c4_exit.store(true, Ordering::Relaxed);
dr1.join().unwrap();
dr4.join().unwrap();
}

View File

@ -45,9 +45,9 @@ fn make_spy_node(leader: &NodeInfo) -> (Ncp, Arc<RwLock<ClusterInfo>>, Pubkey) {
let mut spy = Node::new_localhost();
let me = spy.info.id.clone();
let daddr = "0.0.0.0:0".parse().unwrap();
spy.info.contact_info.tvu = daddr;
spy.info.tvu = daddr;
let mut spy_cluster_info = ClusterInfo::new(spy.info).expect("ClusterInfo::new");
spy_cluster_info.insert(&leader);
spy_cluster_info.insert_info(leader.clone());
spy_cluster_info.set_leader(leader.id);
let spy_cluster_info_ref = Arc::new(RwLock::new(spy_cluster_info));
let spy_window = Arc::new(RwLock::new(default_window()));
@ -68,7 +68,7 @@ fn make_listening_node(leader: &NodeInfo) -> (Ncp, Arc<RwLock<ClusterInfo>>, Nod
let new_node_info = new_node.info.clone();
let me = new_node.info.id.clone();
let mut new_node_cluster_info = ClusterInfo::new(new_node_info).expect("ClusterInfo::new");
new_node_cluster_info.insert(&leader);
new_node_cluster_info.insert_info(leader.clone());
new_node_cluster_info.set_leader(leader.id);
let new_node_cluster_info_ref = Arc::new(RwLock::new(new_node_cluster_info));
let new_node_window = Arc::new(RwLock::new(default_window()));
@ -96,8 +96,8 @@ fn converge(leader: &NodeInfo, num_nodes: usize) -> Vec<NodeInfo> {
let mut rv = vec![];
for _ in 0..30 {
let num = spy_ref.read().unwrap().convergence();
let mut v = spy_ref.read().unwrap().get_valid_peers();
if num >= num_nodes as u64 && v.len() >= num_nodes {
let mut v = spy_ref.read().unwrap().rpc_peers();
if num >= num_nodes && v.len() >= num_nodes {
rv.append(&mut v);
converged = true;
break;
@ -183,7 +183,7 @@ fn test_multi_node_ledger_window() -> result::Result<()> {
&zero_ledger_path,
keypair,
Arc::new(Keypair::new()),
Some(leader_data.contact_info.ncp),
Some(leader_data.ncp),
false,
LeaderScheduler::from_bootstrap_leader(leader_pubkey),
None,
@ -288,7 +288,7 @@ fn test_multi_node_validator_catchup_from_zero() -> result::Result<()> {
&ledger_path,
keypair,
Arc::new(Keypair::new()),
Some(leader_data.contact_info.ncp),
Some(leader_data.ncp),
false,
LeaderScheduler::from_bootstrap_leader(leader_pubkey),
None,
@ -326,7 +326,7 @@ fn test_multi_node_validator_catchup_from_zero() -> result::Result<()> {
&zero_ledger_path,
keypair,
Arc::new(Keypair::new()),
Some(leader_data.contact_info.ncp),
Some(leader_data.ncp),
false,
LeaderScheduler::from_bootstrap_leader(leader_pubkey),
None,
@ -420,7 +420,7 @@ fn test_multi_node_basic() {
&ledger_path,
keypair,
Arc::new(Keypair::new()),
Some(leader_data.contact_info.ncp),
Some(leader_data.ncp),
false,
LeaderScheduler::from_bootstrap_leader(leader_pubkey),
None,
@ -496,7 +496,7 @@ fn test_boot_validator_from_file() -> result::Result<()> {
&ledger_path,
keypair,
Arc::new(Keypair::new()),
Some(leader_data.contact_info.ncp),
Some(leader_data.ncp),
false,
LeaderScheduler::from_bootstrap_leader(leader_pubkey),
None,
@ -584,7 +584,7 @@ fn test_leader_restart_validator_start_from_old_ledger() -> result::Result<()> {
&stale_ledger_path,
keypair,
Arc::new(Keypair::new()),
Some(leader_data.contact_info.ncp),
Some(leader_data.ncp),
false,
LeaderScheduler::from_bootstrap_leader(leader_data.id),
None,
@ -715,7 +715,7 @@ fn test_multi_node_dynamic_network() {
&ledger_path,
Arc::new(keypair),
Arc::new(Keypair::new()),
Some(leader_data.contact_info.ncp),
Some(leader_data.ncp),
true,
LeaderScheduler::from_bootstrap_leader(leader_pubkey),
None,
@ -861,7 +861,7 @@ fn test_leader_to_validator_transition() {
&leader_ledger_path,
leader_keypair,
Arc::new(Keypair::new()),
Some(leader_info.contact_info.ncp),
Some(leader_info.ncp),
false,
LeaderScheduler::new(&leader_scheduler_config),
None,
@ -875,10 +875,10 @@ fn test_leader_to_validator_transition() {
let mut converged = false;
for _ in 0..30 {
let num = spy_node.read().unwrap().convergence();
let mut v: Vec<NodeInfo> = spy_node.read().unwrap().get_valid_peers();
let mut v: Vec<NodeInfo> = spy_node.read().unwrap().rpc_peers();
// There's only one person excluding the spy node (the leader) who should see
// two nodes on the network
if num >= 2 as u64 && v.len() >= 1 {
if num >= 2 && v.len() >= 1 {
converged = true;
break;
}
@ -1001,7 +1001,7 @@ fn test_leader_validator_basic() {
&validator_ledger_path,
validator_keypair,
Arc::new(vote_account_keypair),
Some(leader_info.contact_info.ncp),
Some(leader_info.ncp),
false,
LeaderScheduler::new(&leader_scheduler_config),
None,
@ -1013,7 +1013,7 @@ fn test_leader_validator_basic() {
&leader_ledger_path,
leader_keypair,
Arc::new(Keypair::new()),
Some(leader_info.contact_info.ncp),
Some(leader_info.ncp),
false,
LeaderScheduler::new(&leader_scheduler_config),
None,
@ -1189,7 +1189,7 @@ fn test_dropped_handoff_recovery() {
&bootstrap_leader_ledger_path,
bootstrap_leader_keypair,
Arc::new(Keypair::new()),
Some(bootstrap_leader_info.contact_info.ncp),
Some(bootstrap_leader_info.ncp),
false,
LeaderScheduler::new(&leader_scheduler_config),
None,
@ -1212,7 +1212,7 @@ fn test_dropped_handoff_recovery() {
&validator_ledger_path,
kp,
Arc::new(Keypair::new()),
Some(bootstrap_leader_info.contact_info.ncp),
Some(bootstrap_leader_info.ncp),
false,
LeaderScheduler::new(&leader_scheduler_config),
None,
@ -1238,7 +1238,7 @@ fn test_dropped_handoff_recovery() {
&next_leader_ledger_path,
next_leader_keypair,
Arc::new(vote_account_keypair),
Some(bootstrap_leader_info.contact_info.ncp),
Some(bootstrap_leader_info.ncp),
false,
LeaderScheduler::new(&leader_scheduler_config),
None,
@ -1355,7 +1355,7 @@ fn test_full_leader_validator_network() {
&bootstrap_leader_ledger_path,
Arc::new(node_keypairs.pop_front().unwrap()),
Arc::new(vote_account_keypairs.pop_front().unwrap()),
Some(bootstrap_leader_info.contact_info.ncp),
Some(bootstrap_leader_info.ncp),
false,
LeaderScheduler::new(&leader_scheduler_config),
None,
@ -1382,7 +1382,7 @@ fn test_full_leader_validator_network() {
&validator_ledger_path,
Arc::new(kp),
Arc::new(vote_account_keypairs.pop_front().unwrap()),
Some(bootstrap_leader_info.contact_info.ncp),
Some(bootstrap_leader_info.ncp),
false,
LeaderScheduler::new(&leader_scheduler_config),
None,
@ -1559,7 +1559,7 @@ fn test_broadcast_last_tick() {
&bootstrap_leader_ledger_path,
Arc::new(bootstrap_leader_keypair),
Arc::new(Keypair::new()),
Some(bootstrap_leader_info.contact_info.ncp),
Some(bootstrap_leader_info.ncp),
false,
LeaderScheduler::new(&leader_scheduler_config),
None,
@ -1621,12 +1621,8 @@ fn test_broadcast_last_tick() {
fn mk_client(leader: &NodeInfo) -> ThinClient {
let transactions_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
assert!(ClusterInfo::is_valid_address(&leader.contact_info.tpu));
ThinClient::new(
leader.contact_info.rpc,
leader.contact_info.tpu,
transactions_socket,
)
assert!(ClusterInfo::is_valid_address(&leader.tpu));
ThinClient::new(leader.rpc, leader.tpu, transactions_socket)
}
fn send_tx_and_retry_get_balance(