removes id and shred_version from CrdsGossip (#18505)
ClusterInfo is the gateway to CrdsGossip function calls, and it already has node's pubkey and shred version (full ContactInfo and Keypair in fact). Duplicating these data in CrdsGossip adds redundancy and possibility for bugs should they not be consistent with ClusterInfo.
This commit is contained in:
parent
a5b91ef4c3
commit
4e1333fbe6
|
@ -485,11 +485,6 @@ impl ClusterInfo {
|
||||||
contact_info_path: PathBuf::default(),
|
contact_info_path: PathBuf::default(),
|
||||||
contact_save_interval: 0, // disabled
|
contact_save_interval: 0, // disabled
|
||||||
};
|
};
|
||||||
{
|
|
||||||
let mut gossip = me.gossip.write().unwrap();
|
|
||||||
gossip.set_self(&id);
|
|
||||||
gossip.set_shred_version(me.my_shred_version());
|
|
||||||
}
|
|
||||||
me.insert_self();
|
me.insert_self();
|
||||||
me.push_self(&HashMap::new(), None);
|
me.push_self(&HashMap::new(), None);
|
||||||
me
|
me
|
||||||
|
@ -497,8 +492,7 @@ impl ClusterInfo {
|
||||||
|
|
||||||
// Should only be used by tests and simulations
|
// Should only be used by tests and simulations
|
||||||
pub fn clone_with_id(&self, new_id: &Pubkey) -> Self {
|
pub fn clone_with_id(&self, new_id: &Pubkey) -> Self {
|
||||||
let mut gossip = self.gossip.read().unwrap().mock_clone();
|
let gossip = self.gossip.read().unwrap().mock_clone();
|
||||||
gossip.id = *new_id;
|
|
||||||
let mut my_contact_info = self.my_contact_info.read().unwrap().clone();
|
let mut my_contact_info = self.my_contact_info.read().unwrap().clone();
|
||||||
my_contact_info.id = *new_id;
|
my_contact_info.id = *new_id;
|
||||||
ClusterInfo {
|
ClusterInfo {
|
||||||
|
@ -545,10 +539,17 @@ impl ClusterInfo {
|
||||||
.lock()
|
.lock()
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.extend(entries);
|
.extend(entries);
|
||||||
self.gossip
|
let ContactInfo {
|
||||||
.write()
|
id: self_pubkey,
|
||||||
.unwrap()
|
shred_version,
|
||||||
.refresh_push_active_set(stakes, gossip_validators);
|
..
|
||||||
|
} = *self.my_contact_info.read().unwrap();
|
||||||
|
self.gossip.write().unwrap().refresh_push_active_set(
|
||||||
|
&self_pubkey,
|
||||||
|
shred_version,
|
||||||
|
stakes,
|
||||||
|
gossip_validators,
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO kill insert_info, only used by tests
|
// TODO kill insert_info, only used by tests
|
||||||
|
@ -686,8 +687,6 @@ impl ClusterInfo {
|
||||||
|
|
||||||
pub fn set_keypair(&self, new_keypair: Arc<Keypair>) {
|
pub fn set_keypair(&self, new_keypair: Arc<Keypair>) {
|
||||||
let id = new_keypair.pubkey();
|
let id = new_keypair.pubkey();
|
||||||
|
|
||||||
self.gossip.write().unwrap().set_self(&id);
|
|
||||||
{
|
{
|
||||||
let mut instance = self.instance.write().unwrap();
|
let mut instance = self.instance.write().unwrap();
|
||||||
*instance = NodeInstance::new(&mut thread_rng(), id, timestamp());
|
*instance = NodeInstance::new(&mut thread_rng(), id, timestamp());
|
||||||
|
@ -1499,6 +1498,7 @@ impl ClusterInfo {
|
||||||
match gossip.new_pull_request(
|
match gossip.new_pull_request(
|
||||||
thread_pool,
|
thread_pool,
|
||||||
self.keypair().deref(),
|
self.keypair().deref(),
|
||||||
|
self.my_shred_version(),
|
||||||
now,
|
now,
|
||||||
gossip_validators,
|
gossip_validators,
|
||||||
stakes,
|
stakes,
|
||||||
|
@ -1676,10 +1676,6 @@ impl ClusterInfo {
|
||||||
entrypoint.shred_version, entrypoint.id
|
entrypoint.shred_version, entrypoint.id
|
||||||
);
|
);
|
||||||
self.my_contact_info.write().unwrap().shred_version = entrypoint.shred_version;
|
self.my_contact_info.write().unwrap().shred_version = entrypoint.shred_version;
|
||||||
self.gossip
|
|
||||||
.write()
|
|
||||||
.unwrap()
|
|
||||||
.set_shred_version(entrypoint.shred_version);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
self.my_shred_version() != 0
|
self.my_shred_version() != 0
|
||||||
|
@ -1694,14 +1690,15 @@ impl ClusterInfo {
|
||||||
bank_forks: Option<&RwLock<BankForks>>,
|
bank_forks: Option<&RwLock<BankForks>>,
|
||||||
stakes: &HashMap<Pubkey, u64>,
|
stakes: &HashMap<Pubkey, u64>,
|
||||||
) {
|
) {
|
||||||
|
let self_pubkey = self.id();
|
||||||
let epoch_duration = get_epoch_duration(bank_forks);
|
let epoch_duration = get_epoch_duration(bank_forks);
|
||||||
let timeouts = {
|
let timeouts = {
|
||||||
let gossip = self.gossip.read().unwrap();
|
let gossip = self.gossip.read().unwrap();
|
||||||
gossip.make_timeouts(stakes, epoch_duration)
|
gossip.make_timeouts(self_pubkey, stakes, epoch_duration)
|
||||||
};
|
};
|
||||||
let num_purged = self
|
let num_purged = self
|
||||||
.time_gossip_write_lock("purge", &self.stats.purge)
|
.time_gossip_write_lock("purge", &self.stats.purge)
|
||||||
.purge(thread_pool, timestamp(), &timeouts);
|
.purge(&self_pubkey, thread_pool, timestamp(), &timeouts);
|
||||||
inc_new_counter_info!("cluster_info-purge-count", num_purged);
|
inc_new_counter_info!("cluster_info-purge-count", num_purged);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1849,11 +1846,13 @@ impl ClusterInfo {
|
||||||
);
|
);
|
||||||
let mut prune_message_timeout = 0;
|
let mut prune_message_timeout = 0;
|
||||||
let mut bad_prune_destination = 0;
|
let mut bad_prune_destination = 0;
|
||||||
|
let self_pubkey = self.id();
|
||||||
{
|
{
|
||||||
let gossip = self.time_gossip_read_lock("process_prune", &self.stats.process_prune);
|
let gossip = self.time_gossip_read_lock("process_prune", &self.stats.process_prune);
|
||||||
let now = timestamp();
|
let now = timestamp();
|
||||||
for (from, data) in messages {
|
for (from, data) in messages {
|
||||||
match gossip.process_prune_msg(
|
match gossip.process_prune_msg(
|
||||||
|
&self_pubkey,
|
||||||
&from,
|
&from,
|
||||||
&data.destination,
|
&data.destination,
|
||||||
&data.prunes,
|
&data.prunes,
|
||||||
|
@ -2140,9 +2139,10 @@ impl ClusterInfo {
|
||||||
.reduce(HashMap::new, merge)
|
.reduce(HashMap::new, merge)
|
||||||
});
|
});
|
||||||
if !responses.is_empty() {
|
if !responses.is_empty() {
|
||||||
|
let self_pubkey = self.id();
|
||||||
let timeouts = {
|
let timeouts = {
|
||||||
let gossip = self.gossip.read().unwrap();
|
let gossip = self.gossip.read().unwrap();
|
||||||
gossip.make_timeouts(stakes, epoch_duration)
|
gossip.make_timeouts(self_pubkey, stakes, epoch_duration)
|
||||||
};
|
};
|
||||||
for (from, data) in responses {
|
for (from, data) in responses {
|
||||||
self.handle_pull_response(&from, data, &timeouts);
|
self.handle_pull_response(&from, data, &timeouts);
|
||||||
|
@ -2290,9 +2290,10 @@ impl ClusterInfo {
|
||||||
.collect()
|
.collect()
|
||||||
};
|
};
|
||||||
// Generate prune messages.
|
// Generate prune messages.
|
||||||
|
let self_pubkey = self.id();
|
||||||
let prunes = self
|
let prunes = self
|
||||||
.time_gossip_write_lock("prune_received_cache", &self.stats.prune_received_cache)
|
.time_gossip_write_lock("prune_received_cache", &self.stats.prune_received_cache)
|
||||||
.prune_received_cache(origins, stakes);
|
.prune_received_cache(&self_pubkey, origins, stakes);
|
||||||
let prunes: Vec<(Pubkey /*from*/, Vec<Pubkey> /*origins*/)> = prunes
|
let prunes: Vec<(Pubkey /*from*/, Vec<Pubkey> /*origins*/)> = prunes
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.flat_map(|(from, prunes)| {
|
.flat_map(|(from, prunes)| {
|
||||||
|
@ -3377,17 +3378,21 @@ mod tests {
|
||||||
let (spy, _, _) = ClusterInfo::spy_node(solana_sdk::pubkey::new_rand(), 0);
|
let (spy, _, _) = ClusterInfo::spy_node(solana_sdk::pubkey::new_rand(), 0);
|
||||||
let cluster_info = Arc::new(ClusterInfo::new_with_invalid_keypair(node.info));
|
let cluster_info = Arc::new(ClusterInfo::new_with_invalid_keypair(node.info));
|
||||||
cluster_info.insert_info(spy);
|
cluster_info.insert_info(spy);
|
||||||
cluster_info
|
{
|
||||||
.gossip
|
let mut gossip = cluster_info.gossip.write().unwrap();
|
||||||
.write()
|
gossip.refresh_push_active_set(
|
||||||
.unwrap()
|
&cluster_info.id(),
|
||||||
.refresh_push_active_set(&HashMap::new(), None);
|
cluster_info.my_shred_version(),
|
||||||
|
&HashMap::new(), // stakes
|
||||||
|
None, // gossip validators
|
||||||
|
);
|
||||||
|
}
|
||||||
let reqs = cluster_info.generate_new_gossip_requests(
|
let reqs = cluster_info.generate_new_gossip_requests(
|
||||||
&thread_pool,
|
&thread_pool,
|
||||||
None, // gossip_validators
|
None, // gossip_validators
|
||||||
&HashMap::new(),
|
&HashMap::new(), // stakes
|
||||||
true, // generate_pull_requests
|
true, // generate_pull_requests
|
||||||
false, // require_stake_for_gossip
|
false, // require_stake_for_gossip
|
||||||
);
|
);
|
||||||
//assert none of the addrs are invalid.
|
//assert none of the addrs are invalid.
|
||||||
reqs.iter().all(|(addr, _)| {
|
reqs.iter().all(|(addr, _)| {
|
||||||
|
@ -3493,11 +3498,15 @@ mod tests {
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.mock_pong(peer.id, peer.gossip, Instant::now());
|
.mock_pong(peer.id, peer.gossip, Instant::now());
|
||||||
cluster_info.insert_info(peer);
|
cluster_info.insert_info(peer);
|
||||||
cluster_info
|
{
|
||||||
.gossip
|
let mut gossip = cluster_info.gossip.write().unwrap();
|
||||||
.write()
|
gossip.refresh_push_active_set(
|
||||||
.unwrap()
|
&cluster_info.id(),
|
||||||
.refresh_push_active_set(&HashMap::new(), None);
|
cluster_info.my_shred_version(),
|
||||||
|
&HashMap::new(), // stakes
|
||||||
|
None, // gossip validators
|
||||||
|
);
|
||||||
|
}
|
||||||
//check that all types of gossip messages are signed correctly
|
//check that all types of gossip messages are signed correctly
|
||||||
let push_messages = cluster_info
|
let push_messages = cluster_info
|
||||||
.gossip
|
.gossip
|
||||||
|
@ -3518,6 +3527,7 @@ mod tests {
|
||||||
.new_pull_request(
|
.new_pull_request(
|
||||||
&thread_pool,
|
&thread_pool,
|
||||||
cluster_info.keypair().deref(),
|
cluster_info.keypair().deref(),
|
||||||
|
cluster_info.my_shred_version(),
|
||||||
timestamp(),
|
timestamp(),
|
||||||
None,
|
None,
|
||||||
&HashMap::new(),
|
&HashMap::new(),
|
||||||
|
@ -3818,6 +3828,7 @@ mod tests {
|
||||||
let timeouts = {
|
let timeouts = {
|
||||||
let gossip = cluster_info.gossip.read().unwrap();
|
let gossip = cluster_info.gossip.read().unwrap();
|
||||||
gossip.make_timeouts(
|
gossip.make_timeouts(
|
||||||
|
cluster_info.id(),
|
||||||
&HashMap::default(), // stakes,
|
&HashMap::default(), // stakes,
|
||||||
Duration::from_millis(gossip.pull.crds_timeout),
|
Duration::from_millis(gossip.pull.crds_timeout),
|
||||||
)
|
)
|
||||||
|
|
|
@ -32,34 +32,14 @@ use {
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
|
||||||
|
#[derive(Default)]
|
||||||
pub struct CrdsGossip {
|
pub struct CrdsGossip {
|
||||||
pub crds: Crds,
|
pub crds: Crds,
|
||||||
pub id: Pubkey,
|
|
||||||
pub shred_version: u16,
|
|
||||||
pub push: CrdsGossipPush,
|
pub push: CrdsGossipPush,
|
||||||
pub pull: CrdsGossipPull,
|
pub pull: CrdsGossipPull,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Default for CrdsGossip {
|
|
||||||
fn default() -> Self {
|
|
||||||
CrdsGossip {
|
|
||||||
crds: Crds::default(),
|
|
||||||
id: Pubkey::default(),
|
|
||||||
shred_version: 0,
|
|
||||||
push: CrdsGossipPush::default(),
|
|
||||||
pull: CrdsGossipPull::default(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl CrdsGossip {
|
impl CrdsGossip {
|
||||||
pub fn set_self(&mut self, id: &Pubkey) {
|
|
||||||
self.id = *id;
|
|
||||||
}
|
|
||||||
pub fn set_shred_version(&mut self, shred_version: u16) {
|
|
||||||
self.shred_version = shred_version;
|
|
||||||
}
|
|
||||||
|
|
||||||
/// process a push message to the network
|
/// process a push message to the network
|
||||||
/// Returns unique origins' pubkeys of upserted values.
|
/// Returns unique origins' pubkeys of upserted values.
|
||||||
pub fn process_push_message(
|
pub fn process_push_message(
|
||||||
|
@ -83,18 +63,18 @@ impl CrdsGossip {
|
||||||
/// remove redundant paths in the network
|
/// remove redundant paths in the network
|
||||||
pub fn prune_received_cache<I>(
|
pub fn prune_received_cache<I>(
|
||||||
&mut self,
|
&mut self,
|
||||||
|
self_pubkey: &Pubkey,
|
||||||
origins: I, // Unique pubkeys of crds values' owners.
|
origins: I, // Unique pubkeys of crds values' owners.
|
||||||
stakes: &HashMap<Pubkey, u64>,
|
stakes: &HashMap<Pubkey, u64>,
|
||||||
) -> HashMap</*gossip peer:*/ Pubkey, /*origins:*/ Vec<Pubkey>>
|
) -> HashMap</*gossip peer:*/ Pubkey, /*origins:*/ Vec<Pubkey>>
|
||||||
where
|
where
|
||||||
I: IntoIterator<Item = Pubkey>,
|
I: IntoIterator<Item = Pubkey>,
|
||||||
{
|
{
|
||||||
let self_pubkey = self.id;
|
|
||||||
origins
|
origins
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.flat_map(|origin| {
|
.flat_map(|origin| {
|
||||||
self.push
|
self.push
|
||||||
.prune_received_cache(&self_pubkey, &origin, stakes)
|
.prune_received_cache(self_pubkey, &origin, stakes)
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.zip(std::iter::repeat(origin))
|
.zip(std::iter::repeat(origin))
|
||||||
})
|
})
|
||||||
|
@ -179,6 +159,7 @@ impl CrdsGossip {
|
||||||
/// add the `from` to the peer's filter of nodes
|
/// add the `from` to the peer's filter of nodes
|
||||||
pub fn process_prune_msg(
|
pub fn process_prune_msg(
|
||||||
&self,
|
&self,
|
||||||
|
self_pubkey: &Pubkey,
|
||||||
peer: &Pubkey,
|
peer: &Pubkey,
|
||||||
destination: &Pubkey,
|
destination: &Pubkey,
|
||||||
origin: &[Pubkey],
|
origin: &[Pubkey],
|
||||||
|
@ -189,8 +170,8 @@ impl CrdsGossip {
|
||||||
if expired {
|
if expired {
|
||||||
return Err(CrdsGossipError::PruneMessageTimeout);
|
return Err(CrdsGossipError::PruneMessageTimeout);
|
||||||
}
|
}
|
||||||
if self.id == *destination {
|
if self_pubkey == destination {
|
||||||
self.push.process_prune_msg(&self.id, peer, origin);
|
self.push.process_prune_msg(self_pubkey, peer, origin);
|
||||||
Ok(())
|
Ok(())
|
||||||
} else {
|
} else {
|
||||||
Err(CrdsGossipError::BadPruneDestination)
|
Err(CrdsGossipError::BadPruneDestination)
|
||||||
|
@ -201,6 +182,8 @@ impl CrdsGossip {
|
||||||
/// * ratio - number of actives to rotate
|
/// * ratio - number of actives to rotate
|
||||||
pub fn refresh_push_active_set(
|
pub fn refresh_push_active_set(
|
||||||
&mut self,
|
&mut self,
|
||||||
|
self_pubkey: &Pubkey,
|
||||||
|
self_shred_version: u16,
|
||||||
stakes: &HashMap<Pubkey, u64>,
|
stakes: &HashMap<Pubkey, u64>,
|
||||||
gossip_validators: Option<&HashSet<Pubkey>>,
|
gossip_validators: Option<&HashSet<Pubkey>>,
|
||||||
) {
|
) {
|
||||||
|
@ -208,18 +191,20 @@ impl CrdsGossip {
|
||||||
&self.crds,
|
&self.crds,
|
||||||
stakes,
|
stakes,
|
||||||
gossip_validators,
|
gossip_validators,
|
||||||
&self.id,
|
self_pubkey,
|
||||||
self.shred_version,
|
self_shred_version,
|
||||||
self.crds.num_nodes(),
|
self.crds.num_nodes(),
|
||||||
CRDS_GOSSIP_NUM_ACTIVE,
|
CRDS_GOSSIP_NUM_ACTIVE,
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// generate a random request
|
/// generate a random request
|
||||||
|
#[allow(clippy::too_many_arguments)]
|
||||||
pub fn new_pull_request(
|
pub fn new_pull_request(
|
||||||
&self,
|
&self,
|
||||||
thread_pool: &ThreadPool,
|
thread_pool: &ThreadPool,
|
||||||
self_keypair: &Keypair,
|
self_keypair: &Keypair,
|
||||||
|
self_shred_version: u16,
|
||||||
now: u64,
|
now: u64,
|
||||||
gossip_validators: Option<&HashSet<Pubkey>>,
|
gossip_validators: Option<&HashSet<Pubkey>>,
|
||||||
stakes: &HashMap<Pubkey, u64>,
|
stakes: &HashMap<Pubkey, u64>,
|
||||||
|
@ -231,7 +216,7 @@ impl CrdsGossip {
|
||||||
thread_pool,
|
thread_pool,
|
||||||
&self.crds,
|
&self.crds,
|
||||||
self_keypair,
|
self_keypair,
|
||||||
self.shred_version,
|
self_shred_version,
|
||||||
now,
|
now,
|
||||||
gossip_validators,
|
gossip_validators,
|
||||||
stakes,
|
stakes,
|
||||||
|
@ -305,14 +290,16 @@ impl CrdsGossip {
|
||||||
|
|
||||||
pub fn make_timeouts(
|
pub fn make_timeouts(
|
||||||
&self,
|
&self,
|
||||||
|
self_pubkey: Pubkey,
|
||||||
stakes: &HashMap<Pubkey, u64>,
|
stakes: &HashMap<Pubkey, u64>,
|
||||||
epoch_duration: Duration,
|
epoch_duration: Duration,
|
||||||
) -> HashMap<Pubkey, u64> {
|
) -> HashMap<Pubkey, u64> {
|
||||||
self.pull.make_timeouts(self.id, stakes, epoch_duration)
|
self.pull.make_timeouts(self_pubkey, stakes, epoch_duration)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn purge(
|
pub fn purge(
|
||||||
&mut self,
|
&mut self,
|
||||||
|
self_pubkey: &Pubkey,
|
||||||
thread_pool: &ThreadPool,
|
thread_pool: &ThreadPool,
|
||||||
now: u64,
|
now: u64,
|
||||||
timeouts: &HashMap<Pubkey, u64>,
|
timeouts: &HashMap<Pubkey, u64>,
|
||||||
|
@ -324,7 +311,7 @@ impl CrdsGossip {
|
||||||
}
|
}
|
||||||
if now > self.pull.crds_timeout {
|
if now > self.pull.crds_timeout {
|
||||||
//sanity check
|
//sanity check
|
||||||
assert_eq!(timeouts[&self.id], std::u64::MAX);
|
assert_eq!(timeouts[self_pubkey], std::u64::MAX);
|
||||||
assert!(timeouts.contains_key(&Pubkey::default()));
|
assert!(timeouts.contains_key(&Pubkey::default()));
|
||||||
rv = self
|
rv = self
|
||||||
.pull
|
.pull
|
||||||
|
@ -342,7 +329,6 @@ impl CrdsGossip {
|
||||||
crds: self.crds.clone(),
|
crds: self.crds.clone(),
|
||||||
push: self.push.mock_clone(),
|
push: self.push.mock_clone(),
|
||||||
pull: self.pull.mock_clone(),
|
pull: self.pull.mock_clone(),
|
||||||
..*self
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -374,11 +360,8 @@ mod test {
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_prune_errors() {
|
fn test_prune_errors() {
|
||||||
let mut crds_gossip = CrdsGossip {
|
let mut crds_gossip = CrdsGossip::default();
|
||||||
id: Pubkey::new(&[0; 32]),
|
let id = Pubkey::new(&[0; 32]);
|
||||||
..CrdsGossip::default()
|
|
||||||
};
|
|
||||||
let id = crds_gossip.id;
|
|
||||||
let ci = ContactInfo::new_localhost(&Pubkey::new(&[1; 32]), 0);
|
let ci = ContactInfo::new_localhost(&Pubkey::new(&[1; 32]), 0);
|
||||||
let prune_pubkey = Pubkey::new(&[2; 32]);
|
let prune_pubkey = Pubkey::new(&[2; 32]);
|
||||||
crds_gossip
|
crds_gossip
|
||||||
|
@ -388,10 +371,16 @@ mod test {
|
||||||
0,
|
0,
|
||||||
)
|
)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
crds_gossip.refresh_push_active_set(&HashMap::new(), None);
|
crds_gossip.refresh_push_active_set(
|
||||||
|
&id,
|
||||||
|
0, // shred version
|
||||||
|
&HashMap::new(), // stakes
|
||||||
|
None, // gossip validators
|
||||||
|
);
|
||||||
let now = timestamp();
|
let now = timestamp();
|
||||||
//incorrect dest
|
//incorrect dest
|
||||||
let mut res = crds_gossip.process_prune_msg(
|
let mut res = crds_gossip.process_prune_msg(
|
||||||
|
&id,
|
||||||
&ci.id,
|
&ci.id,
|
||||||
&Pubkey::new(hash(&[1; 32]).as_ref()),
|
&Pubkey::new(hash(&[1; 32]).as_ref()),
|
||||||
&[prune_pubkey],
|
&[prune_pubkey],
|
||||||
|
@ -400,11 +389,25 @@ mod test {
|
||||||
);
|
);
|
||||||
assert_eq!(res.err(), Some(CrdsGossipError::BadPruneDestination));
|
assert_eq!(res.err(), Some(CrdsGossipError::BadPruneDestination));
|
||||||
//correct dest
|
//correct dest
|
||||||
res = crds_gossip.process_prune_msg(&ci.id, &id, &[prune_pubkey], now, now);
|
res = crds_gossip.process_prune_msg(
|
||||||
|
&id, // self_pubkey
|
||||||
|
&ci.id, // peer
|
||||||
|
&id, // destination
|
||||||
|
&[prune_pubkey], // origins
|
||||||
|
now,
|
||||||
|
now,
|
||||||
|
);
|
||||||
res.unwrap();
|
res.unwrap();
|
||||||
//test timeout
|
//test timeout
|
||||||
let timeout = now + crds_gossip.push.prune_timeout * 2;
|
let timeout = now + crds_gossip.push.prune_timeout * 2;
|
||||||
res = crds_gossip.process_prune_msg(&ci.id, &id, &[prune_pubkey], now, timeout);
|
res = crds_gossip.process_prune_msg(
|
||||||
|
&id, // self_pubkey
|
||||||
|
&ci.id, // peer
|
||||||
|
&id, // destination
|
||||||
|
&[prune_pubkey], // origins
|
||||||
|
now,
|
||||||
|
timeout,
|
||||||
|
);
|
||||||
assert_eq!(res.err(), Some(CrdsGossipError::PruneMessageTimeout));
|
assert_eq!(res.err(), Some(CrdsGossipError::PruneMessageTimeout));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -116,11 +116,9 @@ fn star_network_create(num: usize) -> Network {
|
||||||
let node_keypair = Arc::new(Keypair::new());
|
let node_keypair = Arc::new(Keypair::new());
|
||||||
let contact_info = ContactInfo::new_localhost(&node_keypair.pubkey(), 0);
|
let contact_info = ContactInfo::new_localhost(&node_keypair.pubkey(), 0);
|
||||||
let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(contact_info.clone()));
|
let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(contact_info.clone()));
|
||||||
let id = new.label().pubkey();
|
|
||||||
let mut node = CrdsGossip::default();
|
let mut node = CrdsGossip::default();
|
||||||
node.crds.insert(new.clone(), timestamp()).unwrap();
|
node.crds.insert(new.clone(), timestamp()).unwrap();
|
||||||
node.crds.insert(entry.clone(), timestamp()).unwrap();
|
node.crds.insert(entry.clone(), timestamp()).unwrap();
|
||||||
node.set_self(&id);
|
|
||||||
let node = Node::new(node_keypair, contact_info, Arc::new(Mutex::new(node)));
|
let node = Node::new(node_keypair, contact_info, Arc::new(Mutex::new(node)));
|
||||||
(new.label().pubkey(), node)
|
(new.label().pubkey(), node)
|
||||||
})
|
})
|
||||||
|
@ -128,7 +126,6 @@ fn star_network_create(num: usize) -> Network {
|
||||||
let mut node = CrdsGossip::default();
|
let mut node = CrdsGossip::default();
|
||||||
let id = entry.label().pubkey();
|
let id = entry.label().pubkey();
|
||||||
node.crds.insert(entry, timestamp()).unwrap();
|
node.crds.insert(entry, timestamp()).unwrap();
|
||||||
node.set_self(&id);
|
|
||||||
let node = Node::new(node_keypair, contact_info, Arc::new(Mutex::new(node)));
|
let node = Node::new(node_keypair, contact_info, Arc::new(Mutex::new(node)));
|
||||||
network.insert(id, node);
|
network.insert(id, node);
|
||||||
Network::new(network)
|
Network::new(network)
|
||||||
|
@ -141,18 +138,14 @@ fn rstar_network_create(num: usize) -> Network {
|
||||||
let mut origin = CrdsGossip::default();
|
let mut origin = CrdsGossip::default();
|
||||||
let id = entry.label().pubkey();
|
let id = entry.label().pubkey();
|
||||||
origin.crds.insert(entry, timestamp()).unwrap();
|
origin.crds.insert(entry, timestamp()).unwrap();
|
||||||
origin.set_self(&id);
|
|
||||||
let mut network: HashMap<_, _> = (1..num)
|
let mut network: HashMap<_, _> = (1..num)
|
||||||
.map(|_| {
|
.map(|_| {
|
||||||
let node_keypair = Arc::new(Keypair::new());
|
let node_keypair = Arc::new(Keypair::new());
|
||||||
let contact_info = ContactInfo::new_localhost(&node_keypair.pubkey(), 0);
|
let contact_info = ContactInfo::new_localhost(&node_keypair.pubkey(), 0);
|
||||||
let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(contact_info.clone()));
|
let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(contact_info.clone()));
|
||||||
let id = new.label().pubkey();
|
|
||||||
let mut node = CrdsGossip::default();
|
let mut node = CrdsGossip::default();
|
||||||
node.crds.insert(new.clone(), timestamp()).unwrap();
|
node.crds.insert(new.clone(), timestamp()).unwrap();
|
||||||
origin.crds.insert(new.clone(), timestamp()).unwrap();
|
origin.crds.insert(new.clone(), timestamp()).unwrap();
|
||||||
node.set_self(&id);
|
|
||||||
|
|
||||||
let node = Node::new(node_keypair, contact_info, Arc::new(Mutex::new(node)));
|
let node = Node::new(node_keypair, contact_info, Arc::new(Mutex::new(node)));
|
||||||
(new.label().pubkey(), node)
|
(new.label().pubkey(), node)
|
||||||
})
|
})
|
||||||
|
@ -168,10 +161,8 @@ fn ring_network_create(num: usize) -> Network {
|
||||||
let node_keypair = Arc::new(Keypair::new());
|
let node_keypair = Arc::new(Keypair::new());
|
||||||
let contact_info = ContactInfo::new_localhost(&node_keypair.pubkey(), 0);
|
let contact_info = ContactInfo::new_localhost(&node_keypair.pubkey(), 0);
|
||||||
let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(contact_info.clone()));
|
let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(contact_info.clone()));
|
||||||
let id = new.label().pubkey();
|
|
||||||
let mut node = CrdsGossip::default();
|
let mut node = CrdsGossip::default();
|
||||||
node.crds.insert(new.clone(), timestamp()).unwrap();
|
node.crds.insert(new.clone(), timestamp()).unwrap();
|
||||||
node.set_self(&id);
|
|
||||||
let node = Node::new(node_keypair, contact_info, Arc::new(Mutex::new(node)));
|
let node = Node::new(node_keypair, contact_info, Arc::new(Mutex::new(node)));
|
||||||
(new.label().pubkey(), node)
|
(new.label().pubkey(), node)
|
||||||
})
|
})
|
||||||
|
@ -180,7 +171,7 @@ fn ring_network_create(num: usize) -> Network {
|
||||||
for k in 0..keys.len() {
|
for k in 0..keys.len() {
|
||||||
let start_info = {
|
let start_info = {
|
||||||
let start = &network[&keys[k]];
|
let start = &network[&keys[k]];
|
||||||
let start_id = start.lock().unwrap().id;
|
let start_id = keys[k];
|
||||||
let label = CrdsValueLabel::ContactInfo(start_id);
|
let label = CrdsValueLabel::ContactInfo(start_id);
|
||||||
let gossip = start.gossip.lock().unwrap();
|
let gossip = start.gossip.lock().unwrap();
|
||||||
gossip.crds.get(&label).unwrap().value.clone()
|
gossip.crds.get(&label).unwrap().value.clone()
|
||||||
|
@ -202,10 +193,8 @@ fn connected_staked_network_create(stakes: &[u64]) -> Network {
|
||||||
let node_keypair = Arc::new(Keypair::new());
|
let node_keypair = Arc::new(Keypair::new());
|
||||||
let contact_info = ContactInfo::new_localhost(&node_keypair.pubkey(), 0);
|
let contact_info = ContactInfo::new_localhost(&node_keypair.pubkey(), 0);
|
||||||
let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(contact_info.clone()));
|
let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(contact_info.clone()));
|
||||||
let id = new.label().pubkey();
|
|
||||||
let mut node = CrdsGossip::default();
|
let mut node = CrdsGossip::default();
|
||||||
node.crds.insert(new.clone(), timestamp()).unwrap();
|
node.crds.insert(new.clone(), timestamp()).unwrap();
|
||||||
node.set_self(&id);
|
|
||||||
let node = Node::staked(
|
let node = Node::staked(
|
||||||
node_keypair,
|
node_keypair,
|
||||||
contact_info,
|
contact_info,
|
||||||
|
@ -221,15 +210,14 @@ fn connected_staked_network_create(stakes: &[u64]) -> Network {
|
||||||
.iter()
|
.iter()
|
||||||
.map(|k| {
|
.map(|k| {
|
||||||
let start = &network[k].lock().unwrap();
|
let start = &network[k].lock().unwrap();
|
||||||
let start_id = start.id;
|
let start_label = CrdsValueLabel::ContactInfo(*k);
|
||||||
let start_label = CrdsValueLabel::ContactInfo(start_id);
|
|
||||||
start.crds.get(&start_label).unwrap().value.clone()
|
start.crds.get(&start_label).unwrap().value.clone()
|
||||||
})
|
})
|
||||||
.collect();
|
.collect();
|
||||||
for end in network.values_mut() {
|
for (end_pubkey, end) in network.iter_mut() {
|
||||||
for k in 0..keys.len() {
|
for k in 0..keys.len() {
|
||||||
let mut end = end.lock().unwrap();
|
let mut end = end.lock().unwrap();
|
||||||
if keys[k] != end.id {
|
if keys[k] != *end_pubkey {
|
||||||
let start_info = start_entries[k].clone();
|
let start_info = start_entries[k].clone();
|
||||||
end.crds.insert(start_info, timestamp()).unwrap();
|
end.crds.insert(start_info, timestamp()).unwrap();
|
||||||
}
|
}
|
||||||
|
@ -258,9 +246,13 @@ fn network_simulator(thread_pool: &ThreadPool, network: &mut Network, max_conver
|
||||||
// make sure there is someone in the active set
|
// make sure there is someone in the active set
|
||||||
let network_values: Vec<Node> = network.values().cloned().collect();
|
let network_values: Vec<Node> = network.values().cloned().collect();
|
||||||
network_values.par_iter().for_each(|node| {
|
network_values.par_iter().for_each(|node| {
|
||||||
node.lock()
|
let node_pubkey = node.keypair.pubkey();
|
||||||
.unwrap()
|
node.lock().unwrap().refresh_push_active_set(
|
||||||
.refresh_push_active_set(&HashMap::new(), None);
|
&node_pubkey,
|
||||||
|
0, // shred version
|
||||||
|
&HashMap::new(), // stakes
|
||||||
|
None, // gossip validators
|
||||||
|
);
|
||||||
});
|
});
|
||||||
let mut total_bytes = bytes_tx;
|
let mut total_bytes = bytes_tx;
|
||||||
let mut ts = timestamp();
|
let mut ts = timestamp();
|
||||||
|
@ -271,8 +263,9 @@ fn network_simulator(thread_pool: &ThreadPool, network: &mut Network, max_conver
|
||||||
ts += 1000;
|
ts += 1000;
|
||||||
// push a message to the network
|
// push a message to the network
|
||||||
network_values.par_iter().for_each(|locked_node| {
|
network_values.par_iter().for_each(|locked_node| {
|
||||||
|
let node_pubkey = locked_node.keypair.pubkey();
|
||||||
let node = &mut locked_node.lock().unwrap();
|
let node = &mut locked_node.lock().unwrap();
|
||||||
let label = CrdsValueLabel::ContactInfo(node.id);
|
let label = CrdsValueLabel::ContactInfo(node_pubkey);
|
||||||
let entry = node.crds.get(&label).unwrap();
|
let entry = node.crds.get(&label).unwrap();
|
||||||
let mut m = entry.value.contact_info().cloned().unwrap();
|
let mut m = entry.value.contact_info().cloned().unwrap();
|
||||||
m.wallclock = now;
|
m.wallclock = now;
|
||||||
|
@ -327,13 +320,15 @@ fn network_run_push(
|
||||||
let requests: Vec<_> = network_values
|
let requests: Vec<_> = network_values
|
||||||
.par_iter()
|
.par_iter()
|
||||||
.map(|node| {
|
.map(|node| {
|
||||||
|
let node_pubkey = node.keypair.pubkey();
|
||||||
let mut node_lock = node.lock().unwrap();
|
let mut node_lock = node.lock().unwrap();
|
||||||
let timeouts = node_lock.make_timeouts(
|
let timeouts = node_lock.make_timeouts(
|
||||||
|
node_pubkey,
|
||||||
&HashMap::default(), // stakes
|
&HashMap::default(), // stakes
|
||||||
Duration::from_millis(node_lock.pull.crds_timeout),
|
Duration::from_millis(node_lock.pull.crds_timeout),
|
||||||
);
|
);
|
||||||
node_lock.purge(thread_pool, now, &timeouts);
|
node_lock.purge(&node_pubkey, thread_pool, now, &timeouts);
|
||||||
(node_lock.id, node_lock.new_push_messages(vec![], now))
|
(node_pubkey, node_lock.new_push_messages(vec![], now))
|
||||||
})
|
})
|
||||||
.collect();
|
.collect();
|
||||||
let transfered: Vec<_> = requests
|
let transfered: Vec<_> = requests
|
||||||
|
@ -356,7 +351,14 @@ fn network_run_push(
|
||||||
.collect();
|
.collect();
|
||||||
let prunes_map = network
|
let prunes_map = network
|
||||||
.get(&to)
|
.get(&to)
|
||||||
.map(|node| node.lock().unwrap().prune_received_cache(origins, &stakes))
|
.map(|node| {
|
||||||
|
let node_pubkey = node.keypair.pubkey();
|
||||||
|
node.lock().unwrap().prune_received_cache(
|
||||||
|
&node_pubkey,
|
||||||
|
origins,
|
||||||
|
&stakes,
|
||||||
|
)
|
||||||
|
})
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
for (from, prune_set) in prunes_map {
|
for (from, prune_set) in prunes_map {
|
||||||
|
@ -371,11 +373,19 @@ fn network_run_push(
|
||||||
network
|
network
|
||||||
.get(&from)
|
.get(&from)
|
||||||
.map(|node| {
|
.map(|node| {
|
||||||
|
let node_pubkey = node.keypair.pubkey();
|
||||||
let node = node.lock().unwrap();
|
let node = node.lock().unwrap();
|
||||||
let destination = node.id;
|
let destination = node_pubkey;
|
||||||
let now = timestamp();
|
let now = timestamp();
|
||||||
node.process_prune_msg(&to, &destination, &prune_keys, now, now)
|
node.process_prune_msg(
|
||||||
.unwrap()
|
&node_pubkey,
|
||||||
|
&to,
|
||||||
|
&destination,
|
||||||
|
&prune_keys,
|
||||||
|
now,
|
||||||
|
now,
|
||||||
|
)
|
||||||
|
.unwrap()
|
||||||
})
|
})
|
||||||
.unwrap();
|
.unwrap();
|
||||||
}
|
}
|
||||||
|
@ -399,9 +409,13 @@ fn network_run_push(
|
||||||
}
|
}
|
||||||
if now % CRDS_GOSSIP_PUSH_MSG_TIMEOUT_MS == 0 && now > 0 {
|
if now % CRDS_GOSSIP_PUSH_MSG_TIMEOUT_MS == 0 && now > 0 {
|
||||||
network_values.par_iter().for_each(|node| {
|
network_values.par_iter().for_each(|node| {
|
||||||
node.lock()
|
let node_pubkey = node.keypair.pubkey();
|
||||||
.unwrap()
|
node.lock().unwrap().refresh_push_active_set(
|
||||||
.refresh_push_active_set(&HashMap::new(), None);
|
&node_pubkey,
|
||||||
|
0, // shred version
|
||||||
|
&HashMap::new(), // stakes
|
||||||
|
None, // gossip validators
|
||||||
|
);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
total = network_values
|
total = network_values
|
||||||
|
@ -468,6 +482,7 @@ fn network_run_pull(
|
||||||
.new_pull_request(
|
.new_pull_request(
|
||||||
thread_pool,
|
thread_pool,
|
||||||
from.keypair.deref(),
|
from.keypair.deref(),
|
||||||
|
0, // shred version.
|
||||||
now,
|
now,
|
||||||
None,
|
None,
|
||||||
&HashMap::new(),
|
&HashMap::new(),
|
||||||
|
@ -476,8 +491,9 @@ fn network_run_pull(
|
||||||
&mut pings,
|
&mut pings,
|
||||||
)
|
)
|
||||||
.ok()?;
|
.ok()?;
|
||||||
|
let from_pubkey = from.keypair.pubkey();
|
||||||
let gossip = from.gossip.lock().unwrap();
|
let gossip = from.gossip.lock().unwrap();
|
||||||
let label = CrdsValueLabel::ContactInfo(gossip.id);
|
let label = CrdsValueLabel::ContactInfo(from_pubkey);
|
||||||
let self_info = gossip.crds.get(&label).unwrap().value.clone();
|
let self_info = gossip.crds.get(&label).unwrap().value.clone();
|
||||||
Some((peer.id, filters, self_info))
|
Some((peer.id, filters, self_info))
|
||||||
})
|
})
|
||||||
|
@ -676,11 +692,8 @@ fn test_star_network_large_push() {
|
||||||
}
|
}
|
||||||
#[test]
|
#[test]
|
||||||
fn test_prune_errors() {
|
fn test_prune_errors() {
|
||||||
let mut crds_gossip = CrdsGossip {
|
let mut crds_gossip = CrdsGossip::default();
|
||||||
id: Pubkey::new(&[0; 32]),
|
let id = Pubkey::new(&[0; 32]);
|
||||||
..CrdsGossip::default()
|
|
||||||
};
|
|
||||||
let id = crds_gossip.id;
|
|
||||||
let ci = ContactInfo::new_localhost(&Pubkey::new(&[1; 32]), 0);
|
let ci = ContactInfo::new_localhost(&Pubkey::new(&[1; 32]), 0);
|
||||||
let prune_pubkey = Pubkey::new(&[2; 32]);
|
let prune_pubkey = Pubkey::new(&[2; 32]);
|
||||||
crds_gossip
|
crds_gossip
|
||||||
|
@ -690,22 +703,42 @@ fn test_prune_errors() {
|
||||||
0,
|
0,
|
||||||
)
|
)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
crds_gossip.refresh_push_active_set(&HashMap::new(), None);
|
crds_gossip.refresh_push_active_set(
|
||||||
|
&id,
|
||||||
|
0, // shred version
|
||||||
|
&HashMap::new(), // stakes
|
||||||
|
None, // gossip validators
|
||||||
|
);
|
||||||
let now = timestamp();
|
let now = timestamp();
|
||||||
//incorrect dest
|
//incorrect dest
|
||||||
let mut res = crds_gossip.process_prune_msg(
|
let mut res = crds_gossip.process_prune_msg(
|
||||||
&ci.id,
|
&id, // self_pubkey
|
||||||
&Pubkey::new(hash(&[1; 32]).as_ref()),
|
&ci.id, // peer
|
||||||
&[prune_pubkey],
|
&Pubkey::new(hash(&[1; 32]).as_ref()), // destination
|
||||||
|
&[prune_pubkey], // origins
|
||||||
now,
|
now,
|
||||||
now,
|
now,
|
||||||
);
|
);
|
||||||
assert_eq!(res.err(), Some(CrdsGossipError::BadPruneDestination));
|
assert_eq!(res.err(), Some(CrdsGossipError::BadPruneDestination));
|
||||||
//correct dest
|
//correct dest
|
||||||
res = crds_gossip.process_prune_msg(&ci.id, &id, &[prune_pubkey], now, now);
|
res = crds_gossip.process_prune_msg(
|
||||||
|
&id, // self_pubkey
|
||||||
|
&ci.id, // peer
|
||||||
|
&id, // destination
|
||||||
|
&[prune_pubkey], // origins
|
||||||
|
now,
|
||||||
|
now,
|
||||||
|
);
|
||||||
res.unwrap();
|
res.unwrap();
|
||||||
//test timeout
|
//test timeout
|
||||||
let timeout = now + crds_gossip.push.prune_timeout * 2;
|
let timeout = now + crds_gossip.push.prune_timeout * 2;
|
||||||
res = crds_gossip.process_prune_msg(&ci.id, &id, &[prune_pubkey], now, timeout);
|
res = crds_gossip.process_prune_msg(
|
||||||
|
&id, // self_pubkey
|
||||||
|
&ci.id, // peer
|
||||||
|
&id, // destination
|
||||||
|
&[prune_pubkey], // origins
|
||||||
|
now,
|
||||||
|
timeout,
|
||||||
|
);
|
||||||
assert_eq!(res.err(), Some(CrdsGossipError::PruneMessageTimeout));
|
assert_eq!(res.err(), Some(CrdsGossipError::PruneMessageTimeout));
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue