improves threads' utilization in processing gossip packets (#12962)

ClusterInfo::process_packets handles incoming packets in a thread_pool:
https://github.com/solana-labs/solana/blob/87311cce7/core/src/cluster_info.rs#L2118-L2134

However, profiling runtime shows that threads are not well utilized and
a lot of the processing is done sequentially.

This commit redistributes the work done in parallel. Testing on a gce
cluster shows 20%+ improvement in processing gossip packets with much
smaller variations.
This commit is contained in:
behzad nouri 2020-10-19 19:03:38 +00:00 committed by GitHub
parent cca318f805
commit 75d62ca095
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 174 additions and 114 deletions

View File

@ -211,6 +211,7 @@ struct GossipStats {
new_push_requests2: Counter,
new_push_requests_num: Counter,
filter_pull_response: Counter,
process_gossip_packets_time: Counter,
process_pull_response: Counter,
process_pull_response_count: Counter,
process_pull_response_len: Counter,
@ -366,6 +367,59 @@ enum Protocol {
PruneMessage(Pubkey, PruneData),
}
impl Protocol {
fn par_verify(self) -> Option<Self> {
match self {
Protocol::PullRequest(_, ref caller) => {
if caller.verify() {
Some(self)
} else {
inc_new_counter_info!("cluster_info-gossip_pull_request_verify_fail", 1);
None
}
}
Protocol::PullResponse(from, data) => {
let size = data.len();
let data: Vec<_> = data.into_par_iter().filter(Signable::verify).collect();
if size != data.len() {
inc_new_counter_info!(
"cluster_info-gossip_pull_response_verify_fail",
size - data.len()
);
}
if data.is_empty() {
None
} else {
Some(Protocol::PullResponse(from, data))
}
}
Protocol::PushMessage(from, data) => {
let size = data.len();
let data: Vec<_> = data.into_par_iter().filter(Signable::verify).collect();
if size != data.len() {
inc_new_counter_info!(
"cluster_info-gossip_push_msg_verify_fail",
size - data.len()
);
}
if data.is_empty() {
None
} else {
Some(Protocol::PushMessage(from, data))
}
}
Protocol::PruneMessage(_, ref data) => {
if data.verify() {
Some(self)
} else {
inc_new_counter_debug!("cluster_info-gossip_prune_msg_verify_fail", 1);
None
}
}
}
}
}
impl Sanitize for Protocol {
fn sanitize(&self) -> std::result::Result<(), SanitizeError> {
match self {
@ -1650,7 +1704,7 @@ impl ClusterInfo {
&self,
recycler: &PacketsRecycler,
stakes: &HashMap<Pubkey, u64>,
packets: Packets,
packets: Vec<(SocketAddr, Protocol)>,
response_sender: &PacketSender,
feature_set: Option<&FeatureSet>,
epoch_time_ms: u64,
@ -1664,115 +1718,82 @@ impl ClusterInfo {
.unwrap()
.make_timeouts(&stakes, epoch_time_ms);
let mut pull_responses = HashMap::new();
packets.packets.iter().for_each(|packet| {
let from_addr = packet.meta.addr();
limited_deserialize(&packet.data[..packet.meta.size])
.into_iter()
.filter(|r: &Protocol| r.sanitize().is_ok())
.for_each(|request| match request {
Protocol::PullRequest(filter, caller) => {
let start = allocated.get();
if !caller.verify() {
inc_new_counter_info!(
"cluster_info-gossip_pull_request_verify_fail",
1
);
} else if let Some(contact_info) = caller.contact_info() {
if contact_info.id == self.id() {
warn!("PullRequest ignored, I'm talking to myself");
inc_new_counter_debug!("cluster_info-window-request-loopback", 1);
} else if contact_info.shred_version == 0
|| contact_info.shred_version == self.my_shred_version()
|| self.my_shred_version() == 0
{
gossip_pull_data.push(PullData {
from_addr,
caller,
filter,
});
} else {
self.stats.skip_pull_shred_version.add_relaxed(1);
}
}
datapoint_debug!(
"solana-gossip-listen-memory",
("pull_request", (allocated.get() - start) as i64, i64),
);
}
Protocol::PullResponse(from, mut data) => {
let start = allocated.get();
data.retain(|v| {
let ret = v.verify();
if !ret {
inc_new_counter_info!(
"cluster_info-gossip_pull_response_verify_fail",
1
);
}
ret
});
let pull_entry = pull_responses.entry(from).or_insert_with(Vec::new);
pull_entry.extend(data);
datapoint_debug!(
"solana-gossip-listen-memory",
("pull_response", (allocated.get() - start) as i64, i64),
);
}
Protocol::PushMessage(from, mut data) => {
let start = allocated.get();
data.retain(|v| {
let ret = v.verify();
if !ret {
inc_new_counter_info!(
"cluster_info-gossip_push_msg_verify_fail",
1
);
}
ret
});
let rsp = self.handle_push_message(recycler, &from, data, stakes);
if let Some(rsp) = rsp {
let _ignore_disconnect = response_sender.send(rsp);
}
datapoint_debug!(
"solana-gossip-listen-memory",
("push_message", (allocated.get() - start) as i64, i64),
);
}
Protocol::PruneMessage(from, data) => {
let start = allocated.get();
if data.verify() {
self.stats.prune_message_count.add_relaxed(1);
self.stats
.prune_message_len
.add_relaxed(data.prunes.len() as u64);
match self
.time_gossip_write_lock("process_prune", &self.stats.process_prune)
.process_prune_msg(
&from,
&data.destination,
&data.prunes,
data.wallclock,
timestamp(),
) {
Err(CrdsGossipError::PruneMessageTimeout) => {
inc_new_counter_debug!("cluster_info-prune_message_timeout", 1)
}
Err(CrdsGossipError::BadPruneDestination) => {
inc_new_counter_debug!("cluster_info-bad_prune_destination", 1)
}
_ => (),
}
for (from_addr, packet) in packets {
match packet {
Protocol::PullRequest(filter, caller) => {
let start = allocated.get();
if let Some(contact_info) = caller.contact_info() {
if contact_info.id == self.id() {
warn!("PullRequest ignored, I'm talking to myself");
inc_new_counter_debug!("cluster_info-window-request-loopback", 1);
} else if contact_info.shred_version == 0
|| contact_info.shred_version == self.my_shred_version()
|| self.my_shred_version() == 0
{
gossip_pull_data.push(PullData {
from_addr,
caller,
filter,
});
} else {
inc_new_counter_debug!("cluster_info-gossip_prune_msg_verify_fail", 1);
self.stats.skip_pull_shred_version.add_relaxed(1);
}
datapoint_debug!(
"solana-gossip-listen-memory",
("prune_message", (allocated.get() - start) as i64, i64),
);
}
})
});
datapoint_debug!(
"solana-gossip-listen-memory",
("pull_request", (allocated.get() - start) as i64, i64),
);
}
Protocol::PullResponse(from, data) => {
let start = allocated.get();
let pull_entry = pull_responses.entry(from).or_insert_with(Vec::new);
pull_entry.extend(data);
datapoint_debug!(
"solana-gossip-listen-memory",
("pull_response", (allocated.get() - start) as i64, i64),
);
}
Protocol::PushMessage(from, data) => {
let start = allocated.get();
let rsp = self.handle_push_message(recycler, &from, data, stakes);
if let Some(rsp) = rsp {
let _ignore_disconnect = response_sender.send(rsp);
}
datapoint_debug!(
"solana-gossip-listen-memory",
("push_message", (allocated.get() - start) as i64, i64),
);
}
Protocol::PruneMessage(from, data) => {
let start = allocated.get();
self.stats.prune_message_count.add_relaxed(1);
self.stats
.prune_message_len
.add_relaxed(data.prunes.len() as u64);
match self
.time_gossip_write_lock("process_prune", &self.stats.process_prune)
.process_prune_msg(
&from,
&data.destination,
&data.prunes,
data.wallclock,
timestamp(),
) {
Err(CrdsGossipError::PruneMessageTimeout) => {
inc_new_counter_debug!("cluster_info-prune_message_timeout", 1)
}
Err(CrdsGossipError::BadPruneDestination) => {
inc_new_counter_debug!("cluster_info-bad_prune_destination", 1)
}
_ => (),
}
datapoint_debug!(
"solana-gossip-listen-memory",
("prune_message", (allocated.get() - start) as i64, i64),
);
}
}
}
for (from, data) in pull_responses {
self.handle_pull_response(&from, data, &timeouts);
@ -2125,12 +2146,31 @@ impl ClusterInfo {
feature_set: Option<&FeatureSet>,
epoch_time_ms: u64,
) {
let sender = response_sender.clone();
thread_pool.install(|| {
requests.into_par_iter().for_each_with(sender, |s, reqs| {
self.handle_packets(&recycler, &stakes, reqs, s, feature_set, epoch_time_ms)
});
let mut timer = Measure::start("process_gossip_packets_time");
let packets: Vec<_> = thread_pool.install(|| {
requests
.into_par_iter()
.flat_map(|request| request.packets.into_par_iter())
.filter_map(|packet| {
let protocol: Protocol =
limited_deserialize(&packet.data[..packet.meta.size]).ok()?;
protocol.sanitize().ok()?;
let protocol = protocol.par_verify()?;
Some((packet.meta.addr(), protocol))
})
.collect()
});
self.handle_packets(
recycler,
&stakes,
packets,
response_sender,
feature_set,
epoch_time_ms,
);
self.stats
.process_gossip_packets_time
.add_measure(&mut timer);
}
/// Process messages from the network
@ -2241,6 +2281,11 @@ impl ClusterInfo {
i64
),
("purge", self.stats.purge.clear(), i64),
(
"process_gossip_packets_time",
self.stats.process_gossip_packets_time.clear(),
i64
),
(
"process_pull_resp",
self.stats.process_pull_response.clear(),

View File

@ -273,9 +273,9 @@ impl EntryVerificationState {
.zip(entries)
.all(|((hash, tx_hash), answer)| {
if answer.num_hashes == 0 {
*hash == answer.hash
hash == answer.hash
} else {
let mut poh = Poh::new(*hash, None);
let mut poh = Poh::new(hash, None);
if let Some(mixin) = tx_hash {
poh.record(*mixin).unwrap().hash == answer.hash
} else {

View File

@ -165,6 +165,21 @@ impl<'a, T: Clone + Send + Sync + Default + Sized> IntoParallelIterator for &'a
}
}
impl<T: Clone + Default + Send + Sized> IntoParallelIterator for PinnedVec<T> {
type Item = T;
type Iter = rayon::vec::IntoIter<T>;
fn into_par_iter(mut self) -> Self::Iter {
if self.pinned {
unpin(self.x.as_mut_ptr());
self.pinned = false;
}
self.pinnable = false;
self.recycler = None;
std::mem::take(&mut self.x).into_par_iter()
}
}
impl<T: Clone + Default + Sized> PinnedVec<T> {
pub fn reserve_and_pin(&mut self, size: usize) {
if self.x.capacity() < size {