locks crds table only once to process push messages (#29218)
Processing push messages is locking and unlocking crds table for each push message: https://github.com/solana-labs/solana/blob/536b879aa/gossip/src/cluster_info.rs#L2266-L2276 https://github.com/solana-labs/solana/blob/536b879aa/gossip/src/crds_gossip_push.rs#L215C9-L260 This commit instead locks the crds table once for all the received push messages.
This commit is contained in:
parent
95d3393008
commit
a5c8c7c536
|
@ -2263,12 +2263,7 @@ impl ClusterInfo {
|
||||||
let origins: HashSet<_> = {
|
let origins: HashSet<_> = {
|
||||||
let _st = ScopedTimer::from(&self.stats.process_push_message);
|
let _st = ScopedTimer::from(&self.stats.process_push_message);
|
||||||
let now = timestamp();
|
let now = timestamp();
|
||||||
messages
|
self.gossip.process_push_message(messages, now)
|
||||||
.into_iter()
|
|
||||||
.flat_map(|(from, crds_values)| {
|
|
||||||
self.gossip.process_push_message(&from, crds_values, now)
|
|
||||||
})
|
|
||||||
.collect()
|
|
||||||
};
|
};
|
||||||
// Generate prune messages.
|
// Generate prune messages.
|
||||||
let self_pubkey = self.id();
|
let self_pubkey = self.id();
|
||||||
|
|
|
@ -47,15 +47,10 @@ impl CrdsGossip {
|
||||||
/// Returns unique origins' pubkeys of upserted values.
|
/// Returns unique origins' pubkeys of upserted values.
|
||||||
pub fn process_push_message(
|
pub fn process_push_message(
|
||||||
&self,
|
&self,
|
||||||
from: &Pubkey,
|
messages: Vec<(/*from:*/ Pubkey, Vec<CrdsValue>)>,
|
||||||
values: Vec<CrdsValue>,
|
|
||||||
now: u64,
|
now: u64,
|
||||||
) -> HashSet<Pubkey> {
|
) -> HashSet<Pubkey> {
|
||||||
self.push
|
self.push.process_push_message(&self.crds, messages, now)
|
||||||
.process_push_message(&self.crds, from, values, now)
|
|
||||||
.into_iter()
|
|
||||||
.filter_map(Result::ok)
|
|
||||||
.collect()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Remove redundant paths in the network.
|
/// Remove redundant paths in the network.
|
||||||
|
|
|
@ -17,7 +17,6 @@ use {
|
||||||
contact_info::ContactInfo,
|
contact_info::ContactInfo,
|
||||||
crds::{Crds, CrdsError, Cursor, GossipRoute},
|
crds::{Crds, CrdsError, Cursor, GossipRoute},
|
||||||
crds_gossip::{get_stake, get_weight},
|
crds_gossip::{get_stake, get_weight},
|
||||||
crds_gossip_error::CrdsGossipError,
|
|
||||||
crds_value::CrdsValue,
|
crds_value::CrdsValue,
|
||||||
ping_pong::PingCache,
|
ping_pong::PingCache,
|
||||||
received_cache::ReceivedCache,
|
received_cache::ReceivedCache,
|
||||||
|
@ -144,38 +143,36 @@ impl CrdsGossipPush {
|
||||||
pub(crate) fn process_push_message(
|
pub(crate) fn process_push_message(
|
||||||
&self,
|
&self,
|
||||||
crds: &RwLock<Crds>,
|
crds: &RwLock<Crds>,
|
||||||
from: &Pubkey,
|
messages: Vec<(/*from:*/ Pubkey, Vec<CrdsValue>)>,
|
||||||
values: Vec<CrdsValue>,
|
|
||||||
now: u64,
|
now: u64,
|
||||||
) -> Vec<Result<Pubkey, CrdsGossipError>> {
|
) -> HashSet<Pubkey> {
|
||||||
self.num_total.fetch_add(values.len(), Ordering::Relaxed);
|
|
||||||
let mut received_cache = self.received_cache.lock().unwrap();
|
let mut received_cache = self.received_cache.lock().unwrap();
|
||||||
let mut crds = crds.write().unwrap();
|
let mut crds = crds.write().unwrap();
|
||||||
let wallclock_window = self.wallclock_window(now);
|
let wallclock_window = self.wallclock_window(now);
|
||||||
values
|
let mut origins = HashSet::with_capacity(messages.len());
|
||||||
.into_iter()
|
for (from, values) in messages {
|
||||||
.map(|value| {
|
self.num_total.fetch_add(values.len(), Ordering::Relaxed);
|
||||||
|
for value in values {
|
||||||
if !wallclock_window.contains(&value.wallclock()) {
|
if !wallclock_window.contains(&value.wallclock()) {
|
||||||
return Err(CrdsGossipError::PushMessageTimeout);
|
continue;
|
||||||
}
|
}
|
||||||
let origin = value.pubkey();
|
let origin = value.pubkey();
|
||||||
match crds.insert(value, now, GossipRoute::PushMessage) {
|
match crds.insert(value, now, GossipRoute::PushMessage) {
|
||||||
Ok(()) => {
|
Ok(()) => {
|
||||||
received_cache.record(origin, *from, /*num_dups:*/ 0);
|
received_cache.record(origin, from, /*num_dups:*/ 0);
|
||||||
Ok(origin)
|
origins.insert(origin);
|
||||||
}
|
}
|
||||||
Err(CrdsError::DuplicatePush(num_dups)) => {
|
Err(CrdsError::DuplicatePush(num_dups)) => {
|
||||||
received_cache.record(origin, *from, usize::from(num_dups));
|
received_cache.record(origin, from, usize::from(num_dups));
|
||||||
self.num_old.fetch_add(1, Ordering::Relaxed);
|
self.num_old.fetch_add(1, Ordering::Relaxed);
|
||||||
Err(CrdsGossipError::PushMessageOldVersion)
|
|
||||||
}
|
}
|
||||||
Err(_) => {
|
Err(_) => {
|
||||||
self.num_old.fetch_add(1, Ordering::Relaxed);
|
self.num_old.fetch_add(1, Ordering::Relaxed);
|
||||||
Err(CrdsGossipError::PushMessageOldVersion)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
})
|
}
|
||||||
.collect()
|
}
|
||||||
|
origins
|
||||||
}
|
}
|
||||||
|
|
||||||
/// New push message to broadcast to peers.
|
/// New push message to broadcast to peers.
|
||||||
|
@ -455,16 +452,15 @@ mod tests {
|
||||||
let label = value.label();
|
let label = value.label();
|
||||||
// push a new message
|
// push a new message
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
push.process_push_message(&crds, &Pubkey::default(), vec![value.clone()], 0),
|
push.process_push_message(&crds, vec![(Pubkey::default(), vec![value.clone()])], 0),
|
||||||
[Ok(label.pubkey())],
|
[label.pubkey()].into_iter().collect(),
|
||||||
);
|
);
|
||||||
assert_eq!(crds.read().unwrap().get::<&CrdsValue>(&label), Some(&value));
|
assert_eq!(crds.read().unwrap().get::<&CrdsValue>(&label), Some(&value));
|
||||||
|
|
||||||
// push it again
|
// push it again
|
||||||
assert_eq!(
|
assert!(push
|
||||||
push.process_push_message(&crds, &Pubkey::default(), vec![value], 0),
|
.process_push_message(&crds, vec![(Pubkey::default(), vec![value])], 0)
|
||||||
[Err(CrdsGossipError::PushMessageOldVersion)],
|
.is_empty());
|
||||||
);
|
|
||||||
}
|
}
|
||||||
#[test]
|
#[test]
|
||||||
fn test_process_push_old_version() {
|
fn test_process_push_old_version() {
|
||||||
|
@ -476,17 +472,16 @@ mod tests {
|
||||||
|
|
||||||
// push a new message
|
// push a new message
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
push.process_push_message(&crds, &Pubkey::default(), vec![value], 0),
|
push.process_push_message(&crds, vec![(Pubkey::default(), vec![value])], 0),
|
||||||
[Ok(ci.id)],
|
[ci.id].into_iter().collect()
|
||||||
);
|
);
|
||||||
|
|
||||||
// push an old version
|
// push an old version
|
||||||
ci.wallclock = 0;
|
ci.wallclock = 0;
|
||||||
let value = CrdsValue::new_unsigned(CrdsData::ContactInfo(ci));
|
let value = CrdsValue::new_unsigned(CrdsData::ContactInfo(ci));
|
||||||
assert_eq!(
|
assert!(push
|
||||||
push.process_push_message(&crds, &Pubkey::default(), vec![value], 0),
|
.process_push_message(&crds, vec![(Pubkey::default(), vec![value])], 0)
|
||||||
[Err(CrdsGossipError::PushMessageOldVersion)],
|
.is_empty());
|
||||||
);
|
|
||||||
}
|
}
|
||||||
#[test]
|
#[test]
|
||||||
fn test_process_push_timeout() {
|
fn test_process_push_timeout() {
|
||||||
|
@ -498,18 +493,16 @@ mod tests {
|
||||||
// push a version to far in the future
|
// push a version to far in the future
|
||||||
ci.wallclock = timeout + 1;
|
ci.wallclock = timeout + 1;
|
||||||
let value = CrdsValue::new_unsigned(CrdsData::ContactInfo(ci.clone()));
|
let value = CrdsValue::new_unsigned(CrdsData::ContactInfo(ci.clone()));
|
||||||
assert_eq!(
|
assert!(push
|
||||||
push.process_push_message(&crds, &Pubkey::default(), vec![value], 0),
|
.process_push_message(&crds, vec![(Pubkey::default(), vec![value])], 0)
|
||||||
[Err(CrdsGossipError::PushMessageTimeout)],
|
.is_empty());
|
||||||
);
|
|
||||||
|
|
||||||
// push a version to far in the past
|
// push a version to far in the past
|
||||||
ci.wallclock = 0;
|
ci.wallclock = 0;
|
||||||
let value = CrdsValue::new_unsigned(CrdsData::ContactInfo(ci));
|
let value = CrdsValue::new_unsigned(CrdsData::ContactInfo(ci));
|
||||||
assert_eq!(
|
assert!(push
|
||||||
push.process_push_message(&crds, &Pubkey::default(), vec![value], timeout + 1),
|
.process_push_message(&crds, vec![(Pubkey::default(), vec![value])], timeout + 1)
|
||||||
[Err(CrdsGossipError::PushMessageTimeout)]
|
.is_empty());
|
||||||
);
|
|
||||||
}
|
}
|
||||||
#[test]
|
#[test]
|
||||||
fn test_process_push_update() {
|
fn test_process_push_update() {
|
||||||
|
@ -522,16 +515,16 @@ mod tests {
|
||||||
|
|
||||||
// push a new message
|
// push a new message
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
push.process_push_message(&crds, &Pubkey::default(), vec![value_old], 0),
|
push.process_push_message(&crds, vec![(Pubkey::default(), vec![value_old])], 0),
|
||||||
[Ok(origin)],
|
[origin].into_iter().collect()
|
||||||
);
|
);
|
||||||
|
|
||||||
// push an old version
|
// push an old version
|
||||||
ci.wallclock = 1;
|
ci.wallclock = 1;
|
||||||
let value = CrdsValue::new_unsigned(CrdsData::ContactInfo(ci));
|
let value = CrdsValue::new_unsigned(CrdsData::ContactInfo(ci));
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
push.process_push_message(&crds, &Pubkey::default(), vec![value], 0),
|
push.process_push_message(&crds, vec![(Pubkey::default(), vec![value])], 0),
|
||||||
[Ok(origin)],
|
[origin].into_iter().collect()
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
#[test]
|
#[test]
|
||||||
|
@ -846,8 +839,8 @@ mod tests {
|
||||||
expected.insert(peer.label().pubkey(), vec![new_msg.clone()]);
|
expected.insert(peer.label().pubkey(), vec![new_msg.clone()]);
|
||||||
let origin = new_msg.pubkey();
|
let origin = new_msg.pubkey();
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
push.process_push_message(&crds, &Pubkey::default(), vec![new_msg], 0),
|
push.process_push_message(&crds, vec![(Pubkey::default(), vec![new_msg])], 0),
|
||||||
[Ok(origin)]
|
[origin].into_iter().collect()
|
||||||
);
|
);
|
||||||
assert_eq!(push.active_set.read().unwrap().len(), 1);
|
assert_eq!(push.active_set.read().unwrap().len(), 1);
|
||||||
assert_eq!(push.new_push_messages(&crds, 0).0, expected);
|
assert_eq!(push.new_push_messages(&crds, 0).0, expected);
|
||||||
|
@ -879,8 +872,12 @@ mod tests {
|
||||||
);
|
);
|
||||||
let crds = RwLock::new(crds);
|
let crds = RwLock::new(crds);
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
push.process_push_message(&crds, &Pubkey::default(), vec![peers[2].clone()], now),
|
push.process_push_message(
|
||||||
[Ok(origin[2])],
|
&crds,
|
||||||
|
vec![(Pubkey::default(), vec![peers[2].clone()])],
|
||||||
|
now
|
||||||
|
),
|
||||||
|
[origin[2]].into_iter().collect()
|
||||||
);
|
);
|
||||||
let ping_cache = Mutex::new(ping_cache);
|
let ping_cache = Mutex::new(ping_cache);
|
||||||
push.refresh_push_active_set(
|
push.refresh_push_active_set(
|
||||||
|
@ -941,8 +938,8 @@ mod tests {
|
||||||
let expected = HashMap::new();
|
let expected = HashMap::new();
|
||||||
let origin = new_msg.pubkey();
|
let origin = new_msg.pubkey();
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
push.process_push_message(&crds, &Pubkey::default(), vec![new_msg.clone()], 0),
|
push.process_push_message(&crds, vec![(Pubkey::default(), vec![new_msg.clone()])], 0),
|
||||||
[Ok(origin)],
|
[origin].into_iter().collect()
|
||||||
);
|
);
|
||||||
push.process_prune_msg(
|
push.process_prune_msg(
|
||||||
&self_id,
|
&self_id,
|
||||||
|
@ -981,8 +978,8 @@ mod tests {
|
||||||
let expected = HashMap::new();
|
let expected = HashMap::new();
|
||||||
let origin = new_msg.pubkey();
|
let origin = new_msg.pubkey();
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
push.process_push_message(&crds, &Pubkey::default(), vec![new_msg], 1),
|
push.process_push_message(&crds, vec![(Pubkey::default(), vec![new_msg])], 1),
|
||||||
[Ok(origin)],
|
[origin].into_iter().collect()
|
||||||
);
|
);
|
||||||
assert_eq!(push.new_push_messages(&crds, 0).0, expected);
|
assert_eq!(push.new_push_messages(&crds, 0).0, expected);
|
||||||
}
|
}
|
||||||
|
@ -997,8 +994,8 @@ mod tests {
|
||||||
let label = value.label();
|
let label = value.label();
|
||||||
// push a new message
|
// push a new message
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
push.process_push_message(&crds, &Pubkey::default(), vec![value.clone()], 0),
|
push.process_push_message(&crds, vec![(Pubkey::default(), vec![value.clone()])], 0),
|
||||||
[Ok(label.pubkey())]
|
[label.pubkey()].into_iter().collect()
|
||||||
);
|
);
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
crds.write().unwrap().get::<&CrdsValue>(&label),
|
crds.write().unwrap().get::<&CrdsValue>(&label),
|
||||||
|
@ -1006,15 +1003,13 @@ mod tests {
|
||||||
);
|
);
|
||||||
|
|
||||||
// push it again
|
// push it again
|
||||||
assert_eq!(
|
assert!(push
|
||||||
push.process_push_message(&crds, &Pubkey::default(), vec![value.clone()], 0),
|
.process_push_message(&crds, vec![(Pubkey::default(), vec![value.clone()])], 0)
|
||||||
[Err(CrdsGossipError::PushMessageOldVersion)],
|
.is_empty());
|
||||||
);
|
|
||||||
|
|
||||||
// push it again
|
// push it again
|
||||||
assert_eq!(
|
assert!(push
|
||||||
push.process_push_message(&crds, &Pubkey::default(), vec![value], 0),
|
.process_push_message(&crds, vec![(Pubkey::default(), vec![value])], 0)
|
||||||
[Err(CrdsGossipError::PushMessageOldVersion)],
|
.is_empty());
|
||||||
);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -290,8 +290,10 @@ fn network_simulator(thread_pool: &ThreadPool, network: &mut Network, max_conver
|
||||||
};
|
};
|
||||||
m.wallclock = now;
|
m.wallclock = now;
|
||||||
node.gossip.process_push_message(
|
node.gossip.process_push_message(
|
||||||
&Pubkey::default(),
|
vec![(
|
||||||
vec![CrdsValue::new_unsigned(CrdsData::ContactInfo(m))],
|
Pubkey::default(),
|
||||||
|
vec![CrdsValue::new_unsigned(CrdsData::ContactInfo(m))],
|
||||||
|
)],
|
||||||
now,
|
now,
|
||||||
);
|
);
|
||||||
});
|
});
|
||||||
|
@ -364,7 +366,7 @@ fn network_run_push(
|
||||||
.get(&to)
|
.get(&to)
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.gossip
|
.gossip
|
||||||
.process_push_message(&from, msgs.clone(), now)
|
.process_push_message(vec![(from, msgs.clone())], now)
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.collect();
|
.collect();
|
||||||
let prunes_map = network
|
let prunes_map = network
|
||||||
|
|
Loading…
Reference in New Issue