Fix fannout gossip bench (#10509)
* Gossip benchmark * Rayon tweaking * push pulls * fanout to max nodes * fixup! fanout to max nodes * fixup! fixup! fanout to max nodes * update * multi vote test * fixup prune * fast propagation * fixups * compute up to 95% * test for specific tx * stats * stats * fixed tests * rename * track a lagging view of which nodes have the local node in their active set in the local received_cache * test fixups * dups are old now * dont prune your own origin * send vote to tpu * tests * fixed tests * fixed test * update * ignore scale * lint * fixup * fixup * fixup * cleanup Co-authored-by: Stephen Akridge <sakridge@gmail.com>
This commit is contained in:
parent
07e8e8af38
commit
ba83e4ca50
|
@ -51,7 +51,7 @@ type PacketsAndOffsets = (Packets, Vec<usize>);
|
|||
pub type UnprocessedPackets = Vec<PacketsAndOffsets>;
|
||||
|
||||
/// Transaction forwarding
|
||||
pub const FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET: u64 = 4;
|
||||
pub const FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET: u64 = 1;
|
||||
|
||||
// Fixed thread size seems to be fastest on GCP setup
|
||||
pub const NUM_THREADS: u32 = 4;
|
||||
|
|
|
@ -252,6 +252,7 @@ pub struct ClusterInfo {
|
|||
my_contact_info: RwLock<ContactInfo>,
|
||||
id: Pubkey,
|
||||
stats: GossipStats,
|
||||
socket: UdpSocket,
|
||||
}
|
||||
|
||||
impl Default for ClusterInfo {
|
||||
|
@ -407,6 +408,7 @@ impl ClusterInfo {
|
|||
my_contact_info: RwLock::new(contact_info),
|
||||
id,
|
||||
stats: GossipStats::default(),
|
||||
socket: UdpSocket::bind("0.0.0.0:0").unwrap(),
|
||||
};
|
||||
{
|
||||
let mut gossip = me.gossip.write().unwrap();
|
||||
|
@ -432,6 +434,7 @@ impl ClusterInfo {
|
|||
my_contact_info: RwLock::new(my_contact_info),
|
||||
id: *new_id,
|
||||
stats: GossipStats::default(),
|
||||
socket: UdpSocket::bind("0.0.0.0:0").unwrap(),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -737,6 +740,13 @@ impl ClusterInfo {
|
|||
.process_push_message(&self.id(), vec![entry], now);
|
||||
}
|
||||
|
||||
pub fn send_vote(&self, vote: &Transaction) -> Result<()> {
|
||||
let tpu = self.my_contact_info().tpu;
|
||||
let buf = serialize(vote)?;
|
||||
self.socket.send_to(&buf, &tpu)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Get votes in the crds
|
||||
/// * since - The timestamp of when the vote inserted must be greater than
|
||||
/// since. This allows the bank to query for new votes only.
|
||||
|
@ -2250,7 +2260,7 @@ impl ClusterInfo {
|
|||
.name("solana-listen".to_string())
|
||||
.spawn(move || {
|
||||
let thread_pool = rayon::ThreadPoolBuilder::new()
|
||||
.num_threads(get_thread_count())
|
||||
.num_threads(std::cmp::min(get_thread_count(), 8))
|
||||
.thread_name(|i| format!("sol-gossip-work-{}", i))
|
||||
.build()
|
||||
.unwrap();
|
||||
|
|
|
@ -36,6 +36,7 @@ use std::collections::HashMap;
|
|||
pub struct Crds {
|
||||
/// Stores the map of labels and values
|
||||
pub table: IndexMap<CrdsValueLabel, VersionedCrdsValue>,
|
||||
pub num_inserts: usize,
|
||||
}
|
||||
|
||||
#[derive(PartialEq, Debug)]
|
||||
|
@ -84,6 +85,7 @@ impl Default for Crds {
|
|||
fn default() -> Self {
|
||||
Crds {
|
||||
table: IndexMap::new(),
|
||||
num_inserts: 0,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -125,6 +127,7 @@ impl Crds {
|
|||
.unwrap_or(true);
|
||||
if do_insert {
|
||||
let old = self.table.insert(label, new_value);
|
||||
self.num_inserts += 1;
|
||||
Ok(old)
|
||||
} else {
|
||||
trace!("INSERT FAILED data: {} new.wallclock: {}", label, wallclock,);
|
||||
|
|
|
@ -76,17 +76,10 @@ impl CrdsGossip {
|
|||
stakes: &HashMap<Pubkey, u64>,
|
||||
) -> HashMap<Pubkey, HashSet<Pubkey>> {
|
||||
let id = &self.id;
|
||||
let crds = &self.crds;
|
||||
let push = &mut self.push;
|
||||
let versioned = labels
|
||||
.into_iter()
|
||||
.filter_map(|label| crds.lookup_versioned(&label));
|
||||
|
||||
let mut prune_map: HashMap<Pubkey, HashSet<_>> = HashMap::new();
|
||||
for val in versioned {
|
||||
let origin = val.value.pubkey();
|
||||
let hash = val.value_hash;
|
||||
let peers = push.prune_received_cache(id, &origin, hash, stakes);
|
||||
for origin in labels.iter().map(|k| k.pubkey()) {
|
||||
let peers = push.prune_received_cache(id, &origin, stakes);
|
||||
for from in peers {
|
||||
prune_map.entry(from).or_default().insert(origin);
|
||||
}
|
||||
|
@ -113,7 +106,7 @@ impl CrdsGossip {
|
|||
return Err(CrdsGossipError::PruneMessageTimeout);
|
||||
}
|
||||
if self.id == *destination {
|
||||
self.push.process_prune_msg(peer, origin);
|
||||
self.push.process_prune_msg(&self.id, peer, origin);
|
||||
Ok(())
|
||||
} else {
|
||||
Err(CrdsGossipError::BadPruneDestination)
|
||||
|
@ -190,14 +183,15 @@ impl CrdsGossip {
|
|||
now: u64,
|
||||
process_pull_stats: &mut ProcessPullStats,
|
||||
) {
|
||||
self.pull.process_pull_responses(
|
||||
let success = self.pull.process_pull_responses(
|
||||
&mut self.crds,
|
||||
from,
|
||||
responses,
|
||||
responses_expired_timeout,
|
||||
now,
|
||||
process_pull_stats,
|
||||
)
|
||||
);
|
||||
self.push.push_pull_responses(success, now);
|
||||
}
|
||||
|
||||
pub fn make_timeouts_test(&self) -> HashMap<Pubkey, u64> {
|
||||
|
|
|
@ -2,7 +2,6 @@
|
|||
pub enum CrdsGossipError {
|
||||
NoPeers,
|
||||
PushMessageTimeout,
|
||||
PushMessageAlreadyReceived,
|
||||
PushMessageOldVersion,
|
||||
BadPruneDestination,
|
||||
PruneMessageTimeout,
|
||||
|
|
|
@ -134,6 +134,7 @@ pub struct CrdsGossipPull {
|
|||
purged_values: VecDeque<(Hash, u64)>,
|
||||
pub crds_timeout: u64,
|
||||
pub msg_timeout: u64,
|
||||
pub num_pulls: usize,
|
||||
}
|
||||
|
||||
impl Default for CrdsGossipPull {
|
||||
|
@ -143,6 +144,7 @@ impl Default for CrdsGossipPull {
|
|||
pull_request_time: HashMap::new(),
|
||||
crds_timeout: CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS,
|
||||
msg_timeout: CRDS_GOSSIP_PULL_MSG_TIMEOUT_MS,
|
||||
num_pulls: 0,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -313,18 +315,24 @@ impl CrdsGossipPull {
|
|||
responses_expired_timeout: Vec<VersionedCrdsValue>,
|
||||
now: u64,
|
||||
stats: &mut ProcessPullStats,
|
||||
) {
|
||||
) -> Vec<(CrdsValueLabel, Hash, u64)> {
|
||||
let mut success = vec![];
|
||||
let mut owners = HashSet::new();
|
||||
for r in responses_expired_timeout {
|
||||
stats.failed_insert += crds.insert_versioned(r).is_err() as usize;
|
||||
}
|
||||
for r in responses {
|
||||
let owner = r.value.label().pubkey();
|
||||
let label = r.value.label();
|
||||
let wc = r.value.wallclock();
|
||||
let hash = r.value_hash;
|
||||
let old = crds.insert_versioned(r);
|
||||
if old.is_err() {
|
||||
stats.failed_insert += 1;
|
||||
} else {
|
||||
stats.success += 1;
|
||||
self.num_pulls += 1;
|
||||
success.push((label, hash, wc));
|
||||
}
|
||||
old.ok().map(|opt| {
|
||||
owners.insert(owner);
|
||||
|
@ -338,6 +346,7 @@ impl CrdsGossipPull {
|
|||
for owner in owners {
|
||||
crds.update_record_timestamp(&owner, now);
|
||||
}
|
||||
success
|
||||
}
|
||||
// build a set of filters of the current crds table
|
||||
// num_filters - used to increase the likelyhood of a value in crds being added to some filter
|
||||
|
|
|
@ -35,6 +35,7 @@ pub const CRDS_GOSSIP_PUSH_FANOUT: usize = 6;
|
|||
pub const CRDS_GOSSIP_PUSH_MSG_TIMEOUT_MS: u64 = 30000;
|
||||
pub const CRDS_GOSSIP_PRUNE_MSG_TIMEOUT_MS: u64 = 500;
|
||||
pub const CRDS_GOSSIP_PRUNE_STAKE_THRESHOLD_PCT: f64 = 0.15;
|
||||
pub const CRDS_GOSSIP_PRUNE_MIN_INGRESS_NODES: usize = 2;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct CrdsGossipPush {
|
||||
|
@ -44,12 +45,18 @@ pub struct CrdsGossipPush {
|
|||
active_set: IndexMap<Pubkey, Bloom<Pubkey>>,
|
||||
/// push message queue
|
||||
push_messages: HashMap<CrdsValueLabel, Hash>,
|
||||
/// cache that tracks which validators a message was received from
|
||||
received_cache: HashMap<Hash, (u64, HashSet<Pubkey>)>,
|
||||
/// Cache that tracks which validators a message was received from
|
||||
/// bool indicates it has been pruned.
|
||||
/// This cache represents a lagging view of which validators
|
||||
/// currently have this node in their `active_set`
|
||||
received_cache: HashMap<Pubkey, HashMap<Pubkey, (bool, u64)>>,
|
||||
pub num_active: usize,
|
||||
pub push_fanout: usize,
|
||||
pub msg_timeout: u64,
|
||||
pub prune_timeout: u64,
|
||||
pub num_total: usize,
|
||||
pub num_old: usize,
|
||||
pub num_pushes: usize,
|
||||
}
|
||||
|
||||
impl Default for CrdsGossipPush {
|
||||
|
@ -64,6 +71,9 @@ impl Default for CrdsGossipPush {
|
|||
push_fanout: CRDS_GOSSIP_PUSH_FANOUT,
|
||||
msg_timeout: CRDS_GOSSIP_PUSH_MSG_TIMEOUT_MS,
|
||||
prune_timeout: CRDS_GOSSIP_PRUNE_MSG_TIMEOUT_MS,
|
||||
num_total: 0,
|
||||
num_old: 0,
|
||||
num_pushes: 0,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -81,18 +91,21 @@ impl CrdsGossipPush {
|
|||
&mut self,
|
||||
self_pubkey: &Pubkey,
|
||||
origin: &Pubkey,
|
||||
hash: Hash,
|
||||
stakes: &HashMap<Pubkey, u64>,
|
||||
) -> Vec<Pubkey> {
|
||||
let origin_stake = stakes.get(origin).unwrap_or(&0);
|
||||
let self_stake = stakes.get(self_pubkey).unwrap_or(&0);
|
||||
let cache = self.received_cache.get(&hash);
|
||||
let cache = self.received_cache.get(origin);
|
||||
if cache.is_none() {
|
||||
return Vec::new();
|
||||
}
|
||||
let peers = cache.unwrap();
|
||||
|
||||
let peers = &cache.unwrap().1;
|
||||
let peer_stake_total: u64 = peers.iter().map(|p| stakes.get(p).unwrap_or(&0)).sum();
|
||||
let peer_stake_total: u64 = peers
|
||||
.iter()
|
||||
.filter(|v| !(v.1).0)
|
||||
.map(|v| stakes.get(v.0).unwrap_or(&0))
|
||||
.sum();
|
||||
let prune_stake_threshold = Self::prune_stake_threshold(*self_stake, *origin_stake);
|
||||
if peer_stake_total < prune_stake_threshold {
|
||||
return Vec::new();
|
||||
|
@ -100,7 +113,8 @@ impl CrdsGossipPush {
|
|||
|
||||
let staked_peers: Vec<(Pubkey, u64)> = peers
|
||||
.iter()
|
||||
.filter_map(|p| stakes.get(p).map(|s| (*p, *s)))
|
||||
.filter(|v| !(v.1).0)
|
||||
.filter_map(|p| stakes.get(p.0).map(|s| (*p.0, *s)))
|
||||
.filter(|(_, s)| *s > 0)
|
||||
.collect();
|
||||
|
||||
|
@ -117,16 +131,27 @@ impl CrdsGossipPush {
|
|||
let (next_peer, next_stake) = staked_peers[next];
|
||||
keep.insert(next_peer);
|
||||
peer_stake_sum += next_stake;
|
||||
if peer_stake_sum >= prune_stake_threshold {
|
||||
if peer_stake_sum >= prune_stake_threshold
|
||||
&& keep.len() >= CRDS_GOSSIP_PRUNE_MIN_INGRESS_NODES
|
||||
{
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
peers
|
||||
.iter()
|
||||
let pruned_peers: Vec<Pubkey> = peers
|
||||
.keys()
|
||||
.filter(|p| !keep.contains(p))
|
||||
.cloned()
|
||||
.collect()
|
||||
.collect();
|
||||
pruned_peers.iter().for_each(|p| {
|
||||
self.received_cache
|
||||
.get_mut(origin)
|
||||
.unwrap()
|
||||
.get_mut(p)
|
||||
.unwrap()
|
||||
.0 = true;
|
||||
});
|
||||
pruned_peers
|
||||
}
|
||||
|
||||
/// process a push message to the network
|
||||
|
@ -137,6 +162,7 @@ impl CrdsGossipPush {
|
|||
value: CrdsValue,
|
||||
now: u64,
|
||||
) -> Result<Option<VersionedCrdsValue>, CrdsGossipError> {
|
||||
self.num_total += 1;
|
||||
if now
|
||||
> value
|
||||
.wallclock()
|
||||
|
@ -149,21 +175,32 @@ impl CrdsGossipPush {
|
|||
return Err(CrdsGossipError::PushMessageTimeout);
|
||||
}
|
||||
let label = value.label();
|
||||
let origin = label.pubkey();
|
||||
let new_value = crds.new_versioned(now, value);
|
||||
let value_hash = new_value.value_hash;
|
||||
if let Some((_, ref mut received_set)) = self.received_cache.get_mut(&value_hash) {
|
||||
received_set.insert(*from);
|
||||
return Err(CrdsGossipError::PushMessageAlreadyReceived);
|
||||
}
|
||||
let received_set = self
|
||||
.received_cache
|
||||
.entry(origin)
|
||||
.or_insert_with(HashMap::new);
|
||||
received_set.entry(*from).or_insert((false, 0)).1 = now;
|
||||
|
||||
let old = crds.insert_versioned(new_value);
|
||||
if old.is_err() {
|
||||
self.num_old += 1;
|
||||
return Err(CrdsGossipError::PushMessageOldVersion);
|
||||
}
|
||||
let mut received_set = HashSet::new();
|
||||
received_set.insert(*from);
|
||||
self.push_messages.insert(label, value_hash);
|
||||
self.received_cache.insert(value_hash, (now, received_set));
|
||||
Ok(old.ok().and_then(|opt| opt))
|
||||
Ok(old.unwrap())
|
||||
}
|
||||
|
||||
/// push pull responses
|
||||
pub fn push_pull_responses(&mut self, values: Vec<(CrdsValueLabel, Hash, u64)>, now: u64) {
|
||||
for (label, value_hash, wc) in values {
|
||||
if now > wc.checked_add(self.msg_timeout).unwrap_or_else(|| 0) {
|
||||
continue;
|
||||
}
|
||||
self.push_messages.insert(label, value_hash);
|
||||
}
|
||||
}
|
||||
|
||||
/// New push message to broadcast to peers.
|
||||
|
@ -172,18 +209,10 @@ impl CrdsGossipPush {
|
|||
/// The list of push messages is created such that all the randomly selected peers have not
|
||||
/// pruned the source addresses.
|
||||
pub fn new_push_messages(&mut self, crds: &Crds, now: u64) -> HashMap<Pubkey, Vec<CrdsValue>> {
|
||||
let max = self.active_set.len();
|
||||
let mut nodes: Vec<_> = (0..max).collect();
|
||||
nodes.shuffle(&mut rand::thread_rng());
|
||||
let peers: Vec<Pubkey> = nodes
|
||||
.into_iter()
|
||||
.filter_map(|n| self.active_set.get_index(n))
|
||||
.take(self.push_fanout)
|
||||
.map(|n| *n.0)
|
||||
.collect();
|
||||
let mut total_bytes: usize = 0;
|
||||
let mut values = vec![];
|
||||
let mut push_messages: HashMap<Pubkey, Vec<CrdsValue>> = HashMap::new();
|
||||
trace!("new_push_messages {}", self.push_messages.len());
|
||||
for (label, hash) in &self.push_messages {
|
||||
let res = crds.lookup_versioned(label);
|
||||
if res.is_none() {
|
||||
|
@ -203,21 +232,37 @@ impl CrdsGossipPush {
|
|||
}
|
||||
values.push(value.clone());
|
||||
}
|
||||
trace!(
|
||||
"new_push_messages {} {}",
|
||||
values.len(),
|
||||
self.active_set.len()
|
||||
);
|
||||
for v in values {
|
||||
for p in peers.iter() {
|
||||
let filter = self.active_set.get_mut(p);
|
||||
if filter.is_some() && !filter.unwrap().contains(&v.label().pubkey()) {
|
||||
//use a consistent index for the same origin so
|
||||
//the active set learns the MST for that origin
|
||||
let start = v.label().pubkey().as_ref()[0] as usize;
|
||||
let max = self.push_fanout.min(self.active_set.len());
|
||||
for i in start..(start + max) {
|
||||
let ix = i % self.active_set.len();
|
||||
if let Some((p, filter)) = self.active_set.get_index(ix) {
|
||||
if !filter.contains(&v.label().pubkey()) {
|
||||
trace!("new_push_messages insert {} {:?}", *p, v);
|
||||
push_messages.entry(*p).or_default().push(v.clone());
|
||||
self.num_pushes += 1;
|
||||
}
|
||||
}
|
||||
self.push_messages.remove(&v.label());
|
||||
}
|
||||
}
|
||||
push_messages
|
||||
}
|
||||
|
||||
/// add the `from` to the peer's filter of nodes
|
||||
pub fn process_prune_msg(&mut self, peer: &Pubkey, origins: &[Pubkey]) {
|
||||
pub fn process_prune_msg(&mut self, self_pubkey: &Pubkey, peer: &Pubkey, origins: &[Pubkey]) {
|
||||
for origin in origins {
|
||||
if origin == self_pubkey {
|
||||
continue;
|
||||
}
|
||||
if let Some(p) = self.active_set.get_mut(peer) {
|
||||
p.add(origin)
|
||||
}
|
||||
|
@ -339,15 +384,11 @@ impl CrdsGossipPush {
|
|||
|
||||
/// purge received push message cache
|
||||
pub fn purge_old_received_cache(&mut self, min_time: u64) {
|
||||
let old_msgs: Vec<Hash> = self
|
||||
.received_cache
|
||||
.iter()
|
||||
.filter_map(|(k, (rcvd_time, _))| if *rcvd_time < min_time { Some(k) } else { None })
|
||||
.cloned()
|
||||
.collect();
|
||||
for k in old_msgs {
|
||||
self.received_cache.remove(&k);
|
||||
}
|
||||
self.received_cache
|
||||
.iter_mut()
|
||||
.for_each(|v| v.1.retain(|_, v| v.1 > min_time));
|
||||
|
||||
self.received_cache.retain(|_, v| !v.is_empty());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -371,7 +412,6 @@ mod test {
|
|||
let value = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
|
||||
&origin, 0,
|
||||
)));
|
||||
let label = value.label();
|
||||
let low_staked_peers = (0..10).map(|_| Pubkey::new_rand());
|
||||
let mut low_staked_set = HashSet::new();
|
||||
low_staked_peers.for_each(|p| {
|
||||
|
@ -380,11 +420,7 @@ mod test {
|
|||
stakes.insert(p, 1);
|
||||
});
|
||||
|
||||
let versioned = crds
|
||||
.lookup_versioned(&label)
|
||||
.expect("versioned value should exist");
|
||||
let hash = versioned.value_hash;
|
||||
let pruned = push.prune_received_cache(&self_id, &origin, hash, &stakes);
|
||||
let pruned = push.prune_received_cache(&self_id, &origin, &stakes);
|
||||
assert!(
|
||||
pruned.is_empty(),
|
||||
"should not prune if min threshold has not been reached"
|
||||
|
@ -395,7 +431,7 @@ mod test {
|
|||
stakes.insert(high_staked_peer, high_stake);
|
||||
let _ = push.process_push_message(&mut crds, &high_staked_peer, value, 0);
|
||||
|
||||
let pruned = push.prune_received_cache(&self_id, &origin, hash, &stakes);
|
||||
let pruned = push.prune_received_cache(&self_id, &origin, &stakes);
|
||||
assert!(
|
||||
pruned.len() < low_staked_set.len() + 1,
|
||||
"should not prune all peers"
|
||||
|
@ -409,7 +445,7 @@ mod test {
|
|||
}
|
||||
|
||||
#[test]
|
||||
fn test_process_push() {
|
||||
fn test_process_push_one() {
|
||||
let mut crds = Crds::default();
|
||||
let mut push = CrdsGossipPush::default();
|
||||
let value = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
|
||||
|
@ -425,9 +461,9 @@ mod test {
|
|||
assert_eq!(crds.lookup(&label), Some(&value));
|
||||
|
||||
// push it again
|
||||
assert_eq!(
|
||||
assert_matches!(
|
||||
push.process_push_message(&mut crds, &Pubkey::default(), value, 0),
|
||||
Err(CrdsGossipError::PushMessageAlreadyReceived)
|
||||
Err(CrdsGossipError::PushMessageOldVersion)
|
||||
);
|
||||
}
|
||||
#[test]
|
||||
|
@ -690,6 +726,7 @@ mod test {
|
|||
#[test]
|
||||
fn test_process_prune() {
|
||||
let mut crds = Crds::default();
|
||||
let self_id = Pubkey::new_rand();
|
||||
let mut push = CrdsGossipPush::default();
|
||||
let peer = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
|
||||
&Pubkey::new_rand(),
|
||||
|
@ -707,7 +744,11 @@ mod test {
|
|||
push.process_push_message(&mut crds, &Pubkey::default(), new_msg.clone(), 0),
|
||||
Ok(None)
|
||||
);
|
||||
push.process_prune_msg(&peer.label().pubkey(), &[new_msg.label().pubkey()]);
|
||||
push.process_prune_msg(
|
||||
&self_id,
|
||||
&peer.label().pubkey(),
|
||||
&[new_msg.label().pubkey()],
|
||||
);
|
||||
assert_eq!(push.new_push_messages(&crds, 0), expected);
|
||||
}
|
||||
#[test]
|
||||
|
@ -749,9 +790,9 @@ mod test {
|
|||
assert_eq!(crds.lookup(&label), Some(&value));
|
||||
|
||||
// push it again
|
||||
assert_eq!(
|
||||
assert_matches!(
|
||||
push.process_push_message(&mut crds, &Pubkey::default(), value.clone(), 0),
|
||||
Err(CrdsGossipError::PushMessageAlreadyReceived)
|
||||
Err(CrdsGossipError::PushMessageOldVersion)
|
||||
);
|
||||
|
||||
// purge the old pushed
|
||||
|
|
|
@ -952,7 +952,6 @@ impl ReplayStage {
|
|||
progress.get_fork_stats(bank.slot()).unwrap().total_staked,
|
||||
lockouts_sender,
|
||||
);
|
||||
|
||||
Self::push_vote(
|
||||
cluster_info,
|
||||
bank,
|
||||
|
@ -1044,6 +1043,7 @@ impl ReplayStage {
|
|||
let blockhash = bank.last_blockhash();
|
||||
vote_tx.partial_sign(&[node_keypair.as_ref()], blockhash);
|
||||
vote_tx.partial_sign(&[authorized_voter_keypair.as_ref()], blockhash);
|
||||
let _ = cluster_info.send_vote(&vote_tx);
|
||||
cluster_info.push_vote(tower_index, vote_tx);
|
||||
}
|
||||
|
||||
|
|
|
@ -4,13 +4,14 @@ extern crate log;
|
|||
use rayon::iter::*;
|
||||
use solana_core::cluster_info::{ClusterInfo, Node};
|
||||
use solana_core::gossip_service::GossipService;
|
||||
use solana_ledger::bank_forks::BankForks;
|
||||
|
||||
use solana_perf::packet::Packet;
|
||||
use solana_sdk::signature::{Keypair, Signer};
|
||||
use solana_sdk::timing::timestamp;
|
||||
use std::net::UdpSocket;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::Arc;
|
||||
use std::sync::{Arc, RwLock};
|
||||
use std::thread::sleep;
|
||||
use std::time::Duration;
|
||||
|
||||
|
@ -27,6 +28,28 @@ fn test_node(exit: &Arc<AtomicBool>) -> (Arc<ClusterInfo>, GossipService, UdpSoc
|
|||
)
|
||||
}
|
||||
|
||||
fn test_node_with_bank(
|
||||
node_keypair: Keypair,
|
||||
exit: &Arc<AtomicBool>,
|
||||
bank_forks: Arc<RwLock<BankForks>>,
|
||||
) -> (Arc<ClusterInfo>, GossipService, UdpSocket) {
|
||||
let keypair = Arc::new(node_keypair);
|
||||
let mut test_node = Node::new_localhost_with_pubkey(&keypair.pubkey());
|
||||
let cluster_info = Arc::new(ClusterInfo::new(test_node.info.clone(), keypair));
|
||||
let gossip_service = GossipService::new(
|
||||
&cluster_info,
|
||||
Some(bank_forks),
|
||||
test_node.sockets.gossip,
|
||||
exit,
|
||||
);
|
||||
let _ = cluster_info.my_contact_info();
|
||||
(
|
||||
cluster_info,
|
||||
gossip_service,
|
||||
test_node.sockets.tvu.pop().unwrap(),
|
||||
)
|
||||
}
|
||||
|
||||
/// Test that the network converges.
|
||||
/// Run until every node in the network has a full ContactInfo set.
|
||||
/// Check that nodes stop sending updates after all the ContactInfo has been shared.
|
||||
|
@ -181,3 +204,120 @@ pub fn cluster_info_retransmit() {
|
|||
dr2.join().unwrap();
|
||||
dr3.join().unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[ignore]
|
||||
pub fn cluster_info_scale() {
|
||||
use solana_measure::measure::Measure;
|
||||
use solana_perf::test_tx::test_tx;
|
||||
use solana_runtime::bank::Bank;
|
||||
use solana_runtime::genesis_utils::{
|
||||
create_genesis_config_with_vote_accounts, ValidatorVoteKeypairs,
|
||||
};
|
||||
solana_logger::setup();
|
||||
let exit = Arc::new(AtomicBool::new(false));
|
||||
let num_nodes: usize = std::env::var("NUM_NODES")
|
||||
.unwrap_or_else(|_| "10".to_string())
|
||||
.parse()
|
||||
.expect("could not parse NUM_NODES as a number");
|
||||
|
||||
let vote_keypairs: Vec<_> = (0..num_nodes)
|
||||
.map(|_| ValidatorVoteKeypairs::new_rand())
|
||||
.collect();
|
||||
let genesis_config_info = create_genesis_config_with_vote_accounts(10_000, &vote_keypairs, 100);
|
||||
let bank0 = Bank::new(&genesis_config_info.genesis_config);
|
||||
let bank_forks = Arc::new(RwLock::new(BankForks::new(0, bank0)));
|
||||
|
||||
let nodes: Vec<_> = vote_keypairs
|
||||
.into_iter()
|
||||
.map(|keypairs| test_node_with_bank(keypairs.node_keypair, &exit, bank_forks.clone()))
|
||||
.collect();
|
||||
let ci0 = nodes[0].0.my_contact_info();
|
||||
for node in &nodes[1..] {
|
||||
node.0.insert_info(ci0.clone());
|
||||
}
|
||||
|
||||
let mut time = Measure::start("time");
|
||||
let mut done;
|
||||
let mut success = false;
|
||||
for _ in 0..30 {
|
||||
done = true;
|
||||
for (i, node) in nodes.iter().enumerate() {
|
||||
warn!("node {} peers: {}", i, node.0.gossip_peers().len());
|
||||
if node.0.gossip_peers().len() != num_nodes - 1 {
|
||||
done = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if done {
|
||||
success = true;
|
||||
break;
|
||||
}
|
||||
sleep(Duration::from_secs(1));
|
||||
}
|
||||
time.stop();
|
||||
warn!("found {} nodes in {} success: {}", num_nodes, time, success);
|
||||
|
||||
for num_votes in 1..1000 {
|
||||
let mut time = Measure::start("votes");
|
||||
let tx = test_tx();
|
||||
warn!("tx.message.account_keys: {:?}", tx.message.account_keys);
|
||||
nodes[0].0.push_vote(0, tx.clone());
|
||||
let mut success = false;
|
||||
for _ in 0..(30 * 5) {
|
||||
let mut not_done = 0;
|
||||
let mut num_old = 0;
|
||||
let mut num_push_total = 0;
|
||||
let mut num_pushes = 0;
|
||||
let mut num_pulls = 0;
|
||||
let mut num_inserts = 0;
|
||||
for node in nodes.iter() {
|
||||
//if node.0.get_votes(0).1.len() != (num_nodes * num_votes) {
|
||||
let has_tx = node
|
||||
.0
|
||||
.get_votes(0)
|
||||
.1
|
||||
.iter()
|
||||
.filter(|v| v.message.account_keys == tx.message.account_keys)
|
||||
.count();
|
||||
num_old += node.0.gossip.read().unwrap().push.num_old;
|
||||
num_push_total += node.0.gossip.read().unwrap().push.num_total;
|
||||
num_pushes += node.0.gossip.read().unwrap().push.num_pushes;
|
||||
num_pulls += node.0.gossip.read().unwrap().pull.num_pulls;
|
||||
num_inserts += node.0.gossip.read().unwrap().crds.num_inserts;
|
||||
if has_tx == 0 {
|
||||
not_done += 1;
|
||||
}
|
||||
}
|
||||
warn!("not_done: {}/{}", not_done, nodes.len());
|
||||
warn!("num_old: {}", num_old);
|
||||
warn!("num_push_total: {}", num_push_total);
|
||||
warn!("num_pushes: {}", num_pushes);
|
||||
warn!("num_pulls: {}", num_pulls);
|
||||
warn!("num_inserts: {}", num_inserts);
|
||||
success = not_done < (nodes.len() / 20);
|
||||
if success {
|
||||
break;
|
||||
}
|
||||
sleep(Duration::from_millis(200));
|
||||
}
|
||||
time.stop();
|
||||
warn!(
|
||||
"propagated vote {} in {} success: {}",
|
||||
num_votes, time, success
|
||||
);
|
||||
sleep(Duration::from_millis(200));
|
||||
for node in nodes.iter() {
|
||||
node.0.gossip.write().unwrap().push.num_old = 0;
|
||||
node.0.gossip.write().unwrap().push.num_total = 0;
|
||||
node.0.gossip.write().unwrap().push.num_pushes = 0;
|
||||
node.0.gossip.write().unwrap().pull.num_pulls = 0;
|
||||
node.0.gossip.write().unwrap().crds.num_inserts = 0;
|
||||
}
|
||||
}
|
||||
|
||||
exit.store(true, Ordering::Relaxed);
|
||||
for node in nodes {
|
||||
node.1.join().unwrap();
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue