diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index 273752e77..72c83f882 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -211,6 +211,7 @@ struct GossipStats { new_push_requests2: Counter, new_push_requests_num: Counter, filter_pull_response: Counter, + process_gossip_packets_time: Counter, process_pull_response: Counter, process_pull_response_count: Counter, process_pull_response_len: Counter, @@ -366,6 +367,59 @@ enum Protocol { PruneMessage(Pubkey, PruneData), } +impl Protocol { + fn par_verify(self) -> Option { + match self { + Protocol::PullRequest(_, ref caller) => { + if caller.verify() { + Some(self) + } else { + inc_new_counter_info!("cluster_info-gossip_pull_request_verify_fail", 1); + None + } + } + Protocol::PullResponse(from, data) => { + let size = data.len(); + let data: Vec<_> = data.into_par_iter().filter(Signable::verify).collect(); + if size != data.len() { + inc_new_counter_info!( + "cluster_info-gossip_pull_response_verify_fail", + size - data.len() + ); + } + if data.is_empty() { + None + } else { + Some(Protocol::PullResponse(from, data)) + } + } + Protocol::PushMessage(from, data) => { + let size = data.len(); + let data: Vec<_> = data.into_par_iter().filter(Signable::verify).collect(); + if size != data.len() { + inc_new_counter_info!( + "cluster_info-gossip_push_msg_verify_fail", + size - data.len() + ); + } + if data.is_empty() { + None + } else { + Some(Protocol::PushMessage(from, data)) + } + } + Protocol::PruneMessage(_, ref data) => { + if data.verify() { + Some(self) + } else { + inc_new_counter_debug!("cluster_info-gossip_prune_msg_verify_fail", 1); + None + } + } + } + } +} + impl Sanitize for Protocol { fn sanitize(&self) -> std::result::Result<(), SanitizeError> { match self { @@ -1650,7 +1704,7 @@ impl ClusterInfo { &self, recycler: &PacketsRecycler, stakes: &HashMap, - packets: Packets, + packets: Vec<(SocketAddr, Protocol)>, response_sender: &PacketSender, feature_set: Option<&FeatureSet>, epoch_time_ms: u64, @@ -1664,115 +1718,82 @@ impl ClusterInfo { .unwrap() .make_timeouts(&stakes, epoch_time_ms); let mut pull_responses = HashMap::new(); - packets.packets.iter().for_each(|packet| { - let from_addr = packet.meta.addr(); - limited_deserialize(&packet.data[..packet.meta.size]) - .into_iter() - .filter(|r: &Protocol| r.sanitize().is_ok()) - .for_each(|request| match request { - Protocol::PullRequest(filter, caller) => { - let start = allocated.get(); - if !caller.verify() { - inc_new_counter_info!( - "cluster_info-gossip_pull_request_verify_fail", - 1 - ); - } else if let Some(contact_info) = caller.contact_info() { - if contact_info.id == self.id() { - warn!("PullRequest ignored, I'm talking to myself"); - inc_new_counter_debug!("cluster_info-window-request-loopback", 1); - } else if contact_info.shred_version == 0 - || contact_info.shred_version == self.my_shred_version() - || self.my_shred_version() == 0 - { - gossip_pull_data.push(PullData { - from_addr, - caller, - filter, - }); - } else { - self.stats.skip_pull_shred_version.add_relaxed(1); - } - } - datapoint_debug!( - "solana-gossip-listen-memory", - ("pull_request", (allocated.get() - start) as i64, i64), - ); - } - Protocol::PullResponse(from, mut data) => { - let start = allocated.get(); - data.retain(|v| { - let ret = v.verify(); - if !ret { - inc_new_counter_info!( - "cluster_info-gossip_pull_response_verify_fail", - 1 - ); - } - ret - }); - let pull_entry = pull_responses.entry(from).or_insert_with(Vec::new); - pull_entry.extend(data); - datapoint_debug!( - "solana-gossip-listen-memory", - ("pull_response", (allocated.get() - start) as i64, i64), - ); - } - Protocol::PushMessage(from, mut data) => { - let start = allocated.get(); - data.retain(|v| { - let ret = v.verify(); - if !ret { - inc_new_counter_info!( - "cluster_info-gossip_push_msg_verify_fail", - 1 - ); - } - ret - }); - let rsp = self.handle_push_message(recycler, &from, data, stakes); - if let Some(rsp) = rsp { - let _ignore_disconnect = response_sender.send(rsp); - } - datapoint_debug!( - "solana-gossip-listen-memory", - ("push_message", (allocated.get() - start) as i64, i64), - ); - } - Protocol::PruneMessage(from, data) => { - let start = allocated.get(); - if data.verify() { - self.stats.prune_message_count.add_relaxed(1); - self.stats - .prune_message_len - .add_relaxed(data.prunes.len() as u64); - match self - .time_gossip_write_lock("process_prune", &self.stats.process_prune) - .process_prune_msg( - &from, - &data.destination, - &data.prunes, - data.wallclock, - timestamp(), - ) { - Err(CrdsGossipError::PruneMessageTimeout) => { - inc_new_counter_debug!("cluster_info-prune_message_timeout", 1) - } - Err(CrdsGossipError::BadPruneDestination) => { - inc_new_counter_debug!("cluster_info-bad_prune_destination", 1) - } - _ => (), - } + for (from_addr, packet) in packets { + match packet { + Protocol::PullRequest(filter, caller) => { + let start = allocated.get(); + if let Some(contact_info) = caller.contact_info() { + if contact_info.id == self.id() { + warn!("PullRequest ignored, I'm talking to myself"); + inc_new_counter_debug!("cluster_info-window-request-loopback", 1); + } else if contact_info.shred_version == 0 + || contact_info.shred_version == self.my_shred_version() + || self.my_shred_version() == 0 + { + gossip_pull_data.push(PullData { + from_addr, + caller, + filter, + }); } else { - inc_new_counter_debug!("cluster_info-gossip_prune_msg_verify_fail", 1); + self.stats.skip_pull_shred_version.add_relaxed(1); } - datapoint_debug!( - "solana-gossip-listen-memory", - ("prune_message", (allocated.get() - start) as i64, i64), - ); } - }) - }); + datapoint_debug!( + "solana-gossip-listen-memory", + ("pull_request", (allocated.get() - start) as i64, i64), + ); + } + Protocol::PullResponse(from, data) => { + let start = allocated.get(); + let pull_entry = pull_responses.entry(from).or_insert_with(Vec::new); + pull_entry.extend(data); + datapoint_debug!( + "solana-gossip-listen-memory", + ("pull_response", (allocated.get() - start) as i64, i64), + ); + } + Protocol::PushMessage(from, data) => { + let start = allocated.get(); + let rsp = self.handle_push_message(recycler, &from, data, stakes); + if let Some(rsp) = rsp { + let _ignore_disconnect = response_sender.send(rsp); + } + datapoint_debug!( + "solana-gossip-listen-memory", + ("push_message", (allocated.get() - start) as i64, i64), + ); + } + Protocol::PruneMessage(from, data) => { + let start = allocated.get(); + self.stats.prune_message_count.add_relaxed(1); + self.stats + .prune_message_len + .add_relaxed(data.prunes.len() as u64); + match self + .time_gossip_write_lock("process_prune", &self.stats.process_prune) + .process_prune_msg( + &from, + &data.destination, + &data.prunes, + data.wallclock, + timestamp(), + ) { + Err(CrdsGossipError::PruneMessageTimeout) => { + inc_new_counter_debug!("cluster_info-prune_message_timeout", 1) + } + Err(CrdsGossipError::BadPruneDestination) => { + inc_new_counter_debug!("cluster_info-bad_prune_destination", 1) + } + _ => (), + } + datapoint_debug!( + "solana-gossip-listen-memory", + ("prune_message", (allocated.get() - start) as i64, i64), + ); + } + } + } for (from, data) in pull_responses { self.handle_pull_response(&from, data, &timeouts); @@ -2125,12 +2146,31 @@ impl ClusterInfo { feature_set: Option<&FeatureSet>, epoch_time_ms: u64, ) { - let sender = response_sender.clone(); - thread_pool.install(|| { - requests.into_par_iter().for_each_with(sender, |s, reqs| { - self.handle_packets(&recycler, &stakes, reqs, s, feature_set, epoch_time_ms) - }); + let mut timer = Measure::start("process_gossip_packets_time"); + let packets: Vec<_> = thread_pool.install(|| { + requests + .into_par_iter() + .flat_map(|request| request.packets.into_par_iter()) + .filter_map(|packet| { + let protocol: Protocol = + limited_deserialize(&packet.data[..packet.meta.size]).ok()?; + protocol.sanitize().ok()?; + let protocol = protocol.par_verify()?; + Some((packet.meta.addr(), protocol)) + }) + .collect() }); + self.handle_packets( + recycler, + &stakes, + packets, + response_sender, + feature_set, + epoch_time_ms, + ); + self.stats + .process_gossip_packets_time + .add_measure(&mut timer); } /// Process messages from the network @@ -2241,6 +2281,11 @@ impl ClusterInfo { i64 ), ("purge", self.stats.purge.clear(), i64), + ( + "process_gossip_packets_time", + self.stats.process_gossip_packets_time.clear(), + i64 + ), ( "process_pull_resp", self.stats.process_pull_response.clear(), diff --git a/ledger/src/entry.rs b/ledger/src/entry.rs index 3928749d2..23036e2af 100644 --- a/ledger/src/entry.rs +++ b/ledger/src/entry.rs @@ -273,9 +273,9 @@ impl EntryVerificationState { .zip(entries) .all(|((hash, tx_hash), answer)| { if answer.num_hashes == 0 { - *hash == answer.hash + hash == answer.hash } else { - let mut poh = Poh::new(*hash, None); + let mut poh = Poh::new(hash, None); if let Some(mixin) = tx_hash { poh.record(*mixin).unwrap().hash == answer.hash } else { diff --git a/perf/src/cuda_runtime.rs b/perf/src/cuda_runtime.rs index a4d146cd9..c9ba77535 100644 --- a/perf/src/cuda_runtime.rs +++ b/perf/src/cuda_runtime.rs @@ -165,6 +165,21 @@ impl<'a, T: Clone + Send + Sync + Default + Sized> IntoParallelIterator for &'a } } +impl IntoParallelIterator for PinnedVec { + type Item = T; + type Iter = rayon::vec::IntoIter; + + fn into_par_iter(mut self) -> Self::Iter { + if self.pinned { + unpin(self.x.as_mut_ptr()); + self.pinned = false; + } + self.pinnable = false; + self.recycler = None; + std::mem::take(&mut self.x).into_par_iter() + } +} + impl PinnedVec { pub fn reserve_and_pin(&mut self, size: usize) { if self.x.capacity() < size {