Make gossip selection stake based (#2848)
This commit is contained in:
parent
33c7f92f56
commit
a484c87354
|
@ -41,7 +41,8 @@ fn converge(
|
||||||
spy_cluster_info.insert_info(leader.clone());
|
spy_cluster_info.insert_info(leader.clone());
|
||||||
spy_cluster_info.set_leader(leader.id);
|
spy_cluster_info.set_leader(leader.id);
|
||||||
let spy_ref = Arc::new(RwLock::new(spy_cluster_info));
|
let spy_ref = Arc::new(RwLock::new(spy_cluster_info));
|
||||||
let gossip_service = GossipService::new(&spy_ref, None, gossip_socket, exit_signal.clone());
|
let gossip_service =
|
||||||
|
GossipService::new(&spy_ref, None, None, gossip_socket, exit_signal.clone());
|
||||||
let mut v: Vec<NodeInfo> = vec![];
|
let mut v: Vec<NodeInfo> = vec![];
|
||||||
// wait for the network to converge, 30 seconds should be plenty
|
// wait for the network to converge, 30 seconds should be plenty
|
||||||
for _ in 0..30 {
|
for _ in 0..30 {
|
||||||
|
|
|
@ -174,16 +174,16 @@ impl ClusterInfo {
|
||||||
let id = node_info.id;
|
let id = node_info.id;
|
||||||
me.gossip.set_self(id);
|
me.gossip.set_self(id);
|
||||||
me.insert_info(node_info);
|
me.insert_info(node_info);
|
||||||
me.push_self();
|
me.push_self(None);
|
||||||
me
|
me
|
||||||
}
|
}
|
||||||
pub fn push_self(&mut self) {
|
pub fn push_self(&mut self, bank: Option<&Arc<Bank>>) {
|
||||||
let mut my_data = self.my_data();
|
let mut my_data = self.my_data();
|
||||||
let now = timestamp();
|
let now = timestamp();
|
||||||
my_data.wallclock = now;
|
my_data.wallclock = now;
|
||||||
let mut entry = CrdsValue::ContactInfo(my_data);
|
let mut entry = CrdsValue::ContactInfo(my_data);
|
||||||
entry.sign(&self.keypair);
|
entry.sign(&self.keypair);
|
||||||
self.gossip.refresh_push_active_set();
|
self.gossip.refresh_push_active_set(bank);
|
||||||
self.gossip.process_push_message(&[entry], now);
|
self.gossip.process_push_message(&[entry], now);
|
||||||
}
|
}
|
||||||
pub fn insert_info(&mut self, node_info: NodeInfo) {
|
pub fn insert_info(&mut self, node_info: NodeInfo) {
|
||||||
|
@ -756,9 +756,14 @@ impl ClusterInfo {
|
||||||
Ok((addr, out))
|
Ok((addr, out))
|
||||||
}
|
}
|
||||||
|
|
||||||
fn new_pull_requests(&mut self) -> Vec<(SocketAddr, Protocol)> {
|
fn new_pull_requests(&mut self, bank: Option<&Arc<Bank>>) -> Vec<(SocketAddr, Protocol)> {
|
||||||
let now = timestamp();
|
let now = timestamp();
|
||||||
let pulls: Vec<_> = self.gossip.new_pull_request(now).ok().into_iter().collect();
|
let pulls: Vec<_> = self
|
||||||
|
.gossip
|
||||||
|
.new_pull_request(now, bank)
|
||||||
|
.ok()
|
||||||
|
.into_iter()
|
||||||
|
.collect();
|
||||||
|
|
||||||
let pr: Vec<_> = pulls
|
let pr: Vec<_> = pulls
|
||||||
.into_iter()
|
.into_iter()
|
||||||
|
@ -795,15 +800,19 @@ impl ClusterInfo {
|
||||||
.collect()
|
.collect()
|
||||||
}
|
}
|
||||||
|
|
||||||
fn gossip_request(&mut self) -> Vec<(SocketAddr, Protocol)> {
|
fn gossip_request(&mut self, bank: Option<&Arc<Bank>>) -> Vec<(SocketAddr, Protocol)> {
|
||||||
let pulls: Vec<_> = self.new_pull_requests();
|
let pulls: Vec<_> = self.new_pull_requests(bank);
|
||||||
let pushes: Vec<_> = self.new_push_requests();
|
let pushes: Vec<_> = self.new_push_requests();
|
||||||
vec![pulls, pushes].into_iter().flat_map(|x| x).collect()
|
vec![pulls, pushes].into_iter().flat_map(|x| x).collect()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// At random pick a node and try to get updated changes from them
|
/// At random pick a node and try to get updated changes from them
|
||||||
fn run_gossip(obj: &Arc<RwLock<Self>>, blob_sender: &BlobSender) -> Result<()> {
|
fn run_gossip(
|
||||||
let reqs = obj.write().unwrap().gossip_request();
|
obj: &Arc<RwLock<Self>>,
|
||||||
|
bank: Option<&Arc<Bank>>,
|
||||||
|
blob_sender: &BlobSender,
|
||||||
|
) -> Result<()> {
|
||||||
|
let reqs = obj.write().unwrap().gossip_request(bank);
|
||||||
let blobs = reqs
|
let blobs = reqs
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.filter_map(|(remote_gossip_addr, req)| to_shared_blob(req, remote_gossip_addr).ok())
|
.filter_map(|(remote_gossip_addr, req)| to_shared_blob(req, remote_gossip_addr).ok())
|
||||||
|
@ -845,6 +854,7 @@ impl ClusterInfo {
|
||||||
/// randomly pick a node and ask them for updates asynchronously
|
/// randomly pick a node and ask them for updates asynchronously
|
||||||
pub fn gossip(
|
pub fn gossip(
|
||||||
obj: Arc<RwLock<Self>>,
|
obj: Arc<RwLock<Self>>,
|
||||||
|
bank: Option<Arc<Bank>>,
|
||||||
blob_sender: BlobSender,
|
blob_sender: BlobSender,
|
||||||
exit: Arc<AtomicBool>,
|
exit: Arc<AtomicBool>,
|
||||||
) -> JoinHandle<()> {
|
) -> JoinHandle<()> {
|
||||||
|
@ -854,7 +864,7 @@ impl ClusterInfo {
|
||||||
let mut last_push = timestamp();
|
let mut last_push = timestamp();
|
||||||
loop {
|
loop {
|
||||||
let start = timestamp();
|
let start = timestamp();
|
||||||
let _ = Self::run_gossip(&obj, &blob_sender);
|
let _ = Self::run_gossip(&obj, bank.as_ref(), &blob_sender);
|
||||||
if exit.load(Ordering::Relaxed) {
|
if exit.load(Ordering::Relaxed) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -862,7 +872,7 @@ impl ClusterInfo {
|
||||||
//TODO: possibly tune this parameter
|
//TODO: possibly tune this parameter
|
||||||
//we saw a deadlock passing an obj.read().unwrap().timeout into sleep
|
//we saw a deadlock passing an obj.read().unwrap().timeout into sleep
|
||||||
if start - last_push > CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS / 2 {
|
if start - last_push > CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS / 2 {
|
||||||
obj.write().unwrap().push_self();
|
obj.write().unwrap().push_self(bank.as_ref());
|
||||||
last_push = timestamp();
|
last_push = timestamp();
|
||||||
}
|
}
|
||||||
let elapsed = timestamp() - start;
|
let elapsed = timestamp() - start;
|
||||||
|
@ -1451,8 +1461,8 @@ mod tests {
|
||||||
.write()
|
.write()
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.gossip
|
.gossip
|
||||||
.refresh_push_active_set();
|
.refresh_push_active_set(None);
|
||||||
let reqs = cluster_info.write().unwrap().gossip_request();
|
let reqs = cluster_info.write().unwrap().gossip_request(None);
|
||||||
//assert none of the addrs are invalid.
|
//assert none of the addrs are invalid.
|
||||||
reqs.iter().all(|(addr, _)| {
|
reqs.iter().all(|(addr, _)| {
|
||||||
let res = ContactInfo::is_valid_address(addr);
|
let res = ContactInfo::is_valid_address(addr);
|
||||||
|
@ -1730,7 +1740,7 @@ mod tests {
|
||||||
|
|
||||||
let (_, _, val) = cluster_info
|
let (_, _, val) = cluster_info
|
||||||
.gossip
|
.gossip
|
||||||
.new_pull_request(timestamp())
|
.new_pull_request(timestamp(), None)
|
||||||
.ok()
|
.ok()
|
||||||
.unwrap();
|
.unwrap();
|
||||||
assert!(val.verify());
|
assert!(val.verify());
|
||||||
|
|
|
@ -8,9 +8,11 @@ use crate::crds_gossip_error::CrdsGossipError;
|
||||||
use crate::crds_gossip_pull::CrdsGossipPull;
|
use crate::crds_gossip_pull::CrdsGossipPull;
|
||||||
use crate::crds_gossip_push::{CrdsGossipPush, CRDS_GOSSIP_NUM_ACTIVE};
|
use crate::crds_gossip_push::{CrdsGossipPush, CRDS_GOSSIP_NUM_ACTIVE};
|
||||||
use crate::crds_value::CrdsValue;
|
use crate::crds_value::CrdsValue;
|
||||||
|
use solana_runtime::bank::Bank;
|
||||||
use solana_runtime::bloom::Bloom;
|
use solana_runtime::bloom::Bloom;
|
||||||
use solana_sdk::hash::Hash;
|
use solana_sdk::hash::Hash;
|
||||||
use solana_sdk::pubkey::Pubkey;
|
use solana_sdk::pubkey::Pubkey;
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
///The min size for bloom filters
|
///The min size for bloom filters
|
||||||
pub const CRDS_GOSSIP_BLOOM_SIZE: usize = 1000;
|
pub const CRDS_GOSSIP_BLOOM_SIZE: usize = 1000;
|
||||||
|
@ -92,9 +94,10 @@ impl CrdsGossip {
|
||||||
|
|
||||||
/// refresh the push active set
|
/// refresh the push active set
|
||||||
/// * ratio - number of actives to rotate
|
/// * ratio - number of actives to rotate
|
||||||
pub fn refresh_push_active_set(&mut self) {
|
pub fn refresh_push_active_set(&mut self, bank: Option<&Arc<Bank>>) {
|
||||||
self.push.refresh_push_active_set(
|
self.push.refresh_push_active_set(
|
||||||
&self.crds,
|
&self.crds,
|
||||||
|
bank,
|
||||||
self.id,
|
self.id,
|
||||||
self.pull.pull_request_time.len(),
|
self.pull.pull_request_time.len(),
|
||||||
CRDS_GOSSIP_NUM_ACTIVE,
|
CRDS_GOSSIP_NUM_ACTIVE,
|
||||||
|
@ -105,8 +108,9 @@ impl CrdsGossip {
|
||||||
pub fn new_pull_request(
|
pub fn new_pull_request(
|
||||||
&self,
|
&self,
|
||||||
now: u64,
|
now: u64,
|
||||||
|
bank: Option<&Arc<Bank>>,
|
||||||
) -> Result<(Pubkey, Bloom<Hash>, CrdsValue), CrdsGossipError> {
|
) -> Result<(Pubkey, Bloom<Hash>, CrdsValue), CrdsGossipError> {
|
||||||
self.pull.new_pull_request(&self.crds, self.id, now)
|
self.pull.new_pull_request(&self.crds, self.id, now, bank)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// time when a request to `from` was initiated
|
/// time when a request to `from` was initiated
|
||||||
|
@ -156,6 +160,28 @@ impl CrdsGossip {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Computes a normalized(log of bank balance) stake
|
||||||
|
pub fn get_stake(id: &Pubkey, bank: Option<&Arc<Bank>>) -> f32 {
|
||||||
|
match bank {
|
||||||
|
Some(bank) => {
|
||||||
|
// cap the max balance to u32 max (it should be plenty)
|
||||||
|
let bal = f64::from(u32::max_value()).min(bank.get_balance(id) as f64);
|
||||||
|
1_f32.max((bal as f32).ln())
|
||||||
|
}
|
||||||
|
_ => 1.0,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Computes bounded weight given some max, a time since last selected, and a stake value
|
||||||
|
/// The minimum stake is 1 and not 0 to allow 'time since last' picked to factor in.
|
||||||
|
pub fn get_weight(max_weight: f32, time_since_last_selected: u32, stake: f32) -> f32 {
|
||||||
|
let mut weight = time_since_last_selected as f32 * stake;
|
||||||
|
if weight.is_infinite() {
|
||||||
|
weight = max_weight;
|
||||||
|
}
|
||||||
|
1.0_f32.max(weight.min(max_weight))
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod test {
|
mod test {
|
||||||
use super::*;
|
use super::*;
|
||||||
|
@ -174,7 +200,7 @@ mod test {
|
||||||
.crds
|
.crds
|
||||||
.insert(CrdsValue::ContactInfo(ci.clone()), 0)
|
.insert(CrdsValue::ContactInfo(ci.clone()), 0)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
crds_gossip.refresh_push_active_set();
|
crds_gossip.refresh_push_active_set(None);
|
||||||
let now = timestamp();
|
let now = timestamp();
|
||||||
//incorrect dest
|
//incorrect dest
|
||||||
let mut res = crds_gossip.process_prune_msg(
|
let mut res = crds_gossip.process_prune_msg(
|
||||||
|
|
|
@ -9,8 +9,9 @@
|
||||||
//! with random hash functions. So each subsequent request will have a different distribution
|
//! with random hash functions. So each subsequent request will have a different distribution
|
||||||
//! of false positives.
|
//! of false positives.
|
||||||
|
|
||||||
|
use crate::contact_info::ContactInfo;
|
||||||
use crate::crds::Crds;
|
use crate::crds::Crds;
|
||||||
use crate::crds_gossip::CRDS_GOSSIP_BLOOM_SIZE;
|
use crate::crds_gossip::{get_stake, get_weight, CRDS_GOSSIP_BLOOM_SIZE};
|
||||||
use crate::crds_gossip_error::CrdsGossipError;
|
use crate::crds_gossip_error::CrdsGossipError;
|
||||||
use crate::crds_value::{CrdsValue, CrdsValueLabel};
|
use crate::crds_value::{CrdsValue, CrdsValueLabel};
|
||||||
use crate::packet::BLOB_DATA_SIZE;
|
use crate::packet::BLOB_DATA_SIZE;
|
||||||
|
@ -18,11 +19,13 @@ use bincode::serialized_size;
|
||||||
use hashbrown::HashMap;
|
use hashbrown::HashMap;
|
||||||
use rand;
|
use rand;
|
||||||
use rand::distributions::{Distribution, WeightedIndex};
|
use rand::distributions::{Distribution, WeightedIndex};
|
||||||
|
use solana_runtime::bank::Bank;
|
||||||
use solana_runtime::bloom::Bloom;
|
use solana_runtime::bloom::Bloom;
|
||||||
use solana_sdk::hash::Hash;
|
use solana_sdk::hash::Hash;
|
||||||
use solana_sdk::pubkey::Pubkey;
|
use solana_sdk::pubkey::Pubkey;
|
||||||
use std::cmp;
|
use std::cmp;
|
||||||
use std::collections::VecDeque;
|
use std::collections::VecDeque;
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
pub const CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS: u64 = 15000;
|
pub const CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS: u64 = 15000;
|
||||||
|
|
||||||
|
@ -54,20 +57,19 @@ impl CrdsGossipPull {
|
||||||
crds: &Crds,
|
crds: &Crds,
|
||||||
self_id: Pubkey,
|
self_id: Pubkey,
|
||||||
now: u64,
|
now: u64,
|
||||||
|
bank: Option<&Arc<Bank>>,
|
||||||
) -> Result<(Pubkey, Bloom<Hash>, CrdsValue), CrdsGossipError> {
|
) -> Result<(Pubkey, Bloom<Hash>, CrdsValue), CrdsGossipError> {
|
||||||
let options: Vec<_> = crds
|
let options: Vec<_> = crds
|
||||||
.table
|
.table
|
||||||
.values()
|
.values()
|
||||||
.filter_map(|v| v.value.contact_info())
|
.filter_map(|v| v.value.contact_info())
|
||||||
.filter(|v| {
|
.filter(|v| v.id != self_id && ContactInfo::is_valid_address(&v.gossip))
|
||||||
v.id != self_id && !v.gossip.ip().is_unspecified() && !v.gossip.ip().is_multicast()
|
|
||||||
})
|
|
||||||
.map(|item| {
|
.map(|item| {
|
||||||
|
let max_weight = f32::from(u16::max_value()) - 1.0;
|
||||||
let req_time: u64 = *self.pull_request_time.get(&item.id).unwrap_or(&0);
|
let req_time: u64 = *self.pull_request_time.get(&item.id).unwrap_or(&0);
|
||||||
let weight = cmp::max(
|
let since = ((now - req_time) / 1024) as u32;
|
||||||
1,
|
let stake = get_stake(&item.id, bank);
|
||||||
cmp::min(u64::from(u16::max_value()) - 1, (now - req_time) / 1024) as u32,
|
let weight = get_weight(max_weight, since, stake);
|
||||||
);
|
|
||||||
(weight, item)
|
(weight, item)
|
||||||
})
|
})
|
||||||
.collect();
|
.collect();
|
||||||
|
@ -201,7 +203,41 @@ mod test {
|
||||||
use super::*;
|
use super::*;
|
||||||
use crate::contact_info::ContactInfo;
|
use crate::contact_info::ContactInfo;
|
||||||
use crate::crds_value::LeaderId;
|
use crate::crds_value::LeaderId;
|
||||||
|
use solana_sdk::genesis_block::GenesisBlock;
|
||||||
use solana_sdk::signature::{Keypair, KeypairUtil};
|
use solana_sdk::signature::{Keypair, KeypairUtil};
|
||||||
|
use std::f32::consts::E;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_new_pull_with_bank() {
|
||||||
|
let (block, mint_keypair) = GenesisBlock::new(500_000);
|
||||||
|
let bank = Arc::new(Bank::new(&block));
|
||||||
|
let mut crds = Crds::default();
|
||||||
|
let node = CrdsGossipPull::default();
|
||||||
|
let me = CrdsValue::ContactInfo(ContactInfo::new_localhost(Keypair::new().pubkey(), 0));
|
||||||
|
crds.insert(me.clone(), 0).unwrap();
|
||||||
|
for i in 1..=30 {
|
||||||
|
let entry =
|
||||||
|
CrdsValue::ContactInfo(ContactInfo::new_localhost(Keypair::new().pubkey(), 0));
|
||||||
|
let id = entry.label().pubkey();
|
||||||
|
crds.insert(entry.clone(), 0).unwrap();
|
||||||
|
bank.transfer(i * 100, &mint_keypair, id, bank.last_id())
|
||||||
|
.unwrap();
|
||||||
|
}
|
||||||
|
// The min balance of the heaviest nodes is at least ln(3000) - 0.5
|
||||||
|
// This is because the heaviest nodes will have very similar weights
|
||||||
|
let min_balance = E.powf(3000_f32.ln() - 0.5);
|
||||||
|
let now = 1024;
|
||||||
|
// try upto 10 times because of rng
|
||||||
|
for _ in 0..10 {
|
||||||
|
let msg = node
|
||||||
|
.new_pull_request(&crds, me.label().pubkey(), now, Some(&bank))
|
||||||
|
.unwrap();
|
||||||
|
if bank.get_balance(&msg.0) >= min_balance.round() as u64 {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
assert!(false, "weighted nodes didn't get picked");
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_new_pull_request() {
|
fn test_new_pull_request() {
|
||||||
|
@ -210,19 +246,19 @@ mod test {
|
||||||
let id = entry.label().pubkey();
|
let id = entry.label().pubkey();
|
||||||
let node = CrdsGossipPull::default();
|
let node = CrdsGossipPull::default();
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
node.new_pull_request(&crds, id, 0),
|
node.new_pull_request(&crds, id, 0, None),
|
||||||
Err(CrdsGossipError::NoPeers)
|
Err(CrdsGossipError::NoPeers)
|
||||||
);
|
);
|
||||||
|
|
||||||
crds.insert(entry.clone(), 0).unwrap();
|
crds.insert(entry.clone(), 0).unwrap();
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
node.new_pull_request(&crds, id, 0),
|
node.new_pull_request(&crds, id, 0, None),
|
||||||
Err(CrdsGossipError::NoPeers)
|
Err(CrdsGossipError::NoPeers)
|
||||||
);
|
);
|
||||||
|
|
||||||
let new = CrdsValue::ContactInfo(ContactInfo::new_localhost(Keypair::new().pubkey(), 0));
|
let new = CrdsValue::ContactInfo(ContactInfo::new_localhost(Keypair::new().pubkey(), 0));
|
||||||
crds.insert(new.clone(), 0).unwrap();
|
crds.insert(new.clone(), 0).unwrap();
|
||||||
let req = node.new_pull_request(&crds, id, 0);
|
let req = node.new_pull_request(&crds, id, 0, None);
|
||||||
let (to, _, self_info) = req.unwrap();
|
let (to, _, self_info) = req.unwrap();
|
||||||
assert_eq!(to, new.label().pubkey());
|
assert_eq!(to, new.label().pubkey());
|
||||||
assert_eq!(self_info, entry);
|
assert_eq!(self_info, entry);
|
||||||
|
@ -245,7 +281,7 @@ mod test {
|
||||||
|
|
||||||
// odds of getting the other request should be 1 in u64::max_value()
|
// odds of getting the other request should be 1 in u64::max_value()
|
||||||
for _ in 0..10 {
|
for _ in 0..10 {
|
||||||
let req = node.new_pull_request(&crds, node_id, u64::max_value());
|
let req = node.new_pull_request(&crds, node_id, u64::max_value(), None);
|
||||||
let (to, _, self_info) = req.unwrap();
|
let (to, _, self_info) = req.unwrap();
|
||||||
assert_eq!(to, old.label().pubkey());
|
assert_eq!(to, old.label().pubkey());
|
||||||
assert_eq!(self_info, entry);
|
assert_eq!(self_info, entry);
|
||||||
|
@ -261,7 +297,7 @@ mod test {
|
||||||
node_crds.insert(entry.clone(), 0).unwrap();
|
node_crds.insert(entry.clone(), 0).unwrap();
|
||||||
let new = CrdsValue::ContactInfo(ContactInfo::new_localhost(Keypair::new().pubkey(), 0));
|
let new = CrdsValue::ContactInfo(ContactInfo::new_localhost(Keypair::new().pubkey(), 0));
|
||||||
node_crds.insert(new.clone(), 0).unwrap();
|
node_crds.insert(new.clone(), 0).unwrap();
|
||||||
let req = node.new_pull_request(&node_crds, node_id, 0);
|
let req = node.new_pull_request(&node_crds, node_id, 0, None);
|
||||||
|
|
||||||
let mut dest_crds = Crds::default();
|
let mut dest_crds = Crds::default();
|
||||||
let mut dest = CrdsGossipPull::default();
|
let mut dest = CrdsGossipPull::default();
|
||||||
|
@ -313,7 +349,7 @@ mod test {
|
||||||
let mut done = false;
|
let mut done = false;
|
||||||
for _ in 0..30 {
|
for _ in 0..30 {
|
||||||
// there is a chance of a false positive with bloom filters
|
// there is a chance of a false positive with bloom filters
|
||||||
let req = node.new_pull_request(&node_crds, node_id, 0);
|
let req = node.new_pull_request(&node_crds, node_id, 0, None);
|
||||||
let (_, filter, caller) = req.unwrap();
|
let (_, filter, caller) = req.unwrap();
|
||||||
let rsp = dest.process_pull_request(&mut dest_crds, caller, filter, 0);
|
let rsp = dest.process_pull_request(&mut dest_crds, caller, filter, 0);
|
||||||
// if there is a false positive this is empty
|
// if there is a false positive this is empty
|
||||||
|
|
|
@ -10,7 +10,7 @@
|
||||||
|
|
||||||
use crate::contact_info::ContactInfo;
|
use crate::contact_info::ContactInfo;
|
||||||
use crate::crds::{Crds, VersionedCrdsValue};
|
use crate::crds::{Crds, VersionedCrdsValue};
|
||||||
use crate::crds_gossip::CRDS_GOSSIP_BLOOM_SIZE;
|
use crate::crds_gossip::{get_stake, get_weight, CRDS_GOSSIP_BLOOM_SIZE};
|
||||||
use crate::crds_gossip_error::CrdsGossipError;
|
use crate::crds_gossip_error::CrdsGossipError;
|
||||||
use crate::crds_value::{CrdsValue, CrdsValueLabel};
|
use crate::crds_value::{CrdsValue, CrdsValueLabel};
|
||||||
use crate::packet::BLOB_DATA_SIZE;
|
use crate::packet::BLOB_DATA_SIZE;
|
||||||
|
@ -18,11 +18,15 @@ use bincode::serialized_size;
|
||||||
use hashbrown::HashMap;
|
use hashbrown::HashMap;
|
||||||
use indexmap::map::IndexMap;
|
use indexmap::map::IndexMap;
|
||||||
use rand;
|
use rand;
|
||||||
|
use rand::distributions::{Distribution, WeightedIndex};
|
||||||
use rand::seq::SliceRandom;
|
use rand::seq::SliceRandom;
|
||||||
|
use solana_runtime::bank::Bank;
|
||||||
use solana_runtime::bloom::Bloom;
|
use solana_runtime::bloom::Bloom;
|
||||||
use solana_sdk::hash::Hash;
|
use solana_sdk::hash::Hash;
|
||||||
use solana_sdk::pubkey::Pubkey;
|
use solana_sdk::pubkey::Pubkey;
|
||||||
|
use solana_sdk::timing::timestamp;
|
||||||
use std::cmp;
|
use std::cmp;
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
pub const CRDS_GOSSIP_NUM_ACTIVE: usize = 30;
|
pub const CRDS_GOSSIP_NUM_ACTIVE: usize = 30;
|
||||||
pub const CRDS_GOSSIP_PUSH_FANOUT: usize = 6;
|
pub const CRDS_GOSSIP_PUSH_FANOUT: usize = 6;
|
||||||
|
@ -160,41 +164,50 @@ impl CrdsGossipPush {
|
||||||
pub fn refresh_push_active_set(
|
pub fn refresh_push_active_set(
|
||||||
&mut self,
|
&mut self,
|
||||||
crds: &Crds,
|
crds: &Crds,
|
||||||
|
bank: Option<&Arc<Bank>>,
|
||||||
self_id: Pubkey,
|
self_id: Pubkey,
|
||||||
network_size: usize,
|
network_size: usize,
|
||||||
ratio: usize,
|
ratio: usize,
|
||||||
) {
|
) {
|
||||||
let need = Self::compute_need(self.num_active, self.active_set.len(), ratio);
|
let need = Self::compute_need(self.num_active, self.active_set.len(), ratio);
|
||||||
let mut new_items = HashMap::new();
|
let mut new_items = HashMap::new();
|
||||||
let mut ixs: Vec<_> = (0..crds.table.len()).collect();
|
|
||||||
ixs.shuffle(&mut rand::thread_rng());
|
|
||||||
|
|
||||||
for ix in ixs {
|
let mut options: Vec<_> = crds
|
||||||
let item = crds.table.get_index(ix);
|
.table
|
||||||
if item.is_none() {
|
.values()
|
||||||
|
.filter(|v| v.value.contact_info().is_some())
|
||||||
|
.map(|v| (v.value.contact_info().unwrap(), v))
|
||||||
|
.filter(|(info, _)| info.id != self_id && ContactInfo::is_valid_address(&info.gossip))
|
||||||
|
.map(|(info, value)| {
|
||||||
|
let max_weight = f32::from(u16::max_value()) - 1.0;
|
||||||
|
let last_updated: u64 = value.local_timestamp;
|
||||||
|
let since = ((timestamp() - last_updated) / 1024) as u32;
|
||||||
|
let stake = get_stake(&info.id, bank);
|
||||||
|
let weight = get_weight(max_weight, since, stake);
|
||||||
|
(weight, info)
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
if options.is_empty() {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
while new_items.len() < need {
|
||||||
|
let index = WeightedIndex::new(options.iter().map(|weighted| weighted.0));
|
||||||
|
if index.is_err() {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
let index = index.unwrap();
|
||||||
|
let index = index.sample(&mut rand::thread_rng());
|
||||||
|
let item = options[index].1;
|
||||||
|
options.remove(index);
|
||||||
|
if self.active_set.get(&item.id).is_some() {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
let val = item.unwrap();
|
if new_items.get(&item.id).is_some() {
|
||||||
if val.0.pubkey() == self_id {
|
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
if self.active_set.get(&val.0.pubkey()).is_some() {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
if new_items.get(&val.0.pubkey()).is_some() {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
if let Some(contact) = val.1.value.contact_info() {
|
|
||||||
if !ContactInfo::is_valid_address(&contact.gossip) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
let size = cmp::max(CRDS_GOSSIP_BLOOM_SIZE, network_size);
|
let size = cmp::max(CRDS_GOSSIP_BLOOM_SIZE, network_size);
|
||||||
let bloom = Bloom::random(size, 0.1, 1024 * 8 * 4);
|
let bloom = Bloom::random(size, 0.1, 1024 * 8 * 4);
|
||||||
new_items.insert(val.0.pubkey(), bloom);
|
new_items.insert(item.id, bloom);
|
||||||
if new_items.len() == need {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
let mut keys: Vec<Pubkey> = self.active_set.keys().cloned().collect();
|
let mut keys: Vec<Pubkey> = self.active_set.keys().cloned().collect();
|
||||||
keys.shuffle(&mut rand::thread_rng());
|
keys.shuffle(&mut rand::thread_rng());
|
||||||
|
@ -247,7 +260,10 @@ impl CrdsGossipPush {
|
||||||
mod test {
|
mod test {
|
||||||
use super::*;
|
use super::*;
|
||||||
use crate::contact_info::ContactInfo;
|
use crate::contact_info::ContactInfo;
|
||||||
|
use solana_sdk::genesis_block::GenesisBlock;
|
||||||
use solana_sdk::signature::{Keypair, KeypairUtil};
|
use solana_sdk::signature::{Keypair, KeypairUtil};
|
||||||
|
use std::f32::consts::E;
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_process_push() {
|
fn test_process_push() {
|
||||||
let mut crds = Crds::default();
|
let mut crds = Crds::default();
|
||||||
|
@ -349,14 +365,14 @@ mod test {
|
||||||
let value1 = CrdsValue::ContactInfo(ContactInfo::new_localhost(Keypair::new().pubkey(), 0));
|
let value1 = CrdsValue::ContactInfo(ContactInfo::new_localhost(Keypair::new().pubkey(), 0));
|
||||||
|
|
||||||
assert_eq!(crds.insert(value1.clone(), 0), Ok(None));
|
assert_eq!(crds.insert(value1.clone(), 0), Ok(None));
|
||||||
push.refresh_push_active_set(&crds, Pubkey::default(), 1, 1);
|
push.refresh_push_active_set(&crds, None, Pubkey::default(), 1, 1);
|
||||||
|
|
||||||
assert!(push.active_set.get(&value1.label().pubkey()).is_some());
|
assert!(push.active_set.get(&value1.label().pubkey()).is_some());
|
||||||
let value2 = CrdsValue::ContactInfo(ContactInfo::new_localhost(Keypair::new().pubkey(), 0));
|
let value2 = CrdsValue::ContactInfo(ContactInfo::new_localhost(Keypair::new().pubkey(), 0));
|
||||||
assert!(push.active_set.get(&value2.label().pubkey()).is_none());
|
assert!(push.active_set.get(&value2.label().pubkey()).is_none());
|
||||||
assert_eq!(crds.insert(value2.clone(), 0), Ok(None));
|
assert_eq!(crds.insert(value2.clone(), 0), Ok(None));
|
||||||
for _ in 0..30 {
|
for _ in 0..30 {
|
||||||
push.refresh_push_active_set(&crds, Pubkey::default(), 1, 1);
|
push.refresh_push_active_set(&crds, None, Pubkey::default(), 1, 1);
|
||||||
if push.active_set.get(&value2.label().pubkey()).is_some() {
|
if push.active_set.get(&value2.label().pubkey()).is_some() {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -368,16 +384,54 @@ mod test {
|
||||||
CrdsValue::ContactInfo(ContactInfo::new_localhost(Keypair::new().pubkey(), 0));
|
CrdsValue::ContactInfo(ContactInfo::new_localhost(Keypair::new().pubkey(), 0));
|
||||||
assert_eq!(crds.insert(value2.clone(), 0), Ok(None));
|
assert_eq!(crds.insert(value2.clone(), 0), Ok(None));
|
||||||
}
|
}
|
||||||
push.refresh_push_active_set(&crds, Pubkey::default(), 1, 1);
|
push.refresh_push_active_set(&crds, None, Pubkey::default(), 1, 1);
|
||||||
assert_eq!(push.active_set.len(), push.num_active);
|
assert_eq!(push.active_set.len(), push.num_active);
|
||||||
}
|
}
|
||||||
#[test]
|
#[test]
|
||||||
|
fn test_active_set_refresh_with_bank() {
|
||||||
|
let (block, mint_keypair) = GenesisBlock::new(100_000_000);
|
||||||
|
let bank = Arc::new(Bank::new(&block));
|
||||||
|
let time = timestamp() - 1024; //make sure there's at least a 1 second delay
|
||||||
|
let mut crds = Crds::default();
|
||||||
|
let mut push = CrdsGossipPush::default();
|
||||||
|
for i in 1..=100 {
|
||||||
|
let peer =
|
||||||
|
CrdsValue::ContactInfo(ContactInfo::new_localhost(Keypair::new().pubkey(), time));
|
||||||
|
let id = peer.label().pubkey();
|
||||||
|
crds.insert(peer.clone(), time).unwrap();
|
||||||
|
bank.transfer(i * 100, &mint_keypair, id, bank.last_id())
|
||||||
|
.unwrap();
|
||||||
|
}
|
||||||
|
let min_balance = E.powf(7000_f32.ln() - 0.5);
|
||||||
|
// try upto 10 times because of rng
|
||||||
|
for _ in 0..10 {
|
||||||
|
push.refresh_push_active_set(&crds, Some(&bank), Pubkey::default(), 100, 30);
|
||||||
|
let mut num_correct = 0;
|
||||||
|
let mut num_wrong = 0;
|
||||||
|
push.active_set.iter().for_each(|peer| {
|
||||||
|
if bank.get_balance(peer.0) >= min_balance as u64 {
|
||||||
|
num_correct += 1;
|
||||||
|
} else {
|
||||||
|
num_wrong += 1;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
// at least half of the heaviest nodes should be picked
|
||||||
|
if num_wrong <= num_correct {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
assert!(
|
||||||
|
false,
|
||||||
|
"expected at 50% of the active set to contain the heaviest nodes"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
#[test]
|
||||||
fn test_new_push_messages() {
|
fn test_new_push_messages() {
|
||||||
let mut crds = Crds::default();
|
let mut crds = Crds::default();
|
||||||
let mut push = CrdsGossipPush::default();
|
let mut push = CrdsGossipPush::default();
|
||||||
let peer = CrdsValue::ContactInfo(ContactInfo::new_localhost(Keypair::new().pubkey(), 0));
|
let peer = CrdsValue::ContactInfo(ContactInfo::new_localhost(Keypair::new().pubkey(), 0));
|
||||||
assert_eq!(crds.insert(peer.clone(), 0), Ok(None));
|
assert_eq!(crds.insert(peer.clone(), 0), Ok(None));
|
||||||
push.refresh_push_active_set(&crds, Pubkey::default(), 1, 1);
|
push.refresh_push_active_set(&crds, None, Pubkey::default(), 1, 1);
|
||||||
|
|
||||||
let new_msg =
|
let new_msg =
|
||||||
CrdsValue::ContactInfo(ContactInfo::new_localhost(Keypair::new().pubkey(), 0));
|
CrdsValue::ContactInfo(ContactInfo::new_localhost(Keypair::new().pubkey(), 0));
|
||||||
|
@ -397,7 +451,7 @@ mod test {
|
||||||
let mut push = CrdsGossipPush::default();
|
let mut push = CrdsGossipPush::default();
|
||||||
let peer = CrdsValue::ContactInfo(ContactInfo::new_localhost(Keypair::new().pubkey(), 0));
|
let peer = CrdsValue::ContactInfo(ContactInfo::new_localhost(Keypair::new().pubkey(), 0));
|
||||||
assert_eq!(crds.insert(peer.clone(), 0), Ok(None));
|
assert_eq!(crds.insert(peer.clone(), 0), Ok(None));
|
||||||
push.refresh_push_active_set(&crds, Pubkey::default(), 1, 1);
|
push.refresh_push_active_set(&crds, None, Pubkey::default(), 1, 1);
|
||||||
|
|
||||||
let new_msg =
|
let new_msg =
|
||||||
CrdsValue::ContactInfo(ContactInfo::new_localhost(Keypair::new().pubkey(), 0));
|
CrdsValue::ContactInfo(ContactInfo::new_localhost(Keypair::new().pubkey(), 0));
|
||||||
|
@ -417,7 +471,7 @@ mod test {
|
||||||
let mut push = CrdsGossipPush::default();
|
let mut push = CrdsGossipPush::default();
|
||||||
let peer = CrdsValue::ContactInfo(ContactInfo::new_localhost(Keypair::new().pubkey(), 0));
|
let peer = CrdsValue::ContactInfo(ContactInfo::new_localhost(Keypair::new().pubkey(), 0));
|
||||||
assert_eq!(crds.insert(peer.clone(), 0), Ok(None));
|
assert_eq!(crds.insert(peer.clone(), 0), Ok(None));
|
||||||
push.refresh_push_active_set(&crds, Pubkey::default(), 1, 1);
|
push.refresh_push_active_set(&crds, None, Pubkey::default(), 1, 1);
|
||||||
|
|
||||||
let mut ci = ContactInfo::new_localhost(Keypair::new().pubkey(), 0);
|
let mut ci = ContactInfo::new_localhost(Keypair::new().pubkey(), 0);
|
||||||
ci.wallclock = 1;
|
ci.wallclock = 1;
|
||||||
|
|
|
@ -181,6 +181,7 @@ impl Fullnode {
|
||||||
let gossip_service = GossipService::new(
|
let gossip_service = GossipService::new(
|
||||||
&cluster_info,
|
&cluster_info,
|
||||||
Some(blocktree.clone()),
|
Some(blocktree.clone()),
|
||||||
|
Some(bank.clone()),
|
||||||
node.sockets.gossip,
|
node.sockets.gossip,
|
||||||
exit.clone(),
|
exit.clone(),
|
||||||
);
|
);
|
||||||
|
|
|
@ -4,6 +4,7 @@ use crate::blocktree::Blocktree;
|
||||||
use crate::cluster_info::{ClusterInfo, Node, NodeInfo};
|
use crate::cluster_info::{ClusterInfo, Node, NodeInfo};
|
||||||
use crate::service::Service;
|
use crate::service::Service;
|
||||||
use crate::streamer;
|
use crate::streamer;
|
||||||
|
use solana_runtime::bank::Bank;
|
||||||
use solana_sdk::pubkey::Pubkey;
|
use solana_sdk::pubkey::Pubkey;
|
||||||
use solana_sdk::signature::{Keypair, KeypairUtil};
|
use solana_sdk::signature::{Keypair, KeypairUtil};
|
||||||
use std::net::UdpSocket;
|
use std::net::UdpSocket;
|
||||||
|
@ -23,6 +24,7 @@ impl GossipService {
|
||||||
pub fn new(
|
pub fn new(
|
||||||
cluster_info: &Arc<RwLock<ClusterInfo>>,
|
cluster_info: &Arc<RwLock<ClusterInfo>>,
|
||||||
blocktree: Option<Arc<Blocktree>>,
|
blocktree: Option<Arc<Blocktree>>,
|
||||||
|
bank: Option<Arc<Bank>>,
|
||||||
gossip_socket: UdpSocket,
|
gossip_socket: UdpSocket,
|
||||||
exit: Arc<AtomicBool>,
|
exit: Arc<AtomicBool>,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
|
@ -44,7 +46,8 @@ impl GossipService {
|
||||||
response_sender.clone(),
|
response_sender.clone(),
|
||||||
exit.clone(),
|
exit.clone(),
|
||||||
);
|
);
|
||||||
let t_gossip = ClusterInfo::gossip(cluster_info.clone(), response_sender, exit.clone());
|
let t_gossip =
|
||||||
|
ClusterInfo::gossip(cluster_info.clone(), bank, response_sender, exit.clone());
|
||||||
let thread_hdls = vec![t_receiver, t_responder, t_listen, t_gossip];
|
let thread_hdls = vec![t_receiver, t_responder, t_listen, t_gossip];
|
||||||
Self { exit, thread_hdls }
|
Self { exit, thread_hdls }
|
||||||
}
|
}
|
||||||
|
@ -70,6 +73,7 @@ pub fn make_listening_node(
|
||||||
let gossip_service = GossipService::new(
|
let gossip_service = GossipService::new(
|
||||||
&new_node_cluster_info_ref,
|
&new_node_cluster_info_ref,
|
||||||
None,
|
None,
|
||||||
|
None,
|
||||||
new_node
|
new_node
|
||||||
.sockets
|
.sockets
|
||||||
.gossip
|
.gossip
|
||||||
|
@ -124,6 +128,7 @@ pub fn make_spy_node(leader: &NodeInfo) -> (GossipService, Arc<RwLock<ClusterInf
|
||||||
let gossip_service = GossipService::new(
|
let gossip_service = GossipService::new(
|
||||||
&spy_cluster_info_ref,
|
&spy_cluster_info_ref,
|
||||||
None,
|
None,
|
||||||
|
None,
|
||||||
spy.sockets.gossip,
|
spy.sockets.gossip,
|
||||||
exit.clone(),
|
exit.clone(),
|
||||||
);
|
);
|
||||||
|
@ -157,7 +162,7 @@ mod tests {
|
||||||
let tn = Node::new_localhost();
|
let tn = Node::new_localhost();
|
||||||
let cluster_info = ClusterInfo::new(tn.info.clone());
|
let cluster_info = ClusterInfo::new(tn.info.clone());
|
||||||
let c = Arc::new(RwLock::new(cluster_info));
|
let c = Arc::new(RwLock::new(cluster_info));
|
||||||
let d = GossipService::new(&c, None, tn.sockets.gossip, exit.clone());
|
let d = GossipService::new(&c, None, None, tn.sockets.gossip, exit.clone());
|
||||||
d.close().expect("thread join");
|
d.close().expect("thread join");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -136,9 +136,11 @@ impl Replicator {
|
||||||
|
|
||||||
let blocktree = Arc::new(blocktree);
|
let blocktree = Arc::new(blocktree);
|
||||||
|
|
||||||
|
//TODO(sagar) Does replicator need a bank also ?
|
||||||
let gossip_service = GossipService::new(
|
let gossip_service = GossipService::new(
|
||||||
&cluster_info,
|
&cluster_info,
|
||||||
Some(blocktree.clone()),
|
Some(blocktree.clone()),
|
||||||
|
None,
|
||||||
node.sockets.gossip,
|
node.sockets.gossip,
|
||||||
exit.clone(),
|
exit.clone(),
|
||||||
);
|
);
|
||||||
|
|
|
@ -373,8 +373,13 @@ pub fn poll_gossip_for_leader(leader_gossip: SocketAddr, timeout: Option<u64>) -
|
||||||
let (node, gossip_socket) = ClusterInfo::spy_node();
|
let (node, gossip_socket) = ClusterInfo::spy_node();
|
||||||
let my_addr = gossip_socket.local_addr().unwrap();
|
let my_addr = gossip_socket.local_addr().unwrap();
|
||||||
let cluster_info = Arc::new(RwLock::new(ClusterInfo::new(node)));
|
let cluster_info = Arc::new(RwLock::new(ClusterInfo::new(node)));
|
||||||
let gossip_service =
|
let gossip_service = GossipService::new(
|
||||||
GossipService::new(&cluster_info.clone(), None, gossip_socket, exit.clone());
|
&cluster_info.clone(),
|
||||||
|
None,
|
||||||
|
None,
|
||||||
|
gossip_socket,
|
||||||
|
exit.clone(),
|
||||||
|
);
|
||||||
|
|
||||||
let leader_entry_point = NodeInfo::new_entry_point(&leader_gossip);
|
let leader_entry_point = NodeInfo::new_entry_point(&leader_gossip);
|
||||||
cluster_info
|
cluster_info
|
||||||
|
|
|
@ -112,7 +112,7 @@ fn network_simulator(network: &mut Network) {
|
||||||
// make sure there is someone in the active set
|
// make sure there is someone in the active set
|
||||||
let network_values: Vec<Node> = network.values().cloned().collect();
|
let network_values: Vec<Node> = network.values().cloned().collect();
|
||||||
network_values.par_iter().for_each(|node| {
|
network_values.par_iter().for_each(|node| {
|
||||||
node.lock().unwrap().refresh_push_active_set();
|
node.lock().unwrap().refresh_push_active_set(None);
|
||||||
});
|
});
|
||||||
let mut total_bytes = bytes_tx;
|
let mut total_bytes = bytes_tx;
|
||||||
for second in 1..num {
|
for second in 1..num {
|
||||||
|
@ -211,7 +211,7 @@ fn network_run_push(network: &mut Network, start: usize, end: usize) -> (usize,
|
||||||
}
|
}
|
||||||
if now % CRDS_GOSSIP_PUSH_MSG_TIMEOUT_MS == 0 && now > 0 {
|
if now % CRDS_GOSSIP_PUSH_MSG_TIMEOUT_MS == 0 && now > 0 {
|
||||||
network_values.par_iter().for_each(|node| {
|
network_values.par_iter().for_each(|node| {
|
||||||
node.lock().unwrap().refresh_push_active_set();
|
node.lock().unwrap().refresh_push_active_set(None);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
total = network_values
|
total = network_values
|
||||||
|
@ -249,7 +249,7 @@ fn network_run_pull(
|
||||||
let requests: Vec<_> = {
|
let requests: Vec<_> = {
|
||||||
network_values
|
network_values
|
||||||
.par_iter()
|
.par_iter()
|
||||||
.filter_map(|from| from.lock().unwrap().new_pull_request(now).ok())
|
.filter_map(|from| from.lock().unwrap().new_pull_request(now, None).ok())
|
||||||
.collect()
|
.collect()
|
||||||
};
|
};
|
||||||
let transfered: Vec<_> = requests
|
let transfered: Vec<_> = requests
|
||||||
|
@ -372,7 +372,7 @@ fn test_prune_errors() {
|
||||||
.crds
|
.crds
|
||||||
.insert(CrdsValue::ContactInfo(ci.clone()), 0)
|
.insert(CrdsValue::ContactInfo(ci.clone()), 0)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
crds_gossip.refresh_push_active_set();
|
crds_gossip.refresh_push_active_set(None);
|
||||||
let now = timestamp();
|
let now = timestamp();
|
||||||
//incorrect dest
|
//incorrect dest
|
||||||
let mut res = crds_gossip.process_prune_msg(
|
let mut res = crds_gossip.process_prune_msg(
|
||||||
|
|
|
@ -21,7 +21,7 @@ fn test_node(exit: Arc<AtomicBool>) -> (Arc<RwLock<ClusterInfo>>, GossipService,
|
||||||
let mut tn = Node::new_localhost_with_pubkey(keypair.pubkey());
|
let mut tn = Node::new_localhost_with_pubkey(keypair.pubkey());
|
||||||
let cluster_info = ClusterInfo::new_with_keypair(tn.info.clone(), Arc::new(keypair));
|
let cluster_info = ClusterInfo::new_with_keypair(tn.info.clone(), Arc::new(keypair));
|
||||||
let c = Arc::new(RwLock::new(cluster_info));
|
let c = Arc::new(RwLock::new(cluster_info));
|
||||||
let d = GossipService::new(&c.clone(), None, tn.sockets.gossip, exit);
|
let d = GossipService::new(&c.clone(), None, None, tn.sockets.gossip, exit);
|
||||||
let _ = c.read().unwrap().my_data();
|
let _ = c.read().unwrap().my_data();
|
||||||
(c, d, tn.sockets.tvu.pop().unwrap())
|
(c, d, tn.sockets.tvu.pop().unwrap())
|
||||||
}
|
}
|
||||||
|
|
|
@ -32,7 +32,7 @@ fn new_gossip(
|
||||||
gossip: UdpSocket,
|
gossip: UdpSocket,
|
||||||
exit: Arc<AtomicBool>,
|
exit: Arc<AtomicBool>,
|
||||||
) -> GossipService {
|
) -> GossipService {
|
||||||
GossipService::new(&cluster_info, None, gossip, exit)
|
GossipService::new(&cluster_info, None, None, gossip, exit)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Test that message sent from leader to target1 and replayed to target2
|
/// Test that message sent from leader to target1 and replayed to target2
|
||||||
|
|
Loading…
Reference in New Issue