diff --git a/core/src/cluster_info_vote_listener.rs b/core/src/cluster_info_vote_listener.rs index 3b26d6368..d3a143f7a 100644 --- a/core/src/cluster_info_vote_listener.rs +++ b/core/src/cluster_info_vote_listener.rs @@ -337,32 +337,23 @@ impl ClusterInfoVoteListener { votes: Vec, labels: Vec, ) -> (Vec, Vec<(CrdsValueLabel, Slot, Packets)>) { - let msgs = packet::to_packets_chunked(&votes, 1); - let r = sigverify::ed25519_verify_cpu(&msgs); + let mut msgs = packet::to_packets_chunked(&votes, 1); + sigverify::ed25519_verify_cpu(&mut msgs); - assert_eq!( - r.iter() - .map(|packets_results| packets_results.len()) - .sum::(), - votes.len() - ); + let (vote_txs, packets) = izip!(labels.into_iter(), votes.into_iter(), msgs,) + .filter_map(|(label, vote, packet)| { + let slot = vote_transaction::parse_vote_transaction(&vote) + .and_then(|(_, vote, _)| vote.slots.last().copied())?; - let (vote_txs, packets) = izip!( - labels.into_iter(), - votes.into_iter(), - r.iter().flatten(), - msgs, - ) - .filter_map(|(label, vote, verify_result, packet)| { - let slot = vote_transaction::parse_vote_transaction(&vote) - .and_then(|(_, vote, _)| vote.slots.last().copied())?; - if *verify_result != 0 { - Some((vote, (label, slot, packet))) - } else { - None - } - }) - .unzip(); + // to_packets_chunked() above split into 1 packet long chunks + assert_eq!(packet.packets.len(), 1); + if !packet.packets[0].meta.discard { + Some((vote, (label, slot, packet))) + } else { + None + } + }) + .unzip(); (vote_txs, packets) } diff --git a/core/src/sigverify.rs b/core/src/sigverify.rs index 8228cbeeb..2103c1b3f 100644 --- a/core/src/sigverify.rs +++ b/core/src/sigverify.rs @@ -31,34 +31,7 @@ impl Default for TransactionSigVerifier { impl SigVerifier for TransactionSigVerifier { fn verify_batch(&self, mut batch: Vec) -> Vec { - let r = sigverify::ed25519_verify(&batch, &self.recycler, &self.recycler_out); - mark_disabled(&mut batch, &r); + sigverify::ed25519_verify(&mut batch, &self.recycler, &self.recycler_out); batch } } - -pub fn mark_disabled(batches: &mut Vec, r: &[Vec]) { - batches.iter_mut().zip(r).for_each(|(b, v)| { - b.packets - .iter_mut() - .zip(v) - .for_each(|(p, f)| p.meta.discard = *f == 0) - }); -} - -#[cfg(test)] -mod tests { - use super::*; - use solana_perf::packet::Packet; - - #[test] - fn test_mark_disabled() { - let mut batch = Packets::default(); - batch.packets.push(Packet::default()); - let mut batches: Vec = vec![batch]; - mark_disabled(&mut batches, &[vec![0]]); - assert_eq!(batches[0].packets[0].meta.discard, true); - mark_disabled(&mut batches, &[vec![1]]); - assert_eq!(batches[0].packets[0].meta.discard, false); - } -} diff --git a/core/src/sigverify_shreds.rs b/core/src/sigverify_shreds.rs index d7e44622c..33af8a966 100644 --- a/core/src/sigverify_shreds.rs +++ b/core/src/sigverify_shreds.rs @@ -4,8 +4,11 @@ use crate::sigverify_stage::SigVerifier; use solana_ledger::leader_schedule_cache::LeaderScheduleCache; use solana_ledger::shred::{OFFSET_OF_SHRED_SLOT, SIZE_OF_SHRED_SLOT}; use solana_ledger::sigverify_shreds::verify_shreds_gpu; -use solana_perf::packet::{limited_deserialize, Packets}; -use solana_perf::recycler_cache::RecyclerCache; +use solana_perf::{ + self, + packet::{limited_deserialize, Packets}, + recycler_cache::RecyclerCache, +}; use solana_runtime::bank_forks::BankForks; use std::collections::{HashMap, HashSet}; use std::sync::{Arc, RwLock}; @@ -67,7 +70,7 @@ impl SigVerifier for ShredSigVerifier { leader_slots.insert(std::u64::MAX, [0u8; 32]); let r = verify_shreds_gpu(&batches, &leader_slots, &self.recycler_cache); - sigverify::mark_disabled(&mut batches, &r); + solana_perf::sigverify::mark_disabled(&mut batches, &r); batches } } diff --git a/core/src/sigverify_stage.rs b/core/src/sigverify_stage.rs index cd67e5466..ff153b0bb 100644 --- a/core/src/sigverify_stage.rs +++ b/core/src/sigverify_stage.rs @@ -45,8 +45,7 @@ pub struct DisabledSigVerifier {} impl SigVerifier for DisabledSigVerifier { fn verify_batch(&self, mut batch: Vec) -> Vec { - let r = sigverify::ed25519_verify_disabled(&batch); - sigverify::mark_disabled(&mut batch, &r); + sigverify::ed25519_verify_disabled(&mut batch); batch } } diff --git a/perf/benches/sigverify.rs b/perf/benches/sigverify.rs index 2611272ef..75aea3eaa 100644 --- a/perf/benches/sigverify.rs +++ b/perf/benches/sigverify.rs @@ -13,13 +13,13 @@ fn bench_sigverify(bencher: &mut Bencher) { let tx = test_tx(); // generate packet vector - let batches = to_packets_chunked(&std::iter::repeat(tx).take(128).collect::>(), 128); + let mut batches = to_packets_chunked(&std::iter::repeat(tx).take(128).collect::>(), 128); let recycler = Recycler::new_without_limit(""); let recycler_out = Recycler::new_without_limit(""); // verify packets bencher.iter(|| { - let _ans = sigverify::ed25519_verify(&batches, &recycler, &recycler_out); + let _ans = sigverify::ed25519_verify(&mut batches, &recycler, &recycler_out); }) } diff --git a/perf/src/cuda_runtime.rs b/perf/src/cuda_runtime.rs index a8f9b01ac..e850100ff 100644 --- a/perf/src/cuda_runtime.rs +++ b/perf/src/cuda_runtime.rs @@ -188,6 +188,14 @@ impl<'a, T: Clone + Send + Sync + Default + Sized> IntoParallelIterator for &'a } } +impl<'a, T: Clone + Send + Sync + Default + Sized> IntoParallelIterator for &'a mut PinnedVec { + type Iter = rayon::slice::IterMut<'a, T>; + type Item = &'a mut T; + fn into_par_iter(self) -> Self::Iter { + self.x.par_iter_mut() + } +} + impl IntoParallelIterator for PinnedVec { type Item = T; type Iter = rayon::vec::IntoIter; diff --git a/perf/src/sigverify.rs b/perf/src/sigverify.rs index 861d0162f..1a2e1682f 100644 --- a/perf/src/sigverify.rs +++ b/perf/src/sigverify.rs @@ -20,6 +20,13 @@ use solana_sdk::signature::Signature; use solana_sdk::transaction::Transaction; use std::mem::size_of; +// Representing key tKeYE4wtowRb8yRroZShTipE18YVnqwXjsSAoNsFU6g +const TRACER_KEY_BYTES: [u8; 32] = [ + 13, 37, 180, 170, 252, 137, 36, 194, 183, 143, 161, 193, 201, 207, 211, 23, 189, 93, 33, 110, + 155, 90, 30, 39, 116, 115, 238, 38, 126, 21, 232, 133, +]; +const TRACER_KEY: Pubkey = Pubkey::new_from_array(TRACER_KEY_BYTES); + lazy_static! { static ref PAR_THREAD_POOL: ThreadPool = rayon::ThreadPoolBuilder::new() .num_threads(get_thread_count()) @@ -79,18 +86,25 @@ pub fn init() { } } -fn verify_packet(packet: &Packet) -> u8 { +fn verify_packet(packet: &mut Packet) { let packet_offsets = get_packet_offsets(packet, 0); let mut sig_start = packet_offsets.sig_start as usize; let mut pubkey_start = packet_offsets.pubkey_start as usize; let msg_start = packet_offsets.msg_start as usize; + // If this packet was already marked as discard, drop it + if packet.meta.discard { + return; + } + if packet_offsets.sig_len == 0 { - return 0; + packet.meta.discard = true; + return; } if packet.meta.size <= msg_start { - return 0; + packet.meta.discard = true; + return; } let msg_end = packet.meta.size; @@ -99,20 +113,30 @@ fn verify_packet(packet: &Packet) -> u8 { let sig_end = sig_start as usize + size_of::(); if pubkey_end >= packet.meta.size || sig_end >= packet.meta.size { - return 0; + packet.meta.discard = true; + return; } let signature = Signature::new(&packet.data[sig_start..sig_end]); + if !signature.verify( &packet.data[pubkey_start..pubkey_end], &packet.data[msg_start..msg_end], ) { - return 0; + packet.meta.discard = true; + return; } + + // Check for tracer pubkey + if !packet.meta.is_tracer_tx + && &packet.data[pubkey_start..pubkey_end] == TRACER_KEY.as_ref() + { + packet.meta.is_tracer_tx = true; + } + pubkey_start += size_of::(); sig_start += size_of::(); } - 1 } pub fn batch_size(batches: &[Packets]) -> usize { @@ -242,30 +266,28 @@ pub fn generate_offsets(batches: &[Packets], recycler: &Recycler) -> T ) } -pub fn ed25519_verify_cpu(batches: &[Packets]) -> Vec> { +pub fn ed25519_verify_cpu(batches: &mut [Packets]) { use rayon::prelude::*; let count = batch_size(batches); debug!("CPU ECDSA for {}", batch_size(batches)); - let rv = PAR_THREAD_POOL.install(|| { + PAR_THREAD_POOL.install(|| { batches .into_par_iter() - .map(|p| p.packets.par_iter().map(verify_packet).collect()) - .collect() + .for_each(|p| p.packets.par_iter_mut().for_each(|p| verify_packet(p))) }); inc_new_counter_debug!("ed25519_verify_cpu", count); - rv } -pub fn ed25519_verify_disabled(batches: &[Packets]) -> Vec> { +pub fn ed25519_verify_disabled(batches: &mut [Packets]) { use rayon::prelude::*; let count = batch_size(batches); debug!("disabled ECDSA for {}", batch_size(batches)); - let rv = batches - .into_par_iter() - .map(|p| vec![1u8; p.packets.len()]) - .collect(); + batches.into_par_iter().for_each(|p| { + p.packets + .par_iter_mut() + .for_each(|p| p.meta.discard = false) + }); inc_new_counter_debug!("ed25519_verify_disabled", count); - rv } pub fn copy_return_values(sig_lens: &[Vec], out: &PinnedVec, rvs: &mut Vec>) { @@ -319,11 +341,19 @@ pub fn get_checked_scalar(scalar: &[u8; 32]) -> Result<[u8; 32], PacketError> { Ok(out) } +pub fn mark_disabled(batches: &mut [Packets], r: &[Vec]) { + batches.iter_mut().zip(r).for_each(|(b, v)| { + b.packets.iter_mut().zip(v).for_each(|(p, f)| { + p.meta.discard = *f == 0; + }) + }); +} + pub fn ed25519_verify( - batches: &[Packets], + batches: &mut [Packets], recycler: &Recycler, recycler_out: &Recycler>, -) -> Vec> { +) { let api = perf_libs::api(); if api.is_none() { return ed25519_verify_cpu(batches); @@ -353,7 +383,7 @@ pub fn ed25519_verify( let mut rvs = Vec::new(); let mut num_packets = 0; - for p in batches { + for p in batches.iter() { elems.push(perf_libs::Elems { elems: p.packets.as_ptr(), num: p.packets.len() as u32, @@ -389,8 +419,8 @@ pub fn ed25519_verify( } trace!("done verify"); copy_return_values(&sig_lens, &out, &mut rvs); + mark_disabled(batches, &rvs); inc_new_counter_debug!("ed25519_verify_gpu", count); - rvs } #[cfg(test)] @@ -430,6 +460,17 @@ mod tests { None } + #[test] + fn test_mark_disabled() { + let mut batch = Packets::default(); + batch.packets.push(Packet::default()); + let mut batches: Vec = vec![batch]; + mark_disabled(&mut batches, &[vec![0]]); + assert_eq!(batches[0].packets[0].meta.discard, true); + mark_disabled(&mut batches, &[vec![1]]); + assert_eq!(batches[0].packets[0].meta.discard, false); + } + #[test] fn test_layout() { let tx = test_tx(); @@ -676,16 +717,19 @@ mod tests { packet.data[20] = packet.data[20].wrapping_add(10); } - let batches = generate_packet_vec(&packet, n, 2); + let mut batches = generate_packet_vec(&packet, n, 2); let recycler = Recycler::new_without_limit(""); let recycler_out = Recycler::new_without_limit(""); // verify packets - let ans = sigverify::ed25519_verify(&batches, &recycler, &recycler_out); + sigverify::ed25519_verify(&mut batches, &recycler, &recycler_out); // check result - let ref_ans = if modify_data { 0u8 } else { 1u8 }; - assert_eq!(ans, vec![vec![ref_ans; n], vec![ref_ans; n]]); + let should_discard = modify_data; + assert!(batches + .iter() + .flat_map(|p| &p.packets) + .all(|p| p.meta.discard == should_discard)); } #[test] @@ -695,14 +739,16 @@ mod tests { tx.signatures.pop(); let packet = sigverify::make_packet_from_transaction(tx); - let batches = generate_packet_vec(&packet, 1, 1); + let mut batches = generate_packet_vec(&packet, 1, 1); let recycler = Recycler::new_without_limit(""); let recycler_out = Recycler::new_without_limit(""); // verify packets - let ans = sigverify::ed25519_verify(&batches, &recycler, &recycler_out); - - assert_eq!(ans, vec![vec![0u8; 1]]); + sigverify::ed25519_verify(&mut batches, &recycler, &recycler_out); + assert!(batches + .iter() + .flat_map(|p| &p.packets) + .all(|p| p.meta.discard)); } #[test] @@ -738,13 +784,23 @@ mod tests { let recycler = Recycler::new_without_limit(""); let recycler_out = Recycler::new_without_limit(""); // verify packets - let ans = sigverify::ed25519_verify(&batches, &recycler, &recycler_out); + sigverify::ed25519_verify(&mut batches, &recycler, &recycler_out); // check result let ref_ans = 1u8; let mut ref_vec = vec![vec![ref_ans; n]; num_batches]; ref_vec[0].push(0u8); - assert_eq!(ans, ref_vec); + assert!(batches + .iter() + .flat_map(|p| &p.packets) + .zip(ref_vec.into_iter().flatten()) + .all(|(p, discard)| { + if discard == 0 { + p.meta.discard + } else { + !p.meta.discard + } + })); } #[test] @@ -772,14 +828,18 @@ mod tests { batches[batch].packets[packet].data[offset].wrapping_add(add); } - // verify packets - let ans = sigverify::ed25519_verify(&batches, &recycler, &recycler_out); + // verify from GPU verification pipeline (when GPU verification is enabled) are + // equivalent to the CPU verification pipeline. + let mut batches_cpu = batches.clone(); + sigverify::ed25519_verify(&mut batches, &recycler, &recycler_out); + ed25519_verify_cpu(&mut batches_cpu); - let cpu_ref = ed25519_verify_cpu(&batches); - - debug!("ans: {:?} ref: {:?}", ans, cpu_ref); // check result - assert_eq!(ans, cpu_ref); + batches + .iter() + .flat_map(|p| &p.packets) + .zip(batches_cpu.iter().flat_map(|p| &p.packets)) + .for_each(|(p1, p2)| assert_eq!(p1, p2)); } } diff --git a/sdk/src/packet.rs b/sdk/src/packet.rs index dd5f1a269..31f6152a9 100644 --- a/sdk/src/packet.rs +++ b/sdk/src/packet.rs @@ -24,6 +24,7 @@ pub struct Meta { pub v6: bool, pub seed: [u8; 32], pub slot: Slot, + pub is_tracer_tx: bool, } #[derive(Clone)]