diff --git a/bench-tps/src/main.rs b/bench-tps/src/main.rs index fba87ae0b..48899ef9e 100644 --- a/bench-tps/src/main.rs +++ b/bench-tps/src/main.rs @@ -41,7 +41,8 @@ fn converge( spy_cluster_info.insert_info(leader.clone()); spy_cluster_info.set_leader(leader.id); let spy_ref = Arc::new(RwLock::new(spy_cluster_info)); - let gossip_service = GossipService::new(&spy_ref, None, gossip_socket, exit_signal.clone()); + let gossip_service = + GossipService::new(&spy_ref, None, None, gossip_socket, exit_signal.clone()); let mut v: Vec = vec![]; // wait for the network to converge, 30 seconds should be plenty for _ in 0..30 { diff --git a/src/cluster_info.rs b/src/cluster_info.rs index 3ad819869..721005508 100644 --- a/src/cluster_info.rs +++ b/src/cluster_info.rs @@ -174,16 +174,16 @@ impl ClusterInfo { let id = node_info.id; me.gossip.set_self(id); me.insert_info(node_info); - me.push_self(); + me.push_self(None); me } - pub fn push_self(&mut self) { + pub fn push_self(&mut self, bank: Option<&Arc>) { let mut my_data = self.my_data(); let now = timestamp(); my_data.wallclock = now; let mut entry = CrdsValue::ContactInfo(my_data); entry.sign(&self.keypair); - self.gossip.refresh_push_active_set(); + self.gossip.refresh_push_active_set(bank); self.gossip.process_push_message(&[entry], now); } pub fn insert_info(&mut self, node_info: NodeInfo) { @@ -756,9 +756,14 @@ impl ClusterInfo { Ok((addr, out)) } - fn new_pull_requests(&mut self) -> Vec<(SocketAddr, Protocol)> { + fn new_pull_requests(&mut self, bank: Option<&Arc>) -> Vec<(SocketAddr, Protocol)> { let now = timestamp(); - let pulls: Vec<_> = self.gossip.new_pull_request(now).ok().into_iter().collect(); + let pulls: Vec<_> = self + .gossip + .new_pull_request(now, bank) + .ok() + .into_iter() + .collect(); let pr: Vec<_> = pulls .into_iter() @@ -795,15 +800,19 @@ impl ClusterInfo { .collect() } - fn gossip_request(&mut self) -> Vec<(SocketAddr, Protocol)> { - let pulls: Vec<_> = self.new_pull_requests(); + fn gossip_request(&mut self, bank: Option<&Arc>) -> Vec<(SocketAddr, Protocol)> { + let pulls: Vec<_> = self.new_pull_requests(bank); let pushes: Vec<_> = self.new_push_requests(); vec![pulls, pushes].into_iter().flat_map(|x| x).collect() } /// At random pick a node and try to get updated changes from them - fn run_gossip(obj: &Arc>, blob_sender: &BlobSender) -> Result<()> { - let reqs = obj.write().unwrap().gossip_request(); + fn run_gossip( + obj: &Arc>, + bank: Option<&Arc>, + blob_sender: &BlobSender, + ) -> Result<()> { + let reqs = obj.write().unwrap().gossip_request(bank); let blobs = reqs .into_iter() .filter_map(|(remote_gossip_addr, req)| to_shared_blob(req, remote_gossip_addr).ok()) @@ -845,6 +854,7 @@ impl ClusterInfo { /// randomly pick a node and ask them for updates asynchronously pub fn gossip( obj: Arc>, + bank: Option>, blob_sender: BlobSender, exit: Arc, ) -> JoinHandle<()> { @@ -854,7 +864,7 @@ impl ClusterInfo { let mut last_push = timestamp(); loop { let start = timestamp(); - let _ = Self::run_gossip(&obj, &blob_sender); + let _ = Self::run_gossip(&obj, bank.as_ref(), &blob_sender); if exit.load(Ordering::Relaxed) { return; } @@ -862,7 +872,7 @@ impl ClusterInfo { //TODO: possibly tune this parameter //we saw a deadlock passing an obj.read().unwrap().timeout into sleep if start - last_push > CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS / 2 { - obj.write().unwrap().push_self(); + obj.write().unwrap().push_self(bank.as_ref()); last_push = timestamp(); } let elapsed = timestamp() - start; @@ -1451,8 +1461,8 @@ mod tests { .write() .unwrap() .gossip - .refresh_push_active_set(); - let reqs = cluster_info.write().unwrap().gossip_request(); + .refresh_push_active_set(None); + let reqs = cluster_info.write().unwrap().gossip_request(None); //assert none of the addrs are invalid. reqs.iter().all(|(addr, _)| { let res = ContactInfo::is_valid_address(addr); @@ -1730,7 +1740,7 @@ mod tests { let (_, _, val) = cluster_info .gossip - .new_pull_request(timestamp()) + .new_pull_request(timestamp(), None) .ok() .unwrap(); assert!(val.verify()); diff --git a/src/crds_gossip.rs b/src/crds_gossip.rs index bd91949a8..b5cc55574 100644 --- a/src/crds_gossip.rs +++ b/src/crds_gossip.rs @@ -8,9 +8,11 @@ use crate::crds_gossip_error::CrdsGossipError; use crate::crds_gossip_pull::CrdsGossipPull; use crate::crds_gossip_push::{CrdsGossipPush, CRDS_GOSSIP_NUM_ACTIVE}; use crate::crds_value::CrdsValue; +use solana_runtime::bank::Bank; use solana_runtime::bloom::Bloom; use solana_sdk::hash::Hash; use solana_sdk::pubkey::Pubkey; +use std::sync::Arc; ///The min size for bloom filters pub const CRDS_GOSSIP_BLOOM_SIZE: usize = 1000; @@ -92,9 +94,10 @@ impl CrdsGossip { /// refresh the push active set /// * ratio - number of actives to rotate - pub fn refresh_push_active_set(&mut self) { + pub fn refresh_push_active_set(&mut self, bank: Option<&Arc>) { self.push.refresh_push_active_set( &self.crds, + bank, self.id, self.pull.pull_request_time.len(), CRDS_GOSSIP_NUM_ACTIVE, @@ -105,8 +108,9 @@ impl CrdsGossip { pub fn new_pull_request( &self, now: u64, + bank: Option<&Arc>, ) -> Result<(Pubkey, Bloom, CrdsValue), CrdsGossipError> { - self.pull.new_pull_request(&self.crds, self.id, now) + self.pull.new_pull_request(&self.crds, self.id, now, bank) } /// time when a request to `from` was initiated @@ -156,6 +160,28 @@ impl CrdsGossip { } } +/// Computes a normalized(log of bank balance) stake +pub fn get_stake(id: &Pubkey, bank: Option<&Arc>) -> f32 { + match bank { + Some(bank) => { + // cap the max balance to u32 max (it should be plenty) + let bal = f64::from(u32::max_value()).min(bank.get_balance(id) as f64); + 1_f32.max((bal as f32).ln()) + } + _ => 1.0, + } +} + +/// Computes bounded weight given some max, a time since last selected, and a stake value +/// The minimum stake is 1 and not 0 to allow 'time since last' picked to factor in. +pub fn get_weight(max_weight: f32, time_since_last_selected: u32, stake: f32) -> f32 { + let mut weight = time_since_last_selected as f32 * stake; + if weight.is_infinite() { + weight = max_weight; + } + 1.0_f32.max(weight.min(max_weight)) +} + #[cfg(test)] mod test { use super::*; @@ -174,7 +200,7 @@ mod test { .crds .insert(CrdsValue::ContactInfo(ci.clone()), 0) .unwrap(); - crds_gossip.refresh_push_active_set(); + crds_gossip.refresh_push_active_set(None); let now = timestamp(); //incorrect dest let mut res = crds_gossip.process_prune_msg( diff --git a/src/crds_gossip_pull.rs b/src/crds_gossip_pull.rs index e015c0085..406a05448 100644 --- a/src/crds_gossip_pull.rs +++ b/src/crds_gossip_pull.rs @@ -9,8 +9,9 @@ //! with random hash functions. So each subsequent request will have a different distribution //! of false positives. +use crate::contact_info::ContactInfo; use crate::crds::Crds; -use crate::crds_gossip::CRDS_GOSSIP_BLOOM_SIZE; +use crate::crds_gossip::{get_stake, get_weight, CRDS_GOSSIP_BLOOM_SIZE}; use crate::crds_gossip_error::CrdsGossipError; use crate::crds_value::{CrdsValue, CrdsValueLabel}; use crate::packet::BLOB_DATA_SIZE; @@ -18,11 +19,13 @@ use bincode::serialized_size; use hashbrown::HashMap; use rand; use rand::distributions::{Distribution, WeightedIndex}; +use solana_runtime::bank::Bank; use solana_runtime::bloom::Bloom; use solana_sdk::hash::Hash; use solana_sdk::pubkey::Pubkey; use std::cmp; use std::collections::VecDeque; +use std::sync::Arc; pub const CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS: u64 = 15000; @@ -54,20 +57,19 @@ impl CrdsGossipPull { crds: &Crds, self_id: Pubkey, now: u64, + bank: Option<&Arc>, ) -> Result<(Pubkey, Bloom, CrdsValue), CrdsGossipError> { let options: Vec<_> = crds .table .values() .filter_map(|v| v.value.contact_info()) - .filter(|v| { - v.id != self_id && !v.gossip.ip().is_unspecified() && !v.gossip.ip().is_multicast() - }) + .filter(|v| v.id != self_id && ContactInfo::is_valid_address(&v.gossip)) .map(|item| { + let max_weight = f32::from(u16::max_value()) - 1.0; let req_time: u64 = *self.pull_request_time.get(&item.id).unwrap_or(&0); - let weight = cmp::max( - 1, - cmp::min(u64::from(u16::max_value()) - 1, (now - req_time) / 1024) as u32, - ); + let since = ((now - req_time) / 1024) as u32; + let stake = get_stake(&item.id, bank); + let weight = get_weight(max_weight, since, stake); (weight, item) }) .collect(); @@ -201,7 +203,41 @@ mod test { use super::*; use crate::contact_info::ContactInfo; use crate::crds_value::LeaderId; + use solana_sdk::genesis_block::GenesisBlock; use solana_sdk::signature::{Keypair, KeypairUtil}; + use std::f32::consts::E; + + #[test] + fn test_new_pull_with_bank() { + let (block, mint_keypair) = GenesisBlock::new(500_000); + let bank = Arc::new(Bank::new(&block)); + let mut crds = Crds::default(); + let node = CrdsGossipPull::default(); + let me = CrdsValue::ContactInfo(ContactInfo::new_localhost(Keypair::new().pubkey(), 0)); + crds.insert(me.clone(), 0).unwrap(); + for i in 1..=30 { + let entry = + CrdsValue::ContactInfo(ContactInfo::new_localhost(Keypair::new().pubkey(), 0)); + let id = entry.label().pubkey(); + crds.insert(entry.clone(), 0).unwrap(); + bank.transfer(i * 100, &mint_keypair, id, bank.last_id()) + .unwrap(); + } + // The min balance of the heaviest nodes is at least ln(3000) - 0.5 + // This is because the heaviest nodes will have very similar weights + let min_balance = E.powf(3000_f32.ln() - 0.5); + let now = 1024; + // try upto 10 times because of rng + for _ in 0..10 { + let msg = node + .new_pull_request(&crds, me.label().pubkey(), now, Some(&bank)) + .unwrap(); + if bank.get_balance(&msg.0) >= min_balance.round() as u64 { + return; + } + } + assert!(false, "weighted nodes didn't get picked"); + } #[test] fn test_new_pull_request() { @@ -210,19 +246,19 @@ mod test { let id = entry.label().pubkey(); let node = CrdsGossipPull::default(); assert_eq!( - node.new_pull_request(&crds, id, 0), + node.new_pull_request(&crds, id, 0, None), Err(CrdsGossipError::NoPeers) ); crds.insert(entry.clone(), 0).unwrap(); assert_eq!( - node.new_pull_request(&crds, id, 0), + node.new_pull_request(&crds, id, 0, None), Err(CrdsGossipError::NoPeers) ); let new = CrdsValue::ContactInfo(ContactInfo::new_localhost(Keypair::new().pubkey(), 0)); crds.insert(new.clone(), 0).unwrap(); - let req = node.new_pull_request(&crds, id, 0); + let req = node.new_pull_request(&crds, id, 0, None); let (to, _, self_info) = req.unwrap(); assert_eq!(to, new.label().pubkey()); assert_eq!(self_info, entry); @@ -245,7 +281,7 @@ mod test { // odds of getting the other request should be 1 in u64::max_value() for _ in 0..10 { - let req = node.new_pull_request(&crds, node_id, u64::max_value()); + let req = node.new_pull_request(&crds, node_id, u64::max_value(), None); let (to, _, self_info) = req.unwrap(); assert_eq!(to, old.label().pubkey()); assert_eq!(self_info, entry); @@ -261,7 +297,7 @@ mod test { node_crds.insert(entry.clone(), 0).unwrap(); let new = CrdsValue::ContactInfo(ContactInfo::new_localhost(Keypair::new().pubkey(), 0)); node_crds.insert(new.clone(), 0).unwrap(); - let req = node.new_pull_request(&node_crds, node_id, 0); + let req = node.new_pull_request(&node_crds, node_id, 0, None); let mut dest_crds = Crds::default(); let mut dest = CrdsGossipPull::default(); @@ -313,7 +349,7 @@ mod test { let mut done = false; for _ in 0..30 { // there is a chance of a false positive with bloom filters - let req = node.new_pull_request(&node_crds, node_id, 0); + let req = node.new_pull_request(&node_crds, node_id, 0, None); let (_, filter, caller) = req.unwrap(); let rsp = dest.process_pull_request(&mut dest_crds, caller, filter, 0); // if there is a false positive this is empty diff --git a/src/crds_gossip_push.rs b/src/crds_gossip_push.rs index 6411e9512..856854913 100644 --- a/src/crds_gossip_push.rs +++ b/src/crds_gossip_push.rs @@ -10,7 +10,7 @@ use crate::contact_info::ContactInfo; use crate::crds::{Crds, VersionedCrdsValue}; -use crate::crds_gossip::CRDS_GOSSIP_BLOOM_SIZE; +use crate::crds_gossip::{get_stake, get_weight, CRDS_GOSSIP_BLOOM_SIZE}; use crate::crds_gossip_error::CrdsGossipError; use crate::crds_value::{CrdsValue, CrdsValueLabel}; use crate::packet::BLOB_DATA_SIZE; @@ -18,11 +18,15 @@ use bincode::serialized_size; use hashbrown::HashMap; use indexmap::map::IndexMap; use rand; +use rand::distributions::{Distribution, WeightedIndex}; use rand::seq::SliceRandom; +use solana_runtime::bank::Bank; use solana_runtime::bloom::Bloom; use solana_sdk::hash::Hash; use solana_sdk::pubkey::Pubkey; +use solana_sdk::timing::timestamp; use std::cmp; +use std::sync::Arc; pub const CRDS_GOSSIP_NUM_ACTIVE: usize = 30; pub const CRDS_GOSSIP_PUSH_FANOUT: usize = 6; @@ -160,41 +164,50 @@ impl CrdsGossipPush { pub fn refresh_push_active_set( &mut self, crds: &Crds, + bank: Option<&Arc>, self_id: Pubkey, network_size: usize, ratio: usize, ) { let need = Self::compute_need(self.num_active, self.active_set.len(), ratio); let mut new_items = HashMap::new(); - let mut ixs: Vec<_> = (0..crds.table.len()).collect(); - ixs.shuffle(&mut rand::thread_rng()); - for ix in ixs { - let item = crds.table.get_index(ix); - if item.is_none() { + let mut options: Vec<_> = crds + .table + .values() + .filter(|v| v.value.contact_info().is_some()) + .map(|v| (v.value.contact_info().unwrap(), v)) + .filter(|(info, _)| info.id != self_id && ContactInfo::is_valid_address(&info.gossip)) + .map(|(info, value)| { + let max_weight = f32::from(u16::max_value()) - 1.0; + let last_updated: u64 = value.local_timestamp; + let since = ((timestamp() - last_updated) / 1024) as u32; + let stake = get_stake(&info.id, bank); + let weight = get_weight(max_weight, since, stake); + (weight, info) + }) + .collect(); + if options.is_empty() { + return; + } + while new_items.len() < need { + let index = WeightedIndex::new(options.iter().map(|weighted| weighted.0)); + if index.is_err() { + break; + } + let index = index.unwrap(); + let index = index.sample(&mut rand::thread_rng()); + let item = options[index].1; + options.remove(index); + if self.active_set.get(&item.id).is_some() { continue; } - let val = item.unwrap(); - if val.0.pubkey() == self_id { + if new_items.get(&item.id).is_some() { continue; } - if self.active_set.get(&val.0.pubkey()).is_some() { - continue; - } - if new_items.get(&val.0.pubkey()).is_some() { - continue; - } - if let Some(contact) = val.1.value.contact_info() { - if !ContactInfo::is_valid_address(&contact.gossip) { - continue; - } - } let size = cmp::max(CRDS_GOSSIP_BLOOM_SIZE, network_size); let bloom = Bloom::random(size, 0.1, 1024 * 8 * 4); - new_items.insert(val.0.pubkey(), bloom); - if new_items.len() == need { - break; - } + new_items.insert(item.id, bloom); } let mut keys: Vec = self.active_set.keys().cloned().collect(); keys.shuffle(&mut rand::thread_rng()); @@ -247,7 +260,10 @@ impl CrdsGossipPush { mod test { use super::*; use crate::contact_info::ContactInfo; + use solana_sdk::genesis_block::GenesisBlock; use solana_sdk::signature::{Keypair, KeypairUtil}; + use std::f32::consts::E; + #[test] fn test_process_push() { let mut crds = Crds::default(); @@ -349,14 +365,14 @@ mod test { let value1 = CrdsValue::ContactInfo(ContactInfo::new_localhost(Keypair::new().pubkey(), 0)); assert_eq!(crds.insert(value1.clone(), 0), Ok(None)); - push.refresh_push_active_set(&crds, Pubkey::default(), 1, 1); + push.refresh_push_active_set(&crds, None, Pubkey::default(), 1, 1); assert!(push.active_set.get(&value1.label().pubkey()).is_some()); let value2 = CrdsValue::ContactInfo(ContactInfo::new_localhost(Keypair::new().pubkey(), 0)); assert!(push.active_set.get(&value2.label().pubkey()).is_none()); assert_eq!(crds.insert(value2.clone(), 0), Ok(None)); for _ in 0..30 { - push.refresh_push_active_set(&crds, Pubkey::default(), 1, 1); + push.refresh_push_active_set(&crds, None, Pubkey::default(), 1, 1); if push.active_set.get(&value2.label().pubkey()).is_some() { break; } @@ -368,16 +384,54 @@ mod test { CrdsValue::ContactInfo(ContactInfo::new_localhost(Keypair::new().pubkey(), 0)); assert_eq!(crds.insert(value2.clone(), 0), Ok(None)); } - push.refresh_push_active_set(&crds, Pubkey::default(), 1, 1); + push.refresh_push_active_set(&crds, None, Pubkey::default(), 1, 1); assert_eq!(push.active_set.len(), push.num_active); } #[test] + fn test_active_set_refresh_with_bank() { + let (block, mint_keypair) = GenesisBlock::new(100_000_000); + let bank = Arc::new(Bank::new(&block)); + let time = timestamp() - 1024; //make sure there's at least a 1 second delay + let mut crds = Crds::default(); + let mut push = CrdsGossipPush::default(); + for i in 1..=100 { + let peer = + CrdsValue::ContactInfo(ContactInfo::new_localhost(Keypair::new().pubkey(), time)); + let id = peer.label().pubkey(); + crds.insert(peer.clone(), time).unwrap(); + bank.transfer(i * 100, &mint_keypair, id, bank.last_id()) + .unwrap(); + } + let min_balance = E.powf(7000_f32.ln() - 0.5); + // try upto 10 times because of rng + for _ in 0..10 { + push.refresh_push_active_set(&crds, Some(&bank), Pubkey::default(), 100, 30); + let mut num_correct = 0; + let mut num_wrong = 0; + push.active_set.iter().for_each(|peer| { + if bank.get_balance(peer.0) >= min_balance as u64 { + num_correct += 1; + } else { + num_wrong += 1; + } + }); + // at least half of the heaviest nodes should be picked + if num_wrong <= num_correct { + return; + } + } + assert!( + false, + "expected at 50% of the active set to contain the heaviest nodes" + ); + } + #[test] fn test_new_push_messages() { let mut crds = Crds::default(); let mut push = CrdsGossipPush::default(); let peer = CrdsValue::ContactInfo(ContactInfo::new_localhost(Keypair::new().pubkey(), 0)); assert_eq!(crds.insert(peer.clone(), 0), Ok(None)); - push.refresh_push_active_set(&crds, Pubkey::default(), 1, 1); + push.refresh_push_active_set(&crds, None, Pubkey::default(), 1, 1); let new_msg = CrdsValue::ContactInfo(ContactInfo::new_localhost(Keypair::new().pubkey(), 0)); @@ -397,7 +451,7 @@ mod test { let mut push = CrdsGossipPush::default(); let peer = CrdsValue::ContactInfo(ContactInfo::new_localhost(Keypair::new().pubkey(), 0)); assert_eq!(crds.insert(peer.clone(), 0), Ok(None)); - push.refresh_push_active_set(&crds, Pubkey::default(), 1, 1); + push.refresh_push_active_set(&crds, None, Pubkey::default(), 1, 1); let new_msg = CrdsValue::ContactInfo(ContactInfo::new_localhost(Keypair::new().pubkey(), 0)); @@ -417,7 +471,7 @@ mod test { let mut push = CrdsGossipPush::default(); let peer = CrdsValue::ContactInfo(ContactInfo::new_localhost(Keypair::new().pubkey(), 0)); assert_eq!(crds.insert(peer.clone(), 0), Ok(None)); - push.refresh_push_active_set(&crds, Pubkey::default(), 1, 1); + push.refresh_push_active_set(&crds, None, Pubkey::default(), 1, 1); let mut ci = ContactInfo::new_localhost(Keypair::new().pubkey(), 0); ci.wallclock = 1; diff --git a/src/fullnode.rs b/src/fullnode.rs index efd951b23..9aa6a30c0 100644 --- a/src/fullnode.rs +++ b/src/fullnode.rs @@ -181,6 +181,7 @@ impl Fullnode { let gossip_service = GossipService::new( &cluster_info, Some(blocktree.clone()), + Some(bank.clone()), node.sockets.gossip, exit.clone(), ); diff --git a/src/gossip_service.rs b/src/gossip_service.rs index 62915e828..ad7ead61e 100644 --- a/src/gossip_service.rs +++ b/src/gossip_service.rs @@ -4,6 +4,7 @@ use crate::blocktree::Blocktree; use crate::cluster_info::{ClusterInfo, Node, NodeInfo}; use crate::service::Service; use crate::streamer; +use solana_runtime::bank::Bank; use solana_sdk::pubkey::Pubkey; use solana_sdk::signature::{Keypair, KeypairUtil}; use std::net::UdpSocket; @@ -23,6 +24,7 @@ impl GossipService { pub fn new( cluster_info: &Arc>, blocktree: Option>, + bank: Option>, gossip_socket: UdpSocket, exit: Arc, ) -> Self { @@ -44,7 +46,8 @@ impl GossipService { response_sender.clone(), exit.clone(), ); - let t_gossip = ClusterInfo::gossip(cluster_info.clone(), response_sender, exit.clone()); + let t_gossip = + ClusterInfo::gossip(cluster_info.clone(), bank, response_sender, exit.clone()); let thread_hdls = vec![t_receiver, t_responder, t_listen, t_gossip]; Self { exit, thread_hdls } } @@ -70,6 +73,7 @@ pub fn make_listening_node( let gossip_service = GossipService::new( &new_node_cluster_info_ref, None, + None, new_node .sockets .gossip @@ -124,6 +128,7 @@ pub fn make_spy_node(leader: &NodeInfo) -> (GossipService, Arc) - let (node, gossip_socket) = ClusterInfo::spy_node(); let my_addr = gossip_socket.local_addr().unwrap(); let cluster_info = Arc::new(RwLock::new(ClusterInfo::new(node))); - let gossip_service = - GossipService::new(&cluster_info.clone(), None, gossip_socket, exit.clone()); + let gossip_service = GossipService::new( + &cluster_info.clone(), + None, + None, + gossip_socket, + exit.clone(), + ); let leader_entry_point = NodeInfo::new_entry_point(&leader_gossip); cluster_info diff --git a/tests/crds_gossip.rs b/tests/crds_gossip.rs index 1a3acc1cc..c236c94df 100644 --- a/tests/crds_gossip.rs +++ b/tests/crds_gossip.rs @@ -112,7 +112,7 @@ fn network_simulator(network: &mut Network) { // make sure there is someone in the active set let network_values: Vec = network.values().cloned().collect(); network_values.par_iter().for_each(|node| { - node.lock().unwrap().refresh_push_active_set(); + node.lock().unwrap().refresh_push_active_set(None); }); let mut total_bytes = bytes_tx; for second in 1..num { @@ -211,7 +211,7 @@ fn network_run_push(network: &mut Network, start: usize, end: usize) -> (usize, } if now % CRDS_GOSSIP_PUSH_MSG_TIMEOUT_MS == 0 && now > 0 { network_values.par_iter().for_each(|node| { - node.lock().unwrap().refresh_push_active_set(); + node.lock().unwrap().refresh_push_active_set(None); }); } total = network_values @@ -249,7 +249,7 @@ fn network_run_pull( let requests: Vec<_> = { network_values .par_iter() - .filter_map(|from| from.lock().unwrap().new_pull_request(now).ok()) + .filter_map(|from| from.lock().unwrap().new_pull_request(now, None).ok()) .collect() }; let transfered: Vec<_> = requests @@ -372,7 +372,7 @@ fn test_prune_errors() { .crds .insert(CrdsValue::ContactInfo(ci.clone()), 0) .unwrap(); - crds_gossip.refresh_push_active_set(); + crds_gossip.refresh_push_active_set(None); let now = timestamp(); //incorrect dest let mut res = crds_gossip.process_prune_msg( diff --git a/tests/gossip.rs b/tests/gossip.rs index e324a0150..f6471e438 100644 --- a/tests/gossip.rs +++ b/tests/gossip.rs @@ -21,7 +21,7 @@ fn test_node(exit: Arc) -> (Arc>, GossipService, let mut tn = Node::new_localhost_with_pubkey(keypair.pubkey()); let cluster_info = ClusterInfo::new_with_keypair(tn.info.clone(), Arc::new(keypair)); let c = Arc::new(RwLock::new(cluster_info)); - let d = GossipService::new(&c.clone(), None, tn.sockets.gossip, exit); + let d = GossipService::new(&c.clone(), None, None, tn.sockets.gossip, exit); let _ = c.read().unwrap().my_data(); (c, d, tn.sockets.tvu.pop().unwrap()) } diff --git a/tests/tvu.rs b/tests/tvu.rs index ac63c24cf..fc642e669 100644 --- a/tests/tvu.rs +++ b/tests/tvu.rs @@ -32,7 +32,7 @@ fn new_gossip( gossip: UdpSocket, exit: Arc, ) -> GossipService { - GossipService::new(&cluster_info, None, gossip, exit) + GossipService::new(&cluster_info, None, None, gossip, exit) } /// Test that message sent from leader to target1 and replayed to target2